Socialising

Outside of finance and enterprise, the internet has also enabled another class of large users of messaging software: the chat systems and social networks connecting people across the world. In terms of sheer numbers of users these are among the largest networks ever built, and many of them find a use for messaging at their core.

What is interesting about the social systems is the diversity and reach of their networks. In contrast to the more controlled environments messaging grew up in, social systems tend to span a very large geographic area, with a significant diversity of clients, interoperating systems, and a whole raft of challenges that come with the explosive viral growth that many have seen.

Chat

One interesting messaging domain is the passing of messages between people - chat. From early mainframe chat programmes and dial-up bulletin boards, the internet has provided a home to millions and millions of chatty people. One of the older chat systems is Internet Relay Chat or IRC, developed in 1988 by Jarkko Oikarinen while working at the University of Oulu in Finland. Okarinen was trying to recreate features from bulletin board systems, but the first part implemented - the real time chat - proved the most compelling. IRC quickly spread through to the universities at Tampere and Helsinki, before crossing to the US and growing to service millions of users over the following years.

IRC provided a broker and client model of a chat service which resembled a messaging system in many ways - with echoes of both direct messaging and pub/sub. Messages could be sent directly to a given user, or to a channel where all joined users would receive them. Everyone on an IRC channel is both a potential producer and consumer, and are often in vastly different geographical locations, connected to different servers.

The servers in a given IRC network are connected to each other in a tree structure. Messages are routed up and down branches of the tree that have subscribers waiting for them - users in a given channel - at the end, much like the way multicast systems pass on packets only to interested parties. The large IRC networks grew organically, with operators attaching new nodes in an ad-hoc fashion based on growing user demands and availability of resources. Due to the non-commercial nature of IRC these were often in short supply, provided by ISPs and universities as a service to their users.

Users would generally connect to the closest server to them, for better response times, but the tree based structure caused problems when servers were lost. Netsplits were familiar events to denizens of IRC channels, where the network became partitioned into two parts after the loss of communication between a pair of servers, with some users in one section of the tree, and some in the other. Servers queued messages for delivery to slow clients, or to slow servers further down the tree, while connected. To facilitate tracking the connection status of clients and servers links were maintained with a heartbeat - PING messages that are sent between local connections (e.g. from one server to another, or one server to its client) and not propagated further than one hop. If a client stopped responding, any messages due for it were generally lost, but upstream senders could be informed the receiver was no longer on line.

Messages were only duplicated at the final delivery to clients. A message to a channel that had 200 users on a given server would only reach that server once, where it will be distributed through to the connected clients. State information, such as who is online and what channels exist, was broadcast throughout the server tree to facilitate calculating this. IRC’s success and scale is testament to the value of simple protocol design and popular needs.

The next generation of chat systems were more aimed at users who had experienced chat in conjunction with the web, or walled-garden web-like services such as early AOL and Compuserve. Systems like ICQ, AIM and MSN Messenger provided instant messaging chat across their networks, and in the late 90s a developer named Jeremie Miller began work on new server and protocol for instant messaging: the Extensible Messaging and Presence Protocol, or XMPP.

Much like IRC, XMPP was designed with clients connecting to servers that acted as brokers, distributing messages. It was originally at the heart of an instant messaging service called Jabber. The protocol, published openly, standardised the best features of the many competing IM systems, and quickly spread outwards. It was used by Google for its Google Talk chat in 2005, added to AOL’s AIM in 2008, and support appeared in Facebook Chat in 2010. Google eventually utilised XMPP for the voice chat part of its chat system as well as text, leveraging the Extensible part of the protocol to carry this different kind of message (though in the main usage of XMPP has been dropped in more recent versions of Talk, renamed Hangouts). These larger organisations filled the roles of the ISPs and universities in IRC - providing resources to host the servers for many clients.

XMPP is request/response based, but inherently decentralised. Rather than having a tree of servers, the domain name system is used to locate the appropriate server for a given user, and connections are made between the servers of different users as required to deliver messages. Servers therefore act as brokers without a central authority or hierarchy, but require the client to trust the delivery and confidentiality of a message to both their own server, and that of the target user.

