Asynchronous Programming Library
Installation
$ npm install metasync
Asynchronous functions composition
metasync(fns)(data, done)
fns
- array of callback-last functions, callback contranct err-firstdata
- input data (optional)done
- err-first callback- Returns: composed callback-last / err-first function
const composed = metasync([f1, f2, f3, [[f4, f5, [f6, f7], f8]], f9]);
- Array of functions gives sequential execution:
[f1, f2, f3]
- Double brackets array of functions gives parallel execution:
[[f1, f2, f3]]
Example:
const metasync = require('metasync');
const fs = require('fs');
// Data collector (collect keys by count)
const dc = metasync.collect(4);
dc.pick('user', { name: 'Marcus Aurelius' });
fs.readFile('HISTORY.md', (err, data) => dc.collect('history', err, data));
dc.take('readme', fs.readFile, 'README.md');
setTimeout(() => dc.pick('timer', { date: new Date() }), 1000);
// Key collector (collect certain keys by names)
const kc = metasync
.collect(['user', 'history', 'readme', 'timer'])
.timeout(2000)
.distinct()
.done((err, data) => console.log(data));
kc.pick('user', { name: 'Marcus Aurelius' });
kc.take('history', fs.readFile, 'HISTORY.md');
kc.take('readme', fs.readFile, 'README.md');
setTimeout(() => kc.pick('timer', { date: new Date() }), 1000);
API
callbackify(fn)
fn
:<Function>
promise-returning function
Returns: <Function>
Convert Promise-returning to callback-last / error-first contract
asyncify(fn)
fn
:<Function>
regular synchronous function
Returns: <Function>
with contract: callback-last / error-first
Convert sync function to callback-last / error-first contract
promiseToCallbackLast(promise, callback)
promise
:<Promise>
callback
:<Function>
Convert Promise to callback-last
promisify(fn)
fn
:<Function>
callback-last function
Returns: <Function>
Promise-returning function
Convert async function to Promise-returning function
promisifySync(fn)
fn
:<Function>
regular synchronous function
Returns: <Function>
Promise-returning function
Convert sync function to Promise object
map(items, fn, done)
items
:<Array>
incomingfn
:<Function>
to be executed for each value in the arraycurrent
:<any>
current element being processed in the arraycallback
:<Function>
done
:<Function>
on done
Asynchronous map (iterate parallel)
filter(items, fn, done)
items
:<Array>
incomingfn
:<Function>
to be executed for each value in the arrayvalue
:<any>
item from items arraycallback
:<Function>
done
:<Function>
on done
Asynchrous filter (iterate parallel)
Example:
metasync.filter(
['data', 'to', 'filter'],
(item, callback) => callback(item.length > 2),
(err, result) => console.dir(result),
);
reduce(items, fn, done[, initial])
items
:<Array>
incomingfn
:<Function>
to be executed for each value in arrayprevious
:<any>
value previously returned in the last iterationcurrent
:<any>
current element being processed in the arraycallback
:<Function>
callback for returning value back to reduce functioncounter
:<number>
index of the current element being processed in arrayitems
:<Array>
the array reduce was called upon
done
:<Function>
on doneinitial
:<any>
optional value to be used as first argument in first iteration
Asynchronous reduce
reduceRight(items, fn, done[, initial])
items
:<Array>
incomingfn
:<Function>
to be executed for each value in arrayprevious
:<any>
value previously returned in the last iterationcurrent
:<any>
current element being processed in the arraycallback
:<Function>
callback for returning value back to reduce functioncounter
:<number>
index of the current element being processed in arrayitems
:<Array>
the array reduce was called upon
done
:<Function>
on doneinitial
:<any>
optional value to be used as first argument in first iteration
Asynchronous reduceRight
each(items, fn, done)
items
:<Array>
incomingfn
:<Function>
value
:<any>
item from items arraycallback
:<Function>
done
:<Function>
on done
Asynchronous each (iterate in parallel)
Example:
metasync.each(
['a', 'b', 'c'],
(item, callback) => {
console.dir({ each: item });
callback();
},
(err, data) => console.dir('each done'),
);
series(items, fn, done)
items
:<Array>
incomingfn
:<Function>
value
:<any>
item from items arraycallback
:<Function>
done
:<Function>
on done
Asynchronous series
Example:
metasync.series(
['a', 'b', 'c'],
(item, callback) => {
console.dir({ series: item });
callback();
},
(err, data) => {
console.dir('series done');
},
);
find(items, fn, done)
items
:<Array>
incomingfn
:<Function>
value
:<any>
item from items arraycallback
:<Function>
done
:<Function>
on done
Asynchronous find (iterate in series)
Example:
metasync.find(
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
(item, callback) => callback(null, item % 3 === 0 && item % 5 === 0),
(err, result) => {
console.dir(result);
},
);
every(items, fn, done)
items
:<Array>
incomingfn
:<Function>
value
:<any>
item from items arraycallback
:<Function>
done
:<Function>
on done
Asynchronous every
some(items, fn, done)
items
:<Array>
incomingfn
:<Function>
value
:<any>
item from items arraycallback
:<Function>
done
:<Function>
on done
Asynchronous some (iterate in series)
asyncMap(items, fn[, options][, done])
items
:<Array>
incoming datasetfn
:<Function>
item
:<any>
index
:<number>
options
:<Object>
map params, optionaldone
:<Function>
call on done, optional
Non-blocking synchronous map
asyncIter(base)
base
:<Iterable>
|<AsyncIterable>
an iterable that is wrapped in<AsyncIterator>
Returns: <AsyncIterator>
Create an AsyncIterator instance
class AsyncIterator
AsyncIterator.prototype.constructor(base)
async AsyncIterator.prototype.next()
async AsyncIterator.prototype.count()
async AsyncIterator.prototype.each(fn, thisArg)
async AsyncIterator.prototype.forEach(fn, thisArg)
async AsyncIterator.prototype.parallel(fn, thisArg)
async AsyncIterator.prototype.every(predicate, thisArg)
async AsyncIterator.prototype.find(predicate, thisArg)
async AsyncIterator.prototype.includes(element)
async AsyncIterator.prototype.reduce(reducer, initialValue)
async AsyncIterator.prototype.some(predicate, thisArg)
async AsyncIterator.prototype.someCount(predicate, count, thisArg)
async AsyncIterator.prototype.collectTo(CollectionClass)
async AsyncIterator.prototype.collectWith(obj, collector)
async AsyncIterator.prototype.join(sep = ', ', prefix = '', suffix = '')
async AsyncIterator.prototype.toArray()
AsyncIterator.prototype.map(mapper, thisArg)
AsyncIterator.prototype.filter(predicate, thisArg)
AsyncIterator.prototype.flat(depth = 1)
AsyncIterator.prototype.flatMap(mapper, thisArg)
AsyncIterator.prototype.zip(...iterators)
AsyncIterator.prototype.chain(...iterators)
AsyncIterator.prototype.take(amount)
AsyncIterator.prototype.takeWhile(predicate, thisArg)
AsyncIterator.prototype.skip(amount)
AsyncIterator.prototype.throttle(percent, min)
AsyncIterator.prototype.enumerate()
collect(expected)
expected
:<number>
|<string[]>
Returns: <Collector>
Create Collector instance
class Collector
Data collector
Collector.prototype.constructor(expected)
expected
:<number>
|<string[]>
count or keys
Data collector
Collector.prototype.collect(key, err, value)
Returns: <this>
Pick or fail key
Collector.prototype.pick(key, value)
key
:<string>
value
:<any>
Returns: <this>
Pick key
Collector.prototype.fail(key, err)
Returns: <this>
Fail key
Collector.prototype.take(key, fn, args)
key
:<string>
fn
:<Function>
args
:<Array>
rest arguments, to be passed in fn
Returns: <this>
Take method result
Collector.prototype.timeout(msec)
msec
:<number>
Returns: <this>
Set timeout
Collector.prototype.done(callback)
callback
:<Function>
err
:<Error>
data
:<any>
Returns: <this>
Set on done listener
Collector.prototype.finalize(key, err, data)
Collector.prototype.distinct(value)
value
:<boolean>
Returns: <this>
Deny or allow unlisted keys
Collector.prototype.cancel(err)
Collector.prototype.then(fulfill, reject)
compose(flow)
flow
:<Function[]>
callback-last / err-first
Returns: <Function>
composed callback-last / err-first
Asynchronous functions composition
Array of functions results in sequential execution: [f1, f2, f3]
Double
brackets array of functions results in parallel execution: [[f1, f2, f3]]
Example:
const composed = metasync([f1, f2, f3, [[f4, f5, [f6, f7], f8]], f9]);
class Composition
Composition.prototype.constructor()
Composition.prototype.on(name, callback)
Composition.prototype.finalize(err)
Composition.prototype.collect(err, result)
Composition.prototype.parallel()
Composition.prototype.sequential()
Composition.prototype.then(fulfill, reject)
Composition.prototype.clone()
Clone composed
Composition.prototype.pause()
Pause execution
Composition.prototype.resume()
Resume execution
Composition.prototype.timeout(msec)
msec
:<number>
Set timeout
Composition.prototype.cancel()
Cancel execution where possible
firstOf(fns, callback)
fns
:<Function[]>
callback-last / err-firstcallback
:<Function>
on done, err-first
Executes all asynchronous functions and pass first result to callback
parallel(fns[, context], callback)
fns
:<Function[]>
callback-last / err-firstcontext
:<Object>
incoming data, optionalcallback
:<Function>
on done, err-first
Parallel execution
Example:
metasync.parallel([f1, f2, f3], (err, data) => {});
sequential(fns[, context], callback)
fns
:<Function[]>
callback-last with err-first contractcontext
:<Object>
incoming data, optionalcallback
:<Function>
err-first on done
Sequential execution
Example:
metasync.sequential([f1, f2, f3], (err, data) => {});
runIf(condition[, defaultVal], asyncFn, ...args)
condition
:<any>
defaultVal
:<any>
optional, value that will be returned to callback ifcondition
is falsy.asyncFn
:<Function>
callback-last function that will be executed ifcondition
if truthyargs
:<any[]>
args to pass toasyncFn
Run asyncFn
if condition
is truthy, else return defaultVal
to callback.
runIfFn(asyncFn, ...args)
asyncFn
:<Function>
callback-last function that will be executed if it is providedargs
:<any[]>
args to pass toasyncFn
Run asyncFn
if it is provided
class do
do.prototype.constructor(fn, ...args)
toAsync(fn)
fn
:<Function>
callback-last / err-first
Returns: <Function>
Convert synchronous function to asynchronous
Transform function with args arguments and callback to function with args as separate values and callback
asAsync(fn, args)
fn
:<Function>
asynchronousargs
:<Array>
its arguments
Wrap function adding async chain methods
of(args)
args
:<Array>
Applicative f => a -> f a
concat(fn1, fn2)
fn1
:<Function>
fn2
:<Function>
Monoid m => a -> a -> a
fmap(fn1, f)
fn1
:<Function>
f
:<Function>
Functor f => (a -> b) -> f a -> f b
ap(fn, funcA)
fn
:<Function>
funcA
:<Function>
Applicative f => f (a -> b) -> f a -> f b
memoize(fn)
fn
:<Function>
sync or async
Returns: <Function>
memoized
Create memoized function
class Memoized
Memoized.prototype.constructor()
Memoized.prototype.clear()
Memoized.prototype.add(key, err, data)
Memoized.prototype.del(key)
Memoized.prototype.get(key, callback)
Memoized.prototype.on(eventName, listener)
eventName
:<string>
listener
:<Function>
handler
Add event listener
Example:
const memoized = new Memoized();
memoized.on('memoize', (err, data) => { ... });
memoized.on('add', (key, err, data) => { ... });
memoized.on('del', (key) => { ... })
memoized.on('clear', () => { ... });
Memoized.prototype.emit(eventName, args)
eventName
:<string>
args
:<any>
rest arguments
Emit Memoized events
poolify(factory, min, norm, max)
queue(concurrency)
concurrency
:<number>
simultaneous and asynchronously executing tasks
Returns: <Queue>
Create Queue instance
class Queue
Queue constructor
Queue.prototype.constructor(concurrency)
concurrency
:<number>
asynchronous concurrency
Queue constructor
Queue.prototype.wait(msec)
msec
:<number>
wait timeout for single item
Returns: <this>
Set wait before processing timeout
Queue.prototype.throttle(count[, interval])
Returns: <this>
Throttle to limit throughput
Queue.prototype.add(item[, factor[, priority]])
item
:<Object>
to be addedfactor
:<number>
|<string>
type, source, destination or path, optionalpriority
:<number>
optional
Returns: <this>
Add item to queue
Queue.prototype.next(task)
task
:<Array>
next task [item, factor, priority]
Returns: <this>
Process next item
Queue.prototype.takeNext()
Returns: <this>
Prepare next item for processing
Queue.prototype.pause()
Returns: <this>
Pause queue
This function is not completely implemented yet
Queue.prototype.resume()
Returns: <this>
Resume queue
This function is not completely implemented yet
Queue.prototype.clear()
Returns: <this>
Clear queue
Queue.prototype.timeout(msec, onTimeout)
msec
:<number>
process timeout for single itemonTimeout
:<Function>
Returns: <this>
Set timeout interval and listener
Queue.prototype.process(fn)
fn
:<Function>
item
:<Object>
callback
:<Function>
Returns: <this>
Set processing function
Queue.prototype.done(fn)
fn
:<Function>
done listener
Returns: <this>
Set listener on processing done
Queue.prototype.success(listener)
listener
:<Function>
on successitem
:<any>
Returns: <this>
Set listener on processing success
Queue.prototype.failure(listener)
listener
:<Function>
on failure
Returns: <this>
Set listener on processing error
Queue.prototype.drain(listener)
listener
:<Function>
on drain
Returns: <this>
Set listener on drain Queue
Queue.prototype.fifo()
Returns: <this>
Switch to FIFO mode (default for Queue)
Queue.prototype.lifo()
Returns: <this>
Switch to LIFO mode
Queue.prototype.priority(flag)
flag
:<boolean>
default: true, false will disable priority mode
Returns: <this>
Activate or deactivate priority mode
Queue.prototype.roundRobin(flag)
flag
:<boolean>
default: true, false will disable roundRobin mode
Returns: <this>
Activate or deactivate round robin mode
Queue.prototype.pipe(dest)
dest
:<Queue>
destination queue
Returns: <this>
Pipe processed items to different queue
throttle(timeout, fn, ...args)
timeout
:<number>
msec intervalfn
:<Function>
to be throttledargs
:<Array>
arguments for fn, optional
Returns: <Function>
Get throttling function, executed once per interval
debounce(timeout, fn, ...args)
timeout
:<number>
msecfn
:<Function>
to be debouncedargs
:<Array>
arguments for fn, optional
Debounce function, delayed execution
timeout(timeout, fn, callback)
timeout
:<number>
time intervalfn
:<Function>
to be executedcallback
:<Function>
callback(...args), on doneargs
:<Array>
Set timeout for asynchronous function execution
Contributors
- Timur Shemsedinov (marcusaurelius)
- See github for full contributors list