GraphQL Subscriptions with Go (gqlgen) Example

28 March 2021

GraphQL subscriptions are extremely useful in situations when a server needs to update its clients. A chat application is a great example to demonstrate this, as when new messages arrive we want to distribute them among all chat participants as quickly as possible without polling. When I first started looking at implementing GraphQL subscriptions in Go it took me a bit before I had my aha moment and everything clicked together. Also, at that time there were not a whole lot of examples that I could look at or reference. This post will go over a concrete chat application example using GraphQL subscriptions and a backend built in Go with gqlgen.

Source Code

If you would like to jump straight to the code and run the example yourself, it is available in the following repo.

Introduction

My assumption here is that you are already familiar with GraphQL and gqlgen, so let’s first go over a high-level overview of how the chat application will work. For details on GraphQL refer to the great resources at howtographql.com.

  • Clients communicate with a server to send and receive messages.
  • Clients use a GraphQL mutation to post new messages.
  • When starting up, clients open a GraphQL subscription over WebSocket to instantly receive new messages being posted.

We will use the front-end chat client and GraphQL schema from the following tutorial and replace the backend with our own Go version. Let’s begin by looking at the more interesting bits needed to put together our GraphQL server. The schema defines a Message type, and a query, mutation, subscription to interact with the messages as follows:

type Message {
  id: ID!
  user: String!
  content: String!
}
type Query {
  messages: [Message!]
}
type Mutation {
  postMessage(user: String!, content: String!): ID!
}
type Subscription {
  messages: [Message!]
}

GraphQL Server

Next, let’s look at defining our GraphQL server with gqlgen. One thing to note is we will not use NewDefaultServer when declaring the server to have more freedom adding additional transports.

...
// CORS setup
c := cors.New(cors.Options{
    AllowedOrigins:   []string{"http://localhost:4000", "http://localhost:8080"},
    AllowCredentials: true,
    Debug:            false,
})
// Use New instead of NewDefaultServer in order to have full control over defining transports
srv := handler.New(generated.NewExecutableSchema(generated.Config{Resolvers: &graph.Resolver{
    ChatMessages: []*model.Message{},
    ChatObservers: map[string]chan []*model.Message{},
}}))
srv.AddTransport(transport.POST{})
srv.AddTransport(transport.Websocket{
    KeepAlivePingInterval: 10 * time.Second,
    Upgrader: websocket.Upgrader{
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    },
})
srv.Use(extension.Introspection{})
...

GraphQL Resolvers

The last interesting bit to look at is the gqlgen generated resolvers and how we can implement them. Let’s first look at the Resolver struct which serves as dependency injection and point out that this is where we will maintain the state of active subscriptions. That is, whenever a client opens a subscription, the server needs to keep track of it so data can be pushed to it. We achieve this with a Go channel. We have a field named ChatObservers of type map[string]chan []*model.Message to maintain state, and match what the schema has defined for our subscription. The Resolver would look like this:

type Resolver struct {
    // All messages since launching the GraphQL endpoint
    ChatMessages  []*model.Message
    // All active subscriptions
    ChatObservers map[string]chan []*model.Message
    mu            sync.Mutex
}
Subscription

The Messages subscription resolver is where we implement the subscription. When a client creates a subscription, the server will create a unique identifier for it and open a Go channel on which future data updates can be pushed. The gqlgen internals will handle reading from the channel and push the updates over the WebSocket for each client. When the channel is created it is added to the ChatObservers map we defined in Resolver. All the clients need to know is that they will receive a slice of messages whenever they are available on the WebSocket subscription they opened. Perfect, that’s exactly what we want!

func (r *subscriptionResolver) Messages(ctx context.Context) (<-chan []*model.Message, error) {
    // Create an ID and channel for each active subscription. We will push changes into this channel.
    // When a new subscription is created by the client, this resolver will fire first.
    id := randString(8)
    msgs := make(chan []*model.Message, 1)

    // Start a goroutine to allow for cleaning up subscriptions that are disconnected.
    // This go routine will only get past Done() when a client terminates the subscription. This allows us
    // to only then remove the reference from the list of ChatObservers since it is no longer needed.
    go func() {
        <-ctx.Done()
        r.mu.Lock()
        delete(r.ChatObservers, id)
        r.mu.Unlock()
    }()
    r.mu.Lock()
    // Keep a reference of the channel so that we can push changes into it when new messages are posted.
    r.ChatObservers[id] = msgs
    r.mu.Unlock()
    // This is optional, and this allows newly subscribed clients to get a list of all the messages that have been
    // posted so far. Upon subscribing the client will be pushed the messages once, further changes are handled
    // in the PostMessage mutation.
    r.ChatObservers[id] <- r.ChatMessages
    return msgs, nil
}
Mutation

The PostMessage mutation is how each client posts new messages into the chat room. Here is where we have data changes (mutations) and is a good place for the server to notify its subscribers of those changes. Client posts a message -> server processes it -> notifies subscribed clients of message changes -> repeat.

func (r *mutationResolver) PostMessage(ctx context.Context, user string, content string) (int, error) {
    // Construct the newly sent message and append it to the existing messages
    msg := model.Message{
        ID:      len(r.ChatMessages),
        User:    user,
        Content: content,
    }
    r.ChatMessages = append(r.ChatMessages, &msg)
    r.mu.Lock()
    // Notify all active subscriptions that a new message has been posted by posted. In this case we push the now
    // updated ChatMessages array to all clients that care about it.
    for _, observer := range r.ChatObservers {
        observer <- r.ChatMessages
    }
    r.mu.Unlock()
    return msg.ID, nil
}

Run The Code

Clone the code for this example from the following repo.

Start Server
cd server
go run server.go
Start Clients
cd client
yarn
yarn start

A client will be available on http://localhost:8080. Launch multiple clients in different windows to simulate a chat.

Observations

You may have noticed that when a new message is posted, the server will send the entire slice of messages recorded since starting up. This of course is not very efficient as the number of messages grows, but for simplicity’s sake, it is sufficient for showing how a GraphQL subscription in Go with gqlgen works. A possible improvement could be to change the schema for the subscription to return a single Message each time and add a Query for getting all messages. That way the client can query once for all the messages when it starts up, and only receive new messages over the wire on its subscription.

Acknowledgments

The front-end React client in this example is from Jack Herrington’s video tutorial available here.


comments powered by Disqus