One of the biggest challenges after taking the first steps into the world of writing Apache Spark applications in Scala is taking them to production.
An application of any kind needs to be easy to run and easy to configure.
This project is trying to help developers write Spark applications focusing mainly on the application logic rather than the details of configuring the application and setting up the Spark context.
This project is also trying to create and encourage a friendly yet professional environment for developers to help each other, so please do not be shy and join through gitter, twitter, issue reports or pull requests.
At the moment there are a lot of changes happening to the spark-utils
project, hopefully for the better.
The latest stable versions, available through Maven Central are
- Spark 2.4:
0.4.2
- Spark 3.0:
0.6.2
- Spark 3.x:
1.0.0
The development version is 1.0.0-R6
which is bringing a clean separation between configuration implementation and the
core, and additionally the PureConfig based configuration module that brings the power and features of PureConfig
to increase productivity even further and allowing for a more mature configuration framework.
The new modules are:
spark-utils-io-pureconfig
for the new PureConfig implementation
We completely removed the legacy scalaz based configuration framework.
We suggest to start considering the new for the future spark-utils-io-pureconfig
.
Migrating to the new 1.0.0-RC6
is quite easy, as the configuration structure was mainly preserved.
More details are available in the RELEASE-NOTES.
For now, some of the documentation related or referenced from this project might be obsolete or outdated, but as the project will get closer to the final release, there will be more improvements.
Spark | Scala 2.12 | Scala 2.13 | Report |
---|---|---|---|
3.0.3 | YES | N/A | 3.0.3 |
3.1.3 | YES | N/A | 3.1.3 |
3.2.4 | YES | YES | 3.2.4 |
3.3.4 | YES | YES | 3.3.4 |
3.4.2 | YES | YES | 3.4.2 |
3.5.1 | YES | YES | 3.5.1 |
This project contains some basic utilities that can help setting up an Apache Spark application project.
The main point is the simplicity of writing Apache Spark applications just focusing on the logic, while providing for easy configuration and arguments passing.
The code sample bellow shows how easy can be to write a file format converter from any acceptable type, with any acceptable parsing configuration options to any acceptable format.
import org.tupol.spark._
object FormatConverterExample extends SparkApp[FormatConverterContext, DataFrame] {
override def createContext(config: Config) = FormatConverterContext.extract(config)
override def run(implicit spark: SparkSession, context: FormatConverterContext): Try[DataFrame] = {
val inputData = spark.source(context.input).read
inputData.sink(context.output).write
}
}
Optionally, the SparkFun
can be used instead of SparkApp
to make the code even more concise.
import org.tupol.spark._
object FormatConverterExample extends
SparkFun[FormatConverterContext, DataFrame](FormatConverterContext.extract) {
override def run(implicit spark: SparkSession, context: FormatConverterContext): Try[DataFrame] =
spark.source(context.input).read.sink(context.output).write
}
Creating the configuration can be as simple as defining a case class to hold the configuration and a factory, that helps extract simple and complex data types like input sources and output sinks.
import org.tupol.spark.io._
case class FormatConverterContext(input: FormatAwareDataSourceConfiguration,
output: FormatAwareDataSinkConfiguration)
There are multiple ways that the context can be easily created from configuration files. This project proposes two ways:
- the new PureConfig based framework
- the legacy ScalaZ based framework
import com.typesafe.config.Config
object FormatConverterContext {
import pureconfig.generic.auto._
import org.tupol.spark.io.pureconf._
import org.tupol.spark.io.pureconf.readers._
def extract(config: Config): Try[FormatConverterContext] = config.extract[FormatConverterContext]
}
For structured streaming applications the format converter might look like this:
object StreamingFormatConverterExample extends SparkApp[StreamingFormatConverterContext, DataFrame] {
override def createContext(config: Config) = StreamingFormatConverterContext.extract(config)
override def run(implicit spark: SparkSession, context: StreamingFormatConverterContext): Try[DataFrame] = {
val inputData = spark.source(context.input).read
inputData.streamingSink(context.output).write.awaitTermination()
}
}
The streaming configuration the configuration can be as simple as following:
import org.tupol.spark.io.streaming.structured._
case class StreamingFormatConverterContext(input: FormatAwareStreamingSourceConfiguration,
output: FormatAwareStreamingSinkConfiguration)
object StreamingFormatConverterContext {
import com.typesafe.config.Config
import pureconfig.generic.auto._
import org.tupol.spark.io.pureconf._
import org.tupol.spark.io.pureconf.streaming.structured._
def extract(config: Config): Try[StreamingFormatConverterContext] = config.extract[StreamingFormatConverterContext]
}
The SparkRunnable
and SparkApp
or
SparkFun
together with the
configuration framework
provide for easy Spark application creation with configuration that can be managed through
configuration files or application parameters.
The IO frameworks for reading and writing data frames add extra convenience for setting up batch and structured streaming jobs that transform various types of files and streams.
Last but not least, there are many utility functions that provide convenience for loading resources, dealing with schemas and so on.
Most of the common features are also implemented as decorators to main Spark classes, like
SparkContext
, DataFrame
and StructType
and they are conveniently available by importing
the org.tupol.spark.implicits._
package.
The documentation for the main utilities and frameworks available:
- SparkApp, SparkFun and SparkRunnable
- DataSource Framework for both batch and structured streaming applications
- DataSink Framework for both batch and structured streaming applications
Latest stable API documentation is available here.
An extensive tutorial and walk-through can be found here. Extensive samples and demos can be found here.
A nice example on how this library can be used can be found in the
spark-tools
project, through the implementation
of a generic format converter and a SQL processor for both batch and structured streams.
- Java 8 or higher
- Scala 2.12
- Apache Spark 3.0.X
Spark Utils is published to Maven Central and Spark Packages:
- Group id / organization:
org.tupol
- Artifact id / name:
spark-utils
- Latest stable versions:
- Spark 2.4:
0.4.2
- Spark 3.0:
0.6.2
- Spark 2.4:
Usage with SBT, adding a dependency to the latest version of tools to your sbt build definition file:
libraryDependencies += "org.tupol" %% "spark-utils-io-pureconfig" % "1.0.0-RC6"
Include this package in your Spark Applications using spark-shell
or spark-submit
$SPARK_HOME/bin/spark-shell --packages org.tupol:spark-utils_2.12:1.0.0-RC6
Note spark-utils-g8 was not yet updated for the 1.x version.
The simplest way to start a new spark-utils
is to make use of the
spark-apps.seed.g8
template project.
To fill in manually the project options run
g8 tupol/spark-apps.seed.g8
The default options look like the following:
name [My Project]:
appname [My First App]:
organization [my.org]:
version [0.0.1-SNAPSHOT]:
package [my.org.my_project]:
classname [MyFirstApp]:
scriptname [my-first-app]:
scalaVersion [2.12.12]:
sparkVersion [3.2.1]:
sparkUtilsVersion [0.4.0]:
To fill in the options in advance
g8 tupol/spark-apps.seed.g8 --name="My Project" --appname="My App" --organization="my.org" --force
1.0.0-RCX
Major library redesign
- Cross compile Scala 2.12 and 2.13
- Building with JDK 17 targeting Java 8
- Added test java options to handle the JDK 17
- Cross compile Scala 2.12 and 2.13
- Build with Spark 3.2.x and tested against Spark 3.x
- Removed the
spark-utils-io-pureconfig
module - Added configuration module based on PureConfig
DataSource
exposesreader
in addition toread
DataSink
andDataAwareSink
exposewriter
in addition towrite
- Added
SparkSessionOps.streamingSource
- Refactored
TypesafeConfigBuilder
, which has two implementations now:SimpleTypesafeConfigBuilder
andFuzzyTypesafeConfigBuilder
- Small improvements to
SharedSparkSession
- Documentation improvements
0.6.2
- Fixed
core
dependency toscala-utils
; now usingscala-utils-core
- Refactored the
core
/implicits
package to make the implicits a little more explicit
For previous versions please consult the release notes.
This code is open source software licensed under the MIT License.