Skip to content

Latest commit

 

History

History
391 lines (326 loc) · 8.96 KB

File metadata and controls

391 lines (326 loc) · 8.96 KB

mercurius

Subscriptions

Subscription support (simple)

const schema = `
  type Notification {
    id: ID!
    message: String
  }

  type Query {
    notifications: [Notification]
  }

  type Mutation {
    addNotification(message: String): Notification
  }

  type Subscription {
    notificationAdded: Notification
  }
`

let idCount = 1
const notifications = [
  {
    id: idCount,
    message: 'Notification message'
  }
]

const resolvers = {
  Query: {
    notifications: () => notifications
  },
  Mutation: {
    addNotification: async (_, { message }, { pubsub }) => {
      const id = idCount++
      const notification = {
        id,
        message
      }
      notifications.push(notification)
      await pubsub.publish({
        topic: 'NOTIFICATION_ADDED',
        payload: {
          notificationAdded: notification
        }
      })

      return notification
    }
  },
  Subscription: {
    notificationAdded: {
      // You can also subscribe to multiple topics at once using an array like this:
      //  pubsub.subscribe(['TOPIC1', 'TOPIC2'])
      subscribe: async (root, args, { pubsub }) =>
        await pubsub.subscribe('NOTIFICATION_ADDED')
    }
  }
}

app.register(mercurius, {
  schema,
  resolvers,
  subscription: true
})

Subscription filters

const schema = `
  type Notification {
    id: ID!
    message: String
  }

  type Query {
    notifications: [Notification]
  }

  type Mutation {
    addNotification(message: String): Notification
  }

  type Subscription {
    notificationAdded(contains: String): Notification
  }
`

let idCount = 1
const notifications = [
  {
    id: idCount,
    message: 'Notification message'
  }
]

const { withFilter } = mercurius

