• Stars
    star
    115
  • Rank 305,916 (Top 7 %)
  • Language
    CoffeeScript
  • License
    MIT License
  • Created almost 10 years ago
  • Updated almost 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Helper to simply implement a worker around RSMQ ( Redis Simple Message Queue )

RSMQ-Worker

Build Status Windows Tests Coveralls Coverage

Deps Status npm version npm downloads

Join the chat at https://gitter.im/mpneuried/rsmq-worker

Helper to simply implement a worker RSMQ ( Redis Simple Message Queue ).

NPM

⚠️ Note: RSMQ uses the Redis EVAL command (LUA scripts) so the minimum Redis version is 2.6+.

Install

  npm install rsmq-worker

Initialize

  new RSMQWorker( queuename, options );

Example:

  var RSMQWorker = require( "rsmq-worker" );
  var worker = new RSMQWorker( "myqueue" );

  worker.on( "message", function( msg, next, id ){
  	// process your message
  	console.log("Message id : " + id);
  	console.log(msg);
  	next()
  });

  // optional error listeners
  worker.on('error', function( err, msg ){
      console.log( "ERROR", err, msg.id );
  });
  worker.on('exceeded', function( msg ){
      console.log( "EXCEEDED", msg.id );
  });
  worker.on('timeout', function( msg ){
      console.log( "TIMEOUT", msg.id, msg.rc );
  });

  worker.start();

Config

  • queuename: ( String required ) The queuename to pull the messages
  • options ( Object optional ) The configuration object
    • options.interval: ( Number[] optional; default = [ 0, 1, 5, 10 ] ) An Array of increasing wait times in seconds. More details
    • options.maxReceiveCount: ( Number optional; default = 10 ) Receive count until a message will be exceeded
    • options.invisibletime: ( Number optional; default = 30 ) A time in seconds to hide a message after it has been received.
    • options.defaultDelay: ( Number optional; default = 1 ) The default delay in seconds for for sending new messages to the queue.
    • options.autostart: ( Boolean optional; default = false ) Autostart the worker on init
    • options.timeout: ( Number optional; default = 3000 ) Message processing timeout in ms. So you have to call the next() method of message at least after e.g. 3000ms. If set to 0 it'll wait until infinity.
    • options.customExceedCheck: ( Function optional; ) A custom function, with the raw message (see message format) as argument to build a custom exceed check. If you return a true the message will not exceed. On return false the regular check for maxReceiveCount will be used.
    • options.alwaysLogErrors: ( Boolean optional; default = false ) An error will be logged to the console even if an error listener has been attached.
    • options.rsmq: ( RedisSMQ optional; default = null ) A already existing rsmq instance to use instead of creating a new client
    • options.redis: ( RedisClient optional; default = null ) A already existing redis client instance to use if no rsmq instance has been defined
    • options.redisPrefix: ( String optional; default = "" ) The redis prefix/namespace for rsmq if no rsmq instance has been defined. This has to match the option ns of RSMQ.
    • options.host: ( String optional; default = "localhost" ) Host to connect to redis if no rsmq or redis instance has been defined
    • options.port: ( Number optional; default = 6379 ) Port to connect to redis if no rsmq or redis instance has been defined
    • options.options: ( Object optional; default = {} ) Options to connect to redis if no rsmq or redis instance has been defined

Raw message format

A message ( e.g. received by the event data or customExceedCheck ) contains the following keys:

  • msg.message : ( String ) The queue message content. You can use complex content by using a stringified JSON.
  • msg.id : ( String ) The rsmq internal message id
  • msg.sent : ( Number ) Timestamp of when this message was sent / created.
  • msg.fr : ( Number ) Timestamp of when this message was first received.
  • msg.rc : ( Number ) Number of times this message was received.

Methods

.start()

If you haven't defined the config autostart to true you have to call the .start() method.

Return

( Self ): The instance itself for chaining.

.stop()

Just stop the receive interval. This will not cut the connection to rsmq/redis. If you want you script to end call .quit()

