By the early 2000s, the proliferation of messaging systems had begun to cause some problems of its own. Different products were incompatible, expensive, and purchasers often found themselves facing significant costs if they wanted to switch to another vendor. Management of these different systems was generally complicated, and often required buying in consultancy from the middleware vendor. This lead for pushes to standardised messaging, which would allow for easier and cheaper switching costs. Two efforts achieved traction in this area, solidifying a number of concepts from different messaging systems and providing some much needed regularity: JMS and AMQP.
JMS
First released in 2001, the Java Message Service standard was part of the J2EE enterprise stack of Sun’s Java programming language. It was designed to provide a standard interface for dealing with existing and new message oriented middleware. It aimed to offer a “wrapper” around any compliant message broker (referred to in JMS as a provider), and hence allow writing messaging code without having to specify exactly which vendor was supplying the broker. The impact of JMS was significant: it captured the most common, reliable and best represented features of the systems at that time, and has been used in many successful products, both commercial (from IBM, Oracle, BEA and others) and open source (such as ActiveMQ). JMS assumed the broker ‘star’ model, and accommodated both ESB and Pub/Sub bus type functionality.
JMS narrows its focus to two styles (referred to by JMS as domains) of message delivery: point-to-point and pub/sub. Point to point is directed communication, where one endpoint communicates with a single other endpoint as in a traditional client server model. As in the common ESB scenario, clients and servers communicate asynchronously via the messaging system rather than directly, allowing appropriate routing and balancing. Pub/Sub support is many-to-many publishing and subscribing. Each of the two styles is independently supported in JMS, so providers may offer either one or both of the formats.
Messages are sent by message producers to ‘destinations’ - which may represent topics, queues, channels or whatever message addressing abstraction is used by the underlying provider. In JMS lingo, Queues refer to point-to-point endpoints, while Topics refers to pub/sub ones. Consumers receive messages, and JMS offers some flexibility around how consumers acknowledge that they have received a message successfully.
This refers to an important aspect of deciding how a messaging system delivers reliability. The ideal situation is exactly once delivery, where a message is never lost, and never delivered twice. In practice this is difficult to implement - it requires acknowledgement of messages at every step, and ways of recovering if those acknowledgement messages themselves are lost. So, most systems aim to provide exactly once, but otherwise fall back to at most once or at least once delivery. In the first case, consumers will receive a message once, but in a failure case might not receive it at all. In the second, consumers will receive a message, but in a failure case might receive multiple copies of the same message, and will need to de-duplicate. This often involves keeping a fixed size hash table of previous messages, or a window of recently received items to check against, increasing memory cost on the consumer (though in a fairly limited way). Which type of delivery is preferable is often down to application specifics.
Consumers can also choose when to acknowledge the receipt of a message. This could be after some application processing, or automatically as soon as they receive the message. The difference may seem small, but it can be important. Once a message is acknowledged, responsibility for it is effectively handed to the consumer, which, if it crashes before it has done its work, could cause data loss. By acknowledging a message only after it has been processed a consumer can rely on the message automatically being resent or timing out if there is a problem while processing.
JMS also addresses reliability with another feature common to many networking and message systems, time to live. By attaching an expiry date to a message, senders could be confident that their messages will not turn up in the far future and cause confusion, which in many cases is preferable to finally delivering a long delayed message. JMS also includes the concept of message priority, described by a priority number: 0-4 for normal priority, 5-9 for high priority. This signifies some messages are more important than others and deserve a speedier transit through the system. What this means, and what effect it has, in practice is down to the provider and users of the system - if everything is sent maximum priority, it is usually exactly the same as everything being sent minimum priority, for example.
When receiving messages, JMS clients can select to block further processing, either forever or until a timeout is hit, or to poll for messages, by registering a callback function to be executed when new data is available. Blocking means the code halts its progress at that point until a message arrives, or some number of (milli)seconds have passed. Polling in these cases means the actual waiting is done in a background thread or the OS kernel, and a callback is triggered when there are new messages ready to receive. This polling can even be on multiple queues or topics, so one piece of code can handle messages from many different sources without overly increasing the complexity of the code. This powerful feature had been a bedrock of network programming for many years, and was present in nearly any message system of size. It allowed multiplexing events from many different connections into one code path, and building more powerful application logic in a straightforward manner. Without the ability to poll, users would have to perform a receive with a short time out one at a time over a series of connections, which would be both wasteful of CPU cycles and slower to respond. By multiplexing the streams together, systems can react as soon as there is data available.
For Pub/Sub consumers, subscriptions to topics can be marked as durable or non-durable. Durable subscriptions, in theory, survive the death of the consumer - if the consumer crashes and comes back, the message broker can continue delivery from where it left off, assuming it has persisted the messages. In the non-durable case, a reconnecting consumer will start fresh with whatever is being sent at that time. While durability sounds like a desirable feature, it puts a large amount of pressure on the broker, particularly if multiple consumers are unreliable, as the broker has to keep more and more information on the state and backlog for each durable subscription.
AMQP
JMS was designed to simplify and standardise, but while it defined a standard API for writing software that interacted with message brokers, it was only for Java based systems and it didn’t address interactions between different messaging systems. In particular, it didn’t speak to the wire protocol, the structure of messages and agreements on how messages are moved between systems. This meant different JMS compatible messaging products weren’t naturally interoperable, even though they could be addressed by common code. Most brokers utilised TCP for transport, but it still left a question on the specifics of the protocol that determined how messages should be communicated, and how that data should be serialised from a complex structure into an opaque blob of bits for transmission across the wire.
By 2003, JPMorganChase, and in particular one of their executive directors, John O’Hara, had reached the conclusion that there needed to be an open, standard design that could encompass the range of messaging technology then used and let it all interoperate, with the protocol itself being either cheap or free to license. A large part of this was to make the messaging software more of a commodity, and hence avoid the vendor lock-in and large switching costs previously encountered. This project was dubbed the Advanced Message Queue, and its protocol the Advanced Message Queue Protocol. To develop and prove the system, O’Hara hired Pieter Hintjens’ iMatix to refine and implement the protocol for the initial AMQP test implementation, which was to be used by several thousand traders within JP Morgan’s equities line of business.
“To start with, the goal of AMQP was breaking the IBM and TIBCO duopoly in enterprise messaging market and commoditisation of messaging middleware products. Both companies have charged exorbitant prices for the software and even wannabe competitor products were extremely costly. The idea was to define an interoperable messaging protocol and thus allow enterprises to switch messaging implementations without breaking the existing applications.” - Martin Sustrik1
The initial requirements included supporting a range of communication types - both point to point request and response, store and forward queues and of course pub/sub. The system had to have support for delivery guarantees, flexible routing of messages and authentication of endpoints of the network (the producers and consumers). AMQP captured a point in time of the evolution of messaging, and has seen wide support in some particularly successful brokers.
AMQP 0.91 Model
AMQP is also star model based, with a centralised message broker that manages the distribution of messages from producers to consumers. It handles this distribution by using the concepts of exchanges and queues. An exchange is the entity that messages are sent to by producers, and a queue is the identity that clients consume from. Messages are routed from exchanges to queues by binding an exchange to a queue, often based on some condition that messages must satisfy in order to be delivered.
Consumers access data either via pull, in which case they request the latest messages from the queue, or push, where messages are sent over as they arrive, via an open TCP connection. Clients can gain some degree of reliability by specifically acknowledging or ACKing messages, which causes them to be removed from the queue, and queues can be defined as durable, in which case they will be persisted to a store in order to survive a restart of the broker.
Until a queue is bound to at least one exchange, it wont receive any messages, and vice versa. The bindings aren’t just simple links though, but are associated with a routing key (to allow direct delivery to a specific queue) or header meta-data (data about data) within which arbitrary pieces of information can be stored. Metadata is stored as key/value pairs, and is used for the binding conditions. Matching rules can address specific keys, and wide ranges of possible values, allowing complex matching but with decent performance.
AMQP has several types of exchange defined and although the basic types allow assembling very complex message distribution patterns vendor specific exchanges are allowed and encouraged. The standard exchanges include a direct exchange, which routes messages to specific queues by exactly matching the routing key of a binding to the routing key in a message, a fan-out exchange which sends a copy of each message the exchange receives to all bound queues, a topic exchange which allows matching on wildcard topic strings (allowing Pub/Sub, with topic filtering), and a header exchange, which matches based on combinations of key-value headers in the message against the equivalent headers set on the binding.
The matching in the topic exchange type is based on examining a topic string associated with a message, which is formatted as a series of dot separated words, such as ‘equities.ibm.nyse’. This allows a hierarchy of topics, and hence flexible group membership, as with Tibco. The exchange looks at the routing key in the bindings for matches, but additionally allows the use of the ‘*’ character, to match any given word, and the ‘#’ character to match 0 or more words. For example ‘equities.ibm.#’ matches ‘equities.ibm.nyse’ and ‘equities.ibm’ but not ‘options.ibm.cboe’. ‘*.ibm.#’ matches all three strings.
These models were chosen on balance for a mix of flexibility and speed, with the main alternative exchange type from other messaging systems being a more generic content based routing. In this style, queries would be registered that ran against each incoming message, usually based on a syntax not dissimilar to database query languages like SQL - for example “currency = USD and ticker = IBM”. The downside with this approach was that the cost of matching each message against a large number of queries could be prohibitive, so the system’s increased power didn’t always scale to the volumes required for some users.
Programmable Brokers
One of the things that makes AMQP so powerful is the ability for clients to program brokers directly through the API, without having to define the topology - the structure of the routing network - ahead of time. This is possible because AMQP exposed declarative methods, allowing producers and consumers to create exchanges and queues and bind them together on the message broker.
This set of methods easily allowed the most common use cases for a message broker to be implemented in AMQP. Direct point to point communications such as the request/reply model, could be implemented with a pair of queues and an exchange using the direct routing model. Store and forward communications could be achieved by using durable queues, and pub/sub using fan out and topic exchanges. The fact that these could be configured directly by the clients and producers as they were needed allowed an AMQP broker to be more flexible than many other systems.
The protocol has been very successfully implemented in open source, with Apache Qpid, RabbitMQ, Mule, Apache Camel and more speaking it, and clients in almost every popular programming language.
AMQP 1.0
Of course, AMQP hasn’t stood still since that version was released. Unusually, given the success of the pre-1.0 versions, the 20-odd companies who form the AMQP working group proceeded to develop a radically different AMQP 1.0 standard, which was released towards the end of 2011 through the OASIS standards body. Driven by the aim of being easier to implement on top of existing vendor systems, the basic principles have moved radically, with some features not being carried through such as the ability to programmatically wire brokers.
The principle architectural differences are around the approach to defining broker functionality, which has been almost entirely dropped. The link protocol, manages the transfer of different frame types, which include types for setting up and tearing down connections, heartbeat messages that are used to signal the connection is still active, and types for transferring message data and the responsibility for the message between two nodes. The protocol allows multiple sessions to be multiplexed together onto the same underlying TCP connection between two hosts. Messages themselves contain multiple levels of annotations, for messaging-system metadata, and also properties, which are pairs of key-value application level metadata. The encapsulated data itself is presented as an opaque blob. Flow control in the link protocol is managed via a process of exchanging credits, allowing the consumer to control the rate of incoming messages by granting credits to the producer, one of which is consumed with each message sent.
The broker functionality on the other hand, is described in more generic terms. Rather than an exchange or queue, messages are simply sent to and from an address - what this is backed by is left much more to the control of the underlying message broker, in order to facilitate use by systems that would not map well to the exchange/queue model of AMQP 0.91.
Perhaps surprisingly, there remains strong appetite for AMQP 0.9 based systems, and while 1.0 is gaining traction in some areas, there are still a large number of systems on the older protocol. AMQP 1.0 does not guarantee the flexibility of choice in broker, but also avoids leaking details of the broker implementation into the service, which is a trade-off that has made implementation easier for some and more difficult for others.
Message Models
All the major versions of AMQP define a standard for message metadata. AMQP allowed you to signal the content type and encoding, the routing key, delivery options and expiration dates, all of which are part of the envelope around the actual message data. This is an important concern as message formats are often a key decision in building a messaging system, affecting the size of messages on the wire and how much work is needed for clients to implement the protocol, or start processing messages. Some efforts at standardisation have approached solely the problem of defining the format of the message data.
There have been many attempts at coming up with generic standards, with choices to be made between text based serialisation formats like XML or JSON, or binary packed formats such as Protocol Buffers or MsgPack. These would often depend on how much overhead the translation incurred, and how much time it might save or add sending on the wire. In some cases, particularly financial market data, the emphasis was on keeping data in the original serialisation format rather than unmarshalling it into a more software friendly structure, and writing clever code that can pull the data right out of the serialised format.
Designing messaging data formats can be complicated, but they’re an important part of the performance and effectiveness of the system. In 1992 Chris Morstatt and Bob Maloureaux needed to facilitate exchanging trading data between Fidelity (also the first client of TIB as it happens) and Salomon Brothers. The format they designed, FIX (Financial Information Exchange) became a de facto standard thanks to its brevity and relative flexibility, and was used by the vast majority of firms by the mid-2000s.
FIX messages are composed of a header, a body, and a trailer, each consisting of several fields separated by a non-printable Start of Heading characters. Each field refers to the official standard, such as Field #8 for start of message. The header usually contains the version header, the size of the body, and the type of the message. The types of messages are similarly defined in the spec, starting at 0 for the heartbeat, and including types for quotes, order execution and acknowledgements.
This is mixed in the with S-type messages which carry the ever changing stream of quote information. This combination of different types of communication in-band (within the same stream) is slightly unusual. In many protocols the enrolment phase of the communication is both more error prone (for common errors such as failure of connection establishment, authentication issues and so on), but also less latency sensitive (as it is only processed once per connection) and so may be more verbose and readable. It’s usually more of a conversation, with handshakes and request/response communication. The main data exchange is usually more optimised, designed to reduce the processing required, and often consists of one or more one-way streams rather than a conversational system. Intermingling the two is usually most beneficial when communications are short-duration: in that case minimising the number of round-trips between two parties can have a significant effect on the pace of the overall communication.
Message bodies in FIX contain the details of the message, such as the exchange targeted, a timestamp, and any transaction data. The format is the same field number to value pairing, with many specialised fields to control things such as the type of acknowledgement required, the sender of the message and other useful features. Values are either plain text or a length field followed by that number of bytes of binary data.
This is one of the two approaches with message data - either the types of data that can be carried are defined ahead of time, or the body is just an opaque blob of bits. The benefit of the blob is straightforward - the message is agnostic to the type of information it carries, much like network protocols such as TCP or UDP. The downside is that the messaging layer cannot make any decisions based on the contents, such as combining messages or dropping near-duplicates, without understanding what the contents represent. Having the fixed fields gives that power, but at the cost of a constraint on the format, and some mixing of functionality between what may be viewed as different layers - effectively the application has to speak the messaging format.
Some systems choose an intermediate ground - TIBCO messages were structured as a series of typed values from a restricted vocabulary, but the types were reasonably generic, allowing the application developer wide leeway in encoding the data how they saw fit. This allowed some degree of exploration of the data, while not requiring a complicated implementation of a detailed spec to be able to read and write data.
FIX also has a trailer, that would normally contain field 10, the checksum of the message that includes a value to determine whether the contents were mangled in transit. This process of affixing some form of check to ensure the message was received properly is often dependent on the guarantees from the lower level. If the protocol the message is being sent over includes checksums on its content, there is no need to duplicate the work. If it doesn’t, or there is an issue with the algorithm used, then it makes sense to include some verification. Some systems include a cryptographic signature as a trailer that verifies the (application defined) body.
“For example, a message carrying a quote for USD/CAD at 15:25:20 GMT on July 31, 2007 looked like this: 8=FIX.4.2 | 9=309 | 35=S | 49=ML-FIX-FX | 56=ECHO2-QTS-TEST | 34=5015 | 52=20070731-15:25:20 | 131=1185895365 | 117=ECHO2-QTS- TEST.00043690C8A8D6B9.00043690D14044C6 | 301=0 | 55=USD/CAD | 167=FOR | 15=USD | 132=1.065450 | 133=1.065850 | 134=5000000.0 | 135=5000000.0 | 647=2000001.0 | 648=2000001.0 | 188=1.06545 | 190=1.06585 | 60=20070731-15:25:20 | 40=H | 64=20070801 | 10=178” - Irene Aldridge, High Frequency Trading
Error correction can be a complex field, particularly when there are multiple streams of information running across the same TCP connection. A phenomenon known as head of line blocking can occur when a corrupted packet is received - no other packets can be delivered to the application until the broken packet is fixed, which means a single bad sub-stream can block several healthy ones. Protocols such as Google’s QUIC have attempted to work around this on multiple levels. They use UDP to avoid head of line blocking, and include Forward Error Correction (FEC) packets which can replace any one of the packets in the group that proceeded them, in order to allow recovery without retransmission. They also allow multiple methods of acknowledgement, either signalling the receipt of a number of packets explicitly or implicitly by requesting the retransmission of specific lost data.
By 2004, the volume of trading information had reached levels where FIX itself was proving too costly and too verbose, so a slimmed down version known as FAST (Fix Adapted for Streaming) was proposed, and developed over the following five years. The protocol removed the inherent duplication in multiple messages - so if fields are duplicated from message to message then they can be omitted after the first time, and there is a syntax for indicating fields which have a value 1 higher than their value in the previous message (as you would see with sequence numbers). It does this via the use of a pmap or presence map, which is a bit string indicating what fields will be contained in the following messages. By modifying the format, FAST could be up to 40% smaller than the equivalent FIX messages with the same level of information, which yielded significant benefits for the performance of the networks carrying those messages. This type of optimisation is only possible because of the deep understanding of the type of data carried within the format. Successful data formats often require trading off performance and flexility, readability and expressiveness, and many other requirements.
-
Martin Sustrik, The Merits of AMQP pt 1 - http://www.250bpm.com/blog:11 ↩