Configure SSE streaming responses
Enable Server-Sent Events (SSE) streaming in your SDKs to deliver real-time data to clients
Server-Sent Events (SSE) streaming enables your API to push real-time updates to clients over a single HTTP connection. Unlike WebSockets, SSE is unidirectional (server-to-client only) and uses standard HTTP, making it simpler to implement and naturally compatible with proxies, firewalls, and existing HTTP infrastructure.
Stainless generates SDK bindings that let your users consume streams ergonomically through for await loops in TypeScript and similar patterns in other languages.
Configure a streaming method
Section titled “Configure a streaming method”To enable streaming on a method, add a streaming object to the method definition in your Stainless config:
resources: my_resources: models: stream_chunk: StreamResponse methods: generate: endpoint: post /v1/generate type: http streaming: param_discriminator: stream stream_event_model: my_resources.stream_chunkThe streaming object accepts these properties:
| Property | Description |
|---|---|
param_discriminator | The request parameter that tells the server whether to respond with SSE or JSON. Set to null if the endpoint always streams. |
stream_event_model | The full path to the model representing each Server-Sent Event, for example, chat.completions.chat_completion_chunk. If not specified, defaults to the response schema. |
params_type_name | The base name for generated request parameter types. When set, Stainless generates {params_type_name}NonStreaming and {params_type_name}Streaming type variants. |
The param_discriminator works regardless of where the parameter is defined in your OpenAPI spec—whether in the request body, query string, or elsewhere. Stainless finds the parameter by name in the method and uses it to determine the response type.
Custom parameter type names
Section titled “Custom parameter type names”For dual-mode endpoints, Stainless generates two request parameter types: one for streaming and one for non-streaming requests. By default, Stainless infers the type name from the request body’s $ref or model name. If neither is available, you may see the diagnostic:
Streaming/CannotInferParamsName: No model name, $ref or streaming.param_type_name defined - using default params type nameTo resolve this, set params_type_name to a descriptive name for your request parameters:
resources: chat: subresources: completions: models: chat_completion: CreateChatCompletionResponse chat_completion_chunk: CreateChatCompletionStreamResponse methods: create: endpoint: post /chat/completions type: http streaming: stream_event_model: chat.completions.chat_completion_chunk param_discriminator: stream params_type_name: chat_completion_create_paramsThis generates clearly named types:
ChatCompletionCreateParamsNonStreamingChatCompletionCreateParamsStreaming
Without params_type_name, Stainless uses a default name which may be less descriptive. Setting this property is optional but recommended for clearer SDK type names.
Always-streaming endpoints
Section titled “Always-streaming endpoints”If your endpoint always returns a streamed response, set param_discriminator to null:
resources: my_resources: models: stream_chunk: StreamResponse methods: stream_data: endpoint: post /v1/stream type: http streaming: param_discriminator: null stream_event_model: my_resources.stream_chunkDual-mode endpoints
Section titled “Dual-mode endpoints”Many APIs support both streaming and non-streaming responses on the same endpoint, controlled by a request parameter. For example, setting stream: true might return an SSE stream while stream: false returns a complete response.
In your OpenAPI spec, define the stream parameter as a boolean:
# OpenAPI specpaths: /chat/completions: post: operationId: createChatCompletion requestBody: content: application/json: schema: type: object properties: stream: type: boolean description: If true, partial message deltas will be sent as SSE eventsThen configure streaming in your Stainless config:
resources: chat: subresources: completions: models: chat_completion: CreateChatCompletionResponse chat_completion_chunk: CreateChatCompletionStreamResponse methods: create: endpoint: post /chat/completions type: http streaming: stream_event_model: chat.completions.chat_completion_chunk param_discriminator: streamIn this example:
- When users set
stream: false(or omit it), the SDK returns aChatCompletionresponse - When users set
stream: true, the SDK returns an iterable stream ofChatCompletionChunkevents
The exact API varies by language. In some languages, users pass a stream parameter to the method. In other languages (like Go and Python), there are separate methods (for example, create and createStreaming) because the return types differ and the language cannot express dependent typing based on parameters.
Configure event handlers
Section titled “Configure event handlers”You can define how the SDK handles specific streaming events using the top-level streaming configuration. This controls what happens when the SDK receives different types of events or data messages.
streaming: on_event: - data_starts_with: '[DONE]' handle: done - event_type: error handle: error - kind: fallthrough handle: yieldThe on_event array defines event handlers that are evaluated in order. When the SDK receives a streaming event, it checks each handler sequentially until it finds a match.
Handler properties
Section titled “Handler properties”Each event handler can match on one of these conditions:
| Property | Description |
|---|---|
kind | The type of handler. Use fallthrough as a catch-all that matches any event, including events with no type and future event types. Place fallthrough handlers at the end of your list for forward compatibility |
data_starts_with | Matches events where the data field starts with the specified string |
event_type | Matches events with the specified SSE event type. Use null to match only events where the event: line is absent |
Handle actions
Section titled “Handle actions”The handle property specifies what the SDK does when an event matches:
| Action | Description |
|---|---|
yield | Parse the event data as JSON and return it to the user. The data must conform to the stream_event_model schema. At least one handler must use this action, otherwise no data is returned to users |
done | Signal that the stream has ended normally. The SDK continues iterating the stream to fully consume it, but ignores all subsequent events. Use this when you need to ensure the HTTP connection is properly drained |
error | Parse the event as an error and raise an exception. Use with error_property to specify which field contains the error details |
break | Stop processing the stream immediately without consuming remaining events. Use this when you want to terminate iteration as soon as possible |
continue | Skip this event and continue to the next one. Use this for keepalive, heartbeat, or other control messages you want to ignore |
Example: Complete streaming configuration
Section titled “Example: Complete streaming configuration”Here is a complete example based on a chat completion API:
streaming: on_event: # Handle server-sent done signal - data_starts_with: '[DONE]' handle: done # Handle explicit error events - event_type: error handle: error # Skip keepalive messages - event_type: [ping, heartbeat] handle: continue # Yield all other events, checking for inline errors - kind: fallthrough handle: yield error_property: error
resources: chat: subresources: completions: models: chat_completion: CreateChatCompletionResponse chat_completion_chunk: CreateChatCompletionStreamResponse methods: create: endpoint: post /chat/completions type: http streaming: stream_event_model: chat.completions.chat_completion_chunk param_discriminator: streamThis configuration:
- Ends the stream when the data starts with
[DONE], continuing to consume remaining events - Raises an exception when receiving an
errorevent type - Ignores
pingandheartbeatkeepalive messages - Yields all other events to the user using
kind: fallthroughas a catch-all - Checks the
errorproperty in yielded events to detect inline errors
Using break to terminate immediately
Section titled “Using break to terminate immediately”Use break when you want to stop processing the stream without consuming remaining events. This is useful when the client needs to disconnect immediately:
streaming: on_event: - event_type: fatal_error handle: break - kind: fallthrough handle: yieldWhen the SDK receives a fatal_error event, it stops iterating immediately. Unlike done, the SDK does not continue consuming the stream.
Error handling in streams
Section titled “Error handling in streams”To handle errors within streamed events, use the error_property option. This tells the SDK which property in the event data contains the error details:
streaming: on_event: - kind: fallthrough handle: yield error_property: errorWhen the SDK encounters an event where the error property is present and truthy, it raises an exception using that property’s value as the error message or object.
For explicit error event types, you can omit error_property to use the entire event data as the error:
streaming: on_event: - event_type: error handle: error # Uses full event data as the error - kind: fallthrough handle: yield error_property: error # Uses only the 'error' fieldSDK usage examples
Section titled “SDK usage examples”Once streaming is configured, users can consume streams in each SDK using idiomatic patterns.
const stream = client.chat.completions .create({ model: 'gpt-4', messages: [{ role: 'user', content: 'Hello!' }], stream: true, }) .on('chunk', (chunk) => { process.stdout.write(chunk.choices[0]?.delta?.content || ''); });
// Or use async iterationfor await (const chunk of stream) { process.stdout.write(chunk.choices[0]?.delta?.content || '');}# Sync streamingwith client.chat.completions.stream( model="gpt-4", messages=[{"role": "user", "content": "Hello!"}],) as stream: for chunk in stream: print(chunk.choices[0].delta.content or "", end="", flush=True)
# Async streamingasync with client.chat.completions.stream( model="gpt-4", messages=[{"role": "user", "content": "Hello!"}],) as stream: async for chunk in stream: print(chunk.choices[0].delta.content or "", end="", flush=True)stream := client.Chat.Completions.NewStreaming(ctx, acme.ChatCompletionNewParams{ Model: acme.String("gpt-4"), Messages: []acme.ChatCompletionMessageParam{ acme.NewUserMessage(acme.NewTextBlock("Hello!")), },})
for stream.Next() { event := stream.Current() switch delta := event.AsAny().(type) { case acme.ChatCompletionChunk: if len(delta.Choices) > 0 { fmt.Print(delta.Choices[0].Delta.Content) } }}
if err := stream.Err(); err != nil { log.Fatal(err)}ChatCompletionCreateParams params = ChatCompletionCreateParams.builder() .model("gpt-4") .addMessage(ChatCompletionMessageParam.ofUser("Hello!")) .stream(true) .build();
try (StreamResponse<ChatCompletionChunk> stream = client.chat().completions().createStreaming(params)) { stream.stream() .flatMap(chunk -> chunk.choices().stream()) .map(choice -> choice.delta().content().orElse("")) .forEach(System.out::print);}val params = ChatCompletionCreateParams.builder() .model("gpt-4") .addMessage(ChatCompletionMessageParam.ofUser("Hello!")) .stream(true) .build()
client.chat().completions().createStreaming(params).use { stream -> stream.stream() .flatMap { chunk -> chunk.choices().stream() } .map { choice -> choice.delta().content().orElse("") } .forEach { print(it) }}stream = client.chat.completions.stream( model: "gpt-4", messages: [{ role: :user, content: "Hello!" }])
stream.each do |chunk| print chunk.choices.first&.delta&.contentend$stream = $client->chat->completions->create([ 'model' => 'gpt-4', 'messages' => [['role' => 'user', 'content' => 'Hello!']], 'stream' => true,]);
foreach ($stream as $chunk) { echo $chunk->choices[0]->delta->content ?? '';}var stream = client.Chat.Completions.CreateStreaming(new ChatCompletionCreateParams{ Model = "gpt-4", Messages = new[] { new UserMessage("Hello!") }, Stream = true});
await foreach (var chunk in stream){ Console.Write(chunk.Choices[0]?.Delta?.Content ?? "");}- Define a dedicated model for stream chunks rather than reusing your main response model
- Use a discriminated union for your stream event model when you have multiple event types. This improves deserialization performance in some languages
- Use meaningful termination signals like
[DONE]that are easy to identify - Consider supporting both streaming and non-streaming modes for flexibility
- Test streaming behavior with slow connections and interruptions