• Stars
    star
    268
  • Rank 153,144 (Top 4 %)
  • Language
    TypeScript
  • License
    MIT License
  • Created over 3 years ago
  • Updated almost 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Durable Object Groups

DOG CI

Durable Object Groups

Features

  • Supports Replica workloads using the HTTP and/or WS protocols
  • Creates or reuses a Replica based on configured connection limit
  • Includes Replica-to-Replica (peer-to-peer) communication
  • Ready for strongly-typed, strict TypeScript usage
  • Allows an active connection to:
    • broadcast messages to the entire cluster
    • emit messages to Replica-owned connections
    • send a whisper a single connection within the cluster

Overview

With DOG, it's easy to setup named clusters of related Durable Objects. Each cluster is controlled by a Group, which directs an incoming Request to a specific Replica instance. A Group adheres to the user-defined limit of active connections per Replica and, in doing so, will reuse existing or create new Replica instances as necessary.

DOG includes convenience methods that allow a Replica to directly communicate with another Replica belonging to the same Group โ€“ effectively a peer-to-peer/gossip network. Additionally, when dealing with active client connections, a Replica class allows you to:

  • broadcast a message to all active connections within the entire cluster
  • emit a message only to active connections owned by the Replica itself
  • whisper a message to a single, targeted connection (via your own identification system); even if it's owned by another Replica instance!

Group and Replica are both abstract classes, which means that you're allowed โ€” and required โ€” to extend them with your own application needs. You may define your own class methods, add your own state properties, or use Durable Storage to fulfill your needs.

Please see Usage, the API docs, and the example application for further information!

Install

$ npm install dog

Usage

Refer to the /example for a complete Chat Room application.

import { identify, Group, Replica } from 'dog';

// deployed as `POOL` binding
export class Pool extends Group {
  limit = 50; // each Replica handles 50 connections max

  link(env: Bindings) {
    return {
      child: env.TASK, // receiving Replica
      self: env.POOL, // self-identifier
    };
  }
}

// deployed as `TASK` binding
export class Task extends Replica {
  link(env) {
    return {
      parent: env.POOL, // parent Group
      self: env.TASK, // self-identifier
    };
  }

  async onmessage(socket, data) {
    let message = JSON.parse(data);
    console.log('[task] onmessage', message);

    if (message.type === 'crawl:url') {
      let { url } = message;
      // ...
      let output = { url, done: true };
      // alert everyone that the task is complete
      return socket.broadcast(JSON.stringify(output), true);
    }

    // other events
  }

  receive(req) {
    // Receive & handle the request
    // NOTE: This is the original, forwarded request
    let { pathname } = new URL(req.url);

    // Rely on internal util for WebSocket upgrade
    if (pathname === '/ws') return this.connect(req);

    // Any other custom routing behavior(s)
    if (pathname === '/') return new Response('OK');

    return toError('Unknown path', 404);
  }
}

function toError(msg, status) {
  return new Response(msg, { status });
}

// Module Worker
export default {
  fetch(req, env, ctx) {
    // Accept: /tasks/<taskname>
    let match = /[/]tasks[/]([^/]+)[/]?/.exec(req.url);
    if (match == null) return toError('Missing task name', 404);

    let taskname = match[1].trim();
    if (taskname.length < 1) return toError('Invalid task name', 400);

    // Generate Durable Object ID from taskname
    let group = env.POOL.idFromName(taskname);

    // Custom request identifier logic
    let reqid = req.headers.get('x-request-id');

    // Identify the `Replica` stub to use
    let replica = await identify(group, reqid, {
      parent: env.POOL,
      child: env.TASK,
    });

    // (Optional) Save reqid -> replica.id
    // await KV.put(`req::${reqid}`, replica.id.toString());

    // Send request to the Replica instance
    return replica.fetch(req);
  }
}

API

identify

Note: Refer to the TypeScript definitions for more information.

The utility function to identify a Replica to be used and, if necessary, will create a new Replica if none are available. Returns the Replica stub directly.

Group

Note: Refer to the TypeScript definitions for more information.

Required:

  • limit: number โ€“ the maximum number of active connections a Replica can handle
  • link(env: Bindings): { self, child } โ€“ define the relationships between this Group and its Replica child class

