• Stars
    star
    111
  • Rank 314,510 (Top 7 %)
  • Language
    Go
  • License
    MIT License
  • Created about 10 years ago
  • Updated about 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A generic oplog/replication system for microservices

REST Operation Log godoc license build

OpLog is used as a real-time data synchronization layer between a producer and consumers. Basically, it's a generic database replication system for micro-services or an easy way to add a public streaming capability to a public API.

Typical use cases:

  • A component handles an authoritative database and several independent components need to keep locally an up-to-date read-only view of the data (e.g.: search engine indexing, recommendation engines, multi-regions architecture, etc.) or to react to certain changes (i.e.: spam detection, analytics, etc.).
  • Implement a public streaming API to monitor changes to objects within the service's model. With the use of Server Sent Event and filtering, it might be used directly from the browser to monitor changes that occur on objects displayed on the page (à la Meteor) or from a native mobile application to keep an up-to-date offline view of the data.

The agent may run locally on same nodes/containers as the API/application producing updates to the data model. The agent receives notifications of updates (operations) via UDP from the producer application and forwards them to the central oplog's data store (a MongoDb cluster). If the central data store is not available, operations are buffered in memory, waiting for the database cluster to be available again.

Flow

The agent also exposes a Server Sent Event API for consumers to be notified in real time about model changes. Thanks to the SSE protocol, a consumer can recover a connection breakage without loosing any updates by using the Last-Event-ID HTTP header (see [Server Sent Event API] below).

A [full replication] is also supported for freshly spawned consumers that need to have a full view of the data.

Change operations are stored on a central MongoDB server. A tailable cursor on a MongoDB capped collection is used for real-time updates and final states of objects is maintained in a secondary collection for full replication support. The actual data is not stored in the OpLog's data store ; the monitored API continues to serve as the authoritative source of data and as the gatekeeper for it (authen/authz). Only modified object's type and id are stored together with the timestamp of the update and some related "parent" object references, useful for filtering. What you put in type, id and parents is up to the service, and must be meaningful to fetch the actual objects data from their API. An optional reference to the modified object can be provided by the OpLog API if the URL schema is setup (see [Starting the agent] below for more info).

As the it is highly expected that the OpLog may miss some operations from the API, a [Periodical Source Synchronization] mechanism is available

A typical deployment includes an oplogd agent running locally on every node of a cluster serving the API to monitor. The agent serves both roles: 1) ingesting operations coming from the API and 2) streaming aggregated operation of all agents to consumers. The same load balancer use to serve the API can expose the OpLog SSE endpoint:

Architecture

Another deployment choice may be to separate the SSE API from the operations ingestion. This can be very easily done by running a separated cluster of oplogd daemons:

Architecture 2

Install

To install the project, execute the following commands:

go get -u github.com/dailymotion/oplog
go build -a -o /usr/local/bin/oplogd github.com/dailymotion/oplog/cmd/oplogd
go build -a -o /usr/local/bin/oplog-sync github.com/dailymotion/oplog/cmd/oplog-sync
go build -a -o /usr/local/bin/oplog-tail github.com/dailymotion/oplog/cmd/oplog-tail

Starting the agent

To start the agent, run the following command:

oplogd --mongo-url mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/database[?options]

The oplog_ops and oplog_states collections will be created in the specified database. It is advised to dedicate a database to this service in order to not to share locks contention with another services.

