Python implementation of Connect Protocol.
This repo contains a protoc plugin that generates sever and client code and a pypi package with common implementation details.
- Python 3.10 or later
You can install the protoc plugin using one of these methods:
Download the latest release from GitHub Releases page. Pre-built binaries are available for:
- Linux (amd64, arm64)
- macOS (amd64, arm64)
- Windows (amd64, arm64)
If you have Go installed, you can install using:
go install github.com/i2y/connecpy/v2/protoc-gen-connecpy@latest
Additionally, please add the connecpy package to your project using your preferred package manager. For instance, with uv, use the command:
uv add connecpy
or
pip install connecpy
To run the server, you'll need one of the following: Uvicorn, Daphne, or Hypercorn. If your goal is to support both HTTP/1.1 and HTTP/2, you should opt for either Daphne or Hypercorn. Additionally, to test the server, you might need a client command, such as buf.
Use the protoc plugin to generate connecpy server and client code.
protoc --python_out=./ --pyi_out=./ --connecpy_out=./ ./haberdasher.proto
By default, naming follows PEP8 conventions. To use Google conventions, matching the output of grpc-python, add --connecpy_opt=naming=google
.
By default, imports are generated absolutely based on the proto package name. To use relative import, add --connecpy_opt=imports=relative
.
# service.py
import random
from connecpy.code import Code
from connecpy.exceptions import ConnecpyException
from connecpy.request import RequestContext
from haberdasher_pb2 import Hat, Size
class HaberdasherService:
async def make_hat(self, req: Size, ctx: RequestContext) -> Hat:
print("remaining_time: ", ctx.timeout_ms())
if req.inches <= 0:
raise ConnecpyException(
Code.INVALID_ARGUMENT, "inches: I can't make a hat that small!"
)
response = Hat(
size=req.inches,
color=random.choice(["white", "black", "brown", "red", "blue"]),
)
if random.random() > 0.5:
response.name = random.choice(
["bowler", "baseball cap", "top hat", "derby"]
)
return response
# server.py
from haberdasher_connecpy import HaberdasherASGIApplication
from service import HaberdasherService
app = HaberdasherASGIApplication(
HaberdasherService()
)
Run the server with
uvicorn --port=3000 server:app
or
daphne --port=3000 server:app
or
hypercorn --bind :3000 server:app
# async_client.py
import asyncio
import httpx
from connecpy.exceptions import ConnecpyException
from haberdasher_connecpy import HaberdasherClient
from haberdasher_pb2 import Size, Hat
server_url = "http://localhost:3000"
timeout_s = 5
async def main():
async with httpx.AsyncClient(
base_url=server_url,
timeout=timeout_s,
) as session:
async with HaberdasherClient(server_url, session=session) as client:
try:
response = await client.make_hat(
Size(inches=12),
)
if not response.HasField("name"):
print("We didn't get a name!")
print(response)
except ConnecpyException as e:
print(e.code, e.message)
if __name__ == "__main__":
asyncio.run(main())
Example output :
size: 12
color: "black"
name: "bowler"
# client.py
from connecpy.exceptions import ConnecpyException
from haberdasher_connecpy import HaberdasherClientSync
from haberdasher_pb2 import Size, Hat
server_url = "http://localhost:3000"
timeout_s = 5
def main():
with HaberdasherClientSync(server_url, timeout_ms=timeout_s * 1000) as client:
try:
response = client.make_hat(
Size(inches=12),
)
if not response.HasField("name"):
print("We didn't get a name!")
print(response)
except ConnecpyException as e:
print(e.code, e.message)
if __name__ == "__main__":
main()
Connecpy supports streaming RPCs in addition to unary RPCs. Here are examples of each type:
In server streaming RPCs, the client sends a single request and receives multiple responses from the server.
# service.py
from typing import AsyncIterator
from connecpy.request import RequestContext
from haberdasher_pb2 import Hat, Size
class HaberdasherService:
async def make_similar_hats(self, req: Size, ctx: RequestContext) -> AsyncIterator[Hat]:
"""Server Streaming: Returns multiple hats of similar size"""
for i in range(3):
yield Hat(
size=req.inches + i,
color=["red", "green", "blue"][i],
name=f"hat #{i+1}"
)
# async_client.py
import asyncio
import httpx
from connecpy.exceptions import ConnecpyException
from haberdasher_connecpy import HaberdasherClient
from haberdasher_pb2 import Size, Hat
async def main():
async with httpx.AsyncClient(base_url="http://localhost:3000") as session:
async with HaberdasherClient(
"http://localhost:3000",
session=session
) as client:
# Server streaming: receive multiple responses
hats = []
stream = client.make_similar_hats(
Size(inches=12, description="summer hat")
)
async for hat in stream:
print(f"Received: {hat.color} {hat.name} (size {hat.size})")
hats.append(hat)
print(f"Total hats received: {len(hats)}")
if __name__ == "__main__":
asyncio.run(main())
# sync_client.py
import httpx
from connecpy.exceptions import ConnecpyException
from haberdasher_connecpy import HaberdasherClientSync
from haberdasher_pb2 import Size, Hat
def main():
with httpx.Client(base_url="http://localhost:3000") as session:
with HaberdasherClientSync(
"http://localhost:3000",
session=session
) as client:
# Server streaming: receive multiple responses
hats = []
stream = client.make_similar_hats(
Size(inches=12, description="winter hat")
)
for hat in stream:
print(f"Received: {hat.color} {hat.name} (size {hat.size})")
hats.append(hat)
print(f"Total hats received: {len(hats)}")
if __name__ == "__main__":
main()
In client streaming RPCs, the client sends multiple requests and receives a single response from the server.
service ExampleService {
rpc CollectSizes(stream Size) returns (Summary);
}
message Summary {
int32 total_count = 1;
float average_size = 2;
}
# service.py
from typing import AsyncIterator
from connecpy.request import RequestContext
from example_pb2 import Size, Summary
class ExampleService:
async def collect_sizes(
self,
req: AsyncIterator[Size],
ctx: RequestContext
) -> Summary:
"""Client Streaming: Collect multiple sizes and return summary"""
sizes = []
async for size_msg in req:
sizes.append(size_msg.inches)
if not sizes:
return Summary(total_count=0, average_size=0)
return Summary(
total_count=len(sizes),
average_size=sum(sizes) / len(sizes)
)
# async_client.py
import asyncio
from typing import AsyncIterator
import haberdasher_pb2
from haberdasher_connecpy import ExampleClient
async def send_sizes() -> AsyncIterator[haberdasher_pb2.Size]:
"""Generator to send multiple sizes to the server"""
sizes_to_send = [10, 12, 14, 16, 18]
for size in sizes_to_send:
yield haberdasher_pb2.Size(inches=size)
await asyncio.sleep(0.1) # Simulate some delay
async def main():
async with ExampleClient(
"http://localhost:3000",
session=session
) as client:
# Client streaming: send multiple requests
summary = await client.collect_sizes(send_sizes())
print(f"Summary: {summary.total_count} sizes, average: {summary.average_size}")
# sync_client.py
from typing import Iterator
import haberdasher_pb2
from haberdasher_connecpy import ExampleClientSync
def send_sizes() -> Iterator[haberdasher_pb2.Size]:
"""Generator to send multiple sizes to the server"""
sizes_to_send = [10, 12, 14, 16, 18]
for size in sizes_to_send:
yield haberdasher_pb2.Size(inches=size)
def main():
with ExampleClientSync(
"http://localhost:3000",
session=session
) as client:
# Client streaming: send multiple requests
summary = client.collect_sizes(send_sizes())
print(f"Summary: {summary.total_count} sizes, average: {summary.average_size}")
Bidirectional streaming RPCs allow both client and server to send multiple messages to each other. Connecpy supports both:
- Full-duplex bidirectional streaming: Client and server can send and receive messages simultaneously
- Half-duplex bidirectional streaming: Client finishes sending all requests before the server starts sending responses
-
Server Implementation:
- HTTP/2 ASGI servers (hypercorn, daphne): Support all streaming types including full-duplex bidirectional streaming
- HTTP/1 ASGI/WSGI servers: Support unary, server streaming, client streaming, and half-duplex bidirectional streaming
- Note: Full-duplex bidirectional streaming is not supported by WSGI servers due to protocol limitations (WSGI servers read the entire request before processing)
-
Client Support:
- Both
ConnecpyClient
(async) andConnecpyClientSync
support receiving streaming responses - Both support sending streaming requests (client streaming)
- Both
-
Resource Management: When using streaming responses, resource will be cleaned up when the returned iterator completes or is garbage collected.
-
Error Handling: Streaming RPCs can raise exceptions during iteration:
# Async version try: async for message in stream: process(message) except ConnecpyException as e: print(f"Stream error: {e.code} - {e.message}") # Sync version try: for message in stream: process(message) except ConnecpyException as e: print(f"Stream error: {e.code} - {e.message}")
-
Bidirectional Streaming:
- Both full-duplex and half-duplex modes are supported
- In full-duplex mode, client and server can send/receive messages simultaneously
- In half-duplex mode, client must finish sending before server starts responding
Of course, you can use any HTTP client to make requests to a Connecpy server. For example, commands like curl
or buf curl
can be used, as well as HTTP client libraries such as requests
, httpx
, aiohttp
, and others. The examples below use curl
and buf curl
.
Content-Type: application/proto, HTTP/1.1
buf curl --data '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --schema ./haberdasher.proto
On Windows, Content-Type: application/proto, HTTP/1.1
buf curl --data '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --schema .\haberdasher.proto
Content-Type: application/proto, HTTP/2
buf curl --data '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --http2-prior-knowledge --schema ./haberdasher.proto
On Windows, Content-Type: application/proto, HTTP/2
buf curl --data '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --http2-prior-knowledge --schema .\haberdasher.proto
Content-Type: application/json, HTTP/1.1
curl -X POST -H "Content-Type: application/json" -d '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat
On Windows, Content-Type: application/json, HTTP/1.1
curl -X POST -H "Content-Type: application/json" -d '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat
Content-Type: application/json, HTTP/2
curl --http2-prior-knowledge -X POST -H "Content-Type: application/json" -d '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat
On Windows, Content-Type: application/json, HTTP/2
curl --http2-prior-knowledge -X POST -H "Content-Type: application/json" -d '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat
Connecpy provides full WSGI support via the ConnecpyWSGIApplication
. This synchronous application adapts our service endpoints to the WSGI specification. It reads requests from the WSGI environ
, processes requests, and returns responses using start_response
. This enables integration with WSGI servers and middleware.
Starting from version 2.1.0, WSGI applications now support streaming RPCs! This includes server streaming, client streaming, and half-duplex bidirectional streaming. Here's how to implement streaming with WSGI:
# service_sync.py
from typing import Iterator
from connecpy.request import RequestContext
from haberdasher_pb2 import Hat, Size
class HaberdasherServiceSync:
def make_hat(self, req: Size, ctx: RequestContext) -> Hat:
"""Unary RPC"""
return Hat(size=req.inches, color="red", name="fedora")
def make_similar_hats(self, req: Size, ctx: RequestContext) -> Iterator[Hat]:
"""Server Streaming RPC - returns multiple hats"""
for i in range(3):
yield Hat(
size=req.inches + i,
color=["red", "green", "blue"][i],
name=f"hat #{i+1}"
)
def collect_sizes(self, req: Iterator[Size], ctx: RequestContext) -> Hat:
"""Client Streaming RPC - receives multiple sizes, returns one hat"""
sizes = []
for size_msg in req:
sizes.append(size_msg.inches)
avg_size = sum(sizes) / len(sizes) if sizes else 0
return Hat(size=int(avg_size), color="average", name="custom")
def make_various_hats(self, req: Iterator[Size], ctx: RequestContext) -> Iterator[Hat]:
"""Bidirectional Streaming RPC (half-duplex only for WSGI)"""
# Note: In WSGI, all requests are received before sending responses
for size_msg in req:
yield Hat(size=size_msg.inches, color="custom", name=f"size-{size_msg.inches}")
# wsgi_server.py
from haberdasher_connecpy import HaberdasherWSGIApplication
from service_sync import HaberdasherServiceSync
app = HaberdasherWSGIApplication(
HaberdasherServiceSync()
)
if __name__ == "__main__":
from werkzeug.serving import run_simple
run_simple("localhost", 3000, app)
Please see the complete example in the example directory.
Connecpy supports various compression methods for both GET and POST requests/responses:
- gzip
- brotli (br)
- zstandard (zstd)
- identity (no compression)
For GET requests, specify the compression method using the compression
query parameter:
curl "http://localhost:3000/service/method?compression=gzip&message=..."
For POST requests, use the Content-Encoding
header:
curl -H "Content-Encoding: br" -d '{"data": "..."}' http://localhost:3000/service/method
The compression is handled directly in the request handlers, ensuring consistent behavior across HTTP methods and frameworks (ASGI/WSGI).
With Connecpy's compression features, you can automatically handle compressed requests and responses. Here are some examples:
The compression handling is built into both ASGI and WSGI applications. You don't need any additional middleware configuration - it works out of the box!
For async clients:
from haberdasher_connecpy import HaberdasherClient
from haberdasher_pb2 import Size
async with HaberdasherClient(
server_url,
send_compression="br",
accept_compression=["gzip"]
) as client:
response = await client.make_hat(
Size(inches=12)
)
For synchronous clients:
from haberdasher_connecpy import HaberdasherClientSync
from haberdasher_pb2 import Size
with HaberdasherClientSync(
server_url,
send_compression="zstd", # Use Zstandard compression for request
accept_compression=["br"] # Accept Brotli compressed response
) as client:
response = client.make_hat(
Size(inches=12)
)
Connecpy automatically enables GET request support for methods marked with idempotency_level = NO_SIDE_EFFECTS
in your proto files. This follows the Connect Protocol's GET specification.
Mark methods as side-effect-free in your .proto
file:
service Haberdasher {
// This method will support both GET and POST requests
rpc MakeHat(Size) returns (Hat) {
option idempotency_level = NO_SIDE_EFFECTS;
}
// This method only supports POST requests (default)
rpc UpdateHat(Hat) returns (Hat);
}
When a method is marked with NO_SIDE_EFFECTS
, the generated client code includes a use_get
parameter:
from haberdasher_connecpy import HaberdasherClient, HaberdasherClientSync
from haberdasher_pb2 import Size
# Async client using GET request
async with HaberdasherClient(server_url, session=session) as client:
response = await client.make_hat(
Size(inches=12),
use_get=True # Use GET instead of POST
)
# Sync client using GET request
with HaberdasherClientSync(server_url) as client:
response = client.make_hat(
Size(inches=12),
use_get=True # Use GET instead of POST
)
The generated server code automatically configures GET support based on the proto definition. Methods with NO_SIDE_EFFECTS
will have allowed_methods=("GET", "POST")
while others will have allowed_methods=("POST",)
only.
You can also make GET requests directly using curl or other HTTP clients:
# GET request with query parameters (base64-encoded message)
curl "http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat?encoding=proto&message=CgwI..."
# GET request with compression
curl "http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat?encoding=proto&compression=gzip&message=..."
Note: GET support is particularly useful for:
- Cacheable operations
- Browser-friendly APIs
- Read-only operations that don't modify server state
ConnecpyASGIApp
is a standard ASGI application meaning any CORS ASGI middleware will work well with it, for example
starlette.middleware.cors.CORSMiddleware
. Refer to Connect Docs for standard
headers commonly used by Connect clients for CORS negotiation and a full example using Starlette.
Connecpy protoc plugin generates the code based on Connect Protocol from the .proto
files.
Connecpy supports the following RPC types:
- Unary RPCs - Single request/response
- Server Streaming RPCs - Single request, multiple responses
- Client Streaming RPCs - Multiple requests, single response
- Bidirectional Streaming RPCs - Multiple requests and responses (both full-duplex and half-duplex)
Starting from version 2.0.0, protoc-gen-connecpy supports Proto Editions 2023. You can use the new editions syntax in your .proto
files:
edition = "2023";
package example;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
The code generation works the same way as with proto2/proto3 syntax. Note that you'll need protoc version 26.0 or later to use editions syntax.
Connecpy applications are standard WSGI or ASGI applications. For complex routing requirements, you can use a routing framework such as werkzeug or starlette.
The generated application classes now expose a path
property that returns the service's URL path, making it easier to mount multiple services:
from haberdasher_connecpy import HaberdasherWSGIApplication
haberdasher_app = HaberdasherWSGIApplication(service)
print(haberdasher_app.path) # "/package.ServiceName"
# Use with routing frameworks
app.wsgi_app = DispatcherMiddleware(
app.wsgi_app,
{
haberdasher_app.path: haberdasher_app,
},
)
Connecpy supports server-side interceptors for both ASGI and WSGI applications. Interceptors allow you to add cross-cutting concerns like logging, authentication, and metrics to your services.
Connecpy provides type-safe interceptors for each RPC type:
# server.py
from typing import Awaitable, Callable, AsyncIterator
from connecpy.request import RequestContext
from haberdasher_pb2 import Size, Hat
from haberdasher_connecpy import HaberdasherASGIApplication
from service import HaberdasherService
# Single interceptor class handling multiple RPC types
class LoggingInterceptor:
async def intercept_unary(
self,
next: Callable[[Size, RequestContext], Awaitable[Hat]],
request: Size,
ctx: RequestContext,
) -> Hat:
print(f"Unary RPC: {ctx.method().name}, size={request.inches}")
response = await next(request, ctx)
print(f"Response sent: {response.color} hat")
return response
async def intercept_server_stream(
self,
next: Callable[[Size, RequestContext], AsyncIterator[Hat]],
request: Size,
ctx: RequestContext,
) -> AsyncIterator[Hat]:
print(f"Server streaming RPC: {ctx.method().name}, size={request.inches}")
async for response in next(request, ctx):
print(f"Streaming: {response.color} hat (size {response.size})")
yield response
# ASGI application with interceptors
app = HaberdasherASGIApplication(
HaberdasherService(),
interceptors=[LoggingInterceptor()] # Single interceptor handles both unary and streaming
)
For simple cross-cutting concerns that don't need access to request/response bodies, use MetadataInterceptor
:
import time
from connecpy.interceptor import MetadataInterceptor
# Simple timing interceptor
class TimingInterceptor(MetadataInterceptor[float]):
async def on_start(self, ctx: RequestContext) -> float:
print(f"Starting {ctx.method().name}")
return time.time()
async def on_end(self, start_time: float, ctx: RequestContext) -> None:
elapsed = time.time() - start_time
print(f"{ctx.method().name} took {elapsed:.3f}s")
# Works with all RPC types!
app = HaberdasherASGIApplication(
HaberdasherService(),
interceptors=[TimingInterceptor()]
)
WSGI applications support synchronous interceptors:
from typing import Callable
from connecpy.interceptor import MetadataInterceptorSync
from connecpy.request import RequestContext
from haberdasher_pb2 import Size, Hat
from haberdasher_connecpy import HaberdasherWSGIApplication
class LoggingInterceptorSync:
def intercept_unary_sync(
self,
next: Callable[[Size, RequestContext], Hat],
request: Size,
ctx: RequestContext,
) -> Hat:
print(f"Sync RPC: {ctx.method().name}, size={request.inches}")
return next(request, ctx)
class TimingInterceptorSync(MetadataInterceptorSync[float]):
def on_start_sync(self, ctx: RequestContext) -> float:
return time.time()
def on_end_sync(self, start_time: float, ctx: RequestContext) -> None:
elapsed = time.time() - start_time
print(f"{ctx.method().name} took {elapsed:.3f}s")
# WSGI application with interceptors
wsgi_app = HaberdasherWSGIApplication(
HaberdasherServiceSync(),
interceptors=[LoggingInterceptorSync(), TimingInterceptorSync()]
)
Interceptor Type | Use Case | ASGI | WSGI |
---|---|---|---|
UnaryInterceptor / UnaryInterceptorSync |
Unary RPCs | ✅ | ✅ |
ClientStreamInterceptor / ClientStreamInterceptorSync |
Client streaming RPCs | ✅ | ✅ |
ServerStreamInterceptor / ServerStreamInterceptorSync |
Server streaming RPCs | ✅ | ✅ |
BidiStreamInterceptor / BidiStreamInterceptorSync |
Bidirectional streaming RPCs | ✅ | ✅ |
MetadataInterceptor / MetadataInterceptorSync |
All RPC types (metadata only) | ✅ | ✅ |
Interceptors are executed in the order they are provided. For example, if you provide [A, B, C]
, the execution order will be:
- A.on_start → B.on_start → C.on_start → handler → C.on_end → B.on_end → A.on_end
Connecpy supports client-side interceptors, allowing you to add cross-cutting concerns to your client requests:
# async_client_with_interceptor.py
import asyncio
from connecpy.exceptions import ConnecpyException
from haberdasher_connecpy import HaberdasherClient
from haberdasher_pb2 import Size, Hat
class LoggingInterceptor:
"""Interceptor that logs all requests and responses"""
async def intercept_unary(self, next, request, ctx):
print(f"[LOG] Calling {ctx.method().name} with request: {request}")
try:
response = await next(request, ctx)
print(f"[LOG] Received response: {response}")
return response
except Exception as e:
print(f"[LOG] Error: {e}")
raise
async def main():
# Create client with interceptors
client = HaberdasherClient(
"http://localhost:3000",
interceptors=[LoggingInterceptor()]
)
try:
response = await client.make_hat(
Size(inches=12)
)
print(response)
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Client interceptors support all RPC types (unary, client streaming, server streaming, and bidirectional streaming) and work with both async and sync clients.
Connecpy allows you to limit incoming message sizes to protect against resource exhaustion. By default, there is no limit, but you can set one by passing read_max_bytes
to the application constructor:
from haberdasher_connecpy import HaberdasherASGIApplication, HaberdasherWSGIApplication
# Set maximum message size to 1MB for ASGI applications
app = HaberdasherASGIApplication(
service,
read_max_bytes=1024 * 1024 # 1MB
)
# Set maximum message size for WSGI applications
wsgi_app = HaberdasherWSGIApplication(
service_sync,
read_max_bytes=1024 * 1024 # 1MB
)
# Disable message size limit (not recommended for production)
app = HaberdasherASGIApplication(
service,
read_max_bytes=None
)
When a message exceeds the configured limit, the server will return a RESOURCE_EXHAUSTED
error to the client.
The initial version (1.0.0) of this software was created by modifying https://github.com/verloop/twirpy at January 4, 2024, so that it supports Connect Protocol.