A Group is initial coordinator for the cluster. It receives a user-supplied request identifier, ReqID, and replies with the Durable Object ID for the Replica instance to be used. If the ReqID has been seen before, the Group will attempt to target the same Replica that the ReqID was previously connected to. If the ReqID is unknown, the Group will send the request to the least-utilized Replica instance or generate a new Replica ID to be used.

When targeting an existing Replica instance, the Group verifies that the Replica actually has availability for the request, as determined by the user-supplied limit value. If a new Replica instance needs to be created, the Group's clusterize() method is called to generate a new Replica instance identifier. You may override this method with your own logic โ€“ for example, including a jurisdiction โ€“ but by default, the Group calls newUniqueId() for a system-guaranteed identifier.

The number of active connections within each Replica instance is automatically tracked and shared between the Replica and its Group parent. The Replica's count is decremented when the connection is closed. This means that when a Replica works with WebSockets, open connections continue to reserve Replica quota until closed. Non-upgraded HTTP connections close and decrement the Replica count as soon as a Response is returned.

Important: Do not define your own fetch() method!
Doing so requires that super.fetch() be called appropriately, otherwise the entire cluster's inter-communication will fail.

You may attach any additional state and/or methods to your Group class extension.

Replica

Note: Refer to the TypeScript definitions for more information.

Required:

  • link(env: Bindings): { self, child } โ€“ define the relationships between this Replica and its Group parent class
  • receive(req: Request): Promise<Response> | Response โ€“ a user-supplied method to handle an incoming Request

A Replica is the cluster's terminating node. In other words, it's your workhorse and is where the bulk of your application logic will reside. By default, a Replica actually does nothing and requires your user-supplied code to become useful. It does, however, provide you with utilities, lifecycle hooks, and event listeners to organize and structure your logic.

A Replica can only receive a Request from its parent Group or from its Replica siblings/peers. Because of this, you cannot define a fetch() method in your Replica class extension, otherwise all internal routing and inter-communication will break.

However, this does not mean that you cannot deploy your own external-facing routing solution!

If an incoming request to a Replica is not an internal DOG event, the request is passed to your receive method, which receives the original Request without any modifications. This means that the execution order for a client request looks like this:

client request
โ””โ”€โ”€> dog.identify(...)
      โ”‚   โ”œโ”€โ”€> Group#fetch (internal)
      โ”‚   โ””โ”€โ”€> Group#clusterize (optional)
      โ””โ”€โ”€> Replica
          โ””โ”€โ”€> Replica.fetch (user)
              โ””โ”€โ”€> Replica#receive

Your receive method is the final handler and decides what the Replica actually does.

If you'd like to remain in the HTTP protocol, then you can treat receive() as if it were the underyling fetch() method. Otherwise, to upgrade the HTTP connection into a WebSocket connection, then you may reach for the Replica.connect() method, which handles the upgrade and unlocks the rest of the Replica abstractions.

Internally, a Socket interface is instantiated and passed to WebSocket event listeners that you chose to define. For example, to handle incoming messages or to react to a new connection, your Replica class may including the following:

import { Replica } from 'dog';

export class Counter extends Replica {
  #counts = new Map<string, number>;

  onopen(socket) {
    // via dog.identify
    // ~> your own ReqID
    let reqid = socket.uid;
    this.#counts.set(reqid, 0);

    // notify others ONLY in Replica
    socket.emit(`"${reqid}" has joined`);
  }

  onmessage(socket, data) {
    let reqid = socket.uid;
    let current = this.#counts.get(reqid);

    // data is always a string
    let msg = JSON.parse(data);

    if (msg.type === '+1') count++;
    else if (msg.type === '-1') count--;
    else return; // unknown msg type

    this.#counts.set(reqid, count);

    // tell EVERYONE in cluster about new count
    socket.broadcast(`"${reqid}" now has ${count}`);
  }

  receive(req) {
    // Only accept "/ws" pathname
    let isWS = /^[/]ws[/]?/.test(req.url);
    // Handle upgrade, return 101 status
    if (isWS) return this.connect(req);
    // Otherwise return a 404
    return new Response('Invalid', { status: 404 });
  }
}

