Building Push-Triggered Sync, Part IV: The Notification Pipeline

So far in this series, we’ve chosen the Go language to build a push provider; we designed data structures that represent notifications, according to Apple’s specifications; and we’ve connected to APNs so we can send those notifications to Apple (and thus to our customers).

Along the way, though, Omni’s push provider needs to do a fair bit of preprocessing and other work for every notification it prepares to send. There’s also the chance that we won’t actually want to send some notifications, instead preferring to filter them out. In the current provider, we consider each notification for:

  • Its bundle ID – notifications need to be sent across the right connection for their bundle ID, and if the ID doesn’t match up with one of the current versions of OmniFocus, we shouldn’t send it.
  • Logging – we want to keep some debugging information around for notifications, just to make sure they’re all being delivered properly.
  • Statistics – we keep an aggregate count of all the notifications that pass through the provider.
  • Associated Omni Sync Server information – while we roll out the push provider, we want to be able to gradually enable notifications for different sync servers, so as to measure the load that push-triggered sync levies on those servers.
  • Error responses from APNs – Apple may tell us that a notification was malformed, or our connection to APNs may drop unexpectedly.

Overall, we want an architecture that can handle all these different needs in a unified manner. Each of these considerations needs to be its own component, but should have an interface roughly similar to all the others, so that we can debug them separately and also compose them easily to form the entire provider.

Luckily, Go has a wealth of documentation – not only for each API, but also in the form of more general guides and examples. One of these discusses a concurrency pattern that can be built entirely out of Go’s language builtins: that of a pipeline. (Read the full article here.)

The general idea behind a Go pipeline revolves around a combination of two important concurrency primitives: channels and goroutines. At their simplest, a channel is a thread-safe way to pass values between two different parts of a program, and a goroutine is a very lightweight way of running a function on a different thread. (The actual implementation details – and implications – are much more complex, but this should suffice for our purposes.)

If we take both channels and goroutines for granted, we can start setting up a bunch of different pipeline components (called “stages” in the original article) that take in a value on a channel, do some arbitrary thing with it, and push it out on another channel. Let’s consider the example of a Notification instance being logged out for debugging – we’ll want to just print the notification in its entirety, then pass it down the pipeline unchanged. We might write this pipeline component as:

func LogPipe(input <-chan Notification) <-chan Notification {
    output := make(chan Notification)
    go func() {
        for notification := range input {
            fmt.Printf("Notification: %v\n", notification)
            output <- notification
    return output

With this implementation, constructing an instance of this logging pipeline component is as simple as calling a function. It needs an existing channel as input, but gives back a new output channel; this means that we can easily chain multiple different components by passing the output channel from one function to the input of another.

We run the “pump” for this component as a goroutine, so that it’s always running alongside the rest of our program. Inside, we use the range of the input channel, so that the goroutine blocks until a notification comes through. When the input channel closes, this for loop over the input channel terminates. At that point, we’ll close the output channel too, signaling down the pipeline that we’re done handling notifications.

For the push provider’s use, we can do a bunch of different things in each of these components – and logging is only the simplest! The provider itself is structured as a pipeline with nearly a dozen components from start to end:

  • The logging component looks very similar to the above, with just a little bit more configurability about where to send logs (and with what severity).
  • The statistics component is also similar to the above, but instead of logging out a message, it increments a few internal counters depending on the contents of the notification.
  • The OSS component uses the UserInfo map [mentioned previously](TODO link to part 2), which we populate with the sender’s OSS username when building some Notification instances. If we need to drop a notification – perhaps because the sending user is on a sync server that’s under heavy load – we can simply refuse to pass it along the output channel from this component.

Even the persistent connection to APNs is handled with a pair of these pipeline components. The first takes in notifications, sends them over to APNs, and forwards them unconditionally; the second then buffers the sent notifications briefly to await an error, then forwards them in turn.

Keep in mind, too, that the pipeline components can have more than one input or output channel! Omni’s provider also includes a pipeline multiplexer and demultiplexer. These components are used to split up the notification stream (based on the associated bundle ID) for transmission to APNs, then rejoin the stream later after the APNs components pass the sent notifications through.

This sort of pipeline architecture is how the provider can handle Omni’s volumes of push notifications. At its peak, our single push provider instance transmits close to 500 notifications every minute – while still using only a few percent of one server’s CPU.

While Go makes this sort of concurrency quick to write and efficiently scalable, it’s not the only piece involved in the puzzle. Next time, we’ll discuss some other technologies involved in the provider stack, including the backing database used to store device registrations.