• Stars
    star
    201
  • Rank 194,491 (Top 4 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 9 years ago
  • Updated almost 5 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

RabbitMQ Spark Streaming receiver

[Coverage Status] (https://coveralls.io/github/Stratio/spark-rabbitmq?branch=master)

RabbitMQ Spark Streaming Receiver

RabbitMQ-Receiver is a library that allows the user to read data with Apache Spark Streaming from RabbitMQ.

Requirements

This library requires Spark 2.0+, Scala 2.11+, RabbitMQ 3.5+

Using the library

There are two ways of using RabbitMQ-Receiver library:

The first one is to add the next dependency in your pom.xml:

<dependency>
  <groupId>com.stratio.receiver</groupId>
  <artifactId>spark-rabbitmq</artifactId>
  <version>LATEST</version>
</dependency>

The other one is to clone the full repository and build the project:

git clone https://github.com/Stratio/spark-rabbitmq.git
mvn clean install

This library includes two implementations for consuming messages from RabbitMQ with Spark Streaming:

Build

mvn clean package

Distributed Approach

This advanced consumer has been implemented extending the Spark InputDStream class. With this approach the user can consume message from multiple rabbitMQ clusters or multiple rabbitMQ queues. In addition is possible to parallelize the consumption in one node starting more than one consumer, one for each Spark RDD Partition.

  • One executor with multiple parallelized consumer from one queue

Single-Parallelized

  • One executor or more with multiple consumers from multiple queues

Multiple-Parallelized

  • Cluster consumer

Cluster

The consumption options establish the number of partitions of the RDD generated by the RabbitMQDStream, each Spark node consumes messages and these messages define the data that are included in the partitions. Is not necessary to save the data into the Spark Block Manager. The future transformations and actions use the data stored on each executor.

When the Streaming starts the time window is divided, to consume and to compute, by default the time for consuming messages from rabbitMQ is 0.9 times the Spark Window. It is possible to limit the time in order to have better performance, in the configuration the user can choose the "maxReceiveTime" in milliseconds. In addition is possible to limit the number of consumed messages with the configuration parameter "maxMessagesPerPartition"

This receiver has optimized the RDD functions count and countAprox.

Each executor has one connection pool and are reused on each streaming batch window in order to have better performance. The actual kafka direct approach implemented by Spark does not have one connection pool, this provoke that on each iteration, the RDDs create a new kafka connection.

This consumer has a limitation, the minimum storage level selected for this RabbitMQDStream is MEMORY_ONLY, the user can't select NONE, because on each Spark action the RDD will be re-computed

Scala API

  • String
val receiverStream = RabbitMQUtils.createDistributedStream[String](sparkStreamingContext, params, distributedKeys)
  • Generic user Type
val receiverStream = RabbitMQUtils.createDistributedStream[R](sparkStreamingContext, params, distributedKeys, Array[Byte] => R))

Java API

JavaReceiverInputDStream receiverStream = RabbitMQUtils.createJavaDistributedStream[R](javaSparkStreamingContext, params, JFunction[Array[Byte], R]);

Spark Parameters Options

Parameter Description Optional
maxMessagesPerPartition Maximum number of messages Yes
levelParallelism Num. of partitions by executor Yes (default: 1)
MaxReceiveTime Max time to receive messages Yes (default: 0) (auto)
rememberDuration Remember duration for Spark Dstreams Yes (default: 60s)

Receiver-based Approach

This is a basic consumer, when the Streaming Context starts Spark run one process in one executor for consuming messages from RabbitMQ. This consumer has one singleton consumer instance for consuming messages asynchronously, on each spark window the consumer receives messages and saves the blocks received inside the Spark Block Memory. All the data is replicated to other nodes. The receiver extends one Akka Actor, this makes that the receiver-base approach implementation has the Akka dependency. In future versions of Spark the Akka dependency will be removed.

Scala API

  • String
val receiverStream = RabbitMQUtils.createStream[String](sparkStreamingContext, params)
  • Generic user Type
val receiverStream = RabbitMQUtils.createStream[R](sparkStreamingContext, params, Array[Byte] => R)

Java API

JavaReceiverInputDStream receiverStream = RabbitMQUtils.createJavaStream[R](javaSparkStreamingContext, params, JFunction[Array[Byte], R]);

RabbitMQ Parameters Options

Parameter Description Optional
hosts RabbitMQ hosts Yes (default: localhost)
virtualHost RabbitMQ virtual Host Yes
queueName Queue name Yes
exchangeName Exchange name Yes
exchangeType Exchange type Yes
routingKeys Routing keys comma separated Yes
userName RabbitMQ username Yes
password RabbitMQ password Yes
durable durable Yes (default: true)
exclusive exclusive Yes (default: false)
autoDelete autoDelete Yes (default: false)
ackType basic/auto Yes (default: basic)
fairDispatch fairDispatch Yes (default: false)
prefetchCount prefetchCount Yes (default: 1)
storageLevel Apache Spark storage level Yes (default: MEMORY_ONLY)
x-max-length RabbitMQ queue property Yes
x-message-ttl RabbitMQ queue property Yes
x-expires RabbitMQ queue property Yes
x-max-length-bytes RabbitMQ queue property Yes
x-dead-letter-exchange RabbitMQ queue property Yes
x-dead-letter-routing-key RabbitMQ queue property Yes
x-max-priority RabbitMQ queue property Yes

License

Licensed to STRATIO (C) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The STRATIO (C) licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

More Repositories

1

cassandra-lucene-index

Lucene based secondary indexes for Cassandra
Java
597
star
2

sparta

Real Time Analytics and Data Pipelines based on Spark Streaming
Scala
525
star
3

Decision

Powered by Spark Streaming & Siddhi
Java
315
star
4

Spark-MongoDB

Spark library for easy MongoDB access
Scala
306
star
5

stratio-cassandra

Discontinued in favour of Cassandra Lucene Index
Java
204
star
6

deep-spark

Connecting Apache Spark with different data stores [DEPRECATED]
Java
196
star
7

crossdata

DISCONTINUED - Easy access to big things. Library for Apache Spark extending and improving its capabilities
Scala
169
star
8

ingestion

Flume - Ingestion, an Apache Flume distribution
Java
147
star
9

khermes

A distributed fake data generator based in Akka.
Scala
92
star
10

stratio-connector-mongodb

(DEPRECATED) A crossdata connector to MongoDB
Java
77
star
11

stratio-connector-decision

(DEPRECATED) A connector for stratio streaming
Java
73
star
12

stratio-connector-elasticsearch

(DEPRECATED) noverify
Java
72
star
13

stratio-connector-cassandra

(DEPRECATED) Native connector for Cassandra using Crossdata
Java
72
star
14

stratio-connector-commons

(DEPRECATED) The common module for the stratio connectors
Java
72
star
15

stratio-connector-deep

(DEPRECATED) Deep connector for multiple data sources
Java
70
star
16

stratio-connector-sparkSQL

(DEPRECATED) A crossdata connector to Spark SQL
Scala
67
star
17

stratio-connector-hdfs

(DEPRECATED) HDFS
Scala
66
star
18

crossdata-connector-skeleton

(DEPRECATED) Skeleton project that can be used to implement Crossdata connectors
Java
62
star
19

vagrant-ova-plugin

Vagrant plugin that export a box from vbox to vmwware
Ruby
61
star
20

datasource-receiver

Spark Receiver for SQL or NoSQL Databases like Cassandra, MongoDB, Elasticsearch or JDBC
Scala
42
star
21

egeo-starter

Egeo Starter is a Boilerplate project prepared for work with Egeo 1.x, Angular 2.x, TypeScript, Webpack, Karma, Jasmine and Sass.
TypeScript
40
star
22

kafka-elasticsearch-sink

Java
31
star
23

incubator-toree

Scala
30
star
24

valkiria

Go
29
star
25

rocket-examples

Sparta 2.x examples: workflows, plugins, sdk, docker ...
Scala
16
star