• Stars
    star
    206
  • Rank 190,504 (Top 4 %)
  • Language
    Scala
  • Created over 10 years ago
  • Updated about 10 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

This project combines Apache Spark and Elasticsearch to enable mining & prediction for Elasticsearch.

Dr.Krusche & Partner PartG

Integration of Elasticsearch with Spark

This project shows how to easily integrate Apache Spark, a fast and general purpose engine for large-scale data processing, with Elasticsearch, a real-time distributed search and analytics engine.

Spark is an in-memory processing framework and outperforms Hadoop up to a factor of 100. Spark is accompanied by

  • MLlib, a scalable machine learning library,
  • Spark SQL, a unified access platform for structured big data,
  • Spark Streaming, a library to build scalable fault-tolerant streaming applications.

If you are more interested in an Elasticsearch plugin-in that brings the power of Predictiveworks. to Elasticsearch, then please refer to Elasticinsight.

Elasticinsight. Overview

Predictiveworks. is an ensemble of dedicated predictive engines that covers a wide range of today's analytics requirements from Association Analysis, to Context-Aware Recommendations up to Text Analysis. Elasticinsight. empowers Elasticsearch to seamlessly uses these multiple engines.


Machine Learning with Elasticsearch

Besides linguistic and semantic enrichment, for data in a search index there is an increasing demand to apply knowledge discovery and data mining techniques, and even predictive analytics to gain deeper insights into the data and further increase their business value.

One of the key prerequisites is to easily connect existing data sources to state-of-the art machine learning and predictive analytics frameworks.

In this project, we give advice how to connect Elasticsearch, a powerful distributed search engine, to Apache Spark and profit from the increasing number of existing machine learning algorithms.

The figure shows the integration pattern for Elasticsearch and Spark from an architectural persepctive and also indicates how to proceed with the enriched content (i.e. the way back to the search index).

Elasticsearch and Spark

The source code below describes a few lines of Scala, that are sufficient to read from Elasticsearch and provide data for further mining and prediction tasks:

val source = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])
val docs = source.map(hit => {
  new EsDocument(hit._1.toString,toMap(hit._2))
})

Document Segmentation with KMeans

From the data format extracted from Elasticsearch RDD[EsDocument] it is just a few lines of Scala to segment these documents with respect to their geo location (latitude,longitude).

From these data a heatmap can be drawn to visualize from which region of world most of the documents come from. The image below shows a multi-colored heatmap, where the colors red, yellow, green and blue indicate different heat ranges.

Heatmap from Piwik Data

Segmenting documents into specific target groups is not restricted their geo location. Time of the day, product or service categories, total revenue, and other parameters may be used.

For segmentation, the K-Means clustering implementation of MLlib is used:

def cluster(documents:RDD[EsDocument],esConf:Configuration):RDD[(Int,EsDocument)] =  {
  
  val fields = esConf.get("es.fields").split(",")
  val vectors = documents.map(doc => toVector(doc.data,fields))   

  val clusters = esConf.get("es.clusters").toInt
  val iterations = esConf.get("es.iterations").toInt
    
  /* Train model */
  val model = KMeans.train(vectors, clusters, iterations)
  
  /* Apply model */
  documents.map(doc => (model.predict(toVector(doc.data,fields)),doc))
    
}

Clustering Elasticsearch data with K-Means is a first and simple example of how to immediately benefit from the integration with Spark. Other business cases may cover recommendations:

Suppose Elasticsearch is used to index e-commerce transactions on a per user basis, then it is also straightforward to build a recommendation system in just two steps:

  • First, implicit user-item ratings have to be derived from the e-commerce transactions, and
  • Second, from this item similarities are calculated to provide a recommendation model.

For more information, please read here.

Insights from Elasticsearch with SQL

Spark SQL allows relational queries expressed in SQL to be executed using Spark. This enables to apply queries to Spark data structures and also to Spark data streams (see below).

As SQL queries generate Spark data structures, a mixture of SQL and native Spark operations is also possible, thus providing a sophisticated mechanism to compute valuable insight from data in real-time.

The code example below illustrates how to apply SQL queries on a Spark data structure (RDD) and provide further insight by mixing with native Spark operations.

/*
 * Elasticsearch specific configuration
 */
val esConf = new Configuration()                          

esConf.set("es.nodes","localhost")
esConf.set("es.port","9200")
    
esConf.set("es.resource", "enron/mails")                
esConf.set("es.query", "?q=*:*")                          

esConf.set("es.table", "docs")
esConf.set("es.sql", "select subject from docs")

...

/*
 * Read from ES and provide some insight with Spark & SparkSQL,
 * thereby mixing SQL and other Spark operations
 */
