Skip to main content
Version: Next (2.x)

Subscriptions

Baeta typechecks subscription resolvers the same way it typechecks queries and mutations. The transport (PubSub) is yours to pick: Yoga's built-in PubSub for single-instance deployments, TypedPubSub over Redis for multi-instance, or @baeta/subscriptions-cloudflare for Durable-Object-backed Workers.

PubSub setup

Here's a Yoga PubSub example:

src/lib/pubsub.ts
import { createPubSub } from "graphql-yoga";
import type { User } from "../__generated__/types.ts";

export type PubSubMap = {
"user-created": [string];
"user-updated": [User];
};

export const pubsub = createPubSub<PubSubMap>();
note

Yoga's PubSub types each channel as a tuple payload ('channel': [Payload]). The tuple lets a single channel carry multiple positional arguments. The TypedPubSub wrapper below uses bare types instead, so don't copy a PubSubMap between the two without flipping the shape.

Or use Baeta's Typed PubSub wrapper, which adds type-safety on top of graphql-subscriptions:

import { TypedPubSub } from "@baeta/subscriptions-pubsub";
import { PubSub } from "graphql-subscriptions";

export const pubsub = new TypedPubSub<PubSub, PubSubMap>(new PubSub());
note

Yoga's .subscribe(channel) returns the async iterable directly. With TypedPubSub you call .asyncIterableIterator(channel) for the iterable; .subscribe(channel, onMessage) is callback-based.

warning

The in-memory graphql-subscriptions PubSub only works on a single instance. For multi-instance deployments, use graphql-redis-subscriptions (see Production setup).

Context integration

Add the PubSub instance to your context so resolvers can publish and subscribe:

src/types/context.ts
import type { PubSub } from "graphql-yoga";
import type { PubSubMap } from "../lib/pubsub.ts";

export type Context = {
pubsub: PubSub<PubSubMap>;
};

Schema definition

Declare the subscriptions in your .gql:

src/modules/user/user.gql
type Subscription {
userCreated: User!
userUpdated: User!
}

Implementing subscriptions

A subscription is two steps: .subscribe() returns the async iterable for the channel; .resolve() maps each published payload to the response. .resolve() accepts the same chain helpers as a query resolver (.map, .key, .to, .withDefault, …).

import { db } from "../../lib/db/prisma.ts";
import { UserModule } from "./typedef.ts";

const { Subscription } = UserModule;

// Subscription with database lookup in resolve
const userCreatedSubscription = Subscription.userCreated
.subscribe(({ ctx }) => {
return ctx.pubsub.subscribe("user-created");
})
.resolve(({ source }) => {
return db.user.findFirstOrThrow({
where: { id: source },
});
});

// Direct subscription: source is passed through
const userUpdatedSubscription = Subscription.userUpdated
.subscribe(({ ctx }) => {
return ctx.pubsub.subscribe("user-updated");
})
.resolve(({ source }) => {
return source;
});

export default Subscription.$fields({
userCreated: userCreatedSubscription,
userUpdated: userUpdatedSubscription,
});

Publishing events

Publish from any resolver. Mutations are the usual fit:

// Publishing after user update
const updateUserMutation = Mutation.updateUser
.$use(async (next, { ctx }) => {
const user = await next();
if (user) {
ctx.pubsub.publish("user-updated", user);
}
return user;
})
.resolve(async ({ args }) => {
return db.user.update({
where: { id: args.where.id },
data: args.data,
});
});

// Publishing after user creation
const createUserMutation = Mutation.createUser.resolve(
async ({ args, ctx }) => {
const user = await db.user.create({
data: args.data,
});
ctx.pubsub.publish("user-created", user.id);
return user;
},
);

Subscription middlewares

A subscription has two distinct phases, and $use attaches independently to each:

  • Before .subscribe(...): wraps the subscribe phase. Runs once per subscription, around the call that returns the async iterable.
  • Between .subscribe(...) and .resolve(...): wraps the resolve phase. Runs once per published event, around the resolver that maps the payload to the response.
const userUpdatedSubscription = Subscription.userUpdated
// Subscribe-phase middleware: runs once when the client connects
.$use(async (next) => {
console.log("Subscribing to user updates");
const sub = await next();
console.log("Subscribed");
return sub;
})
.subscribe(({ ctx }) => {
return ctx.pubsub.subscribe("user-updated");
})
// Resolve-phase middleware: runs once per published event
.$use(async (next) => {
const result = await next();
console.log("Delivering update:", result);
return result;
})
.resolve(({ source }) => {
return source;
});

App-plugin helpers like auth work on both phases. See the authorization guide.

Production setup

For multi-instance deployments, swap the in-memory PubSub for graphql-redis-subscriptions wrapped in TypedPubSub:

src/lib/pubsub.ts
import { TypedPubSub } from "@baeta/subscriptions-pubsub";
import { RedisPubSub } from "graphql-redis-subscriptions";
import Redis from "ioredis";

const options = {
host: REDIS_DOMAIN_NAME,
port: PORT_NUMBER,
};

const redisPubSub = new RedisPubSub({
publisher: new Redis(options),
subscriber: new Redis(options),
});

export const pubsub = new TypedPubSub<RedisPubSub, PubSubMap>(redisPubSub);