• Stars
    star
    137
  • Rank 266,121 (Top 6 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created almost 7 years ago
  • Updated 6 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Kinesis Connector for Structured Streaming

Build Status

NOTE: This project is NO LONGER MAINTAINED.

Ron Cremer has volunteered to maintain this project. Beginning with Spark 3.2, the new project is located here: https://github.com/roncemer/spark-sql-kinesis

Kinesis Connector for Structured Streaming

Implementation of Kinesis Source Provider in Spark Structured Streaming. SPARK-18165 describes the need for such implementation. More details on the implementation can be read in this blog

Downloading and Using the Connector

The connector is available from the Maven Central repository. It can be used using the --packages option or the spark.jars.packages configuration property. Use the following connector artifact

Spark 3.0: com.qubole.spark/spark-sql-kinesis_2.12/1.2.0-spark_3.0
Spark 2.4: com.qubole.spark/spark-sql-kinesis_2.11/1.2.0-spark_2.4

Developer Setup

Checkout kinesis-sql branch depending upon your Spark version. Use Master branch for the latest Spark version

Spark version 3.0.x
git clone [email protected]:qubole/kinesis-sql.git
git checkout master
cd kinesis-sql
mvn install -DskipTests

This will create target/spark-sql-kinesis_2.12-*.jar file which contains the connector code and its dependency jars.

How to use it

Setup Kinesis

Refer Amazon Docs for more options

Create Kinesis Stream
$ aws kinesis create-stream --stream-name test --shard-count 2
Add Records in the stream
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Kinesis'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Connector'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'for'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Apache'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Spark'

Example Streaming Job

Refering $SPARK_HOME to the Spark installation directory.

Open Spark-Shell
$SPARK_HOME/bin/spark-shell --jars target/spark-sql-kinesis_2.11-2.2.0.jar
Subscribe to Kinesis Source
// Subscribe the "test" stream
scala> :paste
val kinesis = spark
	.readStream
	.format("kinesis")
	.option("streamName", "spark-streaming-example")
   	.option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
    .option("awsAccessKeyId", [ACCESS_KEY])
    .option("awsSecretKey", [SECRET_KEY])
    .option("startingposition", "TRIM_HORIZON")
	.load
Check Schema
scala> kinesis.printSchema
root
|-- data: binary (nullable = true)
|-- streamName: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- sequenceNumber: string (nullable = true)
|-- approximateArrivalTimestamp: timestamp (nullable = true)
Word Count
// Cast data into string and group by data column
scala> :paste

	 kinesis
    .selectExpr("CAST(data AS STRING)").as[(String)]
    .groupBy("data").count()
	.writeStream
	.format("console")
    .outputMode("complete") 
	.start()
	.awaitTermination()
Output in Console
+------------+-----+
|        data|count|
+------------+-----+
|         for|    1|
|      Apache|    1|
|       Spark|    1|
|     Kinesis|    1|
|   Connector|    1|
+------------+-----+ 
Using the Kinesis Sink
// Cast data into string and group by data column
    scala> :paste
    kinesis
    .selectExpr("CAST(rand() AS STRING) as partitionKey","CAST(data AS STRING)").as[(String,String)]
    .groupBy("data").count()
    .writeStream
    .format("kinesis")
    .outputMode("update") 
    .option("streamName", "spark-sink-example")
    .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
    .option("awsAccessKeyId", [ACCESS_KEY])
    .option("awsSecretKey", [SECRET_KEY])
    .start()
    .awaitTermination()

Kinesis Source Configuration

Option-Name Default-Value Description
streamName - Name of the stream in Kinesis to read from
endpointUrl https://kinesis.us-east-1.amazonaws.com end-point URL for Kinesis Stream
awsAccessKeyId - AWS Credentials for Kinesis describe, read record operations
awsSecretKey - AWS Credentials for Kinesis describe, read record operations
awsSTSRoleARN - AWS STS Role ARN for Kinesis describe, read record operations
awsSTSSessionName - AWS STS Session name for Kinesis describe, read record operations
awsUseInstanceProfile true Use Instance Profile Credentials if none of credentials provided
startingPosition LATEST Starting Position in Kinesis to fetch data from. Possible values are "latest", "trim_horizon", "earliest" (alias for trim_horizon), or JSON serialized map shardId->KinesisPosition
failondataloss true fail the streaming job if any active shard is missing or expired
kinesis.executor.maxFetchTimeInMs 1000 Maximum time spent in executor to fetch record from Kinesis per Shard
kinesis.executor.maxFetchRecordsPerShard 100000 Maximum Number of records to fetch per shard
kinesis.executor.maxRecordPerRead 10000 Maximum Number of records to fetch per getRecords API call
kinesis.executor.addIdleTimeBetweenReads false Add delay between two consecutive getRecords API call
kinesis.executor.idleTimeBetweenReadsInMs 1000 Minimum delay between two consecutive getRecords
kinesis.client.describeShardInterval 1s (1 second) Minimum Interval between two ListShards API calls to consider resharding
kinesis.client.numRetries 3 Maximum Number of retries for Kinesis API requests
kinesis.client.retryIntervalMs 1000 Cool-off period before retrying Kinesis API
kinesis.client.maxRetryIntervalMs 10000 Max Cool-off period between 2 retries
kinesis.client.avoidEmptyBatches false Avoid creating an empty microbatch job by checking upfront if there are any unread data in the stream before the batch is started

Kinesis Sink Configuration

Option-Name Default-Value Description
streamName - Name of the stream in Kinesis to write to
endpointUrl https://kinesis.us-east-1.amazonaws.com The aws endpoint of the kinesis Stream
awsAccessKeyId - AWS Credentials for Kinesis describe, read record operations
awsSecretKey - AWS Credentials for Kinesis describe, read record
awsSTSRoleARN - AWS STS Role ARN for Kinesis describe, read record operations
awsSTSSessionName - AWS STS Session name for Kinesis describe, read record operations
awsUseInstanceProfile true Use Instance Profile Credentials if none of credentials provided
kinesis.executor.recordMaxBufferedTime 1000 (millis) Specify the maximum buffered time of a record
kinesis.executor.maxConnections 1 Specify the maximum connections to Kinesis
kinesis.executor.aggregationEnabled true Specify if records should be aggregated before sending them to Kinesis
kniesis.executor.flushwaittimemillis 100 Wait time while flushing records to Kinesis on Task End

Roadmap

  • We need to migrate to DataSource V2 APIs for MicroBatchExecution.
  • Maintain Per Micro-Batch Shard Commit state in Dynamo DB

Acknowledgement

This connector would not have been possible without reference implemetation of Kafka connector for Structured streaming, Kinesis Connector for Legacy Streaming and Kinesis Client Library. Structure of some part of the code is influenced by the excellent work done by various Apache Spark Contributors.

More Repositories

1

sparklens

Qubole Sparklens tool for performance tuning Apache Spark
Scala
562
star
2

rubix

Cache File System optimized for columnar formats and object stores
Java
182
star
3

spark-on-lambda

Apache Spark on AWS Lambda
Scala
151
star
4

afctl

afctl helps to manage and deploy Apache Airflow projects faster and smoother.
Python
130
star
5

presto-udfs

Plugin for Presto to allow addition of user functions easily
Java
115
star
6

quark

Quark is a data virtualization engine over analytic databases.
Java
98
star
7

streamx

kafka-connect-s3 : Ingest data from Kafka to Object Stores(s3)
Java
97
star
8

spark-acid

ACID Data Source for Apache Spark based on Hive ACID
Scala
96
star
9

qds-sdk-py

Python SDK for accessing Qubole Data Service
Python
51
star
10

uchit

Python
29
star
11

streaminglens

Qubole Streaminglens tool for tuning Spark Structured Streaming Pipelines
Scala
17
star
12

s3-sqs-connector

A library for reading data from Amzon S3 with optimised listing using Amazon SQS using Spark SQL Streaming ( or Structured streaming).
Scala
17
star
13

spark-state-store

Rocksdb state storage implementation for Structured Streaming.
Scala
16
star
14

presto-kinesis

Presto connector to Amazon Kinesis service.
Java
14
star
15

kinesis-storage-handler

Hive Storage Handler for Kinesis.
Java
11
star
16

qds-sdk-java

A Java library that provides the tools you need to authenticate with, and use the Qubole Data Service API.
Java
7
star
17

demotrends

Code required to setup the demo trends website (http://demotrends.qubole.com)
Ruby
6
star
18

qubole-terraform

HCL
6
star
19

space-ui

UI Ember components based on Space design specs
JavaScript
5
star
20

caching-metastore-client

A metastore client that caches objects
Java
5
star
21

rubix-admin

Admin scripts for Rubix
Python
5
star
22

tco

Python
4
star
23

qds-sdk-R

R extension to execute Hive Commands through Qubole Data Service Python SDK.
Python
4
star
24

docker-images

Qubole Docker Images
Dockerfile
4
star
25

tableau-qubole-connector

JavaScript
3
star
26

metriks-addons

Utilities for collecting metrics in a Rails Application
Ruby
3
star
27

qds-sdk-ruby

Ruby SDK for Qubole API
Ruby
3
star
28

qubole-log-datasets

3
star
29

hubot-qubole

Interaction with Qubole Data Services APIs via Hubot framework
CoffeeScript
3
star
30

customer-success

HCL
2
star
31

bootstrap-functions

Useful functions for Qubole cluster bootstraps
Shell
2
star
32

qubole-jar-test

A maven project to test that qubole jars can be listed as dependencies
Java
2
star
33

etl-examples

Scala
2
star
34

perf-kit-queries

2
star
35

tuning-paper

TeX
2
star
36

blogs

1
star
37

jupyter

1
star
38

presto-event-listeners

1
star
39

qubole-rstudio-example

1
star
40

presto

Presto
Java
1
star
41

qubole.github.io

Qubole OSS Page
1
star
42

quboletsdb

Setup opentsdb using Qubole
Python
1
star