Return

( Self ): The instance itself for chaining.

.send( msg [, delay ][, cb ] )

Helper function to simply send a message in the configured queue.

Arguments

  • msg : ( String required ): The rsmq message. In best practice it's a stringified JSON with additional data.
  • delay : ( Number optional; default = 0 ): The message delay to hide this message for the next x seconds.
  • cb : ( Function optional ): An optional callback to get a secure response for a successful send.

Return

( Self ): The instance itself for chaining.

.del( id [, cb ] )

Helper function to simply delete a message after it has been processed.

Arguments

  • id : ( String required ): The rsmq message id.
  • cb : ( Function optional ): A optional callback to get a secure response for a successful delete.

Return

( Self ): The instance itself for chaining.

.changeInterval( interval )

Change the interval timeouts in operation.

Arguments

  • interval : ( Number|Array required ): The new interval.

Return

( Self ): The instance itself for chaining.

.quit()

Stop the worker and close the connection. After this it's no longer possible to reuse the worker-instance. It's just intended to kill all timers and connections so your script will end.

.info( cb )

Get the current queue attributes. This is just a shortcut to the rsmq.getQueueAttributes.

Arguments

  • cb : ( Function ): Callback with ( err, attributes ). See rsmq-docs for details.

.size( [hidden=false], cb )

Get the current queue size.

Arguments

  • hidden : ( Boolean optional; default = false ): The count of messages including the currently hidden/"in flight" messages.
  • cb : ( Function optional ): Callback with ( err, size ). The size is a Number and represents the number of messages in the queue. If hidden=true you will receive the number of currently hidden messages.

Return

( Self ): The instance itself for chaining.

Events

message

Main event to catch and process a message. If you do not set a handler for this Event nothing will happen.

Example:

worker.on( "message", function( message, next, msgid ){
	// process message ... 
	next();
});

Arguments

  • message : ( String ) The queue message content to process. You can use complex content by using a stringified JSON.
  • next : ( Function ) A function you have to call when your message has been processed.
    Arguments
    • delete: ( Boolean|Error optional; default = true ) Error: If you return an error it will emitted as an error event; Boolean: It's possible to prevent the worker from auto-delete the message on end. This is useful if you want to pop up a message multiple times. To implement this, please check the config options.customExceedCheck
  • msgid : ( String ) The message id. This is useful if you want to delete a message manually.

ready

Fired until the worker is connected to rsmq/redis and has been initialized with the given queuename.

data

The raw event when a message has been received.

Arguments

  • msg : ( String ) The raw rsmq message. ( See section Raw message format )

deleted

Fired after a message has been deleted.

Arguments

  • id : ( String ) The rsmq message id

exceeded

Fired after a message has been exceeded and immediately will be deleted.

Arguments

  • msg : ( String ) The raw rsmq message. ( See section Raw message format )

timeout

Fired if a message processing exceeds the configured timeout.

Arguments

  • msg : ( String ) The raw rsmq message. ( See section Raw message format )

error

Fired if a message processing throws an error.

Arguments

  • err : ( Error|Any ) The thrown error
  • msg : ( String ) The raw rsmq message. ( See section Raw message format )

Advanced example

