• Stars
    star
    224
  • Rank 177,792 (Top 4 %)
  • Language
    JavaScript
  • License
    MIT License
  • Created almost 6 years ago
  • Updated 3 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

An iteration of the Node.js core streams with a series of improvements.

streamx

An iteration of the Node.js core streams with a series of improvements.

npm install streamx

Build Status

Main improvements from Node.js core stream

Proper lifecycle support.

Streams have an _open function that is called before any read/write operation and a _destroy function that is always run as the last part of the stream.

This makes it easy to maintain state.

Easy error handling

Fully integrates a .destroy() function. When called the stream will wait for any pending operation to finish and call the stream destroy logic.

Close is always the last event emitted and destroy is always run.

pipe() error handles

pipe accepts a callback that is called when the pipeline is fully drained. It also error handles the streams provided and destroys both streams if either of them fail.

All streams are both binary and object mode streams

A map function can be provided to map your input data into buffers or other formats. To indicate how much buffer space each data item takes an byteLength function can be provided as well.

This removes the need for two modes of streams.

Simplicity

This is a full rewrite, all contained in one file.

Lots of stream methods are simplified based on how I and devs I work with actually use streams in the wild.

Backwards compat

streamx aims to be compatible with Node.js streams whenever it is reasonable to do so.

This means that streamx streams behave a lot like Node.js streams from the outside but still provides the improvements above.

Smaller browser footprint

streamx has a much smaller footprint when compiled for the browser:

$ for x in stream{,x}; do echo $x: $(browserify -r $x | wc -c) bytes; done
stream: 173844 bytes
streamx: 46943 bytes

With optimizations turned on, the difference is even more stark:

$ for x in stream{,x}; do echo $x: $(browserify -r $x -p tinyify | wc -c) bytes; done
stream: 62649 bytes
streamx: 8460 bytes
$ for x in stream{,x}; do echo $x: $(browserify -r $x -p tinyify | gzip | wc -c) "bytes (gzipped)"; done
stream: 18053 bytes (gzipped)
streamx: 2806 bytes (gzipped)

AbortSignal support

To make it easier to integrate streams in a async/await flow, all streams support a signal option that accepts a AbortSignal to as an alternative means to .destroy streams.

Usage

const { Readable } = require('streamx')

const rs = new Readable({
  read (cb) {
    this.push('Cool data')
    cb(null)
  }
})

rs.on('data', data => console.log('data:', data))

API

This streamx package contains 4 streams similar to Node.js core.

Readable Stream

rs = new stream.Readable([options])

Create a new readable stream.

Options include:

{
  highWaterMark: 16384, // max buffer size in bytes
  map: (data) => data, // optional function to map input data
  byteLength: (data) => size, // optional function that calculates the byte size of input data
  signal: abortController.signal, // optional AbortSignal that triggers `.destroy` when on `abort`
  eagerOpen: false // eagerly open the stream
}

In addition you can pass the open, read, and destroy functions as shorthands in the constructor instead of overwrite the methods below.

The default byteLength function returns the byte length of buffers and 1024 for any other object. This means the buffer will contain around 16 non buffers or buffers worth 16kb when full if the defaults are used.

rs._read(cb)

This function is called when the stream wants you to push new data. Overwrite this and add your own read logic. You should call the callback when you are fully done with the read.

Can also be set using options.read in the constructor.

Note that this function differs from Node.js streams in that it takes the "read finished" callback.

drained = rs.push(data)

Push new data to the stream. Returns true if the buffer is not full and you should push more data if you can.

If you call rs.push(null) you signal to the stream that no more data will be pushed and that you want to end the stream.

data = rs.read()

Read a piece of data from the stream buffer. If the buffer is currently empty null will be returned and you should wait for readable to be emitted before trying again. If the stream has been ended it will also return null.

Note that this method differs from Node.js streams in that it does not accept an optional amounts of bytes to consume.

rs.unshift(data)

Add a piece of data to the front of the buffer. Use this if you read too much data using the rs.read() function.

rs._open(cb)

This function is called once before the first read is issued. Use this function to implement your own open logic.

Can also be set using options.open in the constructor.

rs._destroy(cb)

This function is called just before the stream is fully destroyed. You should use this to implement whatever teardown logic you need. The final part of the stream life cycle is always to call destroy itself so this function will always be called wheather or not the stream ends gracefully or forcefully.

Can also be set using options.destroy in the constructor.

Note that the _destroy might be called without the open function being called in case no read was ever performed on the stream.