Available options:

  • --capped-collection-size=10485760: Size of the created MongoDB capped collection size in bytes (default 10MB).
  • --debug=false: Show debug log messages.
  • --listen=":8042": The address to listen on. Same address is used for both SSE(HTTP) and UDP APIs.
  • --max-queued-events=100000: Number of events to queue before starting throwing up UDP messages.
  • --mongo-url: MongoDB URL to connect to.
  • --object-url: A URL template to reference objects. If this option is set, SSE events will have an "ref" field with the URL to the object. The URL should contain {{type}} and {{id}} variables (i.e.: http://api.mydomain.com/{{type}}/{{id}})
  • --password: Password protecting the global SSE stream.
  • --ingest-password: Password protecting the HTTP ingest endpoint.

Available environment variables:

  • OPLOGD_MONGO_URL: See --mongo-url.
  • OPLOGD_PASSWORD: See --password
  • OPLOGD_INGEST_PASSWORD: See --ingest-password
  • OPLOGD_OBJECT_URL: See --object-url

Producer API: UDP and HTTP

To send operations to the agent you can either send a UDP datagram or a HTTP POST request containing a JSON object.

The default port for both protocol is 8042.

The HTTP request must be a POST on / with application/json as Content-Type.

The format of the JSON object is as follow:

{
    "event": "insert",
    "parents": ["video/xk32jd", "user/xkjdi"],
    "type": "video",
    "id": "xk32jd",
}

The following keys are required:

  • event: The type of event. Can be insert, update or delete.
  • type: The object type (i.e.: video, user, playlist, …)
  • id: The object id of the impacted object as string.

The following keys are optional:

  • parents: The list of parent objects of the modified object. The advised format for items of this list is type/id but any format is acceptable. It is generally a good idea to put a reference to the modified object itself in this list in order to easily let the consumers filter on any updates performed on the object.
  • timestamp: It must contains the date when the object has been updated as RFC 3339 representation. If not provided, the time when the operation has been received by the agent is used instead.

See examples/ directory for implementation examples in different languages.

Consumer API: Server Sent Event

The SSE API runs on the same port as UDP API but using TCP. It means that agents have both input and output roles so it is easy to scale the service by putting an agent on every node of the source API cluster and expose their HTTP port via the same load balancer as the API while each node can send their updates to the UDP port on their localhost.

The W3C SSE protocol is respected by the book. To connect to the API, a GET on / with the Accept: text/event-stream header is performed. If no Last-Event-ID HTTP header is passed, the OpLog server will start sending all future operations with no backlog. On each received operation, the client must store the last associated "event id" as operations are treated. This event id will be used to resume the stream where it has been left in the case of a disconnect. The client just has to send the last consumed "event id" using the Last-Event-ID HTTP header.

It the case that the id defined by Last-Event-ID is no longer available in the underlying oplog_ops capped collection, the agent will automatically fallback to oplog_states by converting the oplog event id into a timestamp.

The following filters can be passed as a query-string:

  • types A list of object types to filter on separated by comas (i.e.: types=video,user).
  • parents A coma separated list of parents to filter on (i.e.: parents=video/xk32jd,user/xkjdi
GET / HTTP/1.1
Accept: text/event-stream

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8

id: 545b55c7f095528dd0f3863c
event: insert
data: {"timestamp":"2014-11-06T03:04:39.041-08:00","parents":["x3kd2"],"type":"video","id":"xekw","ref":"http://api.mydomain.com/video/xekw"}

id: 545b55c8f095528dd0f3863d
event: delete
data: {"timestamp":"2014-11-06T03:04:40.091-08:00","parents":["x3kd2"],"type":"video","id":"xekw","ref":"http://api.mydomain.com/video/xekw"}

…

Full Replication

If required, a full replication with all (not deleted) objects can be performed before streaming live updates. To perform a full replication, pass 0 as value for the Last-Event-ID HTTP header. Numeric event ids with 13 digits or less are considered replication ids, which represent a milliseconds UNIX timestamp. By passing a millisecond timestamp, you are asking to replicate all objects that have been modified passed this date. Passing 0 thus ensures that every object will be replicated.

If a full replication is interrupted during the transfer, the same mechanism as for live updates is used. Once replication is complete, the stream will automatically switch to the live events stream so that the consumer does not miss any updates.

When a full replication starts, a special reset event with no data is sent to inform the consumer that it should reset its database before applying the subsequent operations.

Once the replication is complete and the OpLog switches back to the live updates, a special live event with no data is sent. This event can be useful for a consumer to know when it is safe for the consumer's service to be activated in production for instance.

Periodical Source Synchronization

There is many ways for the OpLog to miss some updates and thus have an incorrect view of the current state of the source data. In order to cope with this issue, a regular synchronization process with the source data content can be performed. The sync is a separate process which compares a dump of the real data with what the OpLog has stored within its own database. For any discrepancies which is anterior to the dump in the OpLog's database, the sync process will generate an appropriate operation in the OpLog to fix the delta on both its own database and for all consumers.

The dump must be in a streamable JSON format. Each line is a JSON object with the same schema as of the data part of the SEE API response. Dump example:

{"timestamp":"2014-11-06T03:04:39.041-08:00", "parents": ["user/xl2d"], "type":"video", "id":"x34cd"}
{"timestamp":"2014-12-24T02:03:05.167+01:00", "parents": ["user/xkwek"], "type":"video", "id":"x12ab"}
{"timestamp":"2014-12-24T01:03:05.167Z", "parents": ["user/xkwek"], "type":"video", "id":"x54cd"}

The timestamp must represent the last modification date of the object as an RFC 3339 representation.

The oplog-sync command is used with this dump in order to perform the sync. This command will connect to the database, do the comparisons and generate the necessary oplog events to fix the deltas. This command does not need an oplogd agent to be running in order to perform its task.

Note that the oplog-sync command is the perfect tool to boostrap an OpLog with an existing API.

BE CAREFUL, any object absent of the dump having a timestamp lower than the most recent timestamp present in the dump will be deleted from the OpLog.

Status Endpoint

The agent exposes a /status endpoint over HTTP to show some statistics about itself. A JSON object is returned with the following fields:

  • events_received: Total number of events received on the UDP interface
  • events_sent: Total number of events sent thru the SSE interface
  • events_ingested: Total number of events ingested into MongoDB with success
  • events_error: Total number of events received on the UDP interface with an invalid format
  • events_discarded: Total number of events discarded because the queue was full
  • queue_size: Current number of events in the ingestion queue
  • queue_max_size: Maximum number of events allowed in the ingestion queue before discarding events
  • clients: Number of clients connected to the SSE API
  • connections: Total number of connections established on the SSE API
GET /status

HTTP/1.1 200 OK
Content-Length: 144
Content-Type: application/json
Date: Thu, 06 Nov 2014 10:40:25 GMT

{
    "clients": 0,
    "connections": 0,
    "events_discarded": 0,
    "events_error": 0,
    "events_ingested": 0,
    "events_received": 0,
    "events_sent": 0,
    "queue_max_size": 100000,
    "queue_size": 0,
    "status": "OK"
}

Consumer

To write a consumer you may use any SSE library and consume the API yourself. If your consumer is written in Go, a dedicated consumer library is available (see github.com/dailymotion/oplogc).

Licenses

All source code is licensed under the MIT License.

More Repositories

1

vast-client-js

VAST (up to 6) parsing library for JavaScript
JavaScript
365
star
2

dailymotion-player-sdk-android

The SDK to play dailymotion videos in your Android apps
Kotlin
83
star
3

asteroid

Asteroid is a tool designed to help you manage your Wireguard server by letting you easily view/add/remove peers.
Go
78
star
4

allure-go

Golang port for Allure test reporting tool
Go
76
star
5

dailymotion-sdk-php

Dailymotion Graph API PHP SDK
PHP
63
star
6

dailymotion-player-sdk-ios

Dailymotion Player SDK for iOS
Objective-C
58
star
7

kinta

🛠️ Automate your mobile workflows in Kotlin 🛠️
Kotlin
47
star
8

dailymotion-sdk-js

Dailymotion JavaScript client API
JavaScript
34
star
9

dailymotion-sdk-python

Dailymotion SDK Python
Python
34
star
10

dailymotion-swift-player-sdk-ios

Dailymotion Player SDK for iOS in Swift
Objective-C
34
star
11

react-collider

Isomorphic React apps done the good way
JavaScript
33
star
12

cloudkey-php

Dailymotion Cloud PHP client API
PHP
29
star
13

ZMScrollableNavigationBar

A UINavigationController category that allows the UINavigationBar to scroll with the content (iOS 8 only)
Objective-C
28
star
14

vmap-js

A VMAP JavaScript Library
JavaScript
25
star
15

chash

Very fast consistent hashing library
C
22
star
16

lua-nginx-guard-jwt

Map JWT claims values to HTTP's request headers
Lua
16
star
17

webpack-multi-output

Webpack loader/plugin for multiple compiling with crazy performances
JavaScript
14
star
18

camcorder

An offline-capable, mobile, web application to record, watch and upload videos to dailymotion.
JavaScript
14
star
19

hands-on-diagnosing-golang-apps

How to diagnose a misbehaving Go application, with different kind of tools, from generic unix tools, to very specific go tools.
Go
13
star
20

picomongo

Python
9
star
21

mod_xgeoip

Apache 2.x module for (very) fast MaxMind geoip database lookups
C
7
star
22

lua-resty-dogstatsd

A client for DogStatsD, an extension of the StatsD metric server for Datadog. Using nginx cosocket API.
Lua
6
star
23

reactjs-meetup-12-2018-demo

Demo repository for the "Get those bottlenecks out of my front yard!" talk
JavaScript
5
star
24

android-cloud-build-sample

This is a simple project to demonstrate how to use Google Cloud Build as an Android Continuous Integration environment
Dockerfile
5
star
25

puzzles

4
star
26

angular-eve-resource

AngularJS Resource Adapter for the Python Eve REST API Framework
JavaScript
4
star
27

hands-on-kubernetes

Learn how to use Kubernetes to deploy applications
4
star
28

oplogc

A Go consumer library for OpLog server
Go
4
star
29

hpcfs

HTTP (Reverse) Proxy Cache Filesystem
C
4
star
30

aa2bk

Java
3
star
31

angular-jed

Angular wrapper for Jed
CoffeeScript
3
star
32

forensiq

Golang library for interacting with Forensiq
Go
2
star
33

vast-client-to-xml

JavaScript
2
star
34

demo-stream-urls-server

Python
2
star
35

player-sdk-android-samples

Sample apps repository for Dailymotion player sdk android
Kotlin
2
star
36

dailymotion-sdk-js-starterkit

sample js app for dailymotion : videos listing
JavaScript
1
star
37

dm-skinning-guidelines

skinning guidelines for the dailymotion site
1
star
38

homebrew-dev

Ruby
1
star
39

player-workshop-api

Base material for a workshop to introduce the player API.
HTML
1
star
40

cloudkey-fixtures

Test fixtures for cloudkey languages bindings unit tests
1
star