The Replica class allows you to optionally define event listeners for the underlying WebSocket events. Whether or not you define onclose and/or onerror listeners, the Replica will always notify the Group parent when the WebSocket connection is closed. The event listeners may be asynchronous and their names follow the browser's WebSocket event names:

  • onopen โ€“ the Replica established a WebSocket connection
  • onmessage โ€“ the Replica received a message from the WebSocket connection
  • onerror โ€“ the WebSocket connection terminated due to an error
  • onclose โ€“ the WebSocket connection was closed

Note: If defined, the onclose listener will be called in the absence of an onerror listener.

Finally, a Replica may communicate directly with its Replica peers in the cluster. This does not rely on WebSockets nor does it require you to use them! It can, however, be leveraged at any point during your HTTP and/or WebSocket handlers.

In DOG, this peer-to-peer communication is called gossip โ€“ because Replicas are typically talking about their connections but without involving the connections; AKA, behind their backs!

In order for a Replica to hear gossip, it must define an ongossip method handler. It will receive a decoded JSON object and must return a new JSON object so that DOG can serialize it and deliver it to sender. In practice, this internal communication is happening over HTTP which means that each Gossip.Message must represent point-in-time information.

Returning to the Counter example, suppose the Counter objects needs to coordinate with one another to determine a leaderboard. Refreshing this leaderboard could be done through a new refresh:leaderboard message, for example:

import { Replica } from 'dog';

export class Counter extends Replica {
  #counts = new Map<string, number>;
  #lastupdate = 0; // timestamp
  #leaders = []; // Array<string, number>[]

  // NOTE: now `async` method
  async onmessage(socket, data) {
    let reqid = socket.uid;
    let current = this.#counts.get(reqid);

    // data is always a string
    let msg = JSON.parse(data);

    // ...

    if (msg.type === 'refresh:leaderboard') {
      // Only gossip if cache is older than 60s
      if (Date.now() - this.#lastupdate > 60e3) {
        // `ongossip` returns Array<[string,number][]>
        let results = await this.gossip({ type: 'ask:scores' });
        let leaders = results.flat(1); // [ [reqid,count], [reqid,count], ... ]

        // sort by highest scores, keep top 10 only
        this.#scores = leaders.sort((a, b) => b[1] - a[1]).slice(0, 10);
        this.#lastupdate = Date.now();
      }

      // Tell EVERYONE in cluster
      return socket.broadcast({
        leaders: this.#scores,
        timestamp: this.#lastupdate,
      });
    }
  }