val documents = es.documentsAsJson(esConf)
val subjects = es.query(documents, esConf).filter(row => row.getString(0).contains("Re"))    

...

def query(documents:RDD[String], esConfig:Configuration):SchemaRDD =  {

  val query = esConfig.get("es.sql")
  val name  = esConfig.get("es.table")
    
  val table = sqlc.jsonRDD(documents)
  table.registerAsTable(name)

  sqlc.sql(query)   

}

Real-Time Stream Processing and Elasticsearch

Real-time analytics is a very popular topic with a wide range of application areas:

  • High frequency trading (finance),
  • Real-time bidding (adtech),
  • Real-time social activity (social networks),
  • Real-time sensoring (Internet of things),
  • Real-time user behavior,

and more, gain tremendous business value from real-time analytics. There exist a lot of popular frameworks to aggregate data in real-time, such as Apache Storm, Apache S4, Apache Samza, Akka Streams, SQLStream to name just a few.

Spark Streaming, which is capable to process about 400,000 records per node per second for simple aggregations on small records, significantly outperforms other popular streaming systems. This is mainly because Spark Streaming groups messages in small batches which are then processed together.

Moreover in case of failure, Spark Streaming batches are only processed once which greatly simplifies the logic (e.g. to make sure some values are not counted multiple times).

Spark Streaming is a layer on top of Spark and transforms and batches data streams from various sources, such as Kafka, Twitter or ZeroMQ into a sequence of Spark RDDs (Resilient Distributed DataSets) using a sliding window. These RDDs can then be manipulated using normal Spark operations.

This project provides a real-time data integration pattern based on Apache Kafka, Spark Streaming and Elasticsearch:

Apache Kafka is a distributed publish-subscribe messaging system, that may also be seen as a real-time integration system. For example, Web tracking events are easily sent to Kafka, and may then be consumed by a set of different consumers.

In this project, we use Spark Streaming as a consumer and aggregator of e.g. such tracking data streams, and perform a live indexing. As Spark Streaming is also able to directly compute new insights from data streams, this data integration pattern may be used as a starting point for real-time data analytics and enrichment before search indexing.

The figure below illustrates the architecture of this pattern. For completeness reasons, Spray has been introduced. Spray is an open-source toolkit for building REST/HTTP-based integration layers on top of Scala and Akka. As it is asynchronous, actor-based, fast, lightweight, and modular, it is an easy way to connect Scala applications to the Web.

Real-time Data Integration and Analytics

The code example below illustrates that such an integration pattern may be implemented with just a few lines of Scala code:

val stream = KafkaUtils.createStream[String,Message,StringDecoder,MessageDecoder](ssc, kafkaConfig, kafkaTopics, StorageLevel.MEMORY_AND_DISK).map(_._2)
stream.foreachRDD(messageRDD => {
  /**
   * Live indexing of Kafka messages; note, that this is also
   * an appropriate place to integrate further message analysis
   */
  val messages = messageRDD.map(prepare)
  messages.saveAsNewAPIHadoopFile("-",classOf[NullWritable],classOf[MapWritable],classOf[EsOutputFormat],esConfig)    
      
})

Most Frequent Items from Streams

Using the architecture as illustrated above not only enables to apply Spark to data streams. It also opens real-time streams to other data processing libraries such as Algebird from Twitter.

Algebird brings, as the name indicates, algebraic algorithms to streaming data. An important representative is Count-Min Sketch which enables to compute the most frequent items from streams in a certain time window. The code example below describes how to apply the CountMinSketchMonoid (Algebird) to compute the most frequent messages from a Kafka Stream with respect to the messages' classification:


object EsCountMinSktech {
    
  def findTopK(stream:DStream[Message]):Seq[(Long,Long)] = {
  
    val DELTA = 1E-3
    val EPS   = 0.01
    
    val SEED = 1
    val PERC = 0.001
 
    val k = 5
    
    var globalCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC).zero
 
    val clases = stream.map(message => message.clas)
    val approxTopClases = clases.mapPartitions(clases => {
      
      val localCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
      clases.map(clas => localCMS.create(clas))
    
    }).reduce(_ ++ _)

    approxTopClases.foreach(rdd => {
      if (rdd.count() != 0) globalCMS ++= rdd.first()
    })
        
    /**
     * Retrieve approximate TopK classifiers from the provided messages
     */
    val globalTopK = globalCMS.heavyHitters.map(clas => (clas, globalCMS.frequency(clas).estimate))
      /*
       * Retrieve the top k message classifiers: it may also be interesting to 
       * return the classifier frequency from this method, ignoring the line below
       */
      .toSeq.sortBy(_._2).reverse.slice(0, k)
  
