• This repository has been archived on 28/Nov/2018
  • Stars
    star
    2,189
  • Rank 21,073 (Top 0.5 %)
  • Language
    JavaScript
  • License
    MIT License
  • Created about 13 years ago
  • Updated almost 6 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

EventStream is like functional programming meets IO

EventStream

Streams are node's best and most misunderstood idea, and EventStream is a toolkit to make creating and working with streams easy.

Normally, streams are only used for IO, but in event stream we send all kinds of objects down the pipe. If your application's input and output are streams, shouldn't the throughput be a stream too?

The EventStream functions resemble the array functions, because Streams are like Arrays, but laid out in time, rather than in memory.

All the event-stream functions return instances of Stream.

event-stream creates 0.8 streams, which are compatible with 0.10 streams.

NOTE: I shall use the term "through stream" to refer to a stream that is writable and readable.

NOTE for Gulp users: Merge will not work for gulp 4. merge-stream should be used.

simple example:

//pretty.js

if(!module.parent) {
  var es = require('event-stream')
  var inspect = require('util').inspect

  process.stdin                        //connect streams together with `pipe`
    .pipe(es.split())                  //split stream to break on newlines
    .pipe(es.map(function (data, cb) { //turn this async function into a stream
      cb(null
        , inspect(JSON.parse(data)))   //render it nicely
    }))
    .pipe(process.stdout)              // pipe it to stdout !
}

run it ...

curl -sS registry.npmjs.org/event-stream | node pretty.js

node Stream documentation

through (write?, end?)

Re-emits data synchronously. Easy way to create synchronous through streams. Pass in optional write and end methods. They will be called in the context of the stream. Use this.pause() and this.resume() to manage flow. Check this.paused to see current flow state. (write always returns !this.paused)

this function is the basis for most of the synchronous streams in event-stream.

es.through(function write(data) {
    this.emit('data', data)
    //this.pause() 
  },
  function end () { //optional
    this.emit('end')
  })

map (asyncFunction)

Create a through stream from an asynchronous function.

var es = require('event-stream')

es.map(function (data, callback) {
  //transform data
  // ...
  callback(null, data)
})

Each map MUST call the callback. It may callback with data, with an error or with no arguments,

  • callback() drop this data.
    this makes the map work like filter,
    note:callback(null,null) is not the same, and will emit null

  • callback(null, newData) turn data into newData

  • callback(error) emit an error for this item.

Note: if a callback is not called, map will think that it is still being processed,
every call must be answered or the stream will not know when to end.

Also, if the callback is called more than once, every call but the first will be ignored.

mapSync (syncFunction)

Same as map, but the callback is called synchronously. Based on es.through

flatmapSync (syncFunction)

Map elements nested.

var es = require('event-stream')

es.flatmapSync(function (data) {
  //transform data
  // ...
  return data
})

filterSync (syncFunction)

Filter elements.

var es = require('event-stream')

es.filterSync(function (data) {
  return data > 0
})

split (matcher)

Break up a stream and reassemble it so that each line is a chunk. matcher may be a String, or a RegExp

Example, read every line in a file ...

fs.createReadStream(file, {flags: 'r'})
  .pipe(es.split())
  .pipe(es.map(function (line, cb) {
    //do something with the line 
    cb(null, line)
  }))

split takes the same arguments as string.split except it defaults to '\n' instead of ',', and the optional limit parameter is ignored. String#split

NOTE - Maintaining Line Breaks
If you want to process each line of the stream, transform the data, reassemble, and KEEP the line breaks the example will look like this:

fs.createReadStream(file, {flags: 'r'})
  .pipe(es.split(/(\r?\n)/))
  .pipe(es.map(function (line, cb) {
    //do something with the line 
    cb(null, line)
  }))

This technique is mentioned in the underlying documentation for the split npm package.

join (separator)

Create a through stream that emits separator between each chunk, just like Array#join.

(for legacy reasons, if you pass a callback instead of a string, join is a synonym for es.wait)

merge (stream1,...,streamN) or merge (streamArray)

concat → merge

Merges streams into one and returns it. Incoming data will be emitted as soon it comes into - no ordering will be applied (for example: data1 data1 data2 data1 data2 - where data1 and data2 is data from two streams). Counts how many streams were passed to it and emits end only when all streams emitted end.

es.merge(
  process.stdout,
  process.stderr
).pipe(fs.createWriteStream('output.log'));

It can also take an Array of streams as input like this:

es.merge([
  fs.createReadStream('input1.txt'),
  fs.createReadStream('input2.txt')
]).pipe(fs.createWriteStream('output.log'));

replace (from, to)

Replace all occurrences of from with to. from may be a String or a RegExp.
Works just like string.split(from).join(to), but streaming.