The format of the messages was based on XML, with connections between server/client and server/server handled over TCP, or more recently HTTP. The XMPP server could queue messages for any clients of that server whether or not they were connected, and managed delivery of messages to remote servers for their clients. The routing was direct - messages were delivered to specific users. Much like IRC, the servers maintained and distributed state of connections in the form of presence. In fact, XMPP maps well enough onto many messaging requirements, some servers (such as the popular ejabberd) have been deployed outside the protocol’s human-to-human origins as a type of message middleware.

Messaging & The Web

While IRC and Jabber clients were mainly native applications, increasingly both services found themselves used via the web, as its growth made it the easiest way to deliver services to users. Driven by such needs, messaging has been taken all the way to the browser as well, with projects like NullMQ and SockJS, which aim to offer messaging semantics and bidirectional communication to web application developers. As Javascript is a language built around asynchronous events, features like WebSockets, Flash sockets and TCP connection handling in Chrome apps combined with a messaging library can be very powerful.

The desire to have messaging systems that can be addressed through familiar HTTP conventions appears in many home-grown message brokers as well. Bit.ly, the popular URL shortener, developed a distributed queue system called NSQ with its own (binary) protocol. However, the directory service used to facilitate the distributed nature can be addressed over HTTP, and for compatibility with their previous messaging system, the broker daemon exposes an HTTP PUT based interface for publishing messages.

Similarly, the ease of use and broad adoption of the web protocol, HTTP, has inspired the design of messaging protocols and brokers themselves. STOMP was a message protocol designed by Brian McCalister in 2005 along the lines of the HTTP model, with UTF-8 text based commands. It included key:value separated headers, null delimited message framing and offers acknowledgements and heart beating on top of a simple message transport. It was supported by several brokers, including HornetQ, ActiveMQ (and its successor Apollo) and RabbitMQ, and has many client libraries available in different languages, reflecting its simple syntax and ease of implementation. It could even be run over WebSockets direct from supporting web browsers.

STOMP and similar protocols have found some favour as edge protocols connecting to systems running more optimised messaging internally. This can be from devices like web browsers connecting into brokers, which then perform a protocol translation to send messages on in their preferred format. That said, for many users, STOMP filled a hole for a readable, straightforward message protocol that could be used for exactly the kind of control data that is often generated in web environments - things that take time and can be better actioned by other services than a webserver.

Job Scheduling

This type of usage was one of the key drivers behind one of the many projects to come out of the fertile mind of Brad Fitzpatrick at Danga Interactive (and more recently Google), the developer of social network LiveJournal. The system was a project called Gearman (an anagram of Manager). Gearman aimed to solve the problem of distributing background work, and became, effectively, a dedicated store-and-forward message broker.

The Gearman service accepted messages (nominally containing work to be done) from clients, and round robin distributed them over the available workers. When a worker completed its tasks, it sent a message back to the Gearman service, which could propagate it to the client if required. This was convenient for, say, distributing long running jobs out to background workers rather than handling it in a web-request process, to ensure good response times for users.

Workers could register which types of work they are able to do by defining capabilities, allowing basic routing of work types to different sets of workers. Different types of work form different queues within the Gearman service, which can be prioritised to allow for more urgent work to get done ahead of less important tasks. Queues can be made to persist to disk to allow server restart, at the (expected) cost of throughput performance.

While Gearman serves as an acceptable message broker, many of the types of task it was designed for fall under the label of job scheduling. This sometimes complex field often overlaps with messaging, as what appears to be a message routing task is often in fact a scheduling one.

For example, a common scenario might see a source publishing messages that need some processing to be applied to them - for example a message signifying a new order has been received, which needs to be validated. If order validation takes a variable amount of time, then an initial approach might be to round robin the message through available workers, sending the first message to worker A, the second to worker B, the third to C, and the fourth back to A and so on. If this proves unsustainable, it might be a case of looking for the least loaded worker. So, A has 5 messages, B has 4 and C has 3, so C gets it. This could be through communicating queue size, or it could be the worker pulling messages based on its internal queue size. The first worker to pull the message gets it. However, if consumers queue messages locally, then the worker may pull several items, and other, available workers may be starved - so A is quicker off the mark, and gets messages 1-4 before B and C have a chance to get in on the action.

If, occasionally, several hard-to-process orders come through at the same time, we might find that in our round-robin distribution A, B and C all end up churning through these long orders - leaving a growing queue of easier to process ones. It might be better in that case to have all of the difficult messages go to A, with the rest taking the regular ones. The ability to schedule often requires being able to estimate the cost of performing a task, and then assign a weight as part of a routing strategy, or to use separate queues for separate, more uniformly sized flows of work. It also requires awareness of resources that jobs may compete over, to avoid issues such as priority inversion. This change in the complexity of messages is what starts distinguishing task scheduling from (somewhat) simpler flow control.