This is an advanced example showing some features in action.

	var fs = require( "fs" );
	var RSMQWorker = require( "rsmq-worker" );

	var fnCheck = function( msg ){
		// check function to not exceed the message if the content is `createmessages`
		if( msg.message === "createmessages" ){
			return true
		}
		return false
	}

	
	var worker = new RSMQWorker( "myqueue", {
		interval: [ .1, 1 ],				// wait 100ms between every receive and step up to 1,3 on empty receives
		invisibletime: 2,						// hide received message for 5 sec
		maxReceiveCount: 2,					// only receive a message 2 times until delete
		autostart: true,						// start worker on init
		customExceedCheck: fnCheck	// set the custom exceed check
	});

	// Listen to errors
	worker.on('error', function( err, msg ){
	    console.log( "ERROR", err, msg.id );
	});
	worker.on('timeout', function( msg ){
	    console.log( "TIMEOUT", msg.id, msg.rc );
	});
	
	// handle exceeded messages
	// grab the internal rsmq instance
	var rsmq = worker._getRsmq();
	worker.on('exceeded', function( msg ){
		console.log( "EXCEEDED", msg.id );
		// NOTE: make sure this queue exists
		rsmq.sendMessage( "YOUR_EXCEEDED_QUEUE", msq, function( err, resp ){
			if( err ){
				console.error( "write-to-exceeded-queue", err )
			}
		});
	});

	// listen to messages
	worker.on( "message", function( message, next, id ){
		
		console.log( "message", message );
		
		if( message === "createmessages" ){
			next( false )
			worker.send( JSON.stringify( { type: "writefile", filename: "./test.txt", txt: "Foo Bar" } ) );
			worker.send( JSON.stringify( { type: "deletefile", filename: "./test.txt" } ) );
			return	
		}

		var _data = JSON.parse( message )
		switch( _data.type ){
			case "writefile": 
				fs.writeFile( _data.filename, _data.txt, function( err ){
					if( err ){
						next( err );
					}else{
						next()
					}
				});
				break;
			case "deletefile": 
				fs.unlink( _data.filename, function( err ){
					if( err ){
						next( err );
					}else{
						next()
					}
				});
				break;
		}
		
	});

	worker.send( "createmessages" );

Details

Options interval

The option interval can:

A.) be a Number so the worker will poll the queue every n seconds (e.g. interval: .5 = twice a second second)

B.) be an Array of Numbers. On start interval[0] is the time to poll the queue. Everytime the worker receives an empty response (queue is empty) the next interval will be used to wait for the next poll (interval[+1]) until the last definition interval[ n ] was reached. On every received message the wait time will be reset to interval[0].

E.g: interval: [ .2, 1, 3 ]

  • 1st poll -> no message -> wait .2s = 200ms
  • 2nd poll -> no message -> wait 1s
  • 3rd poll -> no message -> wait 3s
  • 4th poll -> no message -> wait 3s
  • 5th poll -> 1 message -> wait .2s
  • 6th poll -> no message -> wait 1s
  • 7th poll -> 1 message -> wait .2s
  • 8th poll -> 1 message -> wait .2s
  • 9th poll -> no message -> wait .2s
  • 10th poll -> no message -> wait 1s ...

Todos/Ideas

  • MORE tests!
  • Optional parallel execution. To do multiple receives in parallel.
  • Automatic write of exceeded messages to a configured queue.

Release History

