Remember the buzz surrounding Erlang when the Pragmatic Programmers released Joe Armstrong’s book? If you’re like me, you bought the book, tried Erlang some, and maybe built a few toy apps, then found that Erlang’s quirks are hard to get over when the problem domain isn’t well tailored to Erlang’s strengths. Now, however, when your ruby program blocks the web server with work that could be done later or maxes out just one core out of two or four or sixty-four, you get back that lovin’ feeling for Erlang’s message-passing, massively parallel charms. One solution is to show off your mad polyglot skillz; JRuby, being GIL-less, is another, but the one we’ll discuss here is AMQP. Using AMQP to pass messages between processes, we can accomplish feats of magic such as processing two weeks worth of data in a day without ever leaving the comfort of ruby.
The Guts
Note: I’m presenting an over-simplified view of AMQP here. If you’re already a pro, skip this. If you’re a noob, keep in mind that I’m just presenting the bare minimum to get you started.

AMQP consists of a few main parts. The server that accepts messages and gets them to their destination(s) is called a broker. Within the broker, we have (very simply):
- Queues: As you would guess, queues hold messages until we are ready to use them.
- Exchanges: Exchanges are responsible for handling the messages we send to the server. Exchanges act as a kind of router, passing the messages we send them to one or more queues.
- Bindings: Bindings are like routes for exchanges; they create a relationship between a single queue and and a single exchange.
- Connections and Channels: these deal with our physical and logical connections between client and server. You’ll eventually want to learn about them, but the ruby AMQP libraries do a good job of abstracting them, so we’ll skip them for now.
By combining these pieces, we can pass messages in the following ways:
- One to One: A message is passed from a single producer to a single consumer.
- One to N: A message is passed from a single producer to many consumers.
- One to One-of-N: A message is passed from a single producer to one consumer out of a pool.
- Any of the above with multiple producers (N to One, N to N, etc.)
When you’re ready to learn more about AMQP, definitely read the OpenAMQ project’s background paper.
AMQP in Ruby
To make the following examples work, you’ll need Erlang, RabbitMQ, and the EventMachine, AMQP, and Bunny gems. Getting a decent Erlang install on OS X was quite a bear when I did it last, but I unfortunately didn’t keep any notes. Google is your friend here. RabbitMQ can be obtained with a simple
sudo port install rabbitmq-server
(I’m not sure but I believe Ubuntu users can simply substitute aptitude or apt-get for port)
and you can get the gems from rubyforge (EventMachine is a prerequisite for the amqp gem):
sudo gem install amqp bunny
Topic Publishing
The examples here will all deal with topic exchanges. Topic exchanges route messages based on simple keyword matching, with keywords separated by dots. Using the canonical stocks example, we might have keys of the form country.stock-exchange.company like us.nasdaq.aapl and us.nyse.ge. When we subscribe, we get Apple’s stock price by binding our queue to us.nasdaq.aapl or we could get all US stocks by binding to us.* or all NASDAQ stocks by binding to us.nasdaq.* There are a few other types of exchanges, including RPC, direct exchanges, and fanout exchanges. Be sure to check out amqp’s and bunny’s examples to get started with these.
To keep it simple, we’ll be publishing simple counters with timestamps as strings. Note that we can only use string messages with AMQP. If you need to send arbitrary ruby objects, you’ll need to use Marshal or JSON (or YAML) to serialize them to/from strings.
To begin make sure RabbitMQ is running:
sudo rabbitmq-server start
If you’re on Ubuntu, apt-get/aptitude might have done this for you, so don’t worry about errors.
Now you should be able to run this example:
require "rubygems"
require "eventmachine"
require "mq"
require "amqp"
# comment out one of these to pick communication model
PUBLISH_MODE = :one_to_n
#PUBLISH_MODE = :one_to_one_of_n
START_TIME = Time.new
def finish
EM.forks.each {|pid| Process.kill("KILL", pid)}
EM.stop { AMQP.stop }
end
def gen_queue_name
if PUBLISH_MODE == :one_to_n
rand(2 ** 16).to_s(16)
else
"foobarbaz"
end
end
def timestamp
delta_t = Time.new - START_TIME
delta_t.to_s
end
EM.fork(3) do
mq = MQ.new
qname = gen_queue_name
puts "starting queue ``#{qname}'', PID: #{Process.pid.to_s}"
q = mq.queue(qname).bind(mq.topic("test"), :key => "test.key")
q.subscribe do |msg|
puts "[queue #{qname} (pid #{Process.pid.to_s})] received msg: #{msg} @ #{timestamp}"
end
end
AMQP.start do
i = 0
topic = MQ.new.topic("test")
EM.add_periodic_timer(0.1) do
i += 1
topic.publish("[#{i}] " + timestamp, :key => "test.key")
finish if i >= 10
end
end
The code begins by forking three new processes with EM.fork. This is essentially just a wrapper for Kernel.fork that additionally runs the provided block inside EM.run. Inside the forks, we create a new queue, bind it to the topic “test”, and filter for “test.key”. Then we start subscribing to the queue so we can receive the messages. The important part here is how we switch between a one to N or one to one of N communication model. When all of the subscriber processes use unique queue names, they have unique queues and each queue gets a copy of the message. When the subscribers all share the same queue name, they are subscribed to one shared queue, and each message will be received by only one the subscribers. Also note that, through the magic of EventMachine, we don’t need to explicitly poll for messages. When they arrive, EventMachine takes care of executing the code in the subscribe block.
Now that our subscribers are running, we set up our publisher. Here we’re explicitly using AMQP.start to start our connection to RabbitMQ. Our subscribers called this method implicitly when we called MQ#queue. Note also that we’re placing our publisher code in a block given to AMQP.start, but this isn’t strictly necessary: we could have called AMQP.start without a block or even omitted it and let MQ#topic call it implicitly. If we had done this, we would need to run the publisher code as a block given to EM.run.
For our publisher, we’re using EventMachine’s add_periodic_timer to schedule publishing every 1/10th of a second. This helps avoid some quirks that we’ll look at later. Note that we’re using Topic#publish here and ignoring queues completely.
When you run the example with PUBLISH_MODE == :one_to_n you should see something like this:
starting queue ``3170'', PID: 2912
starting queue ``62b2'', PID: 2910
starting queue ``9828'', PID: 2911
[queue 3170 (pid 2912)] received msg: [1] 0.249223 @ 0.252471
[queue 62b2 (pid 2910)] received msg: [1] 0.249223 @ 0.252466
[queue 9828 (pid 2911)] received msg: [1] 0.249223 @ 0.253655
[queue 62b2 (pid 2910)] received msg: [2] 0.430483 @ 0.4334
[queue 3170 (pid 2912)] received msg: [2] 0.430483 @ 0.433398
[queue 9828 (pid 2911)] received msg: [2] 0.430483 @ 0.434498
...
[queue 3170 (pid 2912)] received msg: [9] 1.698417 @ 1.702544
[queue 62b2 (pid 2910)] received msg: [9] 1.698417 @ 1.702538
[queue 9828 (pid 2911)] received msg: [9] 1.698417 @ 1.703674
With PUBLISH_MODE == :one_to_one_of_n You’ll see:
starting queue ``foobarbaz'', PID: 4911 at 0.00475
starting queue ``foobarbaz'', PID: 4910 at 0.005266
starting queue ``foobarbaz'', PID: 4909 at 0.016942
[queue foobarbaz (pid 4911)] received msg: [1] 0.355391 at 0.358376
[queue foobarbaz (pid 4909)] received msg: [2] 0.53673 at 0.539733
...
[queue foobarbaz (pid 4909)] received msg: [8] 1.623612 at 1.626233
[queue foobarbaz (pid 4910)] received msg: [9] 1.804772 at 1.807327
Blocking EventMachine
In the example so far, we’ve used EM.add_periodic_timer to ensure that we published messages in an orderly way. For many use-cases, however, this won’t be the case. For example, if we have all of our input data available before we start publishing, we might be tempted to use Enumerable#each, like this:
AMQP.start do
topic = MQ.new.topic("test")
(1..10).each do |i|
topic.publish("[#{i}] " + timestamp, :key => "test.key")
sleep(0.1)
end
# Give the subscribers time to catch up before killing them:
EM.add_timer(2) do
finish
end
end
If we run this, we’ll see:
starting queue ``foobarbaz'', PID: 5438
starting queue ``foobarbaz'', PID: 5440
starting queue ``foobarbaz'', PID: 5439
[queue foobarbaz (pid 5438)] received msg: [1] 0.070594 at 1.084451
[queue foobarbaz (pid 5438)] received msg: [4] 0.37131 at 1.085436
[queue foobarbaz (pid 5440)] received msg: [2] 0.17085 at 1.085788
[queue foobarbaz (pid 5438)] received msg: [7] 0.671874 at 1.086368
[queue foobarbaz (pid 5440)] received msg: [5] 0.471554 at 1.086662
[queue foobarbaz (pid 5438)] received msg: [10] 0.972494 at 1.087264
[queue foobarbaz (pid 5440)] received msg: [8] 0.772093 at 1.087417
[queue foobarbaz (pid 5439)] received msg: [3] 0.27107 at 1.088342
[queue foobarbaz (pid 5439)] received msg: [6] 0.571704 at 1.088939
[queue foobarbaz (pid 5439)] received msg: [9] 0.872309 at 1.089509
Notice how all of the messages are received at about the same time? This happens because the sleep is blocking EventMachine, so it doesn’t get a chance to publish a message before #each moves on to the next message. In extreme cases, our connection to RabbitMQ might timeout while waiting for messages, causing errors when we get around to actually publishing them. There are few ways to work around this, but the simplest is to use a synchronous (non-Event Machine) AMQP library for publishing.
Bunny Hop
My personal choice of synchronous AMQP library is bunny. Here’s how we’d use bunny to fix our problem with the previous example:
require "rubygems"
require "eventmachine"
require "mq"
require "amqp"
require "bunny"
#PUBLISH_MODE = :one_to_n
PUBLISH_MODE = :one_to_one_of_n
START_TIME = Time.new
def finish
EM.forks.each {|pid| Process.kill("KILL", pid)}
end
def gen_qname
if PUBLISH_MODE == :one_to_n
rand(2 ** 16).to_s(16)
else
"foobarbaz"
end
end
def timestamp
delta_t = Time.new - START_TIME
delta_t.to_s
end
EM.fork(3) do
mq = MQ.new
qname = gen_qname
puts "starting queue ``#{qname}'', PID: #{Process.pid.to_s}"
q = mq.queue(qname).bind(mq.topic("test"), :key => "test.key")
q.subscribe do |msg|
puts "[queue #{qname} (pid #{Process.pid.to_s})] received msg: #{msg} @ #{timestamp}"
end
end
connection = Bunny.new
connection.start
topic = connection.exchange("test", :type => :topic)
(1..10).each do |i|
topic.publish("[#{i}] " + timestamp, :key => "test.key")
sleep(0.1)
end
finish
connection.stop
The API for bunny has a few differences from the amqp library API, but is also very simple. The examples are also quite helpful.
The main difference with using bunny to publish the messages is that topic.publish blocks the execution of #each until the message is actually published to the server, whereas amqp is using EventMachine’s magic to buffer the messages and continue execution right away. There are advantages to each approach, of course, but as you can see in these examples, using the blocking, synchronous library is in many ways the easier approach. Another “+1″ for this approach, as I mentioned in a previous post, is that using EventMachine within Phusion Passenger can lead to grumpiness, or at least snarky comments on the internet. For example, if you are doing something like generating large PDFs and emailing them to users based on actions in a web app, you could use AMQP to send the relevant parameters to a separate process that does the PDF generating to avoid blocking the web server. Using bunny instead of amqp in your web app code lets avoid the special engineering required to get passenger and EventMachine to play nice with each other.
Resources
- Download the examples: Demo #1, Demo #2, Demo #3
- If you have problems with the examples, give me a shout in the comments, or on twitter.
- The readme for amqp links to a huge amount of AMQP and messaging resources, including some classic critiques of the RPC paradigm.
- Follow Bunny development at its github page.
- Take a look at nanite if you’re interested in a more framework-ey approach.
- This in-depth description of AMQP is worth linking to again. Skim it a few times, then give it a full read once you’ve groked the basics.
- (Update) I wrote a library that provides test doubles for the AMQP module and MQ classes. If you’re interested in doing TDD with AMQP, give it a try: CODE | WORDS
- (Update) I also wrote a library to make AMQP (the EventMachine one) work with web apps more easily. It checks to see if it’s running in Passnger, Thin, or Mongrel and takes care of setting up whatever worker threads and/or callbacks are needed to make AMQP “just work.” WORDS | CODE
Telegraph Key picture by Flickr user photobunny (synchronicity, baby!). CC by-nc-nd
AMQP diagram by Tony Garnock-Jones for RabbitMQ via Wikipedia.