• Stars
    star
    28
  • Rank 855,413 (Top 18 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 6 years ago
  • Updated about 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Spark package to "plug" holes in data using SQL based rules ⚑️ πŸ”Œ

Sparkplug

Spark package to "plug" holes in data using SQL based rules.

Build Status Maven

Motivation

At Indix, we work with a lot of data. Our data pipelines run a wide variety of ML models against our data. There are cases where we have to "plug" or override certain values or predictions in our data. This maybe due to bugs or deficiencies in our current models or just the inherent quality in the source/raw data.

SparkPlug is a rule based override system that helps us to do fixes in our data. The rules also act as central place of "debt" that we need to pay by doing improvements to our aglorithms and models.

Design

We came up with a system that enables engineering, customer success and product management to make the necessary fixes in the data. Using rules based on SQL conditions (WHERE clause predicate), we provided a way to override and fix values / predictions in our data. Each rule has the condition along with the fields that are to be overridden with their respective override values.

SparkPlug is the core of this system. We also have an internal GUI based tool that utilizes our internal data pipeline platform to sample data, apply the rules and provide detailed view into how each rule affects the data.

SparkPlug leverages Spark-SQL to do much of its work. SparkPlug is designed to run within our internal data pipeline platform as well as standalone Spark jobs.

Getting Started

Let's first talk about how rules that SparkPlug works with look like.

Rules

An example rule is given below in json:

{
  "name": "rule1",
  "version": "version1",
  "condition": "title like '%iPhone%'",
  "actions": [
    {
      "key": "title",
      "value": "Apple iPhone"
    }
  ]
}

Each rule identifies itself with a name. The current version of the rule can be identified using version. The SQL predicate in condition is used to identify the applicable rows in the data. On the selected rows, the actions - specified via the column name in key and its overridden value - are applied to plug the data. The value is currently always specified as a string, and is internally validated and convereted to the appropriate type.

Rules can be fed into Sparkplug as a normal jsonlines dataset that Spark can work with.

Sparkplug comes with an helper to deserialize json rules into a collection of PlugRule objects, which is shown below:

// example of creating a Spark session
implicit val spark: SparkSession = SparkSession.builder
    .config(new SparkConf())
    .enableHiveSupport()
    .master("local[*]")
    .getOrCreate()
    
val rules = spark.readPlugRulesFrom(path)

The rules can now be fed into SparkPlug.

Creating SparkPlug instance

SparkPlug comes with a builder that helps you instanstiate SparkPlug object with the right settings. The simple way to create one is as follows:

val sparkPlug = SparkPlug.builder.create()

Once we have the instance, we can "plug" a DatFrame with the rules:

sparkPlug.plug(df, rules)

Rules validation

SparkPlug.validate method can be used to validate the input rules for a given schema.

sparkPlug.validate(df.schema, rules) // Returns a list of validation errors if any.

The SparkPlug object can also be created with validation enabled so that rules are validated before plugging:

val sparkPlug = SparkPlug.builder.enableRulesValidation.create()

Plug details

To track what changes are being made (or not) to each record, it is possible to add PlugDetails to every record with information on which rules were applied to it. This is disabled by default and can be enabled as follows:

val sparkPlug = SparkPlug.builder.enablePlugDetails.create()

This adds a plugDetails column of type Seq[PlugDetail] to the DataFrame. PlugDetail is a simple case class as defined below:

case class PlugDetail(name: String, version: String, fieldNames: Seq[String])

Custom plug details column

By default, plug details are added to the column plugDetails. This can be overridden to a different column, say overrideDetails as follows:

val sparkPlug = SparkPlug.builder.enablePlugDetails("overrideDetails").create()

Custom plug details schema / class

By default, plug details is of type Seq[PlugDetail]. It is possible to provide a custom type by supplying a UDF to SparkPlug which defines how the plug details information are to be populated into the custom type.

The following example shows how one can go about adding plug details of type Seq[OverrideDetail]:

case class OverrideDetail(ruleId: Option[String],
                          fieldNames: Seq[String],
                          ruleVersion: Option[String])

case class TestRowWithOverrideDetails(title: String,
                                      brand: String,
                                      price: Int,
                                      overrideDetails: Seq[OverrideDetail])

class CustomAddPlugDetailUDF extends AddPlugDetailUDF[OverrideDetail] {
  override def addPlugDetails(plugDetails: Seq[Row],
                              ruleName: String,
                              ruleVersion: String,
                              fields: Seq[String]) = {
    plugDetails :+ new GenericRowWithSchema(
      Array(ruleName, fields, ruleVersion),
      plugDetailSchema)
  }
}

As seen in the example above, the custom UDF inherits from AddPlugDetailUDF[T] and implements the addPlugDetails method as needed.

Working with structs

It is possible to override values within a StructType.

{
  "name": "rule1",
  "version": "version1",
  "condition": "title like '%iPhone%'",
  "actions": [
    {
      "key": "price.min",
      "value": "100.0"
    },
    {
      "key": "price.max",
      "value": "1000.0"
    }
  ]
}

Currently SparkPlug supports only one level within structs.

SQL in values

Values can be literal values like "iPhone", "100" or "999.9" etc. SparkPlug also allow SQL within values so that overrides can use the power of SQL and most importantly depend of values of other fields. Values enclosed within ` (backtick) are treated as SQL:

{
  "name": "rule1",
  "version": "version1",
  "condition": "true",
  "actions": [
    {
      "key": "title",
      "value": "`concat(brand, ' ', title)`"
    }
  ]
}

The above rule appends the brand to the title

Keeping track of old value

If the old value of a overridden field needs to be tracked, SparkPlug can be built with the keepOldField option set:

val sparkPlug = SparkPlug.builder.keepOldField.create()

This will add, for each action, a new column named ${actionKey}_${ruleName}_old.

Note: This feature is ideal only when SparkPlug is used with a single rule. This will add one column per action per rule and is not recommended for the production job. We use this feature internally when adding a rule so that we can see how each rule affects the dataset.

More Repositories

1

whatthelang

Lightning Fast Language Prediction πŸš€
Python
161
star
2

aws-maintenance-lambda

A lambda function to send alerts (to Slack, HipChat) on AWS maintenance events.
JavaScript
133
star
3

schemer

Schema registry for CSV, TSV, JSON, AVRO and Parquet schema. Supports schema inference and GraphQL API.
Scala
112
star
4

web-auto-extractor

Automatically extracts structured information from webpages
JavaScript
108
star
5

matsya

Place ASGs on the right Spot Market
Scala
39
star
6

gocd-s3-artifacts

Set of GoCD plugins to publish and fetch artifacts from Amazon S3
Java
36
star
7

formland

A simple, super-flexible, extensible config based form generator for React.
TypeScript
33
star
8

mlflow-gocd

GoCD plugins to work with MLFlow as model repository in a CD flow
Java
29
star
9

css-optimum-selector

Helps to extract shortest optimal css-selector and multi-selector.
CSS
26
star
10

gocd-mesos

Autoscale GOCD agents on top of a mesos cluster
Scala
16
star
11

kafkajs-lz4

πŸ—œ LZ4 compression codec for KafkaJS
TypeScript
14
star
12

javascript-easy-object

Now easily access or modify an object in javascript with javascript-easy-object.
JavaScript
13
star
13

vasuki

Scale GoCD Agents on demand with Docker
Go
13
star
14

rocks

RocksDB Ops CLI
Go
11
star
15

indix-radar

Indix Tech Radar
JavaScript
10
star
16

vamana

Autoscaling toolkit based on custom Application Metrics
Scala
9
star
17

terraform-aws-maintenance-lambda

Terraform module to deploy aws-maintenance-lambda - A lambda function to send alerts (to Slack, HipChat) on AWS maintenance events.
HCL
7
star
18

indix.github.io

Indix Open Source website
HTML
6
star
19

utils

Scala utils for anything and everything
Scala
5
star
20

indix-api-java

Indix API Java client
Java
4
star
21

indix-api-nodejs

Indix API NodeJS Client
JavaScript
4
star
22

bubblewrap

Asynchronous crawler utils
HTML
3
star
23

ml2npy

Export spark ml SparseVectors as numpy csr matrix
Scala
3
star
24

marathon-logger

Push marathon app logs to local syslog daemon
Go
2
star
25

crawler4j

crawler4j fork from Google code
Java
2
star
26

the-vision

Reusable react components
HTML
2
star
27

auto-tag-s3-bucket

Automatically tag S3 buckets with tags from a Google Spreadsheet
Python
2
star
28

indix-api-ruby

Ruby client for indix API
Ruby
2
star
29

openvpn-ops

This repo can be used to create a openvpn server.
Shell
1
star
30

abelwatch

Alerting tool on top of Abel
Go
1
star
31

hadoop-as-publisher

Hadoop Autoscaling Metric Publisher
Python
1
star
32

ansible-ruby

Ansible role to install rvm and ruby
1
star
33

mod_evasive

git mirror of mod_evasive apache module from http://www.zdziarski.com/blog/wp-content/uploads/2010/02/mod_evasive_1.10.1.tar.gz
C
1
star
34

rocksdb-io

hadoop formats, cascading tap and scalding sources for RocksDB
Scala
1
star
35

ansible-telegraf

Ansible role to install telegraf
1
star
36

abel

Business stats collection/aggregation
Scala
1
star
37

ansible-monit

Ansible role for monit
1
star
38

Mobile-Product-Search

This app is a representation of how Indix API can be used to leverage information on stores, brands, products which forms the skeleton of Retail Industry.
Objective-C
1
star