Lucidworks Spark/Solr Integration
This project includes tools for reading data from Solr as a Spark DataFrame/RDD and indexing objects from Spark into Solr using SolrJ.
Version Compatibility
The spark-solr project has several releases, each of which support different versions of Spark and Solr. The compatibility chart below shows the versions supported across the past releases. 'Connector' refers to the 'spark-solr' library
Connector | Spark | Solr |
---|---|---|
3.1.2 |
8.11.0 |
|
3.1.2 |
8.11.0 |
|
3.1.2 |
8.9.0 |
|
2.4.5 |
8.9.0 |
|
2.4.5 |
8.3.0 |
|
2.4.5 |
8.3.0 |
|
2.4.5 |
8.3.0 |
|
2.4.4 |
8.3.0 |
|
2.4.3 |
8.2.0 |
|
2.4.0 |
7.5.0 |
|
2.3.2 |
7.6.0 |
|
2.2.1 |
7.2.1 |
|
2.2.0 |
7.1.0 |
|
2.2.0 |
6.6.1 |
|
2.1.1 |
6.6.0 |
|
2.1.1 |
6.5.1 |
|
1.6.3 |
6.4.2 |
|
1.6.3 |
6.3.0 |
|
1.6.2 |
6.1.0 |
|
1.6.2 |
6.1.0 |
|
1.6.1 |
5.5.2 |
Getting started
Import jar File via spark-shell
cd $SPARK_HOME
./bin/spark-shell --jars spark-solr-{version}-shaded.jar
The shaded jar can be downloaded from the Maven Central or built from the respective branch
Connect to your SolrCloud Instance
via DataFrame
val options = Map(
"collection" -> "{solr_collection_name}",
"zkhost" -> "{zk_connect_string}"
)
val df = spark.read.format("solr")
.options(options)
.load
via RDD
import com.lucidworks.spark.rdd.SelectSolrRDD
val solrRDD = new SelectSolrRDD(zkHost, collectionName, sc)
SelectSolrRDD is an RDD of SolrDocument
via RDD (Java)
import com.lucidworks.spark.rdd.SolrJavaRDD;
import org.apache.spark.api.java.JavaRDD;
SolrJavaRDD solrRDD = SolrJavaRDD.get(zkHost, collection, jsc.sc());
JavaRDD<SolrDocument> resultsRDD = solrRDD.queryShards(solrQuery);
Download/Build the jar Files
Maven Central
The released jar files (1.1.2, 2.0.0, etc..) can be downloaded from the Maven Central repository. Maven Central also holds the shaded, sources, and javadoc .jars for each release.
<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
<version>{latestVersion}</version>
</dependency>
Snapshots
Snapshots of spark-solr are built for every commit on master branch. The snapshots can be accessed from OSS Sonatype.
Build from Source
mvn clean package -DskipTests
This will build 2 jars in the target
directory:
-
spark-solr-${VERSION}.jar
-
spark-solr-${VERSION}-shaded.jar
${VERSION}
will be something like 3.5.6-SNAPSHOT, for development builds.
The first .jar is what you’d want to use if you were using spark-solr in your own project. The second is what you’d use to submit one of the included example apps to Spark.
Features
-
Send objects from a Spark (Streaming or DataFrames) into Solr.
-
Read the results from a Solr query as a Spark RDD or DataFrame.
-
Stream documents from Solr using
/export
handler (only works for exporting fields that have docValues enabled). -
Read large result sets from Solr using cursors or with
/export
handler. -
Data locality. If Spark workers and Solr processes are co-located on the same nodes, the partitions are placed on the nodes where the replicas are located.
Querying
Cursors
Cursors are used by default to pull documents out of Solr. By default, the number of tasks allocated will be the number of shards available for the collection.
If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. A good candidate for the split field is the version field that is attached to every document by the shard leader during indexing. See splits section to enable and configure intra shard splitting.
Cursors won’t work if the index changes during the query time. Constrain your query to a static index by using additional Solr parameters using solr.params.
Streaming API (/export)
If the fields that are being queried have docValues enabled, then the Streaming API can be used to pull documents from Solr in a true Streaming fashion. This method is 8-10x faster than Cursors. The option request_handler allows you to enable Streaming API via DataFrame.
Indexing
Objects can be sent to Solr via Spark Streaming or DataFrames. The schema is inferred from the DataFrame and any fields that do not exist in Solr schema will be added via Schema API. See ManagedIndexSchemaFactory.
See Index parameters for configuration and tuning.
Configuration and Tuning
The Solr DataSource supports a number of optional parameters that allow you to optimize performance when reading data from Solr. The only required parameters for the DataSource are zkhost
and collection
.
Query Parameters
query
Probably the most obvious option is to specify a Solr query that limits the rows you want to load into Spark. For instance, if we only wanted to load documents that mention "solr", we would do:
Usage: option("query","body_t:solr")
Default: *:*
If you don’t specify the "query" option, then all rows are read using the "match all documents" query (*:*
).
fields
You can use the fields
option to specify a subset of fields to retrieve for each document in your results:
Usage: option("fields","id,author_s,favorited_b,…​")
By default, all stored fields for each document are pulled back from Solr.
You can also specify an alias for a field using Solr’s field alias syntax, e.g. author:author_s
. If you want to invoke a function query, such as rord(), then you’ll need to provide an alias, e.g. ord_user:ord(user_id)
. If the return type of the function query is something other than int
or long
, then you’ll need to specify the return type after the function query, such as:
foo:div(sum(x,100),max(y,1)):double
Tip
|
If you request Solr function queries, then the library must use the /select handler to make the request as exporting function queries through /export is not supported by Solr.
|
filters
You can use the filters
option to set filter queries on Solr query:
Usage: option("filters","firstName:Sam,lastName:Powell")
rows
You can use the rows
option to specify the number of rows to retrieve from Solr per request; do not confuse this with max_rows
(see below). Behind the scenes, the implementation uses either deep paging cursors or Streaming API and response streaming, so it is usually safe to specify a large number of rows.
To be clear, this is not the maximum number of rows to read from Solr. All matching rows on the backend are read. The rows
parameter is the page size.
By default, the implementation uses 1000 rows but if your documents are smaller, you can increase this to 10000. Using too large a value can put pressure on the Solr JVM’s garbage collector.
Usage: option("rows","10000")
Default: 1000
max_rows
Limits the result set to a maximum number of rows; only applies when using the /select
handler. The library will issue the query from a single task and let Solr do the distributed query processing. In addition, no paging is performed, i.e. the rows
param is set to max_rows
when querying. Consequently, this option should not be used for large max_rows
values, rather you should just retrieve all rows using multiple Spark tasks and then re-sort with Spark if needed.
Usage: option("max_rows", "100")
Defalut: None
request_handler
Set the Solr request handler for queries. This option can be used to export results from Solr via /export
handler which streams data out of Solr. See Exporting Result Sets for more information.
The /export
handler needs fields to be explicitly specified. Please use the fields
option or specify the fields in the query.
Usage: option("request_handler", "/export")
Default: /select
Increase Read Parallelism using Intra-Shard Splits
If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. The sub range splitting enables faster fetching from Solr by increasing the number of tasks in Solr. This should only be used if there are enough computing resources in the Spark cluster.
Shard splitting is disabled by default.
splits
Enable shard splitting on default field _version_
.
Usage: option("splits", "true")
Default: false
The above option is equivalent to option("split_field", "_version_")
split_field
The field to split on can be changed using split_field
option.
Usage: option("split_field", "id")
Default: _version_
splits_per_shard
Behind the scenes, the DataSource implementation tries to split the shard into evenly sized splits using filter queries. You can also split on a string-based keyword field but it should have sufficient variance in the values to allow for creating enough splits to be useful. In other words, if your Spark cluster can handle 10 splits per shard, but there are only 3 unique values in a keyword field, then you will only get 3 splits.
Keep in mind that this is only a hint to the split calculator and you may end up with a slightly different number of splits than what was requested.
Usage: option("splits_per_shard", "30")
Default: 1
flatten_multivalued
This option is enabled by default and flattens multi valued fields from Solr.
Usage: option("flatten_multivalued", "false")
Default: true
dv
The dv
option will fetch the docValues that are indexed but not stored by using function queries. Should be used for Solr versions lower than 5.5.0.
Usage: option("dv", "true")
Default: false
skip_non_dv
The skip_non_dv
option instructs the solr
datasource to skip all fields that are not docValues.
Usage: option("skip_non_dv", "true")
Default: false
sample_seed
The sample_seed
option allows you to read a random sample of documents from Solr using the specified seed. This option can be useful if you just need to explore the data before performing operations on the full result set. By default, if this option is provided, a 10% sample size is read from Solr, but you can use the sample_pct
option to control the sample size.
Usage: option("sample_seed", "5150")
Default: None
sample_pct
The sample_pct
option allows you to set the size of a random sample of documents from Solr; use a value between 0 and 1.
Usage: option("sample_pct", "0.05")
Default: 0.1
solr.params
The solr.params
option can be used to specify any arbitrary Solr parameters in the form of a Solr query.
Tip
|
Don’t use this to pass parameters that are covered by other options, such as fl (use the fields option) or sort . This option is strictly intended for parameters that are NOT covered by other options.
|
Usage: option("solr.params", "fq=userId:[10 TO 1000]")
Index parameters
soft_commit_secs
If specified, the soft_commit_secs
option will be set via SolrConfig API during indexing
Usage: option("soft_commit_secs", "10")
Default: None
commit_within
The commit_within
param sets commitWithin
on the indexing requests processed by SolrClient. This value should be in milliseconds. See commitWithin
Usage: option("commit_within", "5000")
Default: None
batch_size
The batch_size
option determines the number of documents that are sent to Solr via a HTTP call during indexing. Set this option higher if the docs are small and memory is available.
Usage: option("batch_size", "10000")
Default: 500
gen_uniq_key
If the documents are missing the unique key (derived from Solr schema), then the gen_uniq_key
option will generate a unique value for each document before indexing to Solr. Instead of this option, the UUIDUpdateProcessorFactory can be used to generate UUID values for documents that are missing the unique key field
Usage: option("gen_uniq_key", "true")
Default: false
solr_field_types
This option can used to specify field type for fields written to Solr. Only works if the field names are not already defined in Solr schema
Usage: option("solr_field_types", "rating:string,title:text_en"
Querying Time Series Data
partition_by
Set this option as time, in order to query mutiple time series collections, partitioned according to some time period
Usage: option("partition_by", "time")
Default:none
time_period
This is of the form X DAYS/HOURS/MINUTES.This should be the time period with which the partitions are created.
Usage: option("time_period", "1MINUTES")
Default: 1DAYS
datetime_pattern
This pattern can be inferred from time_period. But this option can be used to explicitly specify.
Usage: option("datetime_pattern", "yyyy_MM_dd_HH_mm")
Default: yyyy_MM_dd
timestamp_field_name
This option is used to specify the field name in the indexed documents where time stamp is found.
Usage: option("timestamp_field_name", "ts")
Default: timestamp_tdt
timezone_id
Used to specify the timezone.
Usage: option("timezone_id", "IST")
Default: UTC
max_active_partitions
This option is used to specify the maximum number of partitions that must be allowed at a time.
Usage: option("max_active_partitions", "100")
Default: null
Troubleshooting Tips
Why is dataFrame.count so slow?
Solr can provide the number of matching documents nearly instantly, so why is calling count
on a DataFrame backed by a Solr query so slow? The reason is that Spark likes to read all rows before performing any operations on a DataFrame. So when you ask SparkSQL to count the rows in a DataFrame, spark-solr has to read all matching documents from Solr and then count the rows in the RDD.
If you’re just exploring a Solr collection from Spark and need to know the number of matching rows for a query, you can use SolrQuerySupport.getNumDocsFromSolr
utility function.
I set rows to 10 and now my job takes forever to read 10 rows from Solr!
The rows
option sets the page size, but all matching rows are read from Solr for every query. So if your query matches many documents in Solr, then Spark is reading them all 10 docs per request.
Use the sample_seed
option to limit the size of the results returned from Solr.
Developing a Spark Application
The com.lucidworks.spark.SparkApp
provides a simple framework for implementing Spark applications in Java. The class saves you from having to duplicate boilerplate code needed to run a Spark application, giving you more time to focus on the business logic of your application.
To leverage this framework, you need to develop a concrete class that either implements RDDProcessor or extends StreamProcessor depending on the type of application you’re developing.
RDDProcessor
Implement the com.lucidworks.spark.SparkApp$RDDProcessor
interface for building a Spark application that operates on a JavaRDD, such as one pulled from a Solr query (see SolrQueryProcessor as an example).
StreamProcessor
Extend the com.lucidworks.spark.SparkApp$StreamProcessor
abstract class to build a Spark streaming application.
See com.lucidworks.spark.example.streaming.oneusagov.OneUsaGovStreamProcessor
or com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor
for examples of how to write a StreamProcessor.
Authenticating with Solr
For background on Solr security, see: Securing Solr.
Kerberos
The Kerberos config should be set via system param java.security.auth.login.config
on extraJavaOptions for both executor and driver.
SparkApp
The SparkApp framework (in spark-solr) allows you to pass the path to a JAAS authentication configuration file using the -solrJaasAuthConfig option
.
For example, if you need to authenticate using the "solr" Kerberos principal, you need to create a JAAS configuration file named jaas-client.conf
that sets the location of your Kerberos keytab file, such as:
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/solr.keytab"
storeKey=true
useTicketCache=true
debug=true
principal="solr";
};
To use this configuration to authenticate to Solr, you simply need to pass the path to jaas-client.conf
created above using the -solrJaasAuthConfig option
, such as:
spark-submit --master yarn-server \
--class com.lucidworks.spark.SparkApp \
$SPARK_SOLR_PROJECT/target/spark-solr-${VERSION}-shaded.jar \
hdfs-to-solr -zkHost $ZK -collection spark-hdfs \
-hdfsPath /user/spark/testdata/syn_sample_50k \
-solrJaasAuthConfig=/path/to/jaas-client.conf
Basic Auth
Basic auth can be configured via System properties basicauth
or solr.httpclient.config
. These system properties have to be set on Driver and Executor JVMs
Examples:
Using basicauth
./bin/spark-shell --master local[*] --jars ~/Git/spark-solr/target/spark-solr-3.0.1-SNAPSHOT-shaded.jar --conf 'spark.driver.extraJavaOptions=-Dbasicauth=solr:SolrRocks'
Using solr.httpclient.config
./bin/spark-shell --master local[*] --jars ~/Git/spark-solr/target/spark-solr-3.0.1-SNAPSHOT-shaded.jar --conf 'spark.driver.extraJavaOptions=-Dsolr.httpclient.config=/Users/kiran/spark/spark-2.1.0-bin-hadoop2.7/auth.txt'
Contents of config file
httpBasicAuthUser=solr
httpBasicAuthPassword=SolrRocks