• This repository has been archived on 08/Jan/2020
  • Stars
    star
    266
  • Rank 150,719 (Top 4 %)
  • Language
    Java
  • License
    Apache License 2.0
  • Created almost 10 years ago
  • Updated over 3 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Flume Source to import data from SQL Databases

flume-ng-sql-source

This project is used for flume-ng to communicate with sql databases

Current sql database engines supported

  • After the last update the code has been integrated with hibernate, so all databases supported by this technology should work.

Compilation and packaging

  $ mvn package

Deployment

Copy flume-ng-sql-source-.jar in target folder into flume plugins dir folder

  $ mkdir -p $FLUME_HOME/plugins.d/sql-source/lib $FLUME_HOME/plugins.d/sql-source/libext
  $ cp flume-ng-sql-source-0.8.jar $FLUME_HOME/plugins.d/sql-source/lib

Specific installation by database engine

MySQL

Download the official mysql jdbc driver and copy in libext flume plugins directory:

$ wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
$ tar xzf mysql-connector-java-5.1.35.tar.gz
$ cp mysql-connector-java-5.1.35-bin.jar $FLUME_HOME/plugins.d/sql-source/libext
Microsoft SQLServer

Download the official Microsoft 4.1 Sql Server jdbc driver and copy in libext flume plugins directory:
Download URL: https://www.microsoft.com/es-es/download/details.aspx?id=11774

$ tar xzf sqljdbc_4.1.5605.100_enu.tar.gz
$ cp sqljdbc_4.1/enu/sqljdbc41.jar $FLUME_HOME/plugins.d/sql-source/libext
IBM DB2

Download the official IBM DB2 jdbc driver and copy in libext flume plugins directory: Download URL: http://www-01.ibm.com/support/docview.wss?uid=swg21363866

Configuration of SQL Source:

Mandatory properties in bold

Property Name Default Description
channels - Connected channel names
type - The component type name, needs to be org.keedio.flume.source.SQLSource
hibernate.connection.url - Url to connect with the remote Database
hibernate.connection.user - Username to connect with the database
hibernate.connection.password - Password to connect with the database
table - Table to export data
status.file.name - Local file name to save last row number read
status.file.path /var/lib/flume Path to save the status file
start.from 0 Start value to import data
delimiter.entry , delimiter of incoming entry
enclose.by.quotes true If Quotes are applied to all values in the output.
columns.to.select * Which colums of the table will be selected
run.query.delay 10000 ms to wait between run queries
batch.size 100 Batch size to send events to flume channel
max.rows 10000 Max rows to import per query
read.only false Sets read only session with DDBB
custom.query - Custom query to force a special request to the DB, be carefull. Check below explanation of this property.
hibernate.connection.driver_class - Driver class to use by hibernate, if not specified the framework will auto asign one
hibernate.dialect - Dialect to use by hibernate, if not specified the framework will auto asign one. Check https://docs.jboss.org/hibernate/orm/4.3/manual/en-US/html/ch03.html#configuration-optional-dialects for a complete list of available dialects
hibernate.connection.provider_class - Set to org.hibernate.connection.C3P0ConnectionProvider to use C3P0 connection pool (recommended for production)
hibernate.c3p0.min_size - Min connection pool size
hibernate.c3p0.max_size - Max connection pool size
default.charset.resultset UTF-8 Result set from DB converted to charset character encoding

Standard Query

If no custom query is set, SELECT <columns.to.select> FROM <table> will be executed each run.query.delay milliseconds configured

Custom Query

A custom query is supported to bring the possibility of using the entire SQL language. This is powerful, but risky, be careful with the custom queries used.

To avoid row export repetitions use the $@$ special character in WHERE clause, to incrementaly export not processed rows and the new ones inserted.

IMPORTANT: For proper operation of Custom Query ensure that incremental field will be returned in the first position of the Query result.

Example:

agent.sources.sql-source.custom.query = SELECT incrementalField,field2 FROM table1 WHERE incrementalField > $@$ 

Configuration example

# For each one of the sources, the type is defined
agent.sources.sqlSource.type = org.keedio.flume.source.SQLSource

agent.sources.sqlSource.hibernate.connection.url = jdbc:db2://192.168.56.70:50000/sample

# Hibernate Database connection properties
agent.sources.sqlSource.hibernate.connection.user = db2inst1
agent.sources.sqlSource.hibernate.connection.password = db2inst1
agent.sources.sqlSource.hibernate.connection.autocommit = true
agent.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.DB2Dialect
agent.sources.sqlSource.hibernate.connection.driver_class = com.ibm.db2.jcc.DB2Driver

#agent.sources.sqlSource.table = employee1

# Columns to import to kafka (default * import entire row)
#agent.sources.sqlSource.columns.to.select = *

# Query delay, each configured milisecond the query will be sent
agent.sources.sqlSource.run.query.delay=10000

# Status file is used to save last readed row
agent.sources.sqlSource.status.file.path = /var/log/flume
agent.sources.sqlSource.status.file.name = sqlSource.status

# Custom query
agent.sources.sqlSource.start.from = 19700101000000000000
agent.sources.sqlSource.custom.query = SELECT * FROM (select DECIMAL(test) * 1000000 AS INCREMENTAL, EMPLOYEE1.* from employee1 UNION select DECIMAL(test) * 1000000 AS INCREMENTAL, EMPLOYEE2.* from employee2) WHERE INCREMENTAL > $@$ ORDER BY INCREMENTAL ASC

agent.sources.sqlSource.batch.size = 1000
agent.sources.sqlSource.max.rows = 1000
agent.sources.sqlSource.delimiter.entry = |

agent.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.sqlSource.hibernate.c3p0.min_size=1
agent.sources.sqlSource.hibernate.c3p0.max_size=10

