• Stars
    star
    133
  • Rank 266,763 (Top 6 %)
  • Language
  • Created over 10 years ago
  • Updated over 6 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

The async stream spec

async-stream

The async stream spec.

This is a spec / manual and that's all you need for async streams, you don't need any modules to create a valid stream, not even for convenience.

Semantics

An async stream is simply a promise returning function. If you're familiar with other stream semantics: There's only readable streams, no need for writables and transforms.

  • A readable is a function: const read = readable().
  • A transform is a readable that takes a stream as argument: const read = transform(readable()).
  • A writable is a while loop: let data; while (data = await read()) {}.

It's all pulling and there's simply no need for base classes.

Comparison to node core streams

Reading

// core
a.on('data', console.log)

// async stream
let data
while (data = await a()) console.log(data)

Piping

// core
source.pipe(transform).pipe(destionation)

// async stream
const read = transform(source())
let data
while (data = await read()) {}

Error handling

// core
source
  .on('error', handle)
  .pipe(transform)
  .on('error', handle)
  .pipe(destination)

// async stream
const read = transform(source())
try {
  let data
  while (data = await read()) {}
} catch (err) {
  handle(err)
}

Implementing a source

// core
const { Readable } = require('stream')
class readable extends Readable {
  _read () {
    this.push(String(Date.now()))
  }
}

// async stream
const readable = () => async () => String(Date.now())

Implementing a transform

// core
const { Transform } = require('stream')
class transform extends Transform {
  _transform (chunk, enc, done) {
    done(null, Number(chunk).toString(16))
  }
}

// async stream
const transform = read => async () => Number(await read()).toString(16)

A note on iteration

The cumbersome

let data
while (data = await read()) {}

will become more elegant once the async iteration spec lands in JavaScript engines:

for await (const data of read()) {}

Examples

All following examples are wrapped inside:

(async () => {

  const sleep = dt => new Promise(resolve => setTimeout(resolve, dt))

  // ...

})()

You can run the examples with node >= 7.6:

$ node examples/<name>.js

read

On each invocation the stream should return a String or Buffer of data, or a falsy value when there's nothing more to be read and the stream is done:

// readable stream that emits 3x the current date with 1 second delay
const dates = () => {
  let i = 0
  return async () => {
    if (++i == 3) return // end
    await sleep(1000)
    return String(Date.now())
  }
}

let data
const read = dates()

while (data = await read()) {
  console.log('data: %s', data)
}

console.log('done reading')

Outputs:

$ node examples/read.js
data: 1391519193735
data: 1391519194644
data: 1391519194663
done reading

end

The async function may take an end argument, which when truthy tells the stream to clean up its underlying resources, like tcp connections or file descriptors.

// readable stream with cleanup logic
const dates = () => {
  let i = 0
  const cleanup = () => console.log('cleaning up')

  return async end => {
    if (end || ++i == 3) return cleanup()
    await sleep(1000)
    return String(Date.now())
  }
}

let data
const read = dates()

console.log(`data: ${await read()}`)
console.log(`data: ${await read()}`)
await read(true)
console.log('done reading')

Outputs:

$ node examples/read-end.js
data: 1391519193735
data: 1391519194644
cleaning up
done reading

pipe

"Pipe" streams into each other by letting them read from each other. Here we add a hex stream that converts date strings from decimal to hexadecimal:

const dates = () => {
  let i = 0
  return async end => {
    if (end || ++i == 3) return
    await sleep(1000)
    return String(Date.now())
  }
}

const hex = fn => async end => {
  const str = await fn(end)
  if (!str) return
  return Number(str).toString(16)
}

let data
const read = hex(dates())

while (data = await read()) {
  console.log(`data: ${data}`)
}

console.log('done reading')

Outputs:

$ node examples/pipe.js
data: 143fd169b4d
data: 143fd169f50
done reading

errors

Just throw inside the async function:

const errors = () => async end => {
  throw new Error('not implemented')
}

let data
const read = errors()

while (true) {
  try {
    data = await read()
  } catch (err) {
    console.error('threw')
    break
  }
}

Outputs:

$ node examples/error.js
threw

This means that a transform stream can decide itself if it wants to handle errors from it's source, by wrapping calls to read in a try/catch, or propagate them to the parent.

high water mark / buffering

Some streams - like node's or unix pipes - have the concept of high water marks / buffering, which means that a fast readable will be asked for data even if a slow writable isn't done consuming yet. This has the advantage of being potentially faster and evening out spikes in the streams' throughputs. However, it leads to more memory usage (in node max. 16kb per stream), complicates implementations and can be very unintuitive.

