Resque with EventMachine

written on

Introduction

SponsorPay is a global cross-platform advertising solution for the monetization of premium content or virtual currency.

This means that our application sits in between advertising networks, publishers and users. A critical part of this operation is communicating the users actions to the publishers and advertising networks. These communications (callbacks) are, more often than not, slow and uncertain, so the natural solution is to offload these jobs.

Problem: To process a large amount of external http requests bound by database access.
Our Solution: Non-blocking EventMachine driven jobs inside resque.
Technology Stack: Ruby, Ruby on Rails, MySQL and all the wonderful things

First approach

Our first approach was based on DelayedJob, and we were basically saving to a single MySQL table all the callbacks that needed to be executed. Since we have a lot of traffic, our database server had trouble handling all the connections, selects and updates from distinct workers.

Every job consists in checking a couple of things in the database and then making an HTTP request. The remote server might be slow to respond, so our worker usually spent the most of its time waiting for the server to answer. This lead to an increase of the number of workers needed, and, since every new worker adds more load to the database it is easy to conclude that given a certain amount of traffic this didn’t scale at all.

New approach: Redis and Resque

The first thing to try was moving the storage from MySQL to something else, preferably faster. The obvious choice is using Defunkt’s great Resque background worker and queueing library.

One thing solved: getting work from Redis queue is pretty much faster than the old MySQL-based solution.

It still didn’t solve another problem: the amount of database connections in the peak hours. Resque always forks a new process (by default), and for fast-paced processing, the initialization adds even more database traffic compared to DJ workers. Of course you can force Resque to not fork for new jobs:

“Non-forking Resque”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
task "resque:work_dont_fork" => ["resque:preload", "resque:setup"] do
  queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',')

  begin
    worker = Resque::Worker.new(*queues)
    worker.cant_fork = true
    worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
    worker.very_verbose = ENV['VVERBOSE']
  rescue Resque::NoQueueError
    abort "set QUEUE var, e.g.$ QUEUE=critical,high rake resque:work"
  end

  if ENV['PIDFILE']
    File.open(ENV['PIDFILE'], 'w') { |f| f << worker.pid }
  end

  worker.log "Starting worker #{worker}"

  worker.work(ENV['INTERVAL'] || 5) # interval, will block
end

This didn’t kill our database server anymore with the initialization, but we were still using one database connection per worker which gets very expensive when we are basically waiting for other servers to respond and touching the database very lightly.

Here we started to think, can we use a single process for several workers, which would share the same pool of database connections and even better: process the callbacks asynchronously. Since our codebase is all Ruby, we started to investigate the EventMachine and specifically its fiber-using sibling em-synchrony. The reactor pattern is an obvious choice for processes like our callbacks, which is lots of IO and almost none of raw CPU processing.

Writing asynchronous Ruby is hard given most of its libraries are non-asynchronous by nature. We needed to find async versions of all the blocking libraries we were using.

All the libraries we needed can be found in the em-synchrony gem:

  • connection_pool,
  • mysql2,
  • em-http-request,
  • em-redis.

We wanted to use as much of the old code as possible, so we also tried to use the em-synchrony activerecord adapter, but we found that, under heavy load, a lot of connections were dropped silently at that point, so we ended up writing our own simple ORM (backed by em-synchrony’s connection pool) and abandoned activerecord because our database usage is non-significant in the context of these jobs.

Our shiny new gem

Our solution is now packed in a rubygem called em-resque, which is a non-forking Resque, where it’s possible to run multiple Resque workers inside Ruby fibers, sharing the same database connection pool and performing fast as hell. For now it is not supporting Rails and is meant for pure Ruby apps. Installing is easy:

gem install em-resque

Or if using Bundler, add the following line to your Gemfile:

Gemfile
1
gem 'em-resque'

And run bundle install.

The gem depends on the Resque library and you can monitor the workers with Resque’s web interface. There is a rake task to start the workers, and when giving an environment variable FIBERS=N it sets the process to start N number of workers. It is necessary to load all the needed libraries in the Rakefile, or if you don’t want to mess the project’s Rakefile, it’s also possible to write a simple starter script for em-resque:

script/resque_async.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
RAILS_ENV = ENV['RAILS_ENV'] || 'development_async'
RAILS_ROOT = Dir.pwd

require 'rubygems'
require 'yaml'
require 'uri'
require 'em-resque'
require 'em-resque/worker_machine'
require 'em-resque/task_helper'
require 'resque-retry'
require 'em-synchrony'
require 'em-synchrony/connection_pool'
require 'em-synchrony/mysql2'
require 'em-synchrony/em-redis'

Dir.glob(File.join(RAILS_ROOT, 'lib', 'async_worker', '**', '*.rb')).sort.each{|f| require File.expand_path(f)}

resque_config = YAML.load_file("#{RAILS_ROOT}/config/resque.yml")
proxy_config = YAML.load_file("#{RAILS_ROOT}/config/proxy.yml")
PROXY = proxy_config ? proxy_config[RAILS_ENV] : nil

EM::Resque.redis = resque_config[RAILS_ENV]
opts = TaskHelper.parse_opts_from_env.merge(:redis => resque_config[RAILS_ENV])
EM::Resque::WorkerMachine.new(opts).start

For example starting 50 workers: QUEUE=* FIBERS=50 ruby script/resque_async.rb and they are displayed in the Resque’s web interface.

em-resque in the resque web interface

We wanted to show some workers working, but they are just to fast to be caught on a screenshot. :)

This thing is pretty nice for our purposes. With 100 workers we can process as many as 2500 callbacks per minute. The workers are sharing 20 database connections in our configuration and they are very lightweight. It scales well, it is easy to configure the connection pool size and it is easy to add more workers when needed.

The next thing to add would be the Rails support. If you want to help, please fork the project on GitHub and send us your pull requests.