• Stars
    star
    420
  • Rank 102,781 (Top 3 %)
  • Language
    JavaScript
  • License
    MIT License
  • Created over 12 years ago
  • Updated 6 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Simple service bus for sending events between processes using amqp.

Build Status

servicebus

Simple service bus for sending events between processes using amqp. Allows for send/receive and publish/subscribe pattern messaging over RabbitMQ.

Configuration

Sending and Receiving

Servicebus allows simple sending and recieving of messages in a 1:1 sender:listener configuration. The following two processes will send an event message called 'my.event' every second from process A to process B via RabbitMQ and print out the sent event:

Process A:

var bus = require('servicebus').bus();
bus.listen('my.event', function (event) {
  console.log(event);
});

Process B:

var bus = require('servicebus').bus();

setInterval(function () {
  bus.send('my.event', { my: 'event' });
}, 1000);

Round-Robin Load Distribution

Simply running multiple versions of Process A, above, will cause servicebus to distribute sent messages evenly accross the list of listeners, in a round-robin pattern.

Message Acknowledgement

(Note: message acking requires use of the https://github.com/mateodelnorte/servicebus-retry middleware)

Servicebus integrates with RabbitMQ's message acknowledement functionality, which causes messages to queue instead of sending until the listening processes marks any previously received message as acknowledged or rejected. Messages can be acknowledged or rejected with the following syntax. To use ack and reject, it must be specified when defining the listening function:

bus.listen('my.event', { ack: true }, function (event) {
  event.handle.acknowledge(); // acknowledge a message
  event.handle.ack(); // short hand is also available
  event.handle.reject(); // reject a message
});

Message acknowledgement is suited for use in load distribution scenarios.

Authentication (RabbitMQ Bus)

Fully qualified url

You may authenticate by providing url as an option when initializing the bus, or setting RABBITMQ_URL as an environment variable. RabbitMQ uses basic auth url format for authentication.

var bus = servicebus.bus({
  url: "amqp://user:pass@localhost:5672,
})

config options

Alternatively, you may provide a user, password, host (optional, default = 'localhost'), and port (optional, default = 5672), and servicebus will construct the url before passing it to RabbitMQ.

var bus = servicebus.bus({
  user: 'rabbitUser',
  password: 'test1234',
  host: '1.1.1.1'
  port: '5555'
})

NOTE: If url and user/password are provided, the url will be used.

Publish / Subscribe

Servicebus can also send messages from 1:N processes in a fan-out architecture. In this pattern, one sender publishes a message and any number of subscribers can receive. The pattern for usage looks very similar to send/listen:

Process A (can be run any number of times, all will receive the event):

var bus = require('servicebus').bus();
bus.subscribe('my.event', function (event) {
  console.log(event);
});

Process B:

var bus = require('servicebus').bus();

setInterval(function () {
  bus.publish('my.event', { my: 'event' });
}, 1000);

Topic Routing

To use topic routing to accept multiple events in a single handler, use publish and subscribe and the following syntax:

bus.publish('event.one', { event: 'one' });
bus.publish('event.two', { event: 'two' });

and for the listener...

bus.subscribe('event.*', function (msg) ...

Middleware

Servicebus allows for middleware packages to enact behavior at the time a message is sent or received. They are very similar to connect middleware in their usage:

  if ( ! process.env.RABBITMQ_URL)
    throw new Error('Tests require a RABBITMQ_URL environment variable to be set, pointing to the RabbiqMQ instance you wish to use.');

  var busUrl = process.env.RABBITMQ_URL

  var bus = require('../').bus({ url: busUrl });

  bus.use(bus.package());
  bus.use(bus.correlate());
  bus.use(bus.logger());

  module.exports.bus = bus;

Middleware may define one or two functions to modify incoming or outgoing messages:

...

  function logIncoming (queueName, message, options, next) {
    log('received ' + util.inspect(message));
    next(null, queueName, message, options);
  }

  function logOutgoing (queueName, message, options, next) {    
    log('sending ' + util.inspect(message));
    next(null, queueName, message, options);
  }

  return {
    handleIncoming: logIncoming,
    handleOutgoing: logOutgoing
  };

handleIncoming pipelines behavior to be enacted on an incoming message. handleOutgoing pipelines behavior to be enacted on an outgoing message. To say that the behavior is pipelined is to say that each middleware is called in succession, allowing each to enact its behavior before the next. (in from protocol->servicebus->middleware 1->middleware 2->servicebus->user code)

Included Middleware

Correlate

Correlate simply adds a .cid (Correlation Identity) property to any outgoing message that doesn't already have one. This is useful for following messages in logs across services.

Logger

Logger ensures that incoming and outgoing messages are logged to stdout via the debug module. (Use this in non-high throughput scenarios, otherwise you'll have some very quickly growing logs)

Package

Package repackages outgoing messages, encapsulating the original message as a .data property and adding additional properties for information like message type and datetime sent:

  // bus.publish('my:event', { my: 'event' });
  {
    my: 'event'
  };

becomes

  {
    data: {
      my: 'event'
    }
    , datetime: 'Wed, 04 Sep 2013 19:31:11 GMT'
    , type: 'my:event'
  };

Retry

https://github.com/mateodelnorte/servicebus-retry

Retry provides ability to specify a max number of times an erroring message will be retried before being placed on an error queue. The retry middleware requires the correlate middleware.

Contributing

servicebus uses semantic-release for deploys.

Commits must follow Conventional Changelog to accurately calculate new versions.

More Repositories

1

meta

tool for turning many repos into a meta repo. why choose many repos or a monolithic repo, when you can have both with a meta repo?
JavaScript
2,026
star
2

sourced

Tiny framework for building models with the event sourcing pattern (events and snapshots).
JavaScript
284
star
3

loop

loop through commands in fun and amazing ways!
JavaScript
126
star
4

forecast.io

wrapper for the forecast.io API
JavaScript
117
star
5

sourced-repo-mongo

mongo data store and repository for sourced-style event sourcing models
JavaScript
55
star
6

coinbase

wrapper for the coinbase bitcoin wallet & exchange API
JavaScript
48
star
7

meta-git

git plugin for meta
JavaScript
23
star
8

servicebus-retry

JavaScript
18
star
9

mockrequire

Simple module for mocking required dependencies. Works with any testing suite.
JavaScript
17
star
10

node-paypal-masspayments

Node PayPal Mass Payment NVM
JavaScript
14
star
11

servicebus-register-handlers

JavaScript
11
star
12

microsvc

tiny reusable framework for building microservices with messaging and rest
JavaScript
11
star
13

servicebus-trace

servicebus middleware to publish message receipt information to a central store, for message tracking and tracing purposes.
JavaScript
10
star
14

filecoin-lotus-docker

Dockerfile
9
star
15

meta-npm

npm plugin for meta
JavaScript
8
star
16

meta-yarn

yarn plugin for meta
JavaScript
6
star
17

node_hands_on-unit_testing

6
star
18

meta-project

JavaScript
6
star
19

july3rd

6
star
20

SeattleNodejs.SimpleServerWHerokuDeploy

JavaScript
5
star
21

amqp-match

transforms amqp routing keys to regular expressions for more convenient runtime matching
JavaScript
4
star
22

selenium-webdriver-slug

Starter project for getting rolling with Selenium Webdriver and PhantomJS
JavaScript
4
star
23

meta-gh

gh command plugin for meta
JavaScript
3
star
24

cconfig

JavaScript
3
star
25

kill-all-node-on-windows

JavaScript
3
star
26

meta-init

plugin for initializing new meta repositories
JavaScript
2
star
27

sample-service

JavaScript
2
star
28

servicebus-message-domain

JavaScript
2
star
29

gitslave

fork of gitslave from http://gitslave.sourceforge.net/ modified to work with current versions of git
Perl
2
star
30

meta-exec

exec plugin for meta
JavaScript
2
star
31

llog

JavaScript
2
star
32

find-file-recursively-up

returns the path at which a provided filename exists, checking the current directory and any parent folders recursively up.
JavaScript
2
star
33

pidler

simple module to create process pids
JavaScript
1
star
34

meta-loop

helper function to loop-exec in a meta repository and all child repositories
JavaScript
1
star
35

sample-site

CSS
1
star
36

microsvc-slave

JavaScript
1
star
37

objectify-folder

return all non index.js modules in a folder as a single object
JavaScript
1
star
38

thenodejsmeetup.com

JavaScript
1
star
39

servicebus-rabbitmq

rabbitmq bus provider for servicebus
JavaScript
1
star
40

get-meta-file

JavaScript
1
star
41

brb

1
star
42

symlink-meta-dependencies

symlink all of a project's dependencies which appear in the project's .meta file
JavaScript
1
star
43

sourced-queued-repo

JavaScript
1
star
44

google-cloud-run

google cloud run example project with build and deployment helpers
Makefile
1
star
45

find-module-bin

scan known node_modules directories to find installation directory of a particular bin file
JavaScript
1
star
46

microsvc-example

JavaScript
1
star