An example where you wouldn't expect that behavior is this:

http.createServer(function(req, res){
  fs.createReadStream('/dev/random').pipe(res);
});

You'd think this would stop reading from the pseudo number generator /dev/random when the request ends, right? Unfortunately that's not the case. Node will read 16kb into an internal buffer first because you might want to later pipe that read stream into another stream and it can than immediately flush that out.

In that case the buffer will be filled up pretty quickly so that's not a huge problem. But imagine your source being slow, with low throughput. For example it could tail logs of an infrequently used system. In this case, with many requests to this http handler, it will keep a great number of streams open.

Currently async-streams have no concept of high water mark / buffering.

Sponsors

This module is proudly supported by my Sponsors!

Do you want to support modules like this to improve their quality, stability and weigh in on new features? Then please consider donating to my Patreon. Not sure how much of my modules you're using? Try feross/thanks!

License

MIT

More Repositories

1

review

Visual regression testing tool for responsive websites
JavaScript
909
star
2

browser-run

Run code inside a browser from the command line
JavaScript
428
star
3

capture-screenshot

Capture screenshots in multiple browsers!
JavaScript
417
star
4

multilevel

Expose a LevelDB over the network
JavaScript
352
star
5

travis-watch

Stream live travis test results of the current commit to your terminal!
JavaScript
293
star
6

multipipe

A better `Stream.pipe` that creates duplex streams and lets you handle errors in one place.
JavaScript
291
star
7

keypair

Generate a RSA PEM key pair from pure JS
JavaScript
274
star
8

tape-run

Headless tape test runner
JavaScript
224
star
9

npm-diff

Diff two versions of a node module
Shell
212
star
10

is-mobile

Check if mobile browser, based on useragent string.
JavaScript
200
star
11

brace-expansion

Brace expansion, as known from sh/bash, in JavaScript
JavaScript
175
star
12

binary-extract

Extract a value from a buffer of json without parsing the whole thing
JavaScript
152
star
13

vipe

Pipe in and out of $EDITOR
Shell
138
star
14

proxy-clone

ES6 Proxies based deep clone
JavaScript
128
star
15

isarray

Array#isArray for older browsers.
JavaScript
127
star
16

reconnect-core

Generic stream reconnection module.
JavaScript
125
star
17

electron-stream

Streaming wrapper around electron
JavaScript
124
star
18

balanced-match

Match balanced character pairs, like `{` and `}`
JavaScript
117
star
19

go-intersect

Find the intersection of two iterable values
Go
110
star
20

just-launch

Launch any browser, on any OS, with a fresh session!
JavaScript
91
star
21

read-file-action

Read file contents
JavaScript
79
star
22

level-store

A streaming storage engine based on LevelDB
JavaScript
75
star
23

require-rebuild

Patch `require()` to recompile a node module if it has been built for a different node version
JavaScript
75
star
24

anarchyos

AnarchyOS
73
star
25

jilla

A Jira client for lazy people
JavaScript
70
star
26

level-fs

node's fs module with leveldb as backend
JavaScript
61
star
27

backer

wip distributed backup / file mirroring tool
JavaScript
60
star
28

parse-gp5

Parser for the Guitar Pro 5 file format
JavaScript
60
star
29

stream

Node.js streams in the browser
HTML
58
star
30

capture-chrome

Capture screenshots using Chrome's new headless mode.
JavaScript
56
star
31

ghub.io

http://ghub.io/<package-name> redirects to a npm package's GitHub page, if available.
JavaScript
54
star
32

memdb

LevelUp + MemDown
JavaScript
51
star
33

approve-pull-request-action

A GitHub Action for approving pull requests.
JavaScript
51
star
34

multilevel-http

Access a leveldb instance from multiple processes via HTTP
JavaScript
50
star
35

level-pathwise

Turn a leveldb into one huge object of arbitrary size! Efficiently and atomically update and read parts of it!
JavaScript
48
star
36

links

Experimental content sharing and collaboration platform.
JavaScript
43
star
37

capture-electron

Capture screenshots using Electron
JavaScript
43
star
38

streamstache

Mustache + Streams for node and browsers.
JavaScript
43
star
39

url-to-screenshot

Capture screenshots using phantomjs
JavaScript
43
star
40

level-secondary

Secondary indexes for leveldb.
JavaScript
41
star
41

co-read

Consume a readable stream generator-style
JavaScript
40
star
42

http-responders

Zero-dependency http responders
JavaScript
39
star
43

go-binary-extract

Extract a value from a json blob without parsing the whole thing
Go
39
star
44

builtins

List of node.js builtin modules
JavaScript
38
star
45