# The channel can be defined as follows.
agent.sources.sqlSource.channels = memoryChannel

Known Issues

An issue with Java SQL Types and Hibernate Types could appear Using SQL Server databases and SQL Server Dialect coming with Hibernate.

Something like:

org.hibernate.MappingException: No Dialect mapping for JDBC type: -15

Use org.keedio.flume.source.SQLServerCustomDialect in flume configuration file to solve this problem.

Special thanks

I used flume-ng-kafka to guide me (https://github.com/baniuyao/flume-ng-kafka-source.git). Thanks to Frank Yao.

Version History

  • Version 1.5.1 added charset encoding for result set is now configurable.
  • Stable version is 1.5.0 (compatible with Apache Flume 1.8.0)
  • Previous stable version is 1.4.3 (compatible with Apache Flume prior to 1.7.0)

More Repositories

1

flume-ftp-source

FTP network server is source of events for Apache-flume
Java
80
star
2

buildoop

Hadoop Ecosystem Builder: Build, package, test and deploy your Hadoop ecosystem project.
Groovy
29
star
3

kafka-hue

Hue application for Apache Kafka
Mako
20
star
4

flume-taildirectory-source

DEPRECATED
Java
17
star
5

avro-schema-repo

Java
11
star
6

openshift-cassandra

Shell
10
star
7

Nifi-JsonProcessor

Sample project development of a custom apache nifi Processor in Scala language
Scala
9
star
8

flume-ng-kafka-avro-sink

Apache Flume sink to produce Avro messages to Apache Kafka
Java
9
star
9

bigdecimal-math

A Java Math.BigDecimal Implementation of Core Mathematical Functions
Java
8
star
10

openstack-log-processor

Scala
7
star
11

bing-speech-to-text-example

Quick-and-dirty usage example of of Bing Voice Recognition RESTful API
Java
7
star
12

nagios-hadoop

This repository contains a collection of nagios plugins to monitor Hadoop ecosystem.
Python
7
star
13

Hadoop-Watcher

Keedio's Hadoop Watcher is a functionality for watching a hdfs path for changes.
Scala
5
star
14

storm-hue

Hue application for Apache Storm
Mako
5
star
15

keedio-stacks

Ambari Stacks made by keedio
Python
4
star
16

hue

Let’s Big Data. Hue is an open source Web interface for analyzing data with Hadoop and Spark. http://gethue.com
Python
4
star
17

flume-enrichment-interceptor-skeleton

Flume interceptor to enrich event body to JSON format with some extra data
Java
4
star
18

buildoopRecipes

Buildoop recipes, for easy package version maintenance
Shell
3
star
19

Flume-ng-source-VFS

Flume-ng-source-VFS is a custom Apache Flume source component for processing files under supported file sytems by Apache-Commons-Vfs2â„¢
Scala
3
star
20

openstack-operations-poc

OpenStack log collection PoC
JavaScript
3
star
21

flume-ng-event-hub-sink

Flume sink to publish in Microsoft Azure Event Hubs
Java
3
star
22

openshift-kafka

Shell
2
star
23

Flume-sink-Gelf

Custom Flume Sink for sending events via GELF protocol.
Java
2
star
24

opensfhit-training

Training repo for OpenShift
JavaScript
2
star
25

VFS2-Monitor

Keedio's VFS2-Monitor is a feature for monitoring changes in directories of supported file systems.
Scala
2
star
26

nifi-azure-blob-connection-service-bundle

Azure Blob Storage shared connection service for Apache NiFi
Java
2
star
27

flume-opsec-source

Java
2
star
28

tailer-example

Java
2
star
29

flume-tcp-sink

A flume sink to send events over a TCP connection
Java
2
star
30

flume-hbase-csv-serializer

A Flume HBase Sink serializer for split CSV events into HBase Columns
Java
2
star
31

air-pollution-data

Scripts to process the Air Pollution Database of the European Environment Agency
R
2
star
32

nodejs_training

Training repo for Node.js
JavaScript
1
star
33

nifi-azure-blob-storage-processor-bundle

Azure Blob Storage NiFi processors
Java
1
star
34

ganglia-storm

Python
1
star
35

flume-cacheable-interceptor-skeleton

Skeleton of a cacheable Flume interceptor.
Java
1
star
36

kafka-json4s-codecs

json4s encoder/decoder for kafka
Java
1
star
37

flume-headers-to-avro-serializer

Serializer for build Avro file using Flume event headers
Java
1
star
38

flume-configurator

Java
1
star
39

AvroRepoKafkaProducerTest

Java
1
star
40

Storm-filterkey-bolt

This bolt operates filtering data from stream of tuples, according a mapped list of crterias specified in Topology's config
Java
1
star
41

tuneable-data-generator

Scala
1
star
42

CsvToElasticSearchInjector

Simple Apache Flink job to inyect data in ElasticSearch via Keedio Stacks
Scala
1
star
43

ambari-admin

Custom ambari-admin style
JavaScript
1
star
44

flume-hbase-offset-serializer

Flume Hbase Serializer to split event body into chunks to send to HBase columns
Java
1
star
45

front-kds-logs

JavaScript
1
star
46

kafka-utilities

Some kafka tools for testing purposes
Java
1
star
47

flume-event-header-fields-copy-interceptor

Java
1
star
48

flume-filedump-interceptor

Scala
1
star
49

flume-stress-test-tools

Some tools to test custom flume sources
Shell
1
star
50

kafka-log4j-appender

Log4j Appender sending log messages to kafka topics
Java
1
star
51

keedio-vagrant

Vagrant based test environment
Ruby
1
star
52

ambari-web

This repository contains required modifications to ambari-web javascripts in order to deploy a keedio stack with ambari
JavaScript
1
star