swarmiji
swarmiji is a framework that helps in writing distributed programs using the clojure programming language. I wrote it because our startup (Runa) needed computations to span not just clojure agents within single JVMs but across machines. This especially became the case as our load grew.
swarmiji uses the fantastic RabbitMQ as its central nervous system to communicate between service requesters and (possibly multiple) workers. It provides simple constructs to create these distributed services and makes it very easy to use them in your code. It has support for additional things like – automatic time-outs (if a worker fails), and a simple metrics collection system (to see how long services are taking, and where time is being spent). It also uses a very simple web-framework that allows clojure functions (that might use swarmiji) to be exposed as web-services.
Contributions are welcome, as are recommendations for how to improve things.
example
The easiest way to understand how to use swarmiji is to see an example. Consider the code below which is written in the traditional way:
(defn monthly-spend [month]
(total-spend-for-days (days-of-month month))
(defn total-quarterly-spend []
(let [first (monthly-spend (first-month-of-quarter))
second (monthly-spend (second-month-of-quarter))
third (monthly-spend (third-month-of-quarter))]
(+ first second third)))
Some imagination is needed – lets assume that total-spend-for-days is a long running process that calculates important stuff. Total-quarterly-spend, then takes three times as long. However, since each month’s calculation is basically independent of each other, we could run this in parallel. If you’d this parallelism to take advantage of a horizontally scalable cluster, these computations need to run on different machines.
The following code shows how the above can be converted into using swarmiji. This version runs in parallel on distributed machines:
(defsevak monthly-spend [month]
(total-spend-for-days (days-of-month month))
(defn total-quarterly-spend []
(let [first (monthly-spend (first-month-of-quarter))
second (monthly-spend (second-month-of-quarter))
third (monthly-spend (third-month-of-quarter))]
(from-swarm 1000 [first second third]
(+ (first :value) (second :value) (third :value))))
What’s happening here is that instead of using the good old defn to define monthly-spend, we use defsevak. Then, when you call the function, what is returned is not the value but a proxy object which represents the computation on another CPU. You can call as many such sevaks as you want, and when you’re ready to combine the results to produce the final result you use the from-swarm macro to synchronize.
sevaks and defsevak
In Hindi, the word sevak means servant or worker. In swarmiji, you define such workers using the defsevak macro. Sevaks can transparently run on remote machines by simply changing a configuration parameter (:distributed-mode) to true.
defsevak itself works exactly like defn.
sevak-server
A sevak-server is the process that loads up all the defined sevaks (using require, load and/or use) and calls boot-sevak-server. This basically makes all the defined sevaks available as “services” that can be called by a client by using the sevak function. The coordination happens over the rabbitmq bus. Here is an example -
;;assume sevak_file1.clj defines add-numbers and multiply-numbers as sevaks
(defsevak add-numbers [n1 n2]
(+ n1 n2))
(defsevak multiply-numbers [n1 n2]
(* n1 n2))
;;assume sevak_file2.clj defines subtract-numbers and divide-numbers as sevaks
(defsevak subtract-numbers [n1 n2]
(= n1 n2))
(defsevak divide-numbers [n1 n2]
(/ n1 n2))
;;now in the file that calls boot-sevak-server:
(use 'sevak-file1)
(use 'sevak-file2)
(boot-sevak-server)
The way to handle increasing volumes of load is to just add more sevak-servers. This can be done by starting multiple processes that call boot-sevak-server, and this can be done on multiple machines. Thus, by simply adding machines, more load can be handled. If you do have multiple sevak-servers, requests are handled in a round-robin fashion. This is the default way rabbitmq delivers messages when there are multiple subscribers – though this job allocation strategy may change in the future.
from-swarm
Sevaks can run on any machine that is available to do the work. The object returned by a call to a sevak represents this computation. Since the time taken to execute this computation is non-deterministic (depends on how much work there is, the load on the cluster, etc.) these object are essentially asynchronous in nature. Thus, when we’re ready to use results from one or more sevaks, we must use a synchronization construct. This is what the from-swarm macro does – it accepts a time-out along with a list of sevaks to synchronize on – the process will basically wait at this point until all the sevaks complete. Once the sevaks have all completed, the code block is executed. If, instead, the sevaks don’t all complete within the specified timeout (specified in milliseconds), an exception is thrown.
It can get cumbersome to always pass a timeout wherever from-swarm is called, so its idiomatic to create a macro which wraps from-swarm and uses an appropriate global timeout value.
what does calling a sevak return?
Calling a sevak looks exactly like calling a regular function. However while a regular function returns the result of the function body, a sevak returns a proxy object that is the representation of the computation happening somewhere on the cloud. This object then must be queried to get the result out. This object also has lots of other information about the computation which can also be queried for. Querying this object is easy – since this object is really a function, you just call it with a query symbol as its argument. The most commonly used one is :value and it looks like -
(object-returned-from-sevak :value) This code returns the final computed result of the sevak.
All the supported arguments supported are:
argument | what does it return? |
:value | the result of the sevak computation |
:distributed? | is this sevak running locally or on a remote computer? |
:complete? | returns true if the sevak has completed its job |
:status | returns :success if the sevak run was successful, else :error if there was a problem |
:sevak-time | returns the CPU time spent executing the sevak body |
:messaging-time | returns the time taken in the messaging overhead of sending and receiving the sevak over RabbitMQ |
:total-time | returns the total of the :sevak-time and :messaging-time |
:exception | returns the name of the exception thrown by the sevak on the remote computer |
:stacktrace | returns the stacktrace of the exception thrown by the sevak on the remote computer |
:sevak-name | returns the name of the sevak |
:args | returns the args that the sevak was called with |
installation
There are a few moving parts to swarmiji.
RabbitMQ
There are many resources on the internet about installing rabbitmq. Here are a couple Official installation instructions and rabbitmq on Mac OS X
mysql
If you want to use the diagnostics recording capability, you will need mysql. There is an init.sql and a schema.rb in the db directory which helps set up the database. The configuration file also specifies the access to this database.
configuration
environment variables
swarmiji needs two environment variables to configure itself – SWARMIJI_HOME and SWARMIJI_ENV. The first should be set to the path of where swarmiji has been installed. The directories within this should be the stuff that comes out of git – config/, and src/, and utils, etc.
config file
There is a configuration file which controls the system. This file has entries for each environment that will be created, for example – development, test, staging etc. The config looks like:
(def operation-configs {
"development" {
:swarmiji-username "amit"
:host "stomp.rabbitmq.runa.com"
:port 61613
:q-username "guest"
:q-password "guest"
:sevak-request-queue-prefix "RUNA_SWARMIJI_TRANSPORT_"
:sevak-diagnostics-queue-prefix "RUNA_SWARMIJI_DIAGNOSTICS_"
:distributed-mode true
:diagnostics-mode false
:logsdir (str swarmiji-home "/logs")
:log-to-console true }
"test" {
...
}
"staging" {
...
}
})
swarmiji-username is used to name-space control messages on the rabbitmq channels used for communicating between sevaks etc. This way, multiple developers can use the same setup, and not step on each others toes.
The host, port, q-username, q-password are for the rabbitmq server running with the STOMP adapter. This may change in the future and use AMQP directly.
sevak-request-queue-prefix and sevak-diagnostic-queue-prefix are both the prefixes used in the rabbitmq channels that are used to handle the distribution. You can concatenate more qualifiers to these in order to allow the same setup to be shared among developers or even environments (eg. test, development, staging, etc.)
distributed-mode controls whether sevaks (when called) are sent off to run remotely or are executed locally (in the same process). This works transparently – and turning distribution off is great for unit-testing.
diagnostics-mode controls whether sevak-servers send out meta-informational messages on the rabbitmq bus. These messages can be recorded into a mysql database by running an included recorder process. More information about this is below in the diagnostics section.
logsdir is where the log files will be stored. Right now, it creates a log file for each sevak-server or web-server-2 process that is created. Log files are named by concatenating the environment name along with the process ID (pid) of the process, followed by dot log (.log) Logging needs to be improved in swarmiji since there can be dozens of sevak-servers running together.
log-to-console, if true, ensures that all log messages are also printed to the console. Good for development environments.
The config file also contains a section to configure the mysql database used by the diagnostics recorder module. This section looks like:
(def swarmiji-mysql-configs {
"development" {
:classname "com.mysql.jdbc.Driver"
:subprotocol "mysql"
:user "root"
:password "override"
:subname (str "//localhost/swarmiji_development")
}
"test" {
...
}
"staging" {
...
}
"production" {
...
}
}
)
binding-for-swarmiji
If your program uses the binding form to set special variables to something, you will need to use the binding-for-swarmiji macro instead. To understand what is happening, it is useful to remember what vars are (thread-local variables) and what happens when, for example, you send a function that uses a var to an agent (a different thread). What happens is that it doesn’t work unless you rebind the var inside the function.
The binding-for-swarmiji works exactly the same way as binding (indeed, it sets up an actual binding under the covers) – but also allows the swarmiji runtime to automatically rebind the vars inside the sevak-server.
Here’s how it works – lets assume you’re using my capjure library :) to do some HBase stuff from your sevaks. Capjure requires the hbase-master and primary-keys-config vars to be set up appropriately. So you would start your sevak-server from inside a binding-for-swarmiji call like this -
(use 'some.hbase-using.sevak-functions)
(binding-for-swarmiji [*hbase-master* (valid-hbase-master-hostname) *primary-keys-config* (valid-primary-keys-config)]
(boot-sevak-server))
And that’s all – your HBase using sevaks would work just fine with no need to rebind anything.
webbing
gotchas
diagnostics
swarmiji
Swarm. Swamiji. Ahahaha.
Copyright 2009 Amit Rathore
Acknowledgements
The excellent YourKit Java Profiler
YourKit is kindly supporting open source projects with its full-featured Java Profiler. YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications. Take a look at YourKit’s leading software products: YourKit Java Profiler and YourKit .NET Profiler