level-schedule

Durable job scheduler based on LevelDB
JavaScript
37
star
46

dat.haus

The composable HTTP API to the dat network
JavaScript
37
star
47

supersize

make text as big as possible in its parent element
HTML
36
star
48

quicktron

Quickly load a script in an Electron window!
JavaScript
35
star
49

review-host

Host multiple reviews in one app server
JavaScript
34
star
50

enstore

In-memory persistence for streams
JavaScript
34
star
51

alrt.io

the code behind alrt.io
JavaScript
33
star
52

deploy-from-npm

Continuous deployment tool tailing npm
JavaScript
33
star
53

node-pv

A node.js implementation of the pv utility
JavaScript
33
star
54

validimir

Create validation functions
JavaScript
33
star
55

intersect

Find the intersection of two arrays
JavaScript
32
star
56

downloads-folder

Get the local downloads folder, for all major platforms
JavaScript
32
star
57

is-type

Type checking from node core.
JavaScript
30
star
58

level-trie

The TRIE data structure and search algorithm, on top of leveldb.
JavaScript
30
star
59

npm-clone

Clone a node module, install its dependencies and run its tests
JavaScript
29
star
60

collect-feedback

Collect realtime anonymous feedback through a website
HTML
27
star
61

merge-pull-request-action

A simple GitHub Action for merging pull requests
JavaScript
27
star
62

ci-watch

Stream live CI results of the current commit to your terminal!
JavaScript
26
star
63

electra

The simplest API for running code inside electron
JavaScript
26
star
64

hypercore-index

Linear asynchronous stateful indexing of a hypercore feed
JavaScript
25
star
65

download-chromium

Download Chromium!
JavaScript
25
star
66

subfs

Create subfilesystems that are rooted at a specific directory.
JavaScript
25
star
67

net-connect

Make tcp connections with a convenient api.
JavaScript
24
star
68

buffer-replace

Like String#replace(), except for buffers
JavaScript
24
star
69

co-level

LevelUp wrappers for "co"
JavaScript
24
star
70

nodeconfeu-13

my live coding
JavaScript
22
star
71

me

Markdown editor.
JavaScript
22
star
72

yo-css

yo-yo helper for inline css
JavaScript
22
star
73

co-wait

setTimeout generator style
JavaScript
22
star
74

find-pull-request-action

A GitHub Action for finding pull requests.
JavaScript
21
star
75

test-npm-dependants

Run the test suites of all modules depending on a given module.
JavaScript
21
star
76

reconnect-net

Reconnect a tcp stream when it goes down.
JavaScript
21
star
77

util-promisify

Node 8's util.promisify, as a node module
JavaScript
20
star
78

bisect-sorted-set

An in memory sorted set that uses binary search on numeric indexes to find values next to each other
JavaScript
20
star
79

native-modules

Report on the native node modules used by your application or module
JavaScript
20
star
80

level-sec

High-level API for creating secondary indexes
JavaScript
19
star
81

module-usage

See how a module is used in npm.
JavaScript
19
star
82

co-queue

A FIFO queue for co
JavaScript
19
star
83

sortable-hash

Hash arrays of numbers into sortable strings with variable precision loss.
JavaScript
19
star
84

json-heal

Heal a cut off json string to make it parseable again
JavaScript
19
star
85

tap-bail

Bail out when the first TAP test fails
JavaScript
18
star
86

role

Develop services as single processes, deploy them as multiple.
JavaScript
18
star
87

deep-access

Access nested object properties via strings.
JavaScript
18
star
88

running-death-overdrive

Listen to Running Death's album Overdrive
JavaScript
17
star
89

rating

Star rating widget
JavaScript
17
star
90

array-filter

Array#filter for older browsers.
JavaScript
17
star
91

nsq-stream

Streaming interface for nsq
JavaScript
16
star
92

tapedeck

[DEPRECATED] Execute tap(e) tests that require browsers...in your browser!
JavaScript
16
star
93

json-store

Simple json db for node
JavaScript
16
star
94

level-fs-browser

level-fs as drop-in fs replacement for the browser
JavaScript
15
star
95

get-user-media

Cross browser navigator.getUserMedia with a node api
JavaScript
15
star
96

npm-dependants

Get dependants of a module on npm.
JavaScript
15
star
97

travis-logs

Stream travis logs to your terminal!
JavaScript
15
star
98

co-each

Parallel forEach for generators
JavaScript
15
star
99

project.sh

Tools to navigate and manage source code repositories living on GitHub.
Shell
14
star
100

git-log

Git log stream using jsgit
JavaScript
14
star