• Stars
    star
    14
  • Rank 1,438,076 (Top 29 %)
  • Language
    Scala
  • Created over 4 years ago
  • Updated 4 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Motivation

Stream in real-time the MySQL database events, such as CRUD and DDL. That may be useful for building Data Warehouse, integration with monolith application on the database level, etc.

Example

MysqlBinlogStream
              .rawEvents[IO](binlogClient)
              .through(streamEvents[IO](transactionState))
              .evalTap(msg => logger.info(s"received $msg"))

Refer to examples for complete snippet

How to build and launch example

  • build docker image
sbt "project mysql-binlog-stream-examples" clean  "docker:publishLocal"
  • run docker compose with MySql and example service
cd mysql-binlog-stream-examples
docker-compose -f docker-compose-mysql-it-mac.yaml up
  • open new console
  • in new console
mysql -u 'root' --host=0.0.0.0 --port=3307 --password=''
  • try to insert new row in SKU or VARIANT table
use test;
insert into sku(id, sku) values(3, '123');
  • in the docker-compose console you should see
example_1  | 02:35:58.734 [ioapp-compute-2] INFO application - received EventMessage(sku,1589596558000,create,8908ecfb63e4-bin.000007,415,true,{
example_1  |   "id" : 3
example_1  | },{
example_1  |   "before" : null,
example_1  |   "after" : {
example_1  |     "id" : 3,
example_1  |     "sku" : "123"
example_1  |   }
example_1  | })

try to update some row and see what happens

Event examples

Create:

{
  "table" : "sku",
  "timestamp" : 1554754028000,
  "action" : "create",
  "fileName" : "acd03b3a873f-bin.000003",
  "offset" : 2177,
  "endOfTransaction" : false,
  "pk" : {
    "id" : 5
  },
  "row" : {
    "before" : null,
    "after" : {
      "id" : 5,
      "sku" : "sku5"
    }
  }
}

Update:

{
  "table" : "sku",
  "timestamp" : 1554754028000,
  "action" : "create",
  "fileName" : "acd03b3a873f-bin.000003",
  "offset" : 2177,
  "endOfTransaction" : false,
  "pk" : {
    "id" : 5
  },
  "row" : {
    "before" : {
      "id" : 5,
      "sku" : "sku5"
    },
    "after" : {
      "id" : 5,
      "sku" : "new sku 5"
    }
  }
}

Delete:

{
  "table" : "sku",
  "timestamp" : 1554754028000,
  "action" : "create",
  "fileName" : "acd03b3a873f-bin.000003",
  "offset" : 2177,
  "endOfTransaction" : false,
  "pk" : {
    "id" : 5
  },
  "row" : {
    "before" : {
      "id" : 5,
      "sku" : "new sku 5"
    },
    "after" : null
  }
}

Truncate:

{
  "table" : "sku",
  "timestamp" : 1554754028000,
  "action" : "truncate",
  "fileName" : "acd03b3a873f-bin.000003",
  "offset" : 2497,
  "endOfTransaction" : true,
  "pk" : null,
  "row" : null
}

Tech Stack

  • Scala
  • FS2 Functional Streams for Scala
  • circe - json streaming encoder/decoder
  • doobie - database integration layer

Support

YourKit Image

This project is supported by YourKit with monitoring and profiling Tools. YourKit supports open source with innovative and intelligent tools for monitoring and profiling Java and .NET applications. YourKit is the creator of YourKit Java Profiler, YourKit .NET Profiler, and YourKit YouMonitor.