• Stars
    star
    138
  • Rank 264,508 (Top 6 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 4 years ago
  • Updated 12 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A library that provides useful extensions to Apache Spark and PySpark.

Spark Extension

This project provides extensions to the Apache Spark project in Scala and Python:

Diff: A diff transformation and application for Datasets that computes the differences between two datasets, i.e. which rows to add, delete or change to get from one dataset to the other.

SortedGroups: A groupByKey transformation that groups rows by a key while providing a sorted iterator for each group. Similar to Dataset.groupByKey.flatMapGroups, but with order guarantees for the iterator.

Histogram: A histogram transformation that computes the histogram DataFrame for a value column.

Global Row Number: A withRowNumbers transformation that provides the global row number w.r.t. the current order of the Dataset, or any given order. In contrast to the existing SQL function row_number, which requires a window spec, this transformation provides the row number across the entire Dataset without scaling problems.

Partitioned Writing: The writePartitionedBy action writes your Dataset partitioned and efficiently laid out with a single operation.

Inspect Parquet files: The structure of Parquet files (the metadata, not the data stored in Parquet) can be inspected similar to parquet-tools or parquet-cli by reading from a simple Spark data source. This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions.

Fluent method call: T.call(transformation: T => R): R: Turns a transformation T => R, that is not part of T into a fluent method call on T. This allows writing fluent code like:

import uk.co.gresearch._

i.doThis()
 .doThat()
 .call(transformation)
 .doMore()

Fluent conditional method call: T.when(condition: Boolean).call(transformation: T => T): T: Perform a transformation fluently only if the given condition is true. This allows writing fluent code like:

import uk.co.gresearch._

i.doThis()
 .doThat()
 .when(condition).call(transformation)
 .doMore()

Backticks: backticks(string: String, strings: String*): String): Encloses the given column name with backticks (`) when needed. This is a handy way to ensure column names with special characters like dots (.) work with col() or select().

.Net DateTime.Ticks: Convert .Net (C#, F#, Visual Basic) DateTime.Ticks into Spark timestamps, seconds and nanoseconds.

Available methods:
// Scala
dotNetTicksToTimestamp(Column): Column       // returns timestamp as TimestampType
dotNetTicksToUnixEpoch(Column): Column       // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos(Column): Column  // returns Unix epoch nanoseconds as LongType

The reverse is provided by (all return LongType .Net ticks):

// Scala
timestampToDotNetTicks(Column): Column
unixEpochToDotNetTicks(Column): Column
unixEpochNanosToDotNetTicks(Column): Column

These methods are also available in Python:

# Python
dotnet_ticks_to_timestamp(column_or_name)         # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch(column_or_name)        # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos(column_or_name)  # returns Unix epoch nanoseconds as LongType

timestamp_to_dotnet_ticks(column_or_name)
unix_epoch_to_dotnet_ticks(column_or_name)
unix_epoch_nanos_to_dotnet_ticks(column_or_name)

Spark job description: Set Spark job description for all Spark jobs within a context:

import uk.co.gresearch.spark._

implicit val session: SparkSession = spark

withJobDescription("parquet file") {
  val df = spark.read.parquet("data.parquet")
  val count = appendJobDescription("count") {
    df.count
  }
  appendJobDescription("write") {
    df.write.csv("data.csv")
  }
}
Without job description With job description

Using Spark Extension

The spark-extension package is available for all Spark 3.2, 3.4 and 3.4 versions. Some Spark earlier versions may also be supported. The package version has the following semantics: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}:

  • SCALA_COMPAT_VERSION: Scala binary compatibility (minor) version. Available are 2.12 and 2.13.
  • SPARK_COMPAT_VERSION: Apache Spark binary compatibility (minor) version. Available are 3.2, 3.3 and 3.4.
  • VERSION: The package version, e.g. 2.1.0.

SBT

Add this line to your build.sbt file:

libraryDependencies += "uk.co.gresearch.spark" %% "spark-extension" % "2.8.0-3.4"

Maven

Add this dependency to your pom.xml file:

<dependency>
  <groupId>uk.co.gresearch.spark</groupId>
  <artifactId>spark-extension_2.12</artifactId>
  <version>2.8.0-3.4</version>
</dependency>

Gradle

Add this dependency to your build.gradle file:

dependencies {
    implementation "uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4"
}

Spark Submit

Submit your Spark app with the Spark Extension dependency (version β‰₯1.1.0) as follows:

spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4 [jar]

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your Spark version.

Spark Shell

Launch a Spark Shell with the Spark Extension dependency (version β‰₯1.1.0) as follows:

spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your Spark Shell version.

Python

PySpark API

Start a PySpark session with the Spark Extension dependency (version β‰₯1.1.0) as follows:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4") \
    .getOrCreate()

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your PySpark version.

PySpark REPL

Launch the Python Spark REPL with the Spark Extension dependency (version β‰₯1.1.0) as follows:

pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your PySpark version.

PySpark spark-submit

Run your Python scripts that use PySpark via spark-submit:

spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4 [script.py]

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your Spark version.

PyPi package (local Spark cluster only)

You may want to install the pyspark-extension python package from PyPi into your development environment. This provides you code completion, typing and test capabilities during your development phase.

Running your Python application on a Spark cluster will still require one of the above ways to add the Scala package to the Spark environment.

pip install pyspark-extension==2.8.0.3.4

Note: Pick the right Spark version (here 3.4) depending on your PySpark version.

Your favorite Data Science notebook

There are plenty of Data Science notebooks around. To use this library, add a jar dependency to your notebook using these Maven coordinates:

uk.co.gresearch.spark:spark-extension_2.12:2.8.0-3.4

Or download the jar and place it on a filesystem where it is accessible by the notebook, and reference that jar file directly.

Check the documentation of your favorite notebook to learn how to add jars to your Spark environment.

Build

You can build this project against different versions of Spark and Scala.

Switch Spark and Scala version

If you want to build for a Spark or Scala version different to what is defined in the pom.xml file, then run

sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]

For example, switch to Spark 3.4.0 and Scala 2.13.8 by running sh set-version.sh 3.4.0 2.13.8.

Build the Scala project

Then execute mvn package to create a jar from the sources. It can be found in target/.

Testing

Run the Scala tests via mvn test.

Setup Python environment

In order to run the Python tests, setup a Python environment as follows (replace [SCALA-COMPAT-VERSION] and [SPARK-COMPAT-VERSION] with the respective values):

virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest

Run Python tests

Run the Python tests via env PYTHONPATH=python:python/test python -m pytest python/test.

Note: you first have to build the Scala sources.

Build Python package

Run the following sequence of commands in the project root directory:

mkdir -p python/pyspark/jars/
cp -v target/spark-extension_*-*.jar python/pyspark/jars/
pip install build

Then execute python -m build python/ to create a whl from the sources. It can be found in python/dist/.

More Repositories

1

consuldotnet

Consul.NET is a .NET client library for the Consul HTTP API
C#
316
star
2

armada

A multi-cluster batch queuing system for high-throughput workloads on Kubernetes.
Go
201
star
3

siembol

An open-source, real-time Security Information & Event Management tool based on big data technologies, providing a scalable, advanced security analytics framework.
Java
188
star
4

ParquetSharp

ParquetSharp is a .NET library for reading and writing Apache Parquet files.
C#
140
star
5

ahocorasick_rs

Check for multiple patterns in a single string at the same time: a fast Aho-Corasick algorithm for Python
Python
127
star
6

fasttrackml

Experiment tracking server focused on speed and scalability
Go
97
star
7

grpc_async_examples

C++
49
star
8

TypeEquality

Type equality for F#
F#
43
star
9

spark-dgraph-connector

A connector for Apache Spark and PySpark to Dgraph databases.
Scala
40
star
10

geras

Geras provides a Thanos Store API for the OpenTSDB HTTP API. This makes it possible to query OpenTSDB via PromQL, through Thanos.
Go
38
star
11

prommsd

Go
30
star
12

thanos-remote-read

Adapter to query Thanos StoreAPI with Prometheus remote read support.
Go
30
star
13

fsharp-formatting-conventions

G-Research F# code formatting guidelines
18
star
14

ParquetSharp.DataFrame

ParquetSharp.DataFrame is a .NET library for reading and writing Apache Parquet files into/from .NET DataFrames, using ParquetSharp
C#
18
star
15

Peregrine

F#
14
star
16

Tack

A DotNet tool that can be used to get filter projects and associated output assemblies from solutions
C#
12
star
17

ProjectLinter

An MSBuild project file linter to validate project file as part of build process
C#
12
star
18

DotNetDockerTest

C#
12
star
19

SolutionValidator

A tool for validating solution files and viewing project dependencies
C#
12
star
20

Bulldog

An opinionated base library for building dotnet tools
C#
12
star
21

VsTestRunner

A DotNet tool which can be used to run dotnet vstest across a set of assemblies
C#
12
star
22

fast-string-search

Python
12
star
23

NuGetPackageChecker

An MSBuild extension to check for required packages and versions
C#
12
star
24

ApiSurface

F#
11
star
25

HiddenWindow

C#
10
star
26

dgraph-dbpedia

Pre-processing DBpedia datasets to load into Dgraph
Scala
10
star
27

fsharp-analyzers

Analyzers for F#
F#
8
star
28

yunikorn-history-server

A service to store and provide historical data for K8S clusters using the Yunikorn scheduler
Go
8
star
29

charts

Repository for all of G Research-hosted helm charts
Mustache
7
star
30

opentsdb-tsuid-ratelimiter

Java
7
star
31

DotNetPerfMonitor

Monitoring performance of the .NET ecosystem (NuGet, MsBuild, C#, F#)
PowerShell
6
star
32

dgraph-lanl-csr

Project to load the "Comprehensive, Multi-Source Cyber-Security Events" dataset into a Dgraph cluster.
Scala
6
star
33

NuPerfMonitor

Monitoring performance of NuGet package manager
PowerShell
5
star
34

fasttrackml-ui-aim

Modern Aim UI built for FastTrackML
Go
5
star
35

prometheus-config-loader

Go
4
star
36

PalletJack

Parquet extension
Python
4
star
37

brand

G-Research branding assets
4
star
38

System.Net.Http.JsonExtensions

C#
2
star
39

armada-jupyter

Python
2
star
40

go-ntlm-auth

Go
2
star
41

siembol-config

A Siembol configuration repository for a Siembol quickstart demo
2
star
42

tfe-plan-bot

Terraform Enterprise/Cloud Plan Bot
Go
1
star
43

fasttrackml-ui-mlflow

Classic MLFlow UI built for FastTrackML
Go
1
star
44

astral

Ruby
1
star
45

bearcat

Python
1
star