Spark Extension
This project provides extensions to the Apache Spark project in Scala and Python:
Diff: A diff
transformation and application for Dataset
s that computes the differences between
two datasets, i.e. which rows to add, delete or change to get from one dataset to the other.
SortedGroups: A groupByKey
transformation that groups rows by a key while providing
a sorted iterator for each group. Similar to Dataset.groupByKey.flatMapGroups
, but with order guarantees
for the iterator.
Histogram: A histogram
transformation that computes the histogram DataFrame for a value column.
Global Row Number: A withRowNumbers
transformation that provides the global row number w.r.t.
the current order of the Dataset, or any given order. In contrast to the existing SQL function row_number
, which
requires a window spec, this transformation provides the row number across the entire Dataset without scaling problems.
Partitioned Writing: The writePartitionedBy
action writes your Dataset
partitioned and
efficiently laid out with a single operation.
Inspect Parquet files: The structure of Parquet files (the metadata, not the data stored in Parquet) can be inspected similar to parquet-tools or parquet-cli by reading from a simple Spark data source. This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions.
Fluent method call: T.call(transformation: T => R): R
: Turns a transformation T => R
,
that is not part of T
into a fluent method call on T
. This allows writing fluent code like:
import uk.co.gresearch._
i.doThis()
.doThat()
.call(transformation)
.doMore()
Fluent conditional method call: T.when(condition: Boolean).call(transformation: T => T): T
:
Perform a transformation fluently only if the given condition is true.
This allows writing fluent code like:
import uk.co.gresearch._
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
Backticks: backticks(string: String, strings: String*): String)
: Encloses the given column name with backticks (`
) when needed.
This is a handy way to ensure column names with special characters like dots (.
) work with col()
or select()
.
.Net DateTime.Ticks: Convert .Net (C#, F#, Visual Basic) DateTime.Ticks
into Spark timestamps, seconds and nanoseconds.
Available methods:
// Scala
dotNetTicksToTimestamp(Column): Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch(Column): Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos(Column): Column // returns Unix epoch nanoseconds as LongType
The reverse is provided by (all return LongType
.Net ticks):
// Scala
timestampToDotNetTicks(Column): Column
unixEpochToDotNetTicks(Column): Column
unixEpochNanosToDotNetTicks(Column): Column
These methods are also available in Python:
# Python
dotnet_ticks_to_timestamp(column_or_name) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch(column_or_name) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos(column_or_name) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks(column_or_name)
unix_epoch_to_dotnet_ticks(column_or_name)
unix_epoch_nanos_to_dotnet_ticks(column_or_name)
Spark job description: Set Spark job description for all Spark jobs within a context:
import uk.co.gresearch.spark._
implicit val session: SparkSession = spark
withJobDescription("parquet file") {
val df = spark.read.parquet("data.parquet")
val count = appendJobDescription("count") {
df.count
}
appendJobDescription("write") {
df.write.csv("data.csv")
}
}
Without job description | With job description |
---|---|
Using Spark Extension
The spark-extension
package is available for all Spark 3.2, 3.4 and 3.4 versions. Some Spark earlier versions may also be supported.
The package version has the following semantics: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
: Scala binary compatibility (minor) version. Available are2.12
and2.13
.SPARK_COMPAT_VERSION
: Apache Spark binary compatibility (minor) version. Available are3.2
,3.3
and3.4
.VERSION
: The package version, e.g.2.1.0
.
SBT
Add this line to your build.sbt
file:
libraryDependencies += "uk.co.gresearch.spark" %% "spark-extension" % "2.8.0-3.4"
Maven
Add this dependency to your pom.xml
file:
<dependency>
<groupId>uk.co.gresearch.spark</groupId>
<artifactId>spark-extension_2.12</artifactId>
<version>2.8.0-3.4</version>
</dependency>
Gradle
Add this dependency to your build.gradle
file:
dependencies {
implementation "uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4"
}
Spark Submit
Submit your Spark app with the Spark Extension dependency (version ≥1.1.0) as follows:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4 [jar]
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your Spark version.
Spark Shell
Launch a Spark Shell with the Spark Extension dependency (version ≥1.1.0) as follows:
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your Spark Shell version.
Python
PySpark API
Start a PySpark session with the Spark Extension dependency (version ≥1.1.0) as follows:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.config("spark.jars.packages", "uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4") \
.getOrCreate()
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your PySpark version.
PySpark REPL
Launch the Python Spark REPL with the Spark Extension dependency (version ≥1.1.0) as follows:
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your PySpark version.
spark-submit
PySpark Run your Python scripts that use PySpark via spark-submit
:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4 [script.py]
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your Spark version.
PyPi package (local Spark cluster only)
You may want to install the pyspark-extension
python package from PyPi into your development environment.
This provides you code completion, typing and test capabilities during your development phase.
Running your Python application on a Spark cluster will still require one of the above ways to add the Scala package to the Spark environment.
pip install pyspark-extension==2.8.0.3.4
Note: Pick the right Spark version (here 3.4) depending on your PySpark version.
Your favorite Data Science notebook
There are plenty of Data Science notebooks around. To use this library, add a jar dependency to your notebook using these Maven coordinates:
uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4
Or download the jar and place it on a filesystem where it is accessible by the notebook, and reference that jar file directly.
Check the documentation of your favorite notebook to learn how to add jars to your Spark environment.
Build
You can build this project against different versions of Spark and Scala.
Switch Spark and Scala version
If you want to build for a Spark or Scala version different to what is defined in the pom.xml
file, then run
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
For example, switch to Spark 3.4.0 and Scala 2.13.8 by running sh set-version.sh 3.4.0 2.13.8
.
Build the Scala project
Then execute mvn package
to create a jar from the sources. It can be found in target/
.
Testing
Run the Scala tests via mvn test
.
Setup Python environment
In order to run the Python tests, setup a Python environment as follows (replace [SCALA-COMPAT-VERSION]
and [SPARK-COMPAT-VERSION]
with the respective values):
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
Run Python tests
Run the Python tests via env PYTHONPATH=python:python/test python -m pytest python/test
.
Note: you first have to build the Scala sources.
Build Python package
Run the following sequence of commands in the project root directory:
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_*-*.jar python/pyspark/jars/
pip install build
Then execute python -m build python/
to create a whl from the sources. It can be found in python/dist/
.