rs._predestroy()

A simple hook that is called as soon as the first stream.destroy() call is invoked.

Use this in case you need to cancel pending reads (if possible) instead of waiting for them to finish.

Can also be set using options.predestroy in the constructor.

rs.destroy([error])

Forcefully destroy the stream. Will call _destroy as soon as all pending reads have finished. Once the stream is fully destroyed close will be emitted.

If you pass an error this error will be emitted just before close is, signifying a reason as to why this stream was destroyed.

rs.pause()

Pauses the stream. You will only need to call this if you want to pause a resumed stream.

Returns this stream instance.

rs.resume()

Will start reading data from the stream as fast as possible.

If you do not call this, you need to use the read() method to read data or the pipe() method to pipe the stream somewhere else or the data handler.

If none of these option are used the stream will stay paused.

Returns this stream instance.

bool = Readable.isPaused(rs)

Returns true if the stream is paused, else false.

writableStream = rs.pipe(writableStream, [callback])

Efficently pipe the readable stream to a writable stream (can be Node.js core stream or a stream from this package). If you provide a callback the callback is called when the pipeline has fully finished with an optional error in case it failed.

To cancel the pipeline destroy either of the streams.

rs.on('readable')

Emitted when data is pushed to the stream if the buffer was previously empty.

rs.on('data', data)

Emitted when data is being read from the stream. If you attach a data handler you are implicitly resuming the stream.

rs.on('end')

Emitted when the readable stream has ended and no data is left in it's buffer.

rs.on('close')

