<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
 
 <title>Doug Barth</title>
 <link href="http://dougbarth.github.com/atom.xml" rel="self"/>
 <link href="http://dougbarth.github.com/"/>
 <updated>2015-10-30T14:07:48+00:00</updated>
 <id>http://dougbarth.github.com/</id>
 <author>
   <name>Doug Barth</name>
   <email>dougbarth@github.com</email>
 </author>

 
 <entry>
   <title>Approximating Priority With RabbitMQ</title>
   <link href="http://dougbarth.github.com/2011/07/01/approximating-priority-with-rabbitmq.html"/>
   <updated>2011-07-01T00:00:00+00:00</updated>
   <id>http://dougbarth.github.com/2011/07/01/approximating-priority-with-rabbitmq</id>
   <content type="html">&lt;p&gt;As mentioned in a previous blog post, at &lt;a href=&quot;http://www.signalhq.com/&quot;&gt;Signal&lt;/a&gt; we use &lt;a href=&quot;http://www.rabbitmq.com&quot;&gt;RabbitMQ&lt;/a&gt; for all our queuing needs. Though the &lt;span class=&quot;caps&quot;&gt;AMQP&lt;/span&gt; protocol supports the concept of priority, RabbitMQ does not yet implement that feature. However, with a little bit of creative client side code, we&amp;#8217;ve been able to approximate priority in our application in a way that will allow us to seamlessly remove that workaround once RabbitMQ implements priority internally.&lt;/p&gt;