parse

Convenience function for parsing JSON chunks. For newline separated JSON, use with es.split. By default it logs parsing errors by console.error; for another behaviour, transforms created by es.parse({error: true}) will emit error events for exceptions thrown from JSON.parse, unmodified.

fs.createReadStream(filename)
  .pipe(es.split()) //defaults to lines.
  .pipe(es.parse())

stringify

convert javascript objects into lines of text. The text will have whitespace escaped and have a \n appended, so it will be compatible with es.parse

objectStream
  .pipe(es.stringify())
  .pipe(fs.createWriteStream(filename))

readable (asyncFunction)

create a readable stream (that respects pause) from an async function.
while the stream is not paused,
the function will be polled with (count, callback),
and this will be the readable stream.

es.readable(function (count, callback) {
  if(streamHasEnded)
    return this.emit('end')
  
  //...
  
  this.emit('data', data) //use this way to emit multiple chunks per call.
      
  callback() // you MUST always call the callback eventually.
             // the function will not be called again until you do this.
})

you can also pass the data and the error to the callback.
you may only call the callback once.
calling the same callback more than once will have no effect.

readArray (array)

Create a readable stream from an Array.

Just emit each item as a data event, respecting pause and resume.

  var es = require('event-stream')
    , reader = es.readArray([1,2,3])

  reader.pipe(...)

If you want the stream behave like a 0.10 stream you will need to wrap it using Readable.wrap() function. Example:

	var s = new stream.Readable({objectMode: true}).wrap(es.readArray([1,2,3]));

writeArray (callback)

create a writeable stream from a callback,
all data events are stored in an array, which is passed to the callback when the stream ends.

  var es = require('event-stream')
    , reader = es.readArray([1, 2, 3])
    , writer = es.writeArray(function (err, array){
      //array deepEqual [1, 2, 3]
    })

  reader.pipe(writer)

pause ()

A stream that buffers all chunks when paused.

  var ps = es.pause()
  ps.pause() //buffer the stream, also do not allow 'end' 
  ps.resume() //allow chunks through

duplex (writeStream, readStream)

Takes a writable stream and a readable stream and makes them appear as a readable writable stream.

It is assumed that the two streams are connected to each other in some way.

(This is used by pipeline and child.)

  var grep = cp.exec('grep Stream')

  es.duplex(grep.stdin, grep.stdout)

child (child_process)

Create a through stream from a child process ...

  var cp = require('child_process')

  es.child(cp.exec('grep Stream')) // a through stream

wait (callback)

waits for stream to emit 'end'. joins chunks of a stream into a single string or buffer. takes an optional callback, which will be passed the complete string/buffer when it receives the 'end' event.

also, emits a single 'data' event.

readStream.pipe(es.wait(function (err, body) {
  // have complete text here.
}))

Other Stream Modules

These modules are not included as a part of EventStream but may be useful when working with streams.

reduce (syncFunction, initial)

Like Array.prototype.reduce but for streams. Given a sync reduce function and an initial value it will return a through stream that emits a single data event with the reduced value once the input stream ends.

var reduce = require("stream-reduce");
process.stdin.pipe(reduce(function(acc, data) {
  return acc + data.length;
}, 0)).on("data", function(length) {
  console.log("stdin size:", length);
});

More Repositories

1

JSON.sh

a pipeable JSON parser written in Bash
Shell
1,996
star
2

JSONStream

rawStream.pipe(JSONStream.parse()).pipe(streamOfObjects)
JavaScript
1,913
star
3

scuttlebutt

peer-to-peer replicatable data structure
JavaScript
1,310
star
4

rc

The non-configurable configuration loader for lazy people.
JavaScript
995
star
5

crdt

Commutative Replicated Data Types for easy collaborative/distributed systems.
JavaScript
836
star
6

through

simple way to create a ReadableWritable stream that works
JavaScript
667
star
7

your-web-app-is-bloated

measuring memory usage of popular webapps
514
star
8

npmd

JavaScript
450
star
9

split

JavaScript
346
star
10

curry

simple curry module, with nothing *too clever*, and full test coverage
JavaScript
313
star
11

random-name

JavaScript
296
star
12

hashlru

JavaScript
240
star
13

wifi.sh

Shell
216
star
14

level-sublevel

no longer maintained, sorry!
JavaScript
194
star
15

mux-demux

mutiplex-demultiplex multiple streams through a single text Stream
JavaScript
179
star
16

noderify

official fork: https://github.com/staltz/noderify
JavaScript
157
star
17

feedopensource

Iteratively Fund Open Source Projects With Bitcoin
JavaScript
142
star
18

excel-stream