    globalTopK
    
  }
}


Technology Stack

More Repositories

1

spark-fm

Reactive Factorization Engine
Scala
105
star
2

customerml

CustomerML is an open source customer science platform leveraging the power of Predictiveworks and fully integrated with Elasticsearch and Shopify. CustomerML starts with proven RFM analysis and combines the results with machine learning thereby providing a deep customer understanding.
Scala
46
star
3

spark-piwik

Beyond Piwik Analytics with Scala and Apache Spark
Scala
45
star
4

spark-arules

This project provides association rule mining for Apache Spark. The algorithms are based on the work of Philippe Fournier-Viger and comprise his TOP-K and TOP-K NR algorithm. Both algorithms do no longer depend on the minimum support parameter and increase the usability of association rule analysis tremendously.
Scala
30
star
5

spark-fsm

This project provides sequential pattern mining for Apache Spark. The algorithms are based on the work of Philippe Fournier-Viger and comprise his SPADE and TSR algorithm. This enables to perform sequential pattern and also sequential rule mining.
Scala
28
star
6

spark-intent

Reactive Intent Recognition Engine
Scala
13
star
7

spark-connect

A subproject of Predictiveworks that provides common access to Cassandra, Elasticsearch, HBase, MongoDB, Parquet, JDBC database and other data sources from Apache Spark.
Scala
13
star
8

spark-outlier

Reactive Outlier Detection Engine
Scala
12
star
9

WSS-Client-for-Android

WSS-Client for Android brings SOAP and Web Service Security to Android Platforms and provides next-generation security for mobile shopping apps and more.
Java
10
star
10

spark-weblog

Implementation of Web Log Analysis in Scala and Apache Spark
Scala
10
star
11

akka-nlp

An integration of the text mining system GATE with Akka
Scala
9
star
12

scala-hmm

Scala implementation of a Hidden Markov Model
Scala
7
star
13

spark-social

Reactive Social Trending
Scala
6
star
14

spark-recom

A Recommender System based on Predictiveworks and Apache Spark
Scala
5
star
15

spark-cluster

Reactive Similarity Analysis Engine
Scala
4
star
16

scala-text

Reactive Text Analysis Engine
Scala
4
star
17

spark-decision

Reactive Decision Analysis Engine
Scala
4
star
18

spark-rest

Predictiveworks. Open Ensemble of Predictive Engines
Scala
4
star
19

spark-core

Scala
4
star
20

mahout-samples

This repository provides a collection of Mahout code snippets.
Java
2
star
21

storm-samples

This repository holds a collection of storm topologies.
Java
2
star
22

spark-insight

Real-time Insights with Apache Spark
2
star
23

OASIS-ebXML-RegRep-v4.0

Java
2
star
24

AMES-Web-Service

The web service component of the Ad-hoc Mobile Ecosystem.
Java
2
star
25

elastic-streaming

This project leverages Apache Spark streaming to index streaming data in an Elasticsearch cluster.
Scala
2
star
26

AMES-Search

This project adds semantic search to the Ad hoc Mobile Ecosystem.
Java
1
star
27

AMES-Nlp

This project add natural language processing to the Ad-hoc Mobile Ecosystem.
Java
1
star
28

AMES-Map-GUI

This project adds map support to the Ad-hoc Mobile Ecosystem.
Java
1
star
29

AMES-OOJDrew

This project adds reasoning support to the Ad-hoc Mobile Ecosystem
Java
1
star
30

spark-datasift

DataSift Streaming for Apache Spark
Scala
1
star
31

elastic-insight

An ES 6.x plugin to connect Elasticsearch to CDAP 5.x analytics
Java
1
star
32

gate-server

A Socket Server to connect to GATE text mining.
Java
1
star
33

AMES-WebDAV

This project add WebDAV support to the Ad-hoc Mobile Ecosystem
Java
1
star
34

AMES-TheJit

JavaScript
1
star
35

spark-pref

User Preference Engine
Scala
1
star
36

AMES-Web-GUI

The Web 2.0 based component of the Ad-hoc Mobile Ecosystem.
JavaScript
1
star
37

AMES-Http

This project aims to provide a functionality for mutual authentication over SSL/TLS.
Java
1
star
38

AMES-Semantic

This project adds semantic support to the Ad-hoc Mobile Ecosystem.
Java
1
star
39

AMES-Web-Shared

This project holds shared information for all AMES-related projects.
Java
1
star
40

AMES-Office

This project adds Office-Support to the Ad-hoc Mobile Ecosystem
Java
1
star
41

core-arules

An extension to the Association Rules implementation of the SPMF open source data mining library
Java
1
star