const schema = `
type Notification {
id: ID!
message: String
}
type Query {
notifications: [Notification]
}
type Mutation {
addNotification(message: String): Notification
}
type Subscription {
notificationAdded: Notification
}
`
let idCount = 1
const notifications = [
{
id: idCount,
message: 'Notification message'
}
]
const resolvers = {
Query: {
notifications: () => notifications
},
Mutation: {
addNotification: async (_, { message }, { pubsub }) => {
const id = idCount++
const notification = {
id,
message
}
notifications.push(notification)
await pubsub.publish({
topic: 'NOTIFICATION_ADDED',
payload: {
notificationAdded: notification
}
})
return notification
}
},
Subscription: {
notificationAdded: {
// You can also subscribe to multiple topics at once using an array like this:
// pubsub.subscribe(['TOPIC1', 'TOPIC2'])
subscribe: async (root, args, { pubsub }) =>
await pubsub.subscribe('NOTIFICATION_ADDED')
}
}
}
app.register(mercurius, {
schema,
resolvers,
subscription: true
})const schema = `
type Notification {
id: ID!
message: String
}
type Query {
notifications: [Notification]
}
type Mutation {
addNotification(message: String): Notification
}
type Subscription {
notificationAdded(contains: String): Notification
}
`
let idCount = 1
const notifications = [
{
id: idCount,
message: 'Notification message'
}
]
const { withFilter } = mercurius
const resolvers = {
Query: {
notifications: () => notifications
},
Mutation: {
addNotification: async (_, { message }, { pubsub }) => {
const id = idCount++
const notification = {
id,
message
}
notifications.push(notification)
await pubsub.publish({
topic: 'NOTIFICATION_ADDED',
payload: {
notificationAdded: notification
}
})
return notification
}
},
Subscription: {
notificationAdded: {
subscribe: withFilter(
(root, args, { pubsub }) => pubsub.subscribe('NOTIFICATION_ADDED'),
(payload, { contains }) => {
if (!contains) return true
return payload.notificationAdded.message.includes(contains)
}
)
}
}
app.register(mercurius, {
schema,
resolvers,
subscription: true
})The context for the Subscription includes:
app, the Fastify applicationreply, an object that pretend to be aReplyobject from Fastify without its decorators.reply.request, the real request object from Fastify
During the connection initialization phase, all content of proerty payload of the connection_init packet
is automatically copied inside request.headers. In case an headers property is specified within payload,
that's used instead.
...
const resolvers = {
Mutation: {
sendMessage: async (_, { message, userId }, { pubsub }) => {
await pubsub.publish({
topic: userId,
payload: message
})
return "OK"
}
},
Subscription: {
receivedMessage: {
// If someone calls the sendMessage mutation with the Id of the user that was added
// to the subscription context, that user receives the message.
subscribe: (root, args, { pubsub, user }) => pubsub.subscribe(user.id)
}
}
}
app.register(mercurius, {
schema,
resolvers,
subscription: {
// Add the decoded JWT from the Authorization header to the subscription context.
context: (_, req) => ({ user: jwt.verify(req.headers["Authorization"].slice(7))})
}
})
...const redis = require('mqemitter-redis')
const emitter = redis({
port: 6579,
host: '127.0.0.1'
})
const schema = `
type Vote {
id: ID!
title: String!
ayes: Int
noes: Int
}
type Query {
votes: [Vote]
}
type Mutation {
voteAye(voteId: ID!): Vote
voteNo(voteId: ID!): Vote
}
type Subscription {
voteAdded(voteId: ID!): Vote
}
`
const votes = []
const VOTE_ADDED = 'VOTE_ADDED'
const resolvers = {
Query: {
votes: async () => votes
},
Mutation: {
voteAye: async (_, { voteId }, { pubsub }) => {
if (voteId <= votes.length) {
votes[voteId - 1].ayes++
await pubsub.publish({
topic: `VOTE_ADDED_${voteId}`,
payload: {
voteAdded: votes[voteId - 1]
}
})
return votes[voteId - 1]
}
throw new Error('Invalid vote id')
},
voteNo: async (_, { voteId }, { pubsub }) => {
if (voteId <= votes.length) {
votes[voteId - 1].noes++
await pubsub.publish({
topic: `VOTE_ADDED_${voteId}`,
payload: {
voteAdded: votes[voteId - 1]
}
})
return votes[voteId - 1]
}
throw new Error('Invalid vote id')
}
},
Subscription: {
voteAdded: {
subscribe: async (root, { voteId }, { pubsub }) => {
// subscribe only for a vote with a given id
return await pubsub.subscribe(`VOTE_ADDED_${voteId}`)
}
}
}
}
app.register(mercurius, {
schema,
resolvers,
subscription: {
emitter,
verifyClient: (info, next) => {
if (info.req.headers['x-fastify-header'] !== 'fastify is awesome !') {
return next(false) // the connection is not allowed
}
next(true) // the connection is allowed
}
}
})Note that when passing both
pubsubandemitteroptions,emitterwill be ignored.
The CustomPubSub interface allows you to implement a custom publish-subscribe mechanism for GraphQL subscriptions. This gives you complete control over how subscription data is published and delivered.
To create a custom PubSub implementation, you need to implement a class with at least the following methods:
-
subscribe(topic, queue, ...customArgs):
topic: String identifier for the subscription topicqueue: A Readable stream where subscription data will be pushed...customArgs: Optional additional parameters that can be passed from the subscription resolver- Returns: A Promise that resolves when the subscription is set up
-
publish(event, callback):
event: Object containingtopicandpayloadpropertiescallback: Function to be called when the publish operation completes- Expected behavior: Emit the payload to subscribers of the topic
The subscribe method supports passing custom arguments from your GraphQL resolvers. This allows for additional filtering or configuration at the subscription level. For example, you might pass an offset parameter to control where a subscription starts reading from, or pass filter criteria to perform server-side filtering.
Here's an example implementation using Node's EventEmitter:
class CustomPubSub {
constructor () {
this.emitter = new EventEmitter()
}
async subscribe (topic, queue, offset) {
const listener = (value) => {
queue.push(value)
}
const close = () => {
this.emitter.removeListener(topic, listener)
}
this.emitter.on(topic, listener)
queue.close.push(close)
}
publish (event, callback) {
this.emitter.emit(event.topic, event.payload)
callback()
}
}
const pubsub = new CustomPubSub()
app.register(mercurius, {
schema,
resolvers: {
Subscription: {
retrieveItems: {
subscribe: (root, args, { pubsub }) => pubsub.subscribe('RETRIEVE_ITEMS', args.offset)
}
}
},
subscription: {
pubsub
}
})Mercurius uses @fastify/websocket internally, but you can still use it by registering before mercurius plugin. If so, it is recommened to set the appropriate options.maxPayload like this:
const fastifyWebsocket = require('@fastify/websocket')
app.register(fastifyWebsocket, {
options: {
maxPayload: 1048576
}
})
app.register(mercurius, {
schema,
resolvers,
subscription: true
})
app.get('/', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
connection.socket.send('hi from server')
})
})