&lt;h1&gt;AMQP&amp;#8217;s priority support&lt;/h1&gt;
&lt;p&gt;The &lt;span class=&quot;caps&quot;&gt;AMQP&lt;/span&gt; protocol supports up to 10 levels of priority, starting at zero. 0 has the lowest priority and 9 has the highest. The priority of a message is set by the publisher using the priority header. The consumer will then have messages pushed to it in priority order.&lt;/p&gt;
&lt;p&gt;Brokers that say they fully conform to the spec must implement at least 2 levels of priority. In that case, the 10 levels of priority are broken up into 2 ranges: 0-4 and 5-9. Each range is then treated as a single priority level.&lt;/p&gt;
&lt;h1&gt;Dealing with it on the client&lt;/h1&gt;
&lt;p&gt;To workaround the lack of priority support in RabbitMQ, we need to split one logical queue into several physical queues based on priority. The consumer pulls messages from all queues, but gives preference to the higher priority queues if work is available.&lt;/p&gt;
&lt;p&gt;At &lt;a href=&quot;http://www.signalhq.com/&quot;&gt;Signal&lt;/a&gt;, we made this nuance transparent to our application by adding a layer of indirection. Our application code publishes using the logical queue name and priority through a common function. That function then determines the physical queue to publish to based on the priority (eg. 0-4 goes to the slow queue, 5-9 goes to the fast queue). Consumers subscribe to both queues, but always work the higher priority queues first.&lt;/p&gt;
&lt;p&gt;Our first implementation had two physical queues (fast &amp;amp; slow), and consumers popped messages off those queues in priority order. First, they would try to pop a message off the fast queue. If a message was returned, they processed it and then tried popping from the fast queue again. If no message was on the fast queue, tried popping from the slow queue. If a message was on the slow queue, they processed it and then tried popping from the fast queue immediately. If no message was on the slow queue, we would sleep for a short while to ensure that we don&amp;#8217;t overwhelm the broker with polling.&lt;/p&gt;
&lt;p&gt;This approach worked well enough, but when queues had a large backlog, we found RabbitMQ would eat up a bunch of &lt;span class=&quot;caps&quot;&gt;CPU&lt;/span&gt; processing the polling logic. Clearly, we needed to switch to a subscription model, so RabbitMQ will push messages to us with less overhead.&lt;/p&gt;
&lt;h1&gt;Dealing with the subscription flood&lt;/h1&gt;
&lt;p&gt;Switching to an &lt;span class=&quot;caps&quot;&gt;AMQP&lt;/span&gt; subscription model presents a new set of issues: namely, subscriptions in &lt;span class=&quot;caps&quot;&gt;AMQP&lt;/span&gt; push messages to consumers as quickly the network interface is serviced. Since we process work on a different thread than &lt;a href=&quot;http://rubyeventmachine.com/&quot;&gt;EventMachine&amp;#8217;s&lt;/a&gt; reactor thread (to leave it free to publish new messages), messages will be pulled off the network interface and into memory faster than we can process them. Without an approach to managing the flood of messages coming into the consumer, we would be unable to guarantee a priority order processing of messages nor keep our process from dying due to memory usage.&lt;/p&gt;
&lt;p&gt;To manage the influx of messages for standard &lt;span class=&quot;caps&quot;&gt;AMQP&lt;/span&gt; subscriptions, we lean on prefetch counts and explicit client acks. The prefetch count, set on a channel, determines how many unacked messages a channel may be sent before the broker will wait for acknowledgments.&lt;/p&gt;
&lt;p&gt;To service multiple subscriptions in priority order, we subscribe to each queue and put incoming messages on an in memory priority queue. Each subscription is on its own channel, with an appropriate prefetch count: higher prefetch counts for higher priority queues. The sizing of the prefetch counts are crucial. If they are too low, low priority messages that are in the worker&amp;#8217;s memory will be processed ahead of higher priority messages that need to be sent from the server. In our application, we found that a good rule of thumb is to double the prefetch count for each higher level of priority.&lt;/p&gt;
&lt;p&gt;As messages are received from RabbitMQ, we put them on an in memory priority queue. A background worker thread pulls messages from that queue (blocking if empty) and processes them. After a message is processed, an ack is sent to RabbitMQ to signal that a new message of that same priority can be sent.&lt;/p&gt;
&lt;h1&gt;Add a third queue&lt;/h1&gt;
&lt;p&gt;With 2 levels of priority approximation, we were able to ensure high priority outbound &lt;span class=&quot;caps&quot;&gt;SMS&lt;/span&gt; messages (eg. responding to a &lt;span class=&quot;caps&quot;&gt;STOP&lt;/span&gt; text message) were processed before lower priority messages (eg. a scheduled blast to a subscription list). This approach worked great until we started having clients who&amp;#8217;s lists were large enough to saturate our outbound connections for long periods of time. Our solution was to add a third priority queue and send really large blasts with the lowest priority and smaller blasts with the middle priority level. With all the plumbing in place, adding this extra queue was little more than adding the third queue and adjusting the priority ranges.&lt;/p&gt;
&lt;p&gt;After living with this priority approximation for over 2 years, we are very pleased. Other than the few incremental changes, we&amp;#8217;ve had no issues supporting this solution in production. If you&amp;#8217;re evaluating &lt;a href=&quot;http://www.signalhq.com/&quot;&gt;RabbitMQ&lt;/a&gt; and lack of proper priority has you down, I&amp;#8217;d suggest implementing this solution.&lt;/p&gt;</content>
 </entry>
 
 <entry>
   <title>Rabbit on a Leash — Rate Limited AMQP subscriptions</title>
   <link href="http://dougbarth.github.com/2011/06/10/keeping-the-rabbit-on-a-leash.html"/>
   <updated>2011-06-10T00:00:00+00:00</updated>
   <id>http://dougbarth.github.com/2011/06/10/keeping-the-rabbit-on-a-leash</id>
   <content type="html">&lt;p&gt;&lt;a href=&quot;http://www.rabbitmq.com&quot;&gt;RabbitMQ&lt;/a&gt; is fast: &lt;a href=&quot;http://www.rabbitmq.com/faq.html#performance&quot;&gt;really fast&lt;/a&gt;. Consuming messages from a queue is extremely efficient. Consumers declare the queues they are subscribing to and the broker pushes messages to the consumer for processing as soon as they are ready. The &lt;a href=&quot;http://www.amqp.org&quot;&gt;&lt;span class=&quot;caps&quot;&gt;AMQP&lt;/span&gt; protocol&lt;/a&gt;, which RabbitMQ implements, supports the concept of limiting how many outstanding messages a consumer can be tasked with processing via the &lt;code&gt;prefetch_count&lt;/code&gt; and &lt;code&gt;no_ack&lt;/code&gt; headers, but it does not have a way to control the rate of delivery of messages to consumers.&lt;/p&gt;
