• Stars
    star
    445
  • Rank 94,209 (Top 2 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 9 years ago
  • Updated 4 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Tools for reading data from Solr as a Spark RDD and indexing objects from Spark into Solr using SolrJ.

Lucidworks Spark/Solr Integration

Version Compatibility

The spark-solr project has several releases, each of which support different versions of Spark and Solr. The compatibility chart below shows the versions supported across the past releases. 'Connector' refers to the 'spark-solr' library

Connector Spark Solr

4.0.2

3.1.2

8.11.0

4.0.1

3.1.2

8.11.0

4.0.0

3.1.2

8.9.0

3.10.0

2.4.5

8.9.0

3.9.0

2.4.5

8.3.0

3.8.1

2.4.5

8.3.0

3.8.0

2.4.5

8.3.0

3.7.0

2.4.4

8.3.0

3.6.6

2.4.3

8.2.0

3.6.0

2.4.0

7.5.0

3.5.19

2.3.2

7.6.0

3.4.5

2.2.1

7.2.1

3.3.4

2.2.0

7.1.0

3.2.2

2.2.0

6.6.1

3.1.1

2.1.1

6.6.0

3.0.4

2.1.1

6.5.1

2.4.0

1.6.3

6.4.2

2.3.4

1.6.3

6.3.0

2.2.3

1.6.2

6.1.0

2.1.0

1.6.2

6.1.0

2.0.4

1.6.1

5.5.2

Getting started

Import jar File via spark-shell

cd $SPARK_HOME
./bin/spark-shell --jars spark-solr-{version}-shaded.jar

The shaded jar can be downloaded from the Maven Central or built from the respective branch

Connect to your SolrCloud Instance

via DataFrame

val options = Map(
  "collection" -> "{solr_collection_name}",
  "zkhost" -> "{zk_connect_string}"
)
val df = spark.read.format("solr")
  .options(options)
  .load

via RDD

import com.lucidworks.spark.rdd.SelectSolrRDD
val solrRDD = new SelectSolrRDD(zkHost, collectionName, sc)

SelectSolrRDD is an RDD of SolrDocument

via RDD (Java)

import com.lucidworks.spark.rdd.SolrJavaRDD;
import org.apache.spark.api.java.JavaRDD;

SolrJavaRDD solrRDD = SolrJavaRDD.get(zkHost, collection, jsc.sc());
JavaRDD<SolrDocument> resultsRDD = solrRDD.queryShards(solrQuery);

Download/Build the jar Files

Maven Central

The released jar files (1.1.2, 2.0.0, etc..) can be downloaded from the Maven Central repository. Maven Central also holds the shaded, sources, and javadoc .jars for each release.

<dependency>
   <groupId>com.lucidworks.spark</groupId>
   <artifactId>spark-solr</artifactId>
   <version>{latestVersion}</version>
</dependency>

Snapshots

Snapshots of spark-solr are built for every commit on master branch. The snapshots can be accessed from OSS Sonatype.

Build from Source

mvn clean package -DskipTests

This will build 2 jars in the target directory:

  • spark-solr-${VERSION}.jar

  • spark-solr-${VERSION}-shaded.jar

${VERSION} will be something like 3.5.6-SNAPSHOT, for development builds.

The first .jar is what you’d want to use if you were using spark-solr in your own project. The second is what you’d use to submit one of the included example apps to Spark.

Features

  • Send objects from a Spark (Streaming or DataFrames) into Solr.

  • Read the results from a Solr query as a Spark RDD or DataFrame.

  • Stream documents from Solr using /export handler (only works for exporting fields that have docValues enabled).

  • Read large result sets from Solr using cursors or with /export handler.

  • Data locality. If Spark workers and Solr processes are co-located on the same nodes, the partitions are placed on the nodes where the replicas are located.

Querying

Cursors

Cursors are used by default to pull documents out of Solr. By default, the number of tasks allocated will be the number of shards available for the collection.

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. A good candidate for the split field is the version field that is attached to every document by the shard leader during indexing. See splits section to enable and configure intra shard splitting.

Cursors won’t work if the index changes during the query time. Constrain your query to a static index by using additional Solr parameters using solr.params.

Streaming API (/export)

If the fields that are being queried have docValues enabled, then the Streaming API can be used to pull documents from Solr in a true Streaming fashion. This method is 8-10x faster than Cursors. The option request_handler allows you to enable Streaming API via DataFrame.

Indexing

Objects can be sent to Solr via Spark Streaming or DataFrames. The schema is inferred from the DataFrame and any fields that do not exist in Solr schema will be added via Schema API. See ManagedIndexSchemaFactory.

See Index parameters for configuration and tuning.

Configuration and Tuning

The Solr DataSource supports a number of optional parameters that allow you to optimize performance when reading data from Solr. The only required parameters for the DataSource are zkhost and collection.

Query Parameters

query

Probably the most obvious option is to specify a Solr query that limits the rows you want to load into Spark. For instance, if we only wanted to load documents that mention "solr", we would do:

Usage: option("query","body_t:solr")

Default: *:*

If you don’t specify the "query" option, then all rows are read using the "match all documents" query (*:*).

fields

You can use the fields option to specify a subset of fields to retrieve for each document in your results:

Usage: option("fields","id,author_s,favorited_b,…​")

By default, all stored fields for each document are pulled back from Solr.

You can also specify an alias for a field using Solr’s field alias syntax, e.g. author:author_s. If you want to invoke a function query, such as rord(), then you’ll need to provide an alias, e.g. ord_user:ord(user_id). If the return type of the function query is something other than int or long, then you’ll need to specify the return type after the function query, such as: foo:div(sum(x,100),max(y,1)):double

Tip
If you request Solr function queries, then the library must use the /select handler to make the request as exporting function queries through /export is not supported by Solr.

filters

You can use the filters option to set filter queries on Solr query:

Usage: option("filters","firstName:Sam,lastName:Powell")

rows

You can use the rows option to specify the number of rows to retrieve from Solr per request; do not confuse this with max_rows (see below). Behind the scenes, the implementation uses either deep paging cursors or Streaming API and response streaming, so it is usually safe to specify a large number of rows.

To be clear, this is not the maximum number of rows to read from Solr. All matching rows on the backend are read. The rows parameter is the page size.

By default, the implementation uses 1000 rows but if your documents are smaller, you can increase this to 10000. Using too large a value can put pressure on the Solr JVM’s garbage collector.

Usage: option("rows","10000") Default: 1000

max_rows

Limits the result set to a maximum number of rows; only applies when using the /select handler. The library will issue the query from a single task and let Solr do the distributed query processing. In addition, no paging is performed, i.e. the rows param is set to max_rows when querying. Consequently, this option should not be used for large max_rows values, rather you should just retrieve all rows using multiple Spark tasks and then re-sort with Spark if needed.

Usage: option("max_rows", "100") Defalut: None

request_handler

Set the Solr request handler for queries. This option can be used to export results from Solr via /export handler which streams data out of Solr. See Exporting Result Sets for more information.

The /export handler needs fields to be explicitly specified. Please use the fields option or specify the fields in the query.

Usage: option("request_handler", "/export") Default: /select

Increase Read Parallelism using Intra-Shard Splits

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. The sub range splitting enables faster fetching from Solr by increasing the number of tasks in Solr. This should only be used if there are enough computing resources in the Spark cluster.

Shard splitting is disabled by default.

splits

Enable shard splitting on default field _version_.

Usage: option("splits", "true")

Default: false

The above option is equivalent to option("split_field", "_version_")

split_field

The field to split on can be changed using split_field option.

Usage: option("split_field", "id") Default: _version_

splits_per_shard

Behind the scenes, the DataSource implementation tries to split the shard into evenly sized splits using filter queries. You can also split on a string-based keyword field but it should have sufficient variance in the values to allow for creating enough splits to be useful. In other words, if your Spark cluster can handle 10 splits per shard, but there are only 3 unique values in a keyword field, then you will only get 3 splits.

Keep in mind that this is only a hint to the split calculator and you may end up with a slightly different number of splits than what was requested.

Usage: option("splits_per_shard", "30")

Default: 1

flatten_multivalued

This option is enabled by default and flattens multi valued fields from Solr.

Usage: option("flatten_multivalued", "false")

Default: true

dv

The dv option will fetch the docValues that are indexed but not stored by using function queries. Should be used for Solr versions lower than 5.5.0.

Usage: option("dv", "true")

Default: false

skip_non_dv

The skip_non_dv option instructs the solr datasource to skip all fields that are not docValues.

Usage: option("skip_non_dv", "true")

Default: false

sample_seed

The sample_seed option allows you to read a random sample of documents from Solr using the specified seed. This option can be useful if you just need to explore the data before performing operations on the full result set. By default, if this option is provided, a 10% sample size is read from Solr, but you can use the sample_pct option to control the sample size.

Usage: option("sample_seed", "5150")

Default: None

sample_pct

The sample_pct option allows you to set the size of a random sample of documents from Solr; use a value between 0 and 1.

Usage: option("sample_pct", "0.05")

Default: 0.1

solr.params

The solr.params option can be used to specify any arbitrary Solr parameters in the form of a Solr query.

Tip
Don’t use this to pass parameters that are covered by other options, such as fl (use the fields option) or sort. This option is strictly intended for parameters that are NOT covered by other options.

Usage: option("solr.params", "fq=userId:[10 TO 1000]")

Index parameters

soft_commit_secs

If specified, the soft_commit_secs option will be set via SolrConfig API during indexing

Usage: option("soft_commit_secs", "10")

Default: None

commit_within

The commit_within param sets commitWithin on the indexing requests processed by SolrClient. This value should be in milliseconds. See commitWithin

Usage: option("commit_within", "5000")

Default: None

batch_size

The batch_size option determines the number of documents that are sent to Solr via a HTTP call during indexing. Set this option higher if the docs are small and memory is available.

Usage: option("batch_size", "10000")

Default: 500

gen_uniq_key

If the documents are missing the unique key (derived from Solr schema), then the gen_uniq_key option will generate a unique value for each document before indexing to Solr. Instead of this option, the UUIDUpdateProcessorFactory can be used to generate UUID values for documents that are missing the unique key field

Usage: option("gen_uniq_key", "true")

Default: false

solr_field_types

This option can used to specify field type for fields written to Solr. Only works if the field names are not already defined in Solr schema

Usage: option("solr_field_types", "rating:string,title:text_en"

Querying Time Series Data

partition_by

Set this option as time, in order to query mutiple time series collections, partitioned according to some time period

Usage: option("partition_by", "time")

Default:none

time_period

This is of the form X DAYS/HOURS/MINUTES.This should be the time period with which the partitions are created.

Usage: option("time_period", "1MINUTES")

Default: 1DAYS

datetime_pattern

This pattern can be inferred from time_period. But this option can be used to explicitly specify.

Usage: option("datetime_pattern", "yyyy_MM_dd_HH_mm")

Default: yyyy_MM_dd

timestamp_field_name

This option is used to specify the field name in the indexed documents where time stamp is found.

Usage: option("timestamp_field_name", "ts")

Default: timestamp_tdt

timezone_id

Used to specify the timezone.

Usage: option("timezone_id", "IST")

Default: UTC

max_active_partitions

This option is used to specify the maximum number of partitions that must be allowed at a time.

Usage: option("max_active_partitions", "100")

Default: null

Troubleshooting Tips

Why is dataFrame.count so slow?

Solr can provide the number of matching documents nearly instantly, so why is calling count on a DataFrame backed by a Solr query so slow? The reason is that Spark likes to read all rows before performing any operations on a DataFrame. So when you ask SparkSQL to count the rows in a DataFrame, spark-solr has to read all matching documents from Solr and then count the rows in the RDD.

If you’re just exploring a Solr collection from Spark and need to know the number of matching rows for a query, you can use SolrQuerySupport.getNumDocsFromSolr utility function.

I set rows to 10 and now my job takes forever to read 10 rows from Solr!

The rows option sets the page size, but all matching rows are read from Solr for every query. So if your query matches many documents in Solr, then Spark is reading them all 10 docs per request.

Use the sample_seed option to limit the size of the results returned from Solr.

Developing a Spark Application

The com.lucidworks.spark.SparkApp provides a simple framework for implementing Spark applications in Java. The class saves you from having to duplicate boilerplate code needed to run a Spark application, giving you more time to focus on the business logic of your application.

To leverage this framework, you need to develop a concrete class that either implements RDDProcessor or extends StreamProcessor depending on the type of application you’re developing.

RDDProcessor

Implement the com.lucidworks.spark.SparkApp$RDDProcessor interface for building a Spark application that operates on a JavaRDD, such as one pulled from a Solr query (see SolrQueryProcessor as an example).

StreamProcessor

Extend the com.lucidworks.spark.SparkApp$StreamProcessor abstract class to build a Spark streaming application.

See com.lucidworks.spark.example.streaming.oneusagov.OneUsaGovStreamProcessor or com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor for examples of how to write a StreamProcessor.

Authenticating with Solr

For background on Solr security, see: Securing Solr.

Kerberos

The Kerberos config should be set via system param java.security.auth.login.config on extraJavaOptions for both executor and driver.

SparkApp

The SparkApp framework (in spark-solr) allows you to pass the path to a JAAS authentication configuration file using the -solrJaasAuthConfig option.

For example, if you need to authenticate using the "solr" Kerberos principal, you need to create a JAAS configuration file named jaas-client.conf that sets the location of your Kerberos keytab file, such as:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/keytabs/solr.keytab"
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr";
};

To use this configuration to authenticate to Solr, you simply need to pass the path to jaas-client.conf created above using the -solrJaasAuthConfig option, such as:

spark-submit --master yarn-server \
  --class com.lucidworks.spark.SparkApp \
  $SPARK_SOLR_PROJECT/target/spark-solr-${VERSION}-shaded.jar \
  hdfs-to-solr -zkHost $ZK -collection spark-hdfs \
  -hdfsPath /user/spark/testdata/syn_sample_50k \
  -solrJaasAuthConfig=/path/to/jaas-client.conf

Basic Auth

Basic auth can be configured via System properties basicauth or solr.httpclient.config. These system properties have to be set on Driver and Executor JVMs

Examples:

Using basicauth

 ./bin/spark-shell --master local[*] --jars ~/Git/spark-solr/target/spark-solr-3.0.1-SNAPSHOT-shaded.jar  --conf 'spark.driver.extraJavaOptions=-Dbasicauth=solr:SolrRocks'

Using solr.httpclient.config

 ./bin/spark-shell --master local[*] --jars ~/Git/spark-solr/target/spark-solr-3.0.1-SNAPSHOT-shaded.jar  --conf 'spark.driver.extraJavaOptions=-Dsolr.httpclient.config=/Users/kiran/spark/spark-2.1.0-bin-hadoop2.7/auth.txt'

Contents of config file

httpBasicAuthUser=solr
httpBasicAuthPassword=SolrRocks

More Repositories

1

banana

Banana for Solr - A Port of Kibana
JavaScript
670
star
2

solr-scale-tk

Fabric-based framework for deploying and managing SolrCloud clusters in the cloud.
Java
90
star
3

silk

Silk is a port of Kibana 4 project.
JavaScript
69
star
4

mlockall-agent

Java Agent for using mlockall in any java application
Java
60
star
5

auto-phrase-tokenfilter

Lucene Auto Phrase TokenFilter implementation
Java
59
star
6

hadoop-solr

Code to index HDFS to Solr using MapReduce
Java
50
star
7

hive-solr

Code to index Hive tables to Solr and Solr indexes to Hive
Java
46
star
8

solrlogmanager

solr-logstash
Ruby
43
star
9

searchhub

Fusion demo app searching open-source project data from the Apache Software Foundation
Python
42
star
10

lucidworks-view

Create custom user experiences for your Fusion-powered apps.
JavaScript
37
star
11

solr-fabric

Fabric scripts to install Solr 4 in distributed mode (SolrCloud)
Shell
30
star
12

zeppelin-solr

Apache Solr interpreter for Apache Zeppelin
Scala
28
star
13

query-autofiltering-component

A Query Autofiltering SearchComponent for Solr that can translate free-text queries into structured queries using index metadata
Java
28
star
14

fusion-cloud-native

Scripts and other tools for installing / operating Fusion on Kubernetes
Shell
27
star
15

solr-helm-chart

A helm chart to install solr into kubernetes
Smarty
25
star
16

data-quality

Preliminary Solr DQ / Data Quality experiments and prototype, and SolrJ wrapper utilities
Java
25
star
17

solr-for-datascience

JavaScript
24
star
18

solrj-nested-docs

Simple example of Solr Block Joins between Parents and Children, implemented in SolrJ
Java
22
star
19

gc-log-analyzer

Java GC log parser / analyzer for Java 7 and 8
Java
22
star
20

solr-stack

Ambari stack service for easily installing and managing Solr on HDP cluster
Python
19
star
21

storm-solr

Storm / Solr Integration
Java
19
star
22

fusion-financial-demo

Simple Financial Demo for LWS that indexes S&P 500 company info, historical stock prices and Twitter feeds
HTML
19
star
23

yarn-proto

Solr on YARN prototype
Java
18
star
24

connectors-sdk-resources

Fusion Connector SDK documentation, examples and related resources
Java
16
star
25

flare

Blacklight powered by LucidWorks Search
Ruby
13
star
26

silkusecases

Provided Guidance on Creating End to End Solutions for Common SILK Use Cases
13
star
27

simple-category-extraction-component

Simple FieldCache based query introspection Solr Search Component - solves the 'red sofa' problem
Java
12
star
28

solr-couchbase-plugin

This plugin allows to import CouchBase data to Solr. It uses the Cross-datacenter Replication (XDCR) feature of Couchbase Server 2.0 to transfer data continuously.
Java
11
star
29

logstash-output-json_batch

Batching implementation of http output for logstash
Ruby
10
star
30

fusion-seed-app

Angular app to facilitate the creation of demos for PoCs/pilots
JavaScript
9
star
31

streams

This is the main repository for the Streams Team at Lucidworks.
Python
8
star
32

ocr-parser-tool

Tool multi-threaded OCR-scanning of a folder with Tika and Tesseract
Java
7
star
33

solr-for-security

JavaScript
7
star
34

solr-backups

Helpers for backing up solr.
Python
6
star
35

fusion-spark-bootcamp

Fusion Spark Bootcamp
Shell
6
star
36

lucidworks-training-files

Public repo for Lucidworks Training
HTML
6
star
37

fusion-client-tools

Client tools for working with Fusion, such as to support the hbase-indexer sending docs to a Fusion pipeline.
Java
6
star
38

ngOrwell

A simple angular observer for AngularJs 1
JavaScript
5
star
39

solr-slider

Solr on YARN via Slider
Python
5
star
40

lucidworks.github.io

Repo for eLearning lab markdown files
CSS
3
star
41

fusion-upgrade-scripts

Upgrade scripts for Fusion
Batchfile
3
star
42

Lucidworks-Example-Javascript-Stages

JavaScript
3
star
43

sidecar_index

Sidecar index components for Solr
Java
3
star
44

fusion-spark-job-workbench

Development kit for building custom Spark jobs for Lucidworks Fusion
Java
3
star
45

pig-solr

Code for using Pig scripts to index content to Solr
Java
3
star
46

solr-log4j2

Log4J 2 based LogWatcher implementation.
Java
2
star
47

ocp-fusion-helm-charts

Lucidworks Fusion Helm Charts for OpenShift (OCP)
Smarty
2
star
48

solr-hadoop-common

Shared code for our Hadoop ecosystem connectors
Java
2
star
49

fusion-log-indexer

Watch a directory for logs and send each line to a Fusion pipeline as a PipelineDocument using grok for parsing.
Java
2
star
50

citibike-demo

Demo of SiLK for Citibike data
JavaScript
2
star
51

fusion-doc-builder

Docker container for building docs
Dockerfile
2
star
52

zendesk-injector

Injects ZenDesk Tickets into either Solr 4.x, or Lucid indexing pipeline
Java
2
star
53

fusion-jupyter

Scripts and examples of hooking up Jupyter notebooks to Fusion
Shell
2
star
54

blog-sortminmax-perf

Performance comparison of sorting on single valued (long) fields compared to sorting on the 2 arg "field" function added in SOLR-2522
Perl
2
star
55

apac-partnerkit-demo-starter

Create and manage Fusion 5 demo starter configuration and share with partners
Less
1
star
56

lw-k8s-workshop

Hands-on Labs for Fusion 5 Kubernetes Workshop
Shell
1
star
57

fusion-search-drupal

PHP
1
star
58

demo_log_generator

A project for generating sample log files for SLK demos
Python
1
star
59

drupal-connector-module

Drupal module for Fusion 8+ connector
PHP
1
star
60

blog-deep-paging-perf

Deep-Paging performance comparison, classic vs SOLR-5463
Perl
1
star
61

commercetools-connector

Fusion 5 connector for commercetools
JavaScript
1
star
62

tableau-fusion-wdc

Tableau Web Data Connector for Lucidworks Fusion
JavaScript
1
star
63

fusion-search-solarium

PHP
1
star
64

drupal-connector

Fusion connector for Drupal 8+
Java
1
star
65

index-stage-sdk

Java
1
star
66

blog-hll-perf

Field value set cardinality performance comparisons in Solr: countDistinct vs cardinality (SOLR-7461 w/HyperLogLog)
Perl
1
star