This type of work also exposes a general weakness in the messaging model - queues can be a source of a number of problems. They are effectively a flat space, so if different types of work are included it can be easy to cause a scheduling problem. While many systems model queues as flexibly managing many different loads, in practice they are often either full or empty, and rarely in an intermediate state. The performance of most messaging systems degrades notably as soon as messages start hitting the queue, as instead of just pushing data out the messaging systems need to perform a series of bookkeeping steps for each message.

“If the queue is not empty then a bit more work has to be done: the messages have to actually be queued up. Initially, this too is fast and cheap as the underlying functional data structures are very fast. Nevertheless, by holding on to messages, the overall memory usage of the queue will be higher, and we are doing more work than before per message (each message is being both enqueued and dequeued now, whereas before each message was just going straight out to a consumer), so the CPU cost per message is higher. Consequently, the top speed you’ll be able to achieve with an empty queue will be higher than the top speed of a queue with a fixed N messages in it, even if N is very small.” - Matthew Sackman1

By hiding potential downstream issues queues can slow the action of error and flow control protocols, and increase the latency for end-to-end communication (referred to as bufferbloat in the networking world). The location and management of queues often ends up being an expensive, and tricky, part of a large scale messaging deployment, particularly one which spans a wide area and many types of device.

PubSubHubub

The web has been a fertile environment for people tackling problems of large scale content distribution, from tiny updates to huge streaming videos. One particularly interesting idea was Pubsubhubbub, a Publish/Subscribe protocol also created by Brad Fitzpatrick. Pubsubhubub was a web service based publish subscribe protocol, which extended the Atom and RSS document feed formats.

RSS and Atom were designed to present feeds of updates to a website. Rather than having to go back and check if a site had published a new post, systems (such as a users’ browser) could consume the RSS and display a list of articles. Checking these feeds regularly would allow notifying users of new posts. The problem was that this is a rather inefficient way to check for information - the client is effectively polling the website for changes or new content. Seeing the similarity between messaging and a document change notification, Brad addressed the problem with a decentralised broker system.

Pubsubhubub works by adding a reference in a publisher’s existing RSS or Atom content feed that points to the feed’s hub. This hub acts as the broker, and allows interested parties to subscribe by registering an endpoint address. When there is an update to the feed, the publisher signals the hub, who connects to the publishers feed, grabs the latest entry, and then communicates via an HTTP POST to the endpoints registered as subscribing to the topic URL (usually the feed address). The model is inherently decentralised, and any one could run a Pubsubhubub hub, though in practice there are several organisations (such as SuperFeedr) who host hubs for a large number of sites.

One of the things Pubsubhubub particularly shows is the abstraction of pub-sub over a series of point-to-point links. The addressing of the pub/dub layer is subscribing to a feed URL, but the mechanics of delivery are really conducted as a series of HTTP POST and GET requests to specific URLs for the feed source, the hub, and the interested clients. Even the document itself is not sent as part of the message to the hub, the hub is just contacted to alert it to the presence of a new document.

A> This type of build is often repeated ad-hoc, in messaging systems that have not necessarily been identified as such. For example, Twitter used to handle an incoming tweet by calling a service to decompose the tweet into followers, then pushing the tweet’s ID onto Redis lists - a high performance datastore - for each of those followers. This is effectively a pub/sub distribution with the subscriptions being registered as ‘follow’ requests.

The address to which the users have subscribed is arbitrary - meaning that a publisher could actually move their content, and still maintain existing relationship, as the addressing of Pubsubhubub is actually a separate messaging layer built on top of normal RESTful HTTP. Pubsubhubub is step in a move from a document request and response model, to more of a stream of content updates.

This idea has been extended into a general concept of webhooks - dropping in callback URLs to receive messages when certain events happen. Services like Github, Google Drive, MailChimp and Wordpress use these to spread notification events from all manner of different activities out to any service interested in them.

Moving To Data Distribution

As relational databases evolved for larger and larger scales, they moved from being purely point-in-time stores to tracking a greater amount of history, and managing an ever-changing data set. Capabilities were added to use the databases as change capture systems, emitting events as values were updated. In the social space, this kind of streaming approach to data has been extended to cover the diverse data sets being processed, and allow greater flexibility in how that data is used.