const resolvers = {
  Query: {
    notifications: () => notifications
  },
  Mutation: {
    addNotification: async (_, { message }, { pubsub }) => {
      const id = idCount++
      const notification = {
        id,
        message
      }
      notifications.push(notification)
      await pubsub.publish({
        topic: 'NOTIFICATION_ADDED',
        payload: {
          notificationAdded: notification
        }
      })

      return notification
    }
  },
  Subscription: {
    notificationAdded: {
      subscribe: withFilter(
        (root, args, { pubsub }) => pubsub.subscribe('NOTIFICATION_ADDED'),
        (payload, { contains }) => {
          if (!contains) return true
          return payload.notificationAdded.message.includes(contains)
        }
      )
  }
}

app.register(mercurius, {
  schema,
  resolvers,
  subscription: true
})

Subscription Context

The context for the Subscription includes:

  • app, the Fastify application
  • reply, an object that pretend to be a Reply object from Fastify without its decorators.
  • reply.request, the real request object from Fastify

During the connection initialization phase, all content of proerty payload of the connection_init packet is automatically copied inside request.headers. In case an headers property is specified within payload, that's used instead.

Build a custom GraphQL context object for subscriptions

...
const resolvers = {
  Mutation: {
    sendMessage: async (_, { message, userId }, { pubsub }) => {
      await pubsub.publish({
        topic: userId,
        payload: message
      })

      return "OK"
    }
  },
  Subscription: {
    receivedMessage: {
      // If someone calls the sendMessage mutation with the Id of the user that was added
      // to the subscription context, that user receives the message.
      subscribe: (root, args, { pubsub, user }) => pubsub.subscribe(user.id)
    }
  }
}

app.register(mercurius, {
  schema,
  resolvers,
  subscription: {
      // Add the decoded JWT from the Authorization header to the subscription context.
      context: (_, req) => ({ user: jwt.verify(req.headers["Authorization"].slice(7))})
  }
})
...

Subscription support (with redis)

const redis = require('mqemitter-redis')
const emitter = redis({
  port: 6579,
  host: '127.0.0.1'
})

const schema = `
  type Vote {
    id: ID!
    title: String!
    ayes: Int
    noes: Int
  }

  type Query {
    votes: [Vote]
  }

  type Mutation {
    voteAye(voteId: ID!): Vote
    voteNo(voteId: ID!): Vote
  }

  type Subscription {
    voteAdded(voteId: ID!): Vote
  }
`
const votes = []
const VOTE_ADDED = 'VOTE_ADDED'

const resolvers = {
  Query: {
    votes: async () => votes
  },
  Mutation: {
    voteAye: async (_, { voteId }, { pubsub }) => {
      if (voteId <= votes.length) {
        votes[voteId - 1].ayes++
        await pubsub.publish({
          topic: `VOTE_ADDED_${voteId}`,
          payload: {
            voteAdded: votes[voteId - 1]
          }
        })

        return votes[voteId - 1]
      }

      throw new Error('Invalid vote id')
    },
    voteNo: async (_, { voteId }, { pubsub }) => {
      if (voteId <= votes.length) {
        votes[voteId - 1].noes++
        await pubsub.publish({
          topic: `VOTE_ADDED_${voteId}`,
          payload: {
            voteAdded: votes[voteId - 1]
          }
        })

        return votes[voteId - 1]
      }

      throw new Error('Invalid vote id')
    }
  },
  Subscription: {
    voteAdded: {
      subscribe: async (root, { voteId }, { pubsub }) => {
        // subscribe only for a vote with a given id
        return await pubsub.subscribe(`VOTE_ADDED_${voteId}`)
      }
    }
  }
}

app.register(mercurius, {
  schema,
  resolvers,
  subscription: {
    emitter,
    verifyClient: (info, next) => {
      if (info.req.headers['x-fastify-header'] !== 'fastify is awesome !') {
        return next(false) // the connection is not allowed
      }
      next(true) // the connection is allowed
    }
  }
})

Subscriptions with custom PubSub

Note that when passing both pubsub and emitter options, emitter will be ignored.

The CustomPubSub interface allows you to implement a custom publish-subscribe mechanism for GraphQL subscriptions. This gives you complete control over how subscription data is published and delivered.

CustomPubSub Interface

To create a custom PubSub implementation, you need to implement a class with at least the following methods:

  • subscribe(topic, queue, ...customArgs):

    • topic: String identifier for the subscription topic
    • queue: A Readable stream where subscription data will be pushed
    • ...customArgs: Optional additional parameters that can be passed from the subscription resolver
    • Returns: A Promise that resolves when the subscription is set up
  • publish(event, callback):

    • event: Object containing topic and payload properties
    • callback: Function to be called when the publish operation completes
    • Expected behavior: Emit the payload to subscribers of the topic

Custom Arguments in subscribe method

The subscribe method supports passing custom arguments from your GraphQL resolvers. This allows for additional filtering or configuration at the subscription level. For example, you might pass an offset parameter to control where a subscription starts reading from, or pass filter criteria to perform server-side filtering.

Here's an example implementation using Node's EventEmitter:

class CustomPubSub {
  constructor () {
    this.emitter = new EventEmitter()
  }

  async subscribe (topic, queue, offset) {
    const listener = (value) => {
      queue.push(value)
    }

    const close = () => {
      this.emitter.removeListener(topic, listener)
    }

    this.emitter.on(topic, listener)
    queue.close.push(close)
  }

  publish (event, callback) {
    this.emitter.emit(event.topic, event.payload)
    callback()
  }
}

const pubsub = new CustomPubSub()

app.register(mercurius, {
  schema,
  resolvers: {
    Subscription: {
      retrieveItems: {
        subscribe: (root, args, { pubsub }) => pubsub.subscribe('RETRIEVE_ITEMS', args.offset)
      }
    }
  },
  subscription: {
    pubsub
  }
})

Subscriptions with @fastify/websocket

Mercurius uses @fastify/websocket internally, but you can still use it by registering before mercurius plugin. If so, it is recommened to set the appropriate options.maxPayload like this:

const fastifyWebsocket = require('@fastify/websocket')

app.register(fastifyWebsocket, {
  options: {
    maxPayload: 1048576
  }
})

app.register(mercurius, {
  schema,
  resolvers,
  subscription: true
})

app.get('/', { websocket: true }, (connection, req) => {
  connection.socket.on('message', message => {
    connection.socket.send('hi from server')
  })
})