Version Date Description
0.5.2 2016-10-24 Optimized README and updated dependencies
0.5.1 2016-08-22 Fixed reconnect error Issue#20. Thanks to mstduff; updated deps; removed generated code docs from repo
0.5.0 2016-07-14 Added methods .info(cb) (Issue#17) and .size( [hidden,] cb )
0.4.3 2016-06-20 Optimized event listeners Issue#15. Thanks to Kevin Turner
0.4.2 2016-05-06 Added the .quit() function Issue#11. Thanks to Sam Fung
0.4.1 2016-04-05 Fixed missing isNumber function
0.4.0 2016-03-30 Updated dependencies (especially lodash to 4.x). Fixed a config bug caused by the array merge from extend Issue#7. Thanks to Peter Hanneman
0.3.8 2015-11-04 Fixed stop behavior. Pull#5. Thanks to Exinferis
0.3.7 2015-09-02 Added tests to check the behavior during errors within message processing; Added option alwaysLogErrors to prevent console logs if an error event handler was attached. Issue #3
0.3.6 2015-09-02 Updated dependencies; optimized readme (thanks to Tobias Lidskog for the pull #4)
0.3.5 2015-04-27 again ... fixed argument dispatch for .send()
0.3.4 2015-04-27 fixed argument dispatch for .send() and added optional cb for .del()
0.3.3 2015-03-27 added changeInterval to modify the interval in operation
0.3.2 2015-02-23 changed default prefix/namespace;
0.3.0 2015-02-16 It's now possible to return an error as first argument of next. This will lead to an error emit + optimized readme
0.2.2 2015-01-27 added option defaultDelay and optimized arguments of the send method; fixed travis.yml
0.2.0 2015-01-27 Added timeout, better error handling and send callback
0.1.2 2015-01-20 Reorganized code, added code docs and optimized readme
0.1.1 2015-01-17 Added test scripts and optimized repository file list
0.1.0 2015-01-16 First working and documented version
0.0.1 2015-01-14 Initial commit

NPM

Initially Generated with generator-mpnodemodule

Other projects

Name Description
rsmq A really simple message queue based on Redis
rsmq-cli a terminal client for rsmq
rest-rsmq REST interface for.
redis-notifications A redis based notification engine. It implements the rsmq-worker to safely create notifications and recurring reports.
node-cache Simple and fast NodeJS internal caching. Node internal in memory cache like memcached.
redis-sessions An advanced session store for NodeJS and Redis
obj-schema Simple module to validate an object by a predefined schema
connect-redis-sessions A connect or express middleware to simply use the redis sessions. With redis sessions you can handle multiple sessions per user_id.
systemhealth Node module to run simple custom checks for your machine or it's connections. It will use redis-heartbeat to send the current state to redis.
task-queue-worker A powerful tool for background processing of tasks that are run by making standard http requests.
soyer Soyer is small lib for serverside use of Google Closure Templates with node.js.
grunt-soy-compile Compile Goggle Closure Templates ( SOY ) templates inclding the handling of XLIFF language files.
backlunr A solution to bring Backbone Collections together with the browser fulltext search engine Lunr.js

The MIT License (MIT)

Copyright © 2015 Mathias Peter, http://www.tcs.de

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

More Repositories

1

redis-notifications

A redis based notification engine. It implements the rsmq-worker to safely create notifications and recurring reports
CoffeeScript
27
star
2

backlunr

is a solution to bring Backbone Collections together with the browser fulltext search engine Lunr.js
JavaScript
26
star
3

aws-s3-form

Generate a signed and ready to use formdata to put files to s3 directly from the browser. Signing is done by using AWS Signature Version 4
HTML
23
star
4

node-payments

A simple interface to multiple payment services based on express
CoffeeScript
23
star
5

rsmq-cli

A RSMQ terminal client.
CoffeeScript
20
star
6

connect-redis-sessions

Session store using redis-sessions for Connect
CoffeeScript
18
star
7

html-extractor

Extract meta-data from a html string. It extracts the body, title, meta-tags and first headlines to a object to push them to a search indexer like elastic-search
CoffeeScript
14
star
8

redis-heartbeat

Pulse a heartbeat to redis. This can be used to detach or attach servers to nginx or similar problems
CoffeeScript
12
star
9

systemhealth

Node module to run simple custom checks for your machine or it's connections. It will use https://github.com/mpneuried/redis-heartbeat to send the current state to redis.
CoffeeScript
10
star
10

grunt-soy-compile

Compile Goggle Closure Templates ( SOY ) templates inclding the handling of XLIFF language files.
CoffeeScript
6
star
11

dailyjs

Regular JavaScript blog
JavaScript
5
star
12

soyer

use Google's Closure Templates inside node.js.
JavaScript
5
star
13

nsq-logger

A nsq reader factory to handle readers multiple topics based on the `nsq-topics` module
CoffeeScript
5
star
14

ex-redis-sessions

An advanced session store for Elixir and NodeJS based on Redis
Elixir
5
star
15

simple-dynamo

simple-dynamo is a abstraction layer to simplify Jed Schmidt's dynamo Node.js driver.
CoffeeScript
4
star
16

obj-schema

Simple module to validate an object by a predefined schema
CoffeeScript
4
star
17

nsq-topics

Nsq helper to poll a nsqlookupd service for all it's topics and reflect it local.
CoffeeScript
4
star
18

hyperrequest

A wrapper arround hyperquest to handle http requests
CoffeeScript
4
star
19

sorted_ttl_list

A ets based list with an expire feature. So you can push keys to the list that will expire after a gven time.
Elixir
4
star
20

fitbit-subscription-example

A example how to use the fitbit subscription api from node
JavaScript
4
star
21

tcs-charts

D3 Charting helpers
JavaScript
3
star
22

elixir-alpine

a minimal docker container to run elixir
3
star
23

mysqlex_pool

A warpper for mysqlex to add connection pooling with poolboy
Elixir
3
star
24

polar-sdk

Simplify the usage of the polar flow api to register, deregister, and list users. It also supports reading the daily activity and exercises
CoffeeScript
3
star
25

generator-mpnodemodule

Simple Generator for a node module
JavaScript
3
star
26

devwatch

a flexible file watcher to run every bash command you need
JavaScript
2
star
27

nodejs-alpine-buildtools-gm

A docker container based on alpine linux with installed node.js, build tools, imagemagick and graphicsmagick
Dockerfile
2
star
28

nsq-watch

A simple watching service to check if one or more topics are not pulled
HTML
2
star
29

rsmq-mnug-twitter

A simple test app as showcase for rsmq. It listens to twitter hash tags and responds with a chuck norris joke as private message.
JavaScript
2
star
30

sortbylist

Simple helper to sort a collection by a given list of id's
CoffeeScript
2
star
31

mpbasic

Basic class
HTML
2
star
32

media-api-client

browser client to upload files to the TCS Media-API
JavaScript
2
star
33

randoms

Generate random numbers, strings, arrays and objects
TypeScript
2
star
34

rsmq-monitor

A monitoring module to look inside rsmq and generate simple usage stats.
CoffeeScript
2
star
35

node-cache-persist

Extend node-cache to add the methods `.read()` and `.write()` to dump and load the cache to/from the disk.
HTML
2
star
36

nsq-nodes

Nsq helper to poll a nsqlookupd service for all it's nodes and reflect it local.
HTML
2
star
37

generator-mpnodeapp

This is a generator to create a basic node.js app skeleton.
CoffeeScript
2
star
38

sortcoll

Simple function to create a collection sort function for multiple keys
CoffeeScript
2
star
39

nodejs-alpine-buildtools-gettext

1
star
40

learn_elixir

Personal notes and tests i've done during leaning elixir with www.learnelixir.tv
Elixir
1
star
41

hans-suzan-tango

website quelldaten
1
star
42

rsmq-mnug-demo

JavaScript
1
star
43

simpleasync

simple solution to handle async processes within the clinet or within node
CoffeeScript
1
star
44

tcs_mail_node_client

TCS mail - node client
CoffeeScript
1
star
45

docker_distillery

Create docker image to be able to create elixir releases via distillery on different operating systems.
1
star
46

mysql-dynamo

A solution to use the simple-dynamo interface with a MySQL database. So you can create a offline version of a AWS-DynamoDB project.
CoffeeScript
1
star
47

html_2_markdown_compare

This is a small tool to compare the results of 3 different node.js html 2 markdown converters.
HTML
1
star
48

mysql-factory

CoffeeScript
1
star
49

Skeleton-ERSCSB

Application skeleton with Express 4, Redis, Swig, CoffeeScript, Stylus and Bootstrap
CoffeeScript
1
star
50

mysql-dynamo-pump

Pump data from a mysql databse to aws dynamo db
CoffeeScript
1
star
51

tcs_node_auth

Easy to use authentication with activation by mail
CoffeeScript
1
star
52

simple_socket_example

a simple socket.io chat to demo socket.io
CoffeeScript
1
star
53

markitup_showdown

JavaScript
1
star
54

Backbone-example-app

A simple demonstation of Backbone
CoffeeScript
1
star
55

warni

A helper to count alerts and send warnings or alerts not on every event.
CoffeeScript
1
star
56

markdown-benchmark

Benchmark node.js markdown to html converter.
CoffeeScript
1
star