Emitted when the readable stream has fully closed (i.e. it's destroy function has completed)

rs.on('error', err)

Emitted if any of the stream operations fail with an error. close is always emitted right after this.

rs.on('piping', dest)

Emitted when the readable stream is pipeing to a destination.

rs.destroyed

Boolean property indicating wheather or not this stream has been destroyed.

bool = Readable.isBackpressured(rs)

Static method to check if a readable stream is currently under backpressure.

stream = Readable.from(arrayOrBufferOrStringOrAsyncIterator)

Static method to turn an array or buffer or string or AsyncIterator into a readable stream.

Writable Stream

ws = new stream.Writable([options])

Create a new writable stream.

Options include:

{
  highWaterMark: 16384, // max buffer size in bytes
  map: (data) => data, // optional function to map input data
  byteLength: (data) => size, // optional function that calculates the byte size of input data
  signal: abortController.signal // optional AbortSignal that triggers `.destroy` when on `abort`
}

In addition you can pass the open, write, final, and destroy functions as shorthands in the constructor instead of overwrite the methods below.

The default byteLength function returns the byte length of buffers and 1024 for any other object. This means the buffer will contain around 16 non buffers or buffers worth 16kb when full if the defaults are used.

ws._open(cb)

This function is called once before the first write is issued. Use this function to implement your own open logic.

Can also be set using options.open in the constructor.

ws._destroy(cb)

This function is called just before the stream is fully destroyed. You should use this to implement whatever teardown logic you need. The final part of the stream life cycle is always to call destroy itself so this function will always be called wheather or not the stream ends gracefully or forcefully.

Can also be set using options.destroy in the constructor.

Note that the _destroy might be called without the open function being called in case no write was ever performed on the stream.

ws._predestroy()

A simple hook that is called as soon as the first stream.destroy() call is invoked.

Use this in case you need to cancel pending writes (if possible) instead of waiting for them to finish.

Can also be set using options.predestroy in the constructor.

ws.destroy([error])

Forcefully destroy the stream. Will call _destroy as soon as all pending reads have finished. Once the stream is fully destroyed close will be emitted.

If you pass an error this error will be emitted just before close is, signifying a reason as to why this stream was destroyed.

drained = ws.write(data)

Write a piece of data to the stream. Returns true if the stream buffer is not full and you should keep writing to it if you can. If false is returned the stream will emit drain once it's buffer is fully drained.

ws._write(data, callback)

This function is called when the stream want to write some data. Use this to implement your own write logic. When done call the callback and the stream will call it again if more data exists in the buffer.

Can also be set using options.write in the constructor.

ws._writev(batch, callback)

Similar to _write but passes an array of all data in the current write buffer instead of the oldest one. Useful if the destination you are writing the data to supports batching.

Can also be set using options.writev in the constructor.

ws.end()

Gracefully end the writable stream. Call this when you no longer want to write to the stream.

Once all writes have been fully drained finish will be emitted.

Returns this stream instance.

ws._final(callback)

This function is called just before finish is emitted, i.e. when all writes have flushed but ws.end() have been called. Use this to implement any logic that should happen after all writes but before finish.

Can also be set using options.final in the constructor.

ws.on('finish')

Emitted when the stream has been ended and all writes have been drained.

ws.on('close')

Emitted when the readable stream has fully closed (i.e. it's destroy function has completed)

ws.on('error', err)

Emitted if any of the stream operations fail with an error. close is always emitted right after this.

ws.on('pipe', src)

Emitted when a readable stream is being piped to the writable one.

ws.destroyed

Boolean property indicating wheather or not this stream has been destroyed.

bool = Writable.isBackpressured(ws)

Static method to check if a writable stream is currently under backpressure.

bool = await Writable.drained(ws)

Static helper to wait for a stream to drain the currently queued writes. Returns true if they were drained and false otherwise if the stream was destroyed.

Duplex Stream

s = new stream.Duplex([options])

A duplex stream is a stream that is both readable and writable.

Since JS does not support multiple inheritance it inherits directly from Readable but implements the Writable API as well.

If you want to provide only a map function for the readable side use mapReadable instead. If you want to provide only a byteLength function for the readable side use byteLengthReadable instead.

Same goes for the writable side but using mapWritable and byteLengthWritable instead.

Transform Stream

A Transform stream is a duplex stream with an ._transform template method that allows to asynchronously map the input to a different output.

The transform stream overrides the _write and _read operations of Readable and Writable but still allows the setting of these options in the constructor. Usually it is unnecessary to pass in read or write/writev or to override the corresponding ._read, ._write or ._writev operation.

ts = new stream.Transform([options])

A transform stream is a duplex stream that maps the data written to it and emits that as readable data.

Has the same options as a duplex stream except you can provide a transform function also.

ts._transform(data, callback)

Transform the incoming data. Call callback(null, mappedData) or use ts.push(mappedData) to return data to the readable side of the stream.

Per default the transform function just remits the incoming data making it act as a pass-through stream.

Pipeline

pipeline allows to stream form a readable through a set of duplex streams to a writable entry.

const { pipeline, Readable, Transform, Writable } = require('streamx')
const lastStream = pipeline(
  Readable.from([1, 2, 3]),
  new Transform({
    transform (from, cb) {
      this.push(from.toString())
      cb()
    }
  }),
  new Writable({
    write (data, cb) {
      console.log(data)
      cb()
    }
  })
  error => {
    // Callback once write has finished
  }
)

lastStream = stream.pipeline(...streams, [done])

Pipe all streams together and return the last stream piped to. When the last stream finishes the pipeline ended succesfully.

If any of the streams error, whether they are Node.js core streams or streamx streams, all streams in the pipeline are shutdown.

Optionally you can pass a done callback to know when the pipeline is done.

promise = stream.pipelinePromise(...streams)

Same as normal pipeline except instead of returning the last stream it returns a promise representing the done callback. Note you should error handle this promise if you use this version.

Utilities

Streamx aims to be minimal and stable. It therefore only contains a minimal set of utilities. To help discover of other modules that help you build streamx apps, we link some useful utilities here

  • stream-composer - Compose streams like Node's stream.compose and the duplexify and pumpify modules.
  • teex - Clone a readable stream into multiple new readable instances.

Contributing

If you want to help contribute to streamx a good way to start is to help writing more test cases, compatibility tests, documentation, or performance benchmarks.

If in doubt open an issue :)

License

MIT

More Repositories

1

peerflix

Streaming torrent client for node.js
JavaScript
6,094
star
2

playback

Video player built using electron and node.js
JavaScript
2,009
star
3

torrent-stream

The low level streaming torrent engine that peerflix uses
JavaScript
1,941
star
4

why-is-node-running

Node is running but you don't know why? why-is-node-running is here to help you.
JavaScript
1,781
star
5

chromecasts

Query your local network for Chromecasts and have them play media
JavaScript
1,447
star
6

csv-parser

Streaming csv parser inspired by binary-csv that aims to be faster than everyone else
JavaScript
1,413
star
7

torrent-mount

Mount a torrent (or magnet link) as a filesystem in real time using torrent-stream and fuse. AKA MAD SCIENCE!
JavaScript
1,333
star
8

turbo-http

Blazing fast low level http server
JavaScript
996
star
9

is-my-json-valid

A JSONSchema validator that uses code generation to be extremely fast
JavaScript
961
star
10

pump

pipe streams together and close all of them if one of them closes
JavaScript
895
star
11

airpaste

A 1-1 network pipe that auto discovers other peers using mdns
JavaScript
819
star
12

hyperdb

Distributed scalable database
JavaScript
752
star
13

protocol-buffers

Protocol Buffers for Node.js
JavaScript
751
star
14

signalhub

Simple signalling server that can be used to coordinate handshaking with webrtc or other fun stuff.
JavaScript
667
star
15

turbo-json-parse

Turbocharged JSON.parse for type stable JSON data
JavaScript
613
star
16

turbo-net

Low level TCP library for Node.js
JavaScript
598
star
17

peercast

torrent-stream + chromecast
JavaScript
509
star
18

hyperbeam

A 1-1 end-to-end encrypted internet pipe powered by Hyperswarm
JavaScript
482
star
19

multicast-dns

Low level multicast-dns implementation in pure javascript
JavaScript
470
star
20

hyperlog

Merkle DAG that replicates based on scuttlebutt logs and causal linking
JavaScript
466
star
21

hypervision

P2P Television
JavaScript
445
star
22

webcat

Mad science p2p pipe across the web using webrtc that uses your Github private/public key for authentication and a signalhub for discovery
JavaScript
437
star
23

tar-stream

tar-stream is a streaming tar parser and generator.
JavaScript
381
star
24

webrtc-swarm

Create a swarm of p2p connections using webrtc and a signalhub
JavaScript
375
star
25

discovery-swarm

A network swarm that uses discovery-channel to find peers
JavaScript
375
star
26

tar-fs

fs bindings for tar-stream
JavaScript
339
star
27

torrent-docker

MAD SCIENCE realtime boot of remote docker images using bittorrent
JavaScript
314
star
28

fuse-bindings

Notice: We published the successor module to this here https://github.com/fuse-friends/fuse-native
C++
312
star
29

peerwiki

all of wikipedia on bittorrent
JavaScript
308
star
30

awesome-p2p

List of great p2p resources
301
star
31

hyperfs

A content-addressable union file system build on top of fuse, hyperlog, leveldb and node
JavaScript
270
star
32

respawn

Spawn a process and restart it if it crashes
JavaScript
254
star
33

pumpify

Combine an array of streams into a single duplex stream using pump and duplexify
JavaScript
252
star
34

polo

Polo is a zero configuration service discovery module written completely in Javascript.
JavaScript
246
star
35

benny-hill

Play the Benny Hill theme while running another command
JavaScript
242
star
36

mp4-stream

Streaming mp4 encoder and decoder
JavaScript
219
star
37

hyperphone

A telephone over Hyperbeam
JavaScript
198
star
38

flat-file-db

Fast in-process flat file database that caches all data in memory
JavaScript
196
star
39

diffy

A tiny framework for building diff based interactive command line tools.
JavaScript
191
star
40

dns-discovery

Discovery peers in a distributed system using regular dns and multicast dns.
JavaScript
190
star
41

duplexify

Turn a writable and readable stream into a streams2 duplex stream with support for async initialization and streams1/streams2 input
JavaScript
185
star
42

browser-server

A HTTP "server" in the browser that uses a service worker to allow you to easily send back your own stream of data.
JavaScript
185
star
43

browserify-fs

fs for the browser using level-filesystem and browserify
JavaScript
184
star
44

ims

Install My Stuff - an opinionated npm module installer
JavaScript
184
star
45

dns-packet

An abstract-encoding compliant module for encoding / decoding DNS packets
JavaScript
181
star
46

jitson

Just-In-Time JSON.parse compiler
JavaScript
178
star
47

dnsjack

A simple DNS proxy that lets you intercept domains and route them to whatever IP you decide.
JavaScript
172
star
48

nanobench

Simple benchmarking tool with TAP-like output that is easy to parse
JavaScript
169
star
49

localcast

A shared event emitter that works across multiple processes on the same machine, including the browser!
JavaScript
165
star
50

level-filesystem

Full implementation of the fs module on top of leveldb
JavaScript
164
star
51

dht-rpc

Make RPC calls over a Kademlia based DHT.
JavaScript
160
star
52

tetris

Play tetris in your terminal - in color
JavaScript
157
star
53

hyperssh

Run SSH over hyperswarm!
JavaScript
146
star
54

end-of-stream

Call a callback when a readable/writable/duplex stream has completed or failed.
JavaScript
145
star
55

flat-tree

A series of functions to map a binary tree to a list
JavaScript
141
star
56

lil-pids

Dead simple process manager with few features
JavaScript
140
star
57

airswarm

Network swarm that automagically discovers other peers on the network using multicast dns
JavaScript
127
star
58

wat2js

Compile WebAssembly .wat files to a common js module
JavaScript
127
star
59

node-modules

Search for node modules
JavaScript
127
star
60

ssh-exec

Execute a script over ssh using Node.JS
JavaScript
126
star
61

add-to-systemd

Small command line tool to simply add a service to systemd
JavaScript
125
star
62

deejay

Music player that broadcasts to everyone on the same network
JavaScript
124
star
63

protocol-buffers-schema

No nonsense protocol buffers schema parser written in Javascript
JavaScript
120
star
64

tree-to-string

Convert a tree structure into a human friendly string
JavaScript
120
star
65

unordered-array-remove

Efficiently remove an element from an unordered array without doing a splice
JavaScript
117
star
66

hyperpipe

Distributed input/output pipe.
JavaScript
116
star
67

abstract-chunk-store

A test suite and interface you can use to implement a chunk based storage backend
JavaScript
113
star
68

shared-structs

Share a struct backed by the same underlying buffer between C and JavaScript
JavaScript
113
star
69

mininet

Spin up and interact with virtual networks using Mininet and Node.js
JavaScript
113
star
70

p2p-workshop

a workshop to learn about p2p
HTML
111
star
71

jsonkv

Single file write-once database that is valid JSON with efficient random access on bigger datasets
JavaScript
109
star
72

ansi-diff-stream

A transform stream that diffs input buffers and outputs the diff as ANSI. If you pipe this to a terminal it will update the output with minimal changes
JavaScript
109
star
73

browser-sync-stream

Rsync between a server and the browser.
JavaScript
108
star
74

docker-registry-server

docker registry server in node.js
JavaScript
108
star
75

dns-socket

Make custom low-level DNS requests from node with retry support.
JavaScript
102
star
76

utp-native

Native bindings for libutp
JavaScript
100
star
77

taco-nginx

Bash script that runs a service and forwards a subdomain to it using nginx when it listens to $PORT
Shell
100
star
78

gunzip-maybe

Transform stream that gunzips its input if it is gzipped and just echoes it if not
JavaScript
98
star
79

merkle-tree-stream

A stream that generates a merkle tree based on the incoming data.
JavaScript
98
star
80

media-recorder-stream

The Media Recorder API in the browser as a readable stream
JavaScript
97
star
81

thunky

Delay the evaluation of a paramless async function and cache the result
JavaScript
97
star
82

peervision

a live p2p streaming protocol
JavaScript
97
star
83

noise-network

Authenticated P2P network backed by Hyperswarm and Noise
JavaScript
96
star
84

soundcloud-to-dat

Download all music from a Soundcloud url and put it into a Dat
JavaScript
96
star
85

blecat

1-1 pipe over bluetooth low energy
JavaScript
95
star
86

debugment

A debug comment -> debugment
JavaScript
93
star
87

hyperdht

A DHT that supports peer discovery and distributed hole punching
JavaScript
93
star
88

docker-browser-console

Forward input/output from docker containers to your browser
JavaScript
90
star
89

srt-to-vtt

Transform stream that converts srt files to vtt files (html5 video subtitles)
JavaScript
89
star
90

speedometer

speed measurement in javascript
JavaScript
88
star
91

mutexify

Bike shed mutex lock implementation
JavaScript
88
star
92

p2p-file-sharing-workshop

A workshop where you learn about distributed file sharing
HTML
88
star
93

mirror-folder

Small module to mirror a folder to another folder. Supports live mode as well.
JavaScript
87
star
94

utp

utp (micro transport protocol) implementation in node
JavaScript
86
star
95

echo-servers.c

A collection of various echo servers in c
C
83
star
96

recursive-watch

Minimal recursive file watcher
JavaScript
82
star
97

docker-browser-server

Spawn and expose docker containers over http and websockets
JavaScript
80
star
98

are-feross-and-mafintosh-stuck-in-an-elevator

Are @feross and @mafintosh stuck in an elevator?
JavaScript
79
star
99

parallel-transform

Transform stream for Node.js that allows you to run your transforms in parallel without changing the order
JavaScript
79
star
100

peer-wire-swarm

swarm implementation for bittorrent
JavaScript
79
star