Routing & Terminal Handlers
Terminal handlers define the end-point of a procedure chain. Actyx RPC supports queries, mutations, streaming responses, and Server-Sent Events (SSE).
All handlers receive { ctx, input } as the first argument, and can also accept additional custom arguments.
Queries and Mutations
- Use
.query()for read-only actions. - Use
.mutation()for state-changing write actions.
// Mutation with additional inline arguments
export const publishPost = procedure
.input(zodResolver(z.object({ id: z.string() })))
.mutation(async ({ ctx, input }, notifyFollowers: boolean) => {
return {
id: input.id,
publishedBy: ctx.userId,
notifyFollowers,
};
});
// Call time
await publishPost({ id: "post_1" }, true);Streaming Responses (.stream())
For procedures returning an AsyncIterable, use .stream(). This is ideal for consuming generative AI streams or real-time progress updates.
const generateContent = procedure
.input(z.object({ prompt: z.string() }))
.stream(async function* ({ input }) {
yield "Thinking...";
const stream = await ai.stream(input.prompt);
for await (const chunk of stream) {
yield chunk;
}
});
// Consume on the client
for await (const chunk of generateContent({ prompt: "Write a story" })) {
console.log(chunk);
}Server-Sent Events (.sse())
For real-time, one-way data push streams (e.g., dashboard tickers, progress bars, notifications) over HTTP, use .sse().
Yield events from a generator, and use the createSSEResponse adapter to turn that stream into a standard HTTP Response.
Server Setup
import { createSSEResponse } from "@explita/actyx-rpc/adapters/next";
const watchStock = procedure
.input(z.object({ symbol: z.string() }))
.sse(async function* ({ input }) {
while (true) {
const price = await getLatestPrice(input.symbol);
yield {
event: "price-update",
data: { price, at: new Date() },
};
await new Promise((r) => setTimeout(r, 5000));
}
});
// Next.js Route Handler
export async function GET(req: Request) {
const stream = watchStock({ symbol: "AAPL" });
return createSSEResponse(stream);
}Raw JS/TS Client Consumption
Use the built-in SSEClient to consume the events on the client:
import { SSEClient } from "@explita/actyx-rpc";
const stock = await SSEClient({
url: "/api/sse",
params: { symbol: "AAPL" },
});
for await (const { event, data } of stock) {
if (event === "price-update") {
console.log("New price:", data.price);
}
}
// To stop the stream manually:
stock.close();WebSocket Subscriptions (.subscription())
For topic-based, real-time subscription feeds, use .subscription().
The handler receives { ctx, input, emit } and should return a cleanup function (e.g. to unsubscribe from database or message broker updates).
export const onRoomEvent = procedure
.input(z.object({ roomId: z.string() }))
.subscription(async ({ ctx, input, emit }) => {
// Subscribe to a topic using the built-in pubsub context
const unsubscribe = await ctx.pubsub.subscribe(
`room:${input.roomId}`,
(data) => {
emit(data); // Push event data payload to the client
}
);
return unsubscribe; // Return the cleanup function
});For client integration, see React Subscriptions. For server setups, see WebSocket Adapters.
Bi-Directional WebSockets (.ws())
For complex, raw, bi-directional socket communications (like multi-user collaboration layers or real-time drawing maps), use .ws().
The handler receives raw socket callbacks to control communications:
export const handleCanvasSync = procedure
.input(z.object({ sessionId: z.string() }))
.ws(async ({ ctx, input, send, onMessage, onClose, onError }) => {
// Access parameters and context
console.log(`User ${ctx.userId} connected to session ${input.sessionId}`);
// Register listeners
onMessage((data) => {
// Broadcast or process incoming payload
broadcastToSession(input.sessionId, data);
});
onClose(() => {
cleanupUserSession(ctx.userId);
});
});RPC Handler Pattern & Result Shape
Procedures in Actyx RPC return a standardized [data, error] tuple.
Success Shape
[
{
success: true,
id: "post_1",
title: "Hello World",
},
null,
]Error Shape
[
null,
{
success: false,
message: "Validation Error",
reason: "VALIDATION_ERROR",
errors: {
title: "Title is too short",
},
},
]Known Failure Reasons
| Reason | Description |
|---|---|
UNAUTHORIZED | Authentication required |
FORBIDDEN | Insufficient permissions |
MAINTENANCE_MODE | System is in maintenance mode |
VALIDATION_ERROR | Input validation failed |
UNEXPECTED_ERROR | Unhandled server error |
INVALID_SESSION | Session expired or invalid |
ABORTED | Request was aborted |
INVALID_CACHE_KEY | Cache key resolution failed |
TIMEOUT | Execution exceeded time limit |
RETRY_EXHAUSTED | All retry attempts exhausted |
CIRCUIT_OPEN | Circuit breaker is open |
RATE_LIMITED | Rate limit exceeded |
Rules of Thumb for Exception Handling
- Let errors throw naturally: Actyx RPC will catch thrown errors, run the global
onErrormapping, and return a clean[null, error]tuple. - Returning
{ success: false }: If you return an object withsuccess: falsefrom the handler, it will be mapped into theerrorslot of the returned tuple. - Implicit Success: Everything else is returned as
[data, null].
// ✅ Good: Let errors throw naturally
const deletePost = procedure.mutation(async ({ input }) => {
return await db.posts.delete(input.id); // throws if not found
});
// ⚠️ Optional: Manually return failed states
const createPost = procedure.mutation(async ({ input }) => {
if (input.title.length < 3) {
return {
success: false,
message: "Title too short",
reason: "VALIDATION_ERROR"
};
}
return await db.posts.create(input);
});