• Stars
    star
    154
  • Rank 242,095 (Top 5 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 5 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A library for querying Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and [MLSQL](https://www.mlsql.tech).

Spark Binlog Library

A library for querying Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and MLSQL.

  1. jianshu: How spark-binlog works
  2. medium: How spark-binlog works

Requirements

This library requires Spark 2.4+ (tested). Some older versions of Spark may work too but they are not officially supported.

Linking

You can link against this library in your program at the following coordinates:

Scala 2.11

This is the latest stable versions.

MySQL Binlog:

groupId: tech.mlsql
artifactId: mysql-binlog_2.11
version: 1.0.4

HBase WAL:

groupId: tech.mlsql
artifactId: hbase-wal_2.11
version: 1.0.4

Limitation

  1. mysql-binlog only support insert/update/delete events. The other events will ignore.
  2. hbase-wal only support Put/Delete events. The other events will ignore.

MySQL Binlog Usage

The example should work with delta-plus

MLSQL Code:

set streamName="binlog";

load binlog.`` where 
host="127.0.0.1"
and port="3306"
and userName="xxxxx"
and password="xxxxx"
and databaseNamePattern="mlsql_console"
and tableNamePattern="script_file"
as table1;

save append table1  
as rate.`mysql_{db}.{table}` 
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";

DataFrame Code:

val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Binlog2DeltaTest")
      .getOrCreate()

val df = spark.readStream.
  format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
  option("host","127.0.0.1").
  option("port","3306").
  option("userName","root").
  option("password","123456").
  option("databaseNamePattern","test").
  option("tableNamePattern","mlsql_binlog").
  load()

val query = df.writeStream.
  format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
  option("__path__","/tmp/datahouse/{db}/{table}").
  option("path","{db}/{table}").
  option("mode","Append").
  option("idCols","id").
  option("duration","3").
  option("syncType","binlog").
  option("checkpointLocation", "/tmp/cpl-binlog2").
  outputMode("append")
  .trigger(Trigger.ProcessingTime("3 seconds"))
  .start()

query.awaitTermination()

Before you run the streaming application, make sure you have fully sync the table

MLSQL Code:

connect jdbc where
 url="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
 and driver="com.mysql.jdbc.Driver"
 and user="xxxxx"
 and password="xxxx"
 as db_cool;
 
load jdbc.`db_cool.script_file`  as script_file;

run script_file as TableRepartition.`` where partitionNum="2" and partitionType="range" and partitionCols="id"
as rep_script_file;

save overwrite rep_script_file as delta.`mysql_mlsql_console.script_file` ;

load delta.`mysql_mlsql_console.script_file`  as output;

DataFrame Code:

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
  .master("local[*]")
  .appName("wow")
  .getOrCreate()

val mysqlConf = Map(
  "url" -> "jdbc:mysql://localhost:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false",
  "driver" -> "com.mysql.jdbc.Driver",
  "user" -> "xxxxx",
  "password" -> "xxxx",
  "dbtable" -> "script_file"
)

import org.apache.spark.sql.functions.col
var df = spark.read.format("jdbc").options(mysqlConf).load()
df = df.repartitionByRange(2, col("id") )
df.write
  .format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
  mode("overwrite").
  save("/tmp/datahouse/mlsql_console/script_file")
spark.close()

HBase WAL Usage

DataFrame code:

val spark = SparkSession.builder()
      .master("local[*]")
      .appName("HBase WAL Sync")
      .getOrCreate()

    val df = spark.readStream.
      format("org.apache.spark.sql.mlsql.sources.hbase.MLSQLHBaseWALDataSource").
      option("walLogPath", "/Users/allwefantasy/Softwares/hbase-2.1.8/WALs").
      option("oldWALLogPath", "/Users/allwefantasy/Softwares/hbase-2.1.8/oldWALs").
      option("startTime", "1").
      option("databaseNamePattern", "test").
      option("tableNamePattern", "mlsql_binlog").
      load()

    val query = df.writeStream.
      format("console").
      option("mode", "Append").
      option("truncate", "false").
      option("numRows", "100000").
      option("checkpointLocation", "/tmp/cpl-binlog25").
      outputMode("append")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    query.awaitTermination()

RoadMap

We hope we can support more DBs including traditional DB e.g Oracle and NoSQL e.g. HBase(WAL),ES,Cassandra in future.

How to get the initial offset

You can mannually set binlog offset, For example:

bingLogNamePrefix="mysql-bin"
binlogIndex="4"
binlogFileOffset="4"

Try using command like following to get the offset you want:

mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000014 | 34913156 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.04 sec)

In this example, we knows that:

bingLogNamePrefix      binlogFileOffset   binlogFileOffset
mysql-bin        .     000014             34913156

this means you should configure parameters like this:

bingLogNamePrefix="mysql-bin"
binlogIndex="14"
binlogFileOffset="34913156"

Or you can use mysqlbinlog command.

mysqlbinlog \ 
--start-datetime="2019-06-19 01:00:00" \ 
--stop-datetime="2019-06-20 23:00:00" \ 
--base64-output=decode-rows \
-vv  master-bin.000004

Questions

Q1

People may meet some log like following:

Trying to restore lost connectioin to .....
Connected to ....

Please check the server_id is configured in my.cnf of your MySQL Server.

Q2

When you have started your stream to consume the binlog, but it seem nothong happen or just print :

Batch: N
-------------------------------------------
+-----+
|value|
+-----+
+-----+

Please check spark log:

20/06/18 11:57:00 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "e999af90-8d0a-48e2-b9fc-fcf1e140f622",
  "runId" : "547ce891-468a-43c5-bb62-614b38f60c39",
  "name" : null,
  "timestamp" : "2020-06-18T03:57:00.002Z",
  "batchId" : 1,
  "numInputRows" : 1,
  "inputRowsPerSecond" : 0.4458314757021846,
  "processedRowsPerSecond" : 2.9673590504451037,
  "durationMs" : {
    "addBatch" : 207,
    "getBatch" : 3,
    "getOffset" : 15,
    "queryPlanning" : 10,
    "triggerExecution" : 337,
    "walCommit" : 63
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "MLSQLBinLogSource(ExecutorBinlogServer(192.168.111.14,52612),....",
    "startOffset" : 160000000004104,
    "endOffset" : 170000000000154,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0,
    "processedRowsPerSecond" : 0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@4f82b82f"
  }
}