Both LinkedIn and Tumblr have published on their use of a data bus that carries the complete stream of changing data - every new post, comment, update or photo pushed to the service, and the myriad other notifications and events that occur in a system with millions of users. Individual cells of request handling services consume what they need, processing information and generating pages as required.

Each has developed systems for managing this data, such as LinkedIn’s Kafka, an open source pub/sub messaging system that has an interesting feature of always persisting data to disk on receipt. Realising the problems caused by queues and persistence, they developed an optimised stream-writing system which cleverly sidesteps some of these issues (at the costs of predictable latency gains). By streaming messages onto and off of disk using as few memory copies as possible, and storing them linearly rather than in a tree structure preferred by databases optimised for random access, Kafka extracts the maximum performance from either spinning disks or SSDs, and allows for excellent write performance with predictable latency.

That’s why we decided to build Kafka. In summary, Kafka has the following three design principles: (1) a very simple API for both producers and consumers; (2) low overhead in network transferring as well as on-disk storage; (3) a scaled out architecture from the beginning.2

By building in flexible options in message delivery to allow bootstrapping new consumers, delivering up to date messages to interested clients, and providing snapshots for slow consumers, systems like this can almost flip the traditional centralised persistence model on its head. The stream of data can be used for the multiple types of task LinkedIn themselves requires - such as updating search indexes, managing caches, pushing to connected clients and more. The logic of what should be consumed and what it is used for can be kept close to the consumers, and tweaks what type of subscriptions to the databus are required.

The databus concept can be extended even outside of the social network itself. Some services, such as Twitter, have in recent years made streams of their data available via APIs to consumers. Even with some reasonable filtering, there is a large amount of relevant information for almost any listener generated, so the lessons from finance can be applied again to this growing social firehose. For most of its existence, software interacting with the web has dealt with documents, but filtering, message distribution, routing, and processing of streams is now part of many new applications and tools for interacting with social networks.

One of the most interesting companies to have arisen from this is Nick Halstead’s Datasift3. Designed around augmenting and filtering social data after Halstead’s experience building aggregator Fav.or.it and the highly popular Tweetmeme, Datasift has a diverse stack of systems that reflects the heterogeneous environments most large networks deal with.

As well as processing the Twitter firehose, they consume content from a variety of social networks, blogs and other sources. Their processing is designed around components which communicate via messaging - sometimes ZeroMQ, sometimes Kafka, sometimes Redis (which is capable of acting as a pub/sub broker as well as a datastore). Their end users want to be able to specify complex filters and receive only the social messages they desire, enhanced with additional information such as the author’s social influence, the sentiment of the message, or the details of any web pages linked. By pushing messages through a series of queues in the different stages of processing, and using appropriate fan in/out and pub/sub patterns, they are able to distribute work in parallel to many different processors, each written in the programming language best fitting the specific problem. The results are streamed to a larger distributed filter system, an access control system, and to the end user for delivery. The filter system acts much like the routing component, distributing messages to consumer queues using custom content based matching.

To complete the loop, companies like Push Technology are developing systems based on bringing data all the way to the end-user. They refer to this type of messaging as data distribution where RESTful HTTP is used to deliver a snapshot, and smaller deltas or change-updates are distributed via streaming systems. Handling this effectively requires an awareness of the capacities of the various consuming devices, so that a mobile phone on a low-bandwidth connection is not pushed a full update stream, but perhaps takes occasional samples. Minimising queue usage is a vital part of ensuring performance through a wide data-distribution architecture, and requires excellent snapshotting capabilities and both smart clients and servers.

In many ways, the social networks are at the cutting edge of messaging - not in terms of raw volume or throughput (where finance is still the natural leader), but in terms of messaging being the fundamental heart of their architecture, and by communicating between a huge range of devices in a huge range of, often fairly uncontrolled, situations.

  1. Matthew Sackman, Sizing Your Rabbits - http://www.rabbitmq.com/blog/2011/09/24/sizing-your-rabbits/ 

  2. Open-Sourcing Kafka - http://blog.linkedin.com/2011/01/11/open-source-linkedin-kafka/ 

  3. For an excellent overview of the architecture by CTO Nick Halstead, take a look at this presentation: http://prezi.com/ite6cbvgv6hu/datasift-architecture-presentation