• Stars
    star
    268
  • Rank 153,144 (Top 4 %)
  • Language
    TypeScript
  • License
    ISC License
  • Created almost 8 years ago
  • Updated 3 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A very flexible and simple websocket library for rxjs

rxjs-websockets

build status Known Vulnerabilities Renovate

An rxjs websocket library with a simple and flexible implementation. Supports the browser and node.js.

Comparisons to other rxjs websocket libraries:

  • observable-socket
    • observable-socket provides an input subject for the user, rxjs-websockets allows the user to supply the input stream as a parameter to allow the user to select an observable with semantics appropriate for their own use case (queueing-subject can be used to achieve the same semantics as observable-socket).
    • With observable-socket the WebSocket object must be used and managed by the user, rxjs-websocket manages the WebSocket(s) for the user lazily according to subscriptions to the messages observable.
    • With observable-socket the WebSocket object must be observed using plain old events to detect the connection status, rxjs-websockets presents the connection status through observables.
  • rxjs built-in websocket subject
    • Implemented as a Subject so lacks the flexibility that rxjs-websockets and observable-socket provide.
    • Does not provide any ability to monitor the web socket connection state.

Installation

npm install -S rxjs-websockets
# or
yarn add rxjs-websockets

For rxjs 6 support, rxjs-websockets 8 is needed.

npm install -S rxjs-websockets@8
# or
yarn add rxjs-websockets@8

Changelog

Changelog here

Simple usage

import { QueueingSubject } from 'queueing-subject'
import { Subscription } from 'rxjs'
import { share, switchMap } from 'rxjs/operators'
import makeWebSocketObservable, {
  GetWebSocketResponses,
  // WebSocketPayload = string | ArrayBuffer | Blob
  WebSocketPayload,
  normalClosureMessage,
} from 'rxjs-websockets'

// this subject queues as necessary to ensure every message is delivered
const input$ = new QueueingSubject<string>()

// queue up a request to be sent when the websocket connects
input$.next('some data')

// create the websocket observable, does *not* open the websocket connection
const socket$ = makeWebSocketObservable('ws://localhost/websocket-path')

const messages$: Observable<WebSocketPayload> = socket$.pipe(
  // the observable produces a value once the websocket has been opened
  switchMap((getResponses: GetWebSocketResponses) => {
    console.log('websocket opened')
    return getResponses(input$)
  }),
  share(),
)

const messagesSubscription: Subscription = messages.subscribe({
  next: (message: string) => {
    console.log('received message:', message)
    // respond to server
    input$.next('i got your message')
  },
  error: (error: Error) => {
    const { message } = error
    if (message === normalClosureMessage) {
      console.log('server closed the websocket connection normally')
    } else {
      console.log('socket was disconnected due to error:', message)
    }
  },
  complete: () => {
    // The clean termination only happens in response to the last
    // subscription to the observable being unsubscribed, any
    // other closure is considered an error.
    console.log('the connection was closed in response to the user')
  },
})

function closeWebsocket() {
  // this also caused the websocket connection to be closed
  messagesSubscription.unsubscribe()
}

setTimeout(closeWebsocket, 2000)

The observable returned by makeWebSocketObservable is cold, this means the websocket connection is attempted lazily as subscriptions are made to it. Advanced users of this library will find it important to understand the distinction between hot and cold observables, for most it will be sufficient to use the share operator as shown in the example above. The share operator ensures at most one websocket connection is attempted regardless of the number of subscriptions to the observable while ensuring the socket is closed when the last subscription is unsubscribed. When only one subscription is made the operator has no effect.

By default the websocket supports binary messages so the payload type is string | ArrayBuffer | Blob, when you only need string messages the generic parameter to makeWebSocketObservable can be used:

const socket$ = makeWebSocketObservable<string>('ws://localhost/websocket-path')
const input$ = new QueueingSubject<string>()

const messages$: Observable<string> = socket$.pipe(
  switchMap((getResponses: GetWebSocketResponses<string>) => getResponses(input$)),
  share(),
)

Reconnecting on unexpected connection closures

This can be done with the built-in rxjs operator retryWhen:

import { Subject } from 'rxjs'
import { switchMap, retryWhen } from 'rxjs/operators'
import makeWebSocketObservable from 'rxjs-websockets'

const input$ = new Subject<string>()

const socket$ = makeWebSocketObservable('ws://localhost/websocket-path')

const messages$ = socket$.pipe(
  switchMap((getResponses) => getResponses(input$)),
  retryWhen((errors) => errors.pipe(delay(1000))),
)

Alternate WebSocket implementations

A custom websocket factory function can be supplied that takes a URL and returns an object that is compatible with WebSocket:

import makeWebSocketObservable, { WebSocketOptions } from 'rxjs-websockets'

const options: WebSocketOptions = {
  // this is used to create the websocket compatible object,
  // the default is shown here
  makeWebSocket: (url: string, protocols?: string | string[]) => new WebSocket(url, protocols),

  // optional argument, passed to `makeWebSocket`
  // protocols: '...',
}

const socket$ = makeWebSocketObservable('ws://127.0.0.1:4201/ws', options)

JSON messages and responses

This example shows how to use the map operator to handle JSON encoding of outgoing messages and parsing of responses:

import { Observable } from 'rxjs'
import makeWebSocketObservable, { WebSocketOptions, GetWebSocketResponses } from 'rxjs-websockets'

function makeJsonWebSocketObservable(
  url: string,
  options?: WebSocketOptions,
): Observable<unknown> {
  const socket$ = makeWebSocketObservable<string>(url, options)
  return socket$.pipe(
    map(
      (getResponses: GetWebSocketResponses<string>) => (input$: Observable<object>) =>
        getResponses(input$.pipe(map((request) => JSON.stringify(request)))).pipe(
          map((response) => JSON.parse(response)),
        ),
    ),
  )
}

The function above can be used identically to makeWebSocketObservable only the requests/responses will be transparently encoded/decoded.

More Repositories

1

smell-baron

A tiny init system written in C for docker containers.
C
78
star
2

observable-input

angular input properties as observable streams
TypeScript
48
star
3

angular-promise-extras

Provides $q.allSettled, $q.map, $q.mapSettled and $q.resolve
JavaScript
48
star
4

colemak

Rebindings to vim like applications for colemak
Vim Script
47
star
5

lovely_touching

A simple init system for docker containers to avoid unreaped zombies and ensure processes shut down cleanly. Written in rust. You should probably use smell-baron instead, it's written in C and has a few more useful features.
Rust
20
star
6

rxjs-ratelimiter

rxjs lossless rate limiter
TypeScript
17
star
7

wildcard-mock-link

A mock link for apollo with wildcard matching and better support for mocking subscriptions.
TypeScript
16
star
8

process-pool

A node.js library for splitting work across multiple processes to take advantage of multi-processor/core machines..
JavaScript
16
star
9

queueing-subject

an rxjs subject that queues data when it has no observers
TypeScript
15
star
10

horse-sparkle

A work queue for JavaScript/TypeScript environments capable of avoiding repetition of work on failure and with bounded memory usage and flexible error handling
TypeScript
13
star
11

friendship-blaster

A tool to run a deployment via docker-compose and update the deployment as new containers are published
TypeScript
7
star
12

hubot-scripts

Hubot scripts including mpd control and letting users create commands with alias.
CoffeeScript
7
star
13

naru

A configurable fuzzy selector written in rust that uses sublime's fuzzy scoring algorithm
Rust
4
star
14

nyah

Nyah is an advanced hybrid programming language with strong metaprogramming capabilities.
D
4
star
15

mirror-directories

Mirror multiple source directories to multiple output directories once or with a watch mode via watchman
TypeScript
4
star
16

cc.gamer

A high performance game engine, runs a Box2D physics engine in a Web Worker to take advantage of multiple CPUs. ABANDONED.
CoffeeScript
3
star
17

ts-purify

ensure compiled typescript files are removed when the corresponding source files are
TypeScript
3
star
18

collaborative-playlist

An Angular / node / typescript app that lets multiple users manipulate a shared playlist. Each browser may serve as a speaker playing the current track in sync with all other speakers, a playlist manager, or both.
TypeScript
3
star
19

the-goat

A typescript PEG parsing library and parser generator based on a PEG variant where the parser produces a statically typed AST derived from the input grammar
TypeScript
2
star
20

babies

A python video player and manager using mpv that records viewing history and keeps track of which episode is next within each series.
Python
2
star
21

catfriend

creates notifications (using dbus) when new e-mails arrive
Ruby
2
star
22

cc.loader

An improved and open source/free clone of ImpactJS' module loader system. ABANDONED.
CoffeeScript
2
star
23

react-flip-webanimate

JavaScript
1
star
24

lambics

Reviews of some lambic beers.
1
star
25

damaged-captain

Simple SQL based migration tool for node
TypeScript
1
star
26

pacmoon

A system for recompiling arch binaries. ABANDONED.
1
star
27

dunny

just some algorithmically generated music from our band: aisforapplebisformonkey
Scala
1
star
28

post-rock

A command-line argument parser for d like std.getopt but can also help generate your --help string.
D
1
star
29

kwin-window-switch

A kwin script for switching between windows using key bindings that avoids the limitations of the task switcher.
JavaScript
1
star
30

the-party

Ecmascript6 to Ecmascript 5 transpiler - ABANDONED, 5to6 (now babel) has overtaken this.
JavaScript
1
star
31

graphql-filter-object

filter objects and arrays according to a graphql-tag
TypeScript
1
star
32

catsite

A website for controlling energy sockets via the energenie pimote board and viewing the pi camera. Written using HTML5 Web Components (Polymer) and python bottle.
HTML
1
star
33

d-beard

Small metaprogramming toolkit for D.
D
1
star
34

trek.js

A parser that read the TBPEG grammar syntax that is capable of producing an AST building parser from a single grammar.
JavaScript
1
star
35

tabdrop

A vim script to emulate "tab drop" for non-GUI vim + TabDropHere to drop before currently viewed tab.
Vim Script
1
star
36

usagi

Usagi is a set of zsh utility functions
Shell
1
star
37

pito

Pito is a C++11 framework for generating function interception and sandboxes based on shared library wrappers.
C++
1
star
38

cousin-harris

a wrapper around fb-watchman with a nicer interface
TypeScript
1
star
39

nyu

Nyu is a parser generator based on the TBPEG variant of PEGs where both the parser and AST can be specified in a single grammar.
C++
1
star
40

rb_lovely

Fast sorted set and sorted hash for ruby implemented via bindings to std::set and boost::multi_index_container respectively
Ruby
1
star