&lt;p&gt;At &lt;a href=&quot;http://www.signalhq.com&quot;&gt;Signal&lt;/a&gt;, we use RabbitMQ for all our queueing infrastructure needs. Outgoing &lt;span class=&quot;caps&quot;&gt;SMS&lt;/span&gt; messages (MTs) are queued for a pool of workers to send to our aggregator. Our connection to our &lt;span class=&quot;caps&quot;&gt;SMS&lt;/span&gt; aggregator requires us to limit the rate that we send messages to their system. It would seem that RabbitMQ is a poor fit for that use case, but we&amp;#8217;ve been able to fulfill it using a bit of client side code and the existing &lt;span class=&quot;caps&quot;&gt;AMQP&lt;/span&gt; protocol.&lt;/p&gt;
&lt;p&gt;Using AMQP&amp;#8217;s &lt;code&gt;prefetch_count&lt;/code&gt;, client acks and a blocking token bucket, it&amp;#8217;s possible to implement rate controlled processing of queued messages.&lt;/p&gt;
&lt;h1&gt;Token Buckets&lt;/h1&gt;
&lt;p&gt;A token bucket is an algorithm that is used to control the rate of data that flows through a system&lt;sup class=&quot;footnote&quot; id=&quot;fnr1&quot;&gt;&lt;a href=&quot;#fn1&quot;&gt;1&lt;/a&gt;&lt;/sup&gt;. Token buckets can be configured to allow traffic to burst to full speed, but they ensure that the average traffic processed is held at a configurable rate.&lt;/p&gt;
&lt;p&gt;The concept of a token bucket is rather simple. Imagine the bucket in your freezer&amp;#8217;s ice maker. Cubes of ice are added to the bucket at a certain rate (say 1 a second). The size of the bucket controls how many ice cubes (tokens) we can have waiting in the bucket before we will stop making more.&lt;/p&gt;
&lt;p&gt;In order for traffic to be processed, we need to take a token (or more) from that bucket. If the bucket is empty, that work cannot be processed. The rate that tokens are added to the bucket controls the average speed that work is processed. If we started with an empty bucket we could process work at a rate equal to the rate that we added cubes of ice. The size of the bucket controls how much work we can burst. If the bucket held 10 tokens, we could process 10 units of work at full speed before we would be rate limited.&lt;/p&gt;
&lt;h1&gt;Putting it all together&lt;/h1&gt;
&lt;p&gt;With a correctly working token bucket, implementing fixed rate processing is fairly straightforward. First, when subscribing to a queue, we set an explicit &lt;code&gt;prefetch_count&lt;/code&gt; on the channel and we set &lt;code&gt;no_ack&lt;/code&gt; to false when subscribing. The &lt;code&gt;prefetch_count&lt;/code&gt; limits how many unacked messages RabbitMQ will deliver and &lt;code&gt;no_ack&lt;/code&gt; allows us to acknowledge the message once we&amp;#8217;ve finished processing it. In our application, we size the &lt;code&gt;prefetch_count&lt;/code&gt; so there are a few seconds worth of messages waiting in the worker&amp;#8217;s memory to be sent.&lt;/p&gt;
&lt;p&gt;We use the token bucket to control our rate of processing these messages from RabbitMQ. We need to take a token from the bucket before processing a message. If the bucket is empty, we block until a new token is added.&lt;/p&gt;
&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-ruby&quot; data-lang=&quot;ruby&quot;&gt;&lt;span class=&quot;no&quot;&gt;EM&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;run&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;do&lt;/span&gt;
  &lt;span class=&quot;n&quot;&gt;channel&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;no&quot;&gt;AQMP&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;::&lt;/span&gt;&lt;span class=&quot;no&quot;&gt;Channel&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;new&lt;/span&gt;

  &lt;span class=&quot;c1&quot;&gt;# Allow 10 unacked messages to be delivered to this worker.&lt;/span&gt;
  &lt;span class=&quot;n&quot;&gt;channel&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;prefetch&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;10&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;)&lt;/span&gt;

  &lt;span class=&quot;c1&quot;&gt;# Configure this worker to send at 1 msg/s on average with occassional bursts&lt;/span&gt;
  &lt;span class=&quot;c1&quot;&gt;# up to 5 messages.&lt;/span&gt;
  &lt;span class=&quot;n&quot;&gt;token_bucket&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;no&quot;&gt;TokenBucket&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;new&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;mi&quot;&gt;5&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;)&lt;/span&gt;

  &lt;span class=&quot;c1&quot;&gt;# Refresh the token bucket every second. The bucket is also refreshed&lt;/span&gt;
  &lt;span class=&quot;c1&quot;&gt;# when the take method is called.&lt;/span&gt;
  &lt;span class=&quot;no&quot;&gt;EM&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;add_periodic_timer&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;p&quot;&gt;{&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;token_bucket&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;refresh&lt;/span&gt; &lt;span class=&quot;p&quot;&gt;}&lt;/span&gt;
 
  &lt;span class=&quot;c1&quot;&gt;# We subscribe with explicit acknowledgments so we can signal to RabbitMQ&lt;/span&gt;
  &lt;span class=&quot;c1&quot;&gt;# that more work should be delivered. Without this setting, RabbitMQ would&lt;/span&gt;
  &lt;span class=&quot;c1&quot;&gt;# send work over to us as fast as possible.&lt;/span&gt;
  &lt;span class=&quot;n&quot;&gt;channel&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;queue&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;s1&quot;&gt;&amp;#39;send_mt&amp;#39;&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;)&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;subscribe&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;ss&quot;&gt;:ack&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&amp;gt;&lt;/span&gt; &lt;span class=&quot;kp&quot;&gt;true&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;do&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;|&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;header&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;message&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;|&lt;/span&gt;
    &lt;span class=&quot;c1&quot;&gt;# Defer the processing to a background thread since taking a token from&lt;/span&gt;
    &lt;span class=&quot;c1&quot;&gt;# the bucket could potentially be a blocking operation and we don&amp;#39;t want to&lt;/span&gt;
    &lt;span class=&quot;c1&quot;&gt;# block the reactor.&lt;/span&gt;
    &lt;span class=&quot;no&quot;&gt;EM&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;defer&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;(&lt;/span&gt;
      &lt;span class=&quot;nb&quot;&gt;lambda&lt;/span&gt; &lt;span class=&quot;p&quot;&gt;{&lt;/span&gt;
        &lt;span class=&quot;c1&quot;&gt;# Takes 1 token from the bucket. If the bucket is empty, this&lt;/span&gt;
        &lt;span class=&quot;c1&quot;&gt;# method will block.&lt;/span&gt;
        &lt;span class=&quot;n&quot;&gt;token_bucket&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;take&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;)&lt;/span&gt;

        &lt;span class=&quot;n&quot;&gt;process_message&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;message&lt;/span&gt;&lt;span class=&quot;p&quot;&gt;)&lt;/span&gt;

        &lt;span class=&quot;c1&quot;&gt;# Acknowledge this message, allowing RabbitMQ to send more work.&lt;/span&gt;
        &lt;span class=&quot;n&quot;&gt;header&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;ack&lt;/span&gt;
      &lt;span class=&quot;p&quot;&gt;}&lt;/span&gt;
    &lt;span class=&quot;p&quot;&gt;)&lt;/span&gt;
  &lt;span class=&quot;k&quot;&gt;end&lt;/span&gt;
&lt;span class=&quot;k&quot;&gt;end&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;p class=&quot;footnote&quot; id=&quot;fn1&quot;&gt;&lt;a href=&quot;#fnr1&quot;&gt;&lt;sup&gt;1&lt;/sup&gt;&lt;/a&gt; &lt;a href=&quot;http://en.wikipedia.org/wiki/Token_bucket&quot; title=&quot;Wikipedia&quot;&gt;Token Buckets&lt;/a&gt;&lt;/p&gt;</content>
 </entry>
 
 
</feed>