  ongossip(msg) {
    // Return array of tuples: Array<[string, number]>
    if (msg.type === 'ask:scores') return [...this.#counts];
    throw new Error(`Missing "${msg.type}" handler in ongossip`);
  }

  // ...
}

License

MIT ยฉ Cloudflare

More Repositories

1

pingora

A library for building fast, reliable and evolvable network services.
Rust
20,561
star
2

quiche

๐Ÿฅง Savoury implementation of the QUIC transport protocol and HTTP/3
Rust
9,191
star
3

cfssl

CFSSL: Cloudflare's PKI and TLS toolkit
Go
8,049
star
4

workerd

The JavaScript / Wasm runtime that powers Cloudflare Workers
C++
6,175
star
5

boringtun

Userspace WireGuardยฎ Implementation in Rust
Rust
6,001
star
6

cloudflared

Cloudflare Tunnel client (formerly Argo Tunnel)
Go
5,870
star
7

flan

A pretty sweet vulnerability scanner
Python
3,910
star
8

miniflare

๐Ÿ”ฅ Fully-local simulator for Cloudflare Workers. For the latest version, see https://github.com/cloudflare/workers-sdk/tree/main/packages/miniflare.
TypeScript
3,719
star
9

wrangler-legacy

๐Ÿค  Home to Wrangler v1 (deprecated)
Rust
3,233
star
10

cloudflare-docs

Cloudflareโ€™s documentation
MDX
3,009
star
11

tableflip

Graceful process restarts in Go
Go
2,549
star
12

workers-rs

Write Cloudflare Workers in 100% Rust via WebAssembly
Rust
2,478
star
13

workers-sdk

โ›…๏ธ Home to Wrangler, the CLI for Cloudflare Workersยฎ
TypeScript
2,464
star
14

wildebeest

Wildebeest is an ActivityPub and Mastodon-compatible server
TypeScript
2,042
star
15

gokey

A simple vaultless password manager in Go
Go
1,836
star
16

ebpf_exporter

Prometheus exporter for custom eBPF metrics
C
1,639
star
17

cloudflare-go

The official Go library for the Cloudflare API
Go
1,477
star
18

lol-html

Low output latency streaming HTML parser/rewriter with CSS selector-based API
Rust
1,459
star
19

orange

TypeScript
1,400
star
20

redoctober

Go server for two-man rule style file encryption and decryption.
Go
1,373
star
21

cf-ui

๐Ÿ’Ž Cloudflare UI Framework
JavaScript
1,297
star
22

sslconfig

Cloudflare's Internet facing SSL configuration
1,287
star
23

foundations

Cloudflare's Rust service foundations library.
Rust
1,273
star
24

next-on-pages

CLI to build and develop Next.js apps for Cloudflare Pages
TypeScript
1,184
star
25

hellogopher

Hellogopher: "just clone and make" your conventional Go project
Makefile
1,153
star
26

production-saas

(WIP) Example SaaS application built in public on the Cloudflare stack!
TypeScript
1,114
star
27

bpftools

BPF Tools - packet analyst toolkit
Python
1,087
star
28

cloudflare-blog

Cloudflare Blog code samples
C
1,065
star
29

templates

A collection of starter templates and examples for Cloudflare Workers and Pages
JavaScript
996
star
30

wrangler-action

๐Ÿง™โ€โ™€๏ธ easily deploy cloudflare workers applications using wrangler and github actions
TypeScript
993
star
31

circl

CIRCL: Cloudflare Interoperable Reusable Cryptographic Library
Go
970
star
32

cf-terraforming

A command line utility to facilitate terraforming your existing Cloudflare resources.
Go
966
star
33

wirefilter

An execution engine for Wireshark-like filters
Rust
947
star
34

workers-chat-demo

JavaScript
867
star
35

pint

Prometheus rule linter/validator
Go
827
star
36

utahfs

UtahFS is an encrypted storage system that provides a user-friendly FUSE drive backed by cloud storage.
Go
805
star
37

terraform-provider-cloudflare

Cloudflare Terraform Provider
Go
775
star
38

Stout

A reliable static website deploy tool
Go
749
star
39

goflow

The high-scalability sFlow/NetFlow/IPFIX collector used internally at Cloudflare.
Go
729
star
40

unsee

Alert dashboard for Prometheus Alertmanager
Go
710
star
41

mitmengine

A MITM (monster-in-the-middle) detection tool. Used to build MALCOLM:
Go
690
star
42

workers-graphql-server

๐Ÿ”ฅLightning-fast, globally distributed Apollo GraphQL server, deployed at the edge using Cloudflare Workers
JavaScript
635
star
43

cloudflare-php

PHP library for the Cloudflare v4 API
PHP
616
star
44

react-gateway

Render React DOM into a new context (aka "Portal")
JavaScript
569
star
45

xdpcap

tcpdump like XDP packet capture
Go
567
star
46

ahocorasick

A Golang implementation of the Aho-Corasick string matching algorithm
Go
541
star
47

lua-resty-logger-socket

Raw-socket-based Logger Library for Nginx (based on ngx_lua)
Perl
477
star
48

mmap-sync

Rust library for concurrent data access, using memory-mapped files, zero-copy deserialization, and wait-free synchronization.
Rust
453
star
49

pages-action

JavaScript
450
star
50

speedtest

Component to perform network speed tests against Cloudflare's edge network
JavaScript
435
star
51

stpyv8

Python 3 and JavaScript interoperability. Successor To PyV8 (https://github.com/flier/pyv8)
C++
430
star
52

nginx-google-oauth

Lua module to add Google OAuth to nginx
Lua
425
star
53

worker-typescript-template

ส• โ€ขฬุˆโ€ขฬ€) TypeScript template for Cloudflare Workers
TypeScript
424
star
54

gokeyless

Go implementation of the keyless protocol
Go
420
star
55

golibs

Various small golang libraries
Go
402
star
56

sandbox

Simple Linux seccomp rules without writing any code
C
385
star
57

mmproxy

mmproxy, the magical PROXY protocol gateway
C
370
star
58

svg-hush

Make it safe to serve untrusted SVG files
Rust
368
star
59

boring

BoringSSL bindings for the Rust programming language.
Rust
357
star
60

cobweb

COBOL to WebAssembly compiler
COBOL
353
star
61

rustwasm-worker-template

A template for kick starting a Cloudflare Worker project using workers-rs. Write your Cloudflare Worker entirely in Rust!
Rust
350
star
62

workers-types

TypeScript type definitions for authoring Cloudflare Workers.
TypeScript
350
star
63

lua-resty-cookie

Lua library for HTTP cookie manipulations for OpenResty/ngx_lua
Perl
347
star
64

cloudflare-ingress-controller

A Kubernetes ingress controller for Cloudflare's Argo Tunnels
Go
344
star
65

node-cloudflare

Node.js API for Client API
JavaScript
335
star
66

serverless-registry

A Docker registry backed by Workers and R2.
TypeScript
327
star
67

cfweb3

JavaScript
313
star
68

workerskv.gui

(WIP) A cross-platform Desktop application for exploring Workers KV Namespace data
Svelte
306
star
69

JSON.is

Open-source documentation for common JSON formats.
JavaScript
302
star
70

sqlalchemy-clickhouse

Python
299
star
71

cloudflare.github.io

Cloudflare โค๏ธ Open Source
CSS
298
star
72

doom-wasm

Chocolate Doom WebAssembly port with WebSockets support
C
297
star
73

json-schema-tools

Packages for working with JSON Schema and JSON Hyper-Schema
JavaScript
296
star
74

chatgpt-plugin

Build ChatGPT plugins with Cloudflare's Developer Platform ๐Ÿค–
JavaScript
289
star
75

chanfana

OpenAPI 3 and 3.1 schema generator and validator for Hono, itty-router and more!
TypeScript
284
star
76

tls-tris

crypto/tls, now with 100% more 1.3. THE API IS NOT STABLE AND DOCUMENTATION IS NOT GUARANTEED.
Go
283
star
77

gortr

The RPKI-to-Router server used at Cloudflare
Go
283
star
78

react-modal2

๐Ÿ’ญ Simple modal component for React.
JavaScript
279
star
79

isbgpsafeyet.com

Is BGP safe yet?
HTML
278
star
80

keyless

Cloudflare's Keyless SSL Server Reference Implementation
C
272
star
81

pp-browser-extension

Client for Privacy Pass protocol providing unlinkable cryptographic tokens
TypeScript
268
star
82

tubular

BSD socket API on steroids
C
261
star
83

go

Go with Cloudflare experimental patches
Go
260
star
84

cloudflare-rs

Rust library for the Cloudflare v4 API
Rust
256
star
85

cloudflare-typescript

The official Typescript library for the Cloudflare API
TypeScript
251
star
86

puppeteer

Puppeteer Core fork that works with Cloudflare Browser Workers
TypeScript
247
star
87

shellflip

Graceful process restarts in Rust
Rust
245
star
88

kv-asset-handler

Routes requests to KV assets
TypeScript
244
star
89

mod_cloudflare

C
243
star
90

semver_bash

Semantic Versioning in Bash
Shell
238
star
91

cfssl_trust

CFSSL's CA trust store repository
Go
226
star
92

doca

A CLI tool that scaffolds API documentation based on JSON HyperSchemas.
JavaScript
224
star
93

alertmanager2es

Receives HTTP webhook notifications from AlertManager and inserts them into an Elasticsearch index for searching and analysis
Go
218
star
94

pmtud

Path MTU daemon - broadcast lost ICMP packets on ECMP networks
C
218
star
95

origin-ca-issuer

Go
216
star
96

worker-template-router

JavaScript
216
star
97

Cloudflare-WordPress

A Cloudflare plugin for WordPress
PHP
215
star
98

cloudflare-docs-engine

A documentation engine built on Gatsby, powering Cloudflareโ€™s docs https://github.com/cloudflare/cloudflare-docs
JavaScript
215
star
99

python-worker-hello-world

Python hello world for Cloudflare Workers
JavaScript
209
star
100

saffron

The cron parser powering Cron Triggers on Cloudflare Workers
Rust
207
star