JavaScript
137
star
19

stream-spec

executable specification for Stream (make testing streams easy)
JavaScript
125
star
20

map-stream

JavaScript
122
star
21

map-reduce

async map-reduce functions for nodejs
JavaScript
121
star
22

cyphernet

115
star
23

observable

A Mutable Value represented as a Function.
HTML
111
star
24

stream-combiner

JavaScript
103
star
25

rpc-stream

JavaScript
98
star
26

bench-lru

JavaScript
87
star
27

pull-box-stream

One way streaming encryption based on libsodium's secretbox primitive
JavaScript
84
star
28

level-live-stream

JavaScript
79
star
29

stack-expression

inspired by regular expressions but can do nested structures
JavaScript
76
star
30

hipster

JavaScript
72
star
31

snob

distributed version control system implemented in javascript.
JavaScript
71
star
32

xdiff

diff complex javascript objects
JavaScript
70
star
33

from

Easy way to create a Readable Stream
JavaScript
70
star
34

scalable-secure-scuttlebutt

HTML
68
star
35

explain-error

JavaScript
67
star
36

fsm

Finite State Machines in javascript
JavaScript
66
star
37

r-edit

JavaScript
64
star
38

readme

JavaScript
62
star
39

tiles

JavaScript
61
star
40

indexhtmlify

JavaScript
59
star
41

tacodb

JavaScript
57
star
42

adiff

diff and patch operations on arrays.
JavaScript
57
star
43

map-filter-reduce

JavaScript
57
star
44

browser-stream

open pipable streams to and from the browser, with Socket.io
JavaScript
55
star
45

reconnect

JavaScript
53
star
46

level-replicate

JavaScript
51
star
47

electro

JavaScript
51
star
48

d64

JavaScript
50
star
49

on-change-network

JavaScript
49
star
50

lock

lock asynchronous resources
JavaScript
48
star
51

crypto-bench

HTML
47
star
52

mynosql

JavaScript
44
star
53

monotonic-timestamp

JavaScript
44
star
54

pause-stream

JavaScript
43
star
55

json-select

JavaScript
43
star
56

json-buffer

JavaScript
41
star
57

coherence

JavaScript
41
star
58

bittodo

JavaScript
40
star
59

stream-punks

discussion repo for streams
39
star
60

charwise

JavaScript
39
star
61

proxy-by-url

custom logic for node-http-proxy to proxy based on incoming url
JavaScript
38
star
62

sentimental-versioning

version numbers with meaning
HTML
38
star
63

level-hooks

JavaScript
37
star
64

sodium-browserify

JavaScript
37
star
65

secret-handshake-paper

TeX
36
star
66

browselectrify

create browserify bundle that also works in electron
JavaScript
36
star
67

kv

simple kv store for streams
JavaScript
35
star
68

c2wasm

C++
35
star
69

level-trigger

triggers for levelup
JavaScript
33
star
70

deploy

scripts to setup continuous deployment with git push
Shell
33
star
71

presentations

JavaScript
32
star
72

rumours

Intergration of scuttlebutt family.
JavaScript
32
star
73

web-bootloader

HTML
28
star
74

what-is-scuttlebutt

spec for defining "scuttlebutt" as a living changing protocol
28
star
75

remote-events

connect EventEmitters through Streams.
JavaScript
28
star
76

indexes-of

JavaScript
27
star
77

mpg123

JavaScript
27
star
78

level-master

JavaScript
27
star
79

h

JavaScript
26
star
80

testbed

continuous integration for nodejs
JavaScript
25
star
81

canvas-browserify

HTML
25
star
82

it-is

assertion DSL based on functional idioms.
JavaScript
25
star
83

level-merkle

JavaScript
25
star
84

semver-ftw

Simple Description of SemVer
HTML
25
star
85

level-inverted-index

JavaScript
24
star
86

computer-modern

CSS
24
star
87

hyperaudio

JavaScript
24
star
88

level-search

JavaScript
24
star
89

level-scuttlebutt

leveldb persistence for scuttlebutts (scuttlebutt/crdt/append-only and friends)
JavaScript
24
star
90

level-couch-sync

JavaScript
23
star
91

simple-xlsx

maintained fork is at https://github.com/zeke/simple-xlsx
JavaScript
23
star
92

shasum

JavaScript
23
star
93

content-addressable-store

JavaScript
23
star
94

ticket-auth

JavaScript
22
star
95

ssh-key-to-pem

JavaScript
21
star
96

private-groups-paper

21
star
97

scuttlebucket

JavaScript
21
star
98

looper

JavaScript
20
star
99

deterministic-tar

JavaScript
20
star
100

npm-browserify

JavaScript
20
star