As we can see, the startOffset/f is changing but the numInputRows is not chagned. Please try a table with a simple schema to make sure the binlog connection works fine.

If the simple schema table works fine, this is may caused by some special sql type. Please address an issue and paste spark log and your target table schema.

You can use code like this to test in your local machine:

package tech.mlsql.test.binlogserver

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.scalatest.FunSuite


object Main{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
          .master("local[*]")
          .appName("MySQL B Sync")
          .getOrCreate()

        val df = spark.readStream.
          format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
          option("host", "127.0.0.1").
          option("port", "3306").
          option("userName", "xxxx").
          option("password", "xxxx").
          option("databaseNamePattern", "wow").
          option("tableNamePattern", "users").
          option("bingLogNamePrefix", "mysql-bin").
          option("binlogIndex", "16").
          option("binlogFileOffset", "3869").
          option("binlog.field.decode.first_name", "UTF-8").
          load()

        // print the binlog(json format)
        val query = df.writeStream.
              format("console").
              option("mode", "Append").
              option("truncate", "false").
              option("numRows", "100000").
              option("checkpointLocation", "/tmp/cpl-mysql6").
              outputMode("append")
              .trigger(Trigger.ProcessingTime("10 seconds"))
              .start()

        query.awaitTermination()
  }
}

More Repositories

1

my-life

算是简历吧....
GCC Machine Description
592
star
2

ServiceFramework

Java MVC framework, agile, fast, rich domain model, made especially for server side of mobile application (一个敏捷,快速,富领域模型的Java MVC 框架,专为 移动应用后端量身定做)
Java
545
star
3

auto-coder

Python
393
star
4

byzer-llm

Easy, fast, and cheap pretrain,finetune, serving for everyone
Python
226
star
5

delta-plus

A library based on delta for Spark and MLSQL
Scala
61
star
6

pyjava

This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python. PyJava introduces Apache Arrow as the exchanging data format.
Python
46
star
7

BYZER-RETRIEVAL

Byzer-retrieval is a distributed retrieval system which designed as a backend for LLM RAG (Retrieval Augmented Generation). The system supports both BM25 retrieval algorithm and vector retrieval algorithm.
Java
42
star
8

sql-code-intelligence

sql code autocomplete
Scala
38
star
9

mammuthus-yarn-docker-scheduler

基于Yarn的容器调度引擎(container scheduler based on yarn)
37
star
10

ServiceframeworkDispatcher

Serviceframework一个简单但灵活的模块引擎
Scala
31
star
11

byzer-agent

Jupyter Notebook
31
star
12

mongomongo

Java ODM framework for MongoDB
Java
25
star
13

mlsql-web

mlsql-web based on vue 提供一个sql编辑器
JavaScript
25
star
14

active_orm

A ORM implementation to active hibernate
Java
23
star
15

mlsql-api-console

Scala
22
star
16

mlsql-web-console

JavaScript
18
star
17

elasticsearch-deep

深入ElasticSearch
17
star
18

mlsql-plugins

Scala
14
star
19

spark-hbase

A HBase datasource implementation for Spark and [MLSQL](http://www.mlsql.tech).
Scala
13
star
20

mammuthus-yarn-client

a project most codes extracting from spark-yarn module make build yarn program more easy
Scala
13
star
21

mlsql

New Repo: https://github.com/byzer-org/kolo-lang
JavaScript
12
star
22

csdn_common

common lib like apache commons
Java
11
star
23

godear

ServiceFramework 示例项目
Java
10
star
24

auto-coder.example

auto-coder.example
TypeScript
10
star
25

spark-adhoc-kafka

This is a datasource implementation for quick query in Kafka with Spark
Scala
9
star
26

stuq-example

课程示例代码
Scala
9
star
27

mlsql-store

Rust
8
star
28

spark-ml-example

课程演示项目
Scala
8
star
29

spark-submitter-console

A web application for submitting spark application
JavaScript
8
star
30

byzer-shell

Rust
6
star
31

streamingpro-editor2

Java
6
star
32

common-utils

common-utils
Java
5
star
33

byzer-data-as-form

Scala
5
star
34

mlsql-jdbc

mlsql jdbc driver
Scala
4
star
35

web-platform

Scala
4
star
36

mlsql-lang-example-project

Jupyter Notebook
3
star
37

ar_runtime_web_console

JavaScript
3
star
38

QuickSand

Extract data from db like SQLServer,MySQL to Hbase,MongoDB,File or standard output supporting thrift ,RESTFul API with WebUI
Java
3
star
39

spark-deep-learning-toy

Python
3
star
40

simple-schema

A library provides a more easy way to describe DataFrame schema for Spark and [MLSQL](http://www.mlsql.tech).
Scala
3
star
41

mlsql-docs

CSS
2
star
42

mlsql-ps-service

A library for starting service in Executor when startup.
Scala
2
star
43

sql-toolkit

A library using Spark/Druid Analyzer to extract table, columns from SQL
Scala
2
star
44

mlsql-example

2
star
45

mlsql-lang-vscode-plugin

TypeScript
2
star
46

mlsql-deploy

Go
1
star
47

user-system

Scala
1
star
48

mlsqlscheduler

mlsqlscheduler
Scala
1
star
49

serviceframework-api-ui

JavaScript
1
star
50

auto-coder.example_01

TypeScript
1
star
51

app_runtime_with_db

Scala
1
star
52

mlsql-native

C++
1
star
53

mlsql-build

Shell
1
star
54

byzer-visualization-example

Byzer 可视化示例
1
star
55

lib-core

Python
1
star
56

byzer-ml-example

1
star
57

baseweb

serviceframework 项目模板
Shell
1
star
58

mammuthus-nginx

提供Rest API操作Nginx 配置文件
Shell
1
star
59

byzer-python-example

Byzer-python example code
1
star
60

mammuthus-dynamic-deploy

program schedule java/docker container based on mammuthus-yarn-client which is a adaptor on yarn
Scala
1
star
61

byzerperf

Python
1
star