Localjob
Localjob is a simple, self-contained background queue built on top of System V message queues (SysV Message Queue => SysV MQ for short). The SysV message queue API is implemented by the kernel. It's very stable and performant.
This means workers and the app pushing to the queue must reside on the same machine. It's the sqlite of background queues. Here's a post about how it works. You can run Localjob either as a seperate process, or as a thread in your app.
Localjob is for early-development situations where you don't need a full-featured background queue, but just want to get started with something simple that does not rely on any external services. The advantage of the SysV queues is that your Rails app or worker can restart at any time, without loosing any events.
Localjob works on Ruby >= 2.0.0 on Linux and OS X.
Add it to your Gemfile:
gem 'localjob'
Usage
Localjobs have the following format:
class EmailJob
def initialize(user_id, email)
@user, @email = User.find(user_id), email
end
def perform
@email.deliver_to(@user)
end
end
To queue a job, create an instance of it and push it to the queue:
queue = Localjob.new
queue << EmailJob.new(current_user.id, welcome_email)
A job is serialized with YAML and pushed onto a persistent SysV Message Queue.
This means a worker does not have to listen on the queue to push things to it.
Pops off the message queue are atomic, so only one will receive the queue. This
means you can run multiple workers on the same machine if you wish. The workers
will deserialize the message to create an instance of your object, and call
#perform
on the object.
Rails initializer
For easy access to your queues in Rails, you can add an initializer to set up a
constant referencing each of your queues. This allows easy access anywhere in
your app. In config/initializers/localjob.rb
:
BackgroundQueue = Localjob.new
Then in your app you can simply reference the constant to push to the queue:
BackgroundQueue << EmailJob.new(current_user.id, welcome_email)
Managing workers
There are two ways to spawn workers, either a thread inside the process emitting
events, or as a separate process managed with the localjob
command-line
utility.
Thread
Spawn the worker thread in an initializer where you are initializing Localjob as well:
BackgroundQueue = Localjob.new
worker = Localjob::Worker.new(BackgroundQueue, logger: Rails.logger)
worker.work(thread: true)
Process
Spawning workers can be done with localjob
. Run localjob work
to spawn a
single worker. It takes a few arguments. The most important being --require
which takes a path the worker will require before processing jobs. For Rails,
you can run localjob work
without any arguments. localjob(2)
has a few other
commands such as list
to list all queues and size
to list the size of all
queues. localjob help
to list all commands.
Gracefully shut down workers by sending SIGQUIT
to them. This will make sure
the worker completes its current job before shutting down. Jobs can be sent to
the queue meanwhile, and the worker will process them once it starts again.
Testing
Create your instance of the queue as normal in your setup:
def setup
@queue = Localjob.new
@worker = Localjob::Worker.new(@queue)
end
In your teardown
you'll want to destroy your queue:
def teardown
@queue.destroy
end
You can get the size of your queue by calling @queue.size
. You pop off the
queue with @queue.shift
. Other than that, just use the normal API. You can
also read the tests for Localjob to get an idea of how to test. Sample test:
def test_pop_and_send_to_worker
WalrusJob.any_instance.expects(:perform)
@localjob << WalrusJob.new("move")
job = @localjob.shift
@worker.process(job)
assert_equal 0, @localjob.size
end
Multiple queues
If you wish to have multiple queues you can have multiple Localjob objects referencing different queues. A worker can only work off a single queue at a time, so you will have to spawn multiple thread or process workers. Example:
MailQueue = Localjob.new(0xDEADC0DE)
DefaultQueue = Localjob.new(0xDEADCAFE)
Note that Localjob takes a hex value as the queue name. This is how the SysV adapter identifies different queues.