• Stars
    star
    161
  • Rank 232,119 (Top 5 %)
  • Language
    Python
  • License
    Apache License 2.0
  • Created about 1 year ago
  • Updated about 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A Python Library to support running data quality rules while the spark job is running⚡

Spark-Expectations

CodeQL build codecov Code style: black Checked with mypy License PYPI version PYPI - Downloads PYPI - Python Version

Spark Expectations is a specialized tool designed with the primary goal of maintaining data integrity within your processing pipeline. By identifying and preventing malformed or incorrect data from reaching the target destination, it ensues that only quality data is passed through. Any erroneous records are not simply ignored but are filtered into a separate error table, allowing for detailed analysis and reporting. Additionally, Spark Expectations provides valuable statistical data on the filtered content, empowering you with insights into your data quality.


The documentation for spark-expectations can be found here

Contributors

Thanks to all the contributors who have helped ideate, develop and bring it to its current state

Contributing

We're delighted that you're interested in contributing to our project! To get started, please carefully read and follow the guidelines provided in our contributing document

What is Spark Expectations?

Spark Expectations is a Data quality framework built in PySpark as a solution for the following problem statements:

  1. The existing data quality tools validates the data in a table at rest and provides the success and error metrics. Users need to manually check the metrics to identify the error records
  2. The error data is not quarantined to an error table or there are no corrective actions taken to send only the valid data to downstream
  3. Users further downstream must consume the same data incorrectly, or they must perform additional calculations to eliminate records that don't comply with the data quality rules.
  4. Another process is required as a corrective action to rectify the errors in the data and lot of planning is usually required for this activity

Spark Expectations solves these issues using the following principles:

  1. All the records which fail one or more data quality rules, are by default quarantined in an _error table along with the metadata on rules that failed, job information etc. This makes it easier for analysts or product teams to view the incorrect data and collaborate with the teams responsible for correcting and reprocessing it.
  2. Aggregated metrics are provided for the raw data and the cleansed data for each run along with the required metadata to prevent recalculation or computation.
  3. The data that doesn't meet the data quality contract or the standards is not moved to the next level or iterations unless or otherwise specified.

Features Of Spark Expectations

Please find the spark-expectations flow and feature diagrams below

Spark - Expectations Setup

Configurations

In order to establish the global configuration parameter for DQ Spark Expectations, you must define and complete the required fields within a variable. This involves creating a variable and ensuring that all the necessary information is provided in the appropriate fields.

from spark_expectations.config.user_config import Constants as user_config

se_user_conf = {
    user_config.se_notifications_enable_email: False,
    user_config.se_notifications_email_smtp_host: "mailhost.nike.com",
    user_config.se_notifications_email_smtp_port: 25,
    user_config.se_notifications_email_from: "<sender_email_id>",
    user_config.se_notifications_email_to_other_nike_mail_id: "<receiver_email_id's>",
    user_config.se_notifications_email_subject: "spark expectations - data quality - notifications", 
    user_config.se_notifications_enable_slack: True,
    user_config.se_notifications_slack_webhook_url: "<slack-webhook-url>", 
    user_config.se_notifications_on_start: True, 
    user_config.se_notifications_on_completion: True,
    user_config.se_notifications_on_fail: True,
    user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, 
    user_config.se_notifications_on_error_drop_threshold: 15,
    #Optional
    #Below two params are optional and need to be enabled to capture the detailed stats in the <stats_table_name>_detailed.
    #user_config.enable_query_dq_detailed_result: True,
    #user_config.enable_agg_dq_detailed_result: True,
    
}

Spark Expectations Initialization

For all the below examples the below import and SparkExpectations class instantiation is mandatory

  1. Instantiate SparkExpectations class which has all the required functions for running data quality rules
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
from pyspark.sql import SparkSession

spark: SparkSession = SparkSession.builder.getOrCreate()
writer = WrappedDataFrameWriter().mode("append").format("delta")
# writer = WrappedDataFrameWriter().mode("append").format("iceberg")
# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(
    product_id="your_product",
    rules_df=spark.table("dq_spark_local.dq_rules"),
    stats_table="dq_spark_local.dq_stats",
    stats_table_writer=writer,
    target_and_error_table_writer=writer,
    debugger=False,
    # stats_streaming_options={user_config.se_enable_streaming: False},
)
  1. Decorate the function with @se.with_expectations decorator
from spark_expectations.config.user_config import *
from pyspark.sql import DataFrame
import os


@se.with_expectations(
    target_table="dq_spark_local.customer_order",
    write_to_table=True,
    user_conf=se_user_conf,
    target_table_view="order",
)
def build_new() -> DataFrame:
    # Return the dataframe on which Spark-Expectations needs to be run
    _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    _df_order.createOrReplaceTempView("order")

    return _df_order 

More Repositories

1

Willow

Willow is a powerful, yet lightweight logging library written in Swift.
Swift
1,334
star
2

gimme-aws-creds

A CLI that utilizes Okta IdP via SAML to acquire temporary AWS credentials
Python
902
star
3

Elevate

Elevate is a JSON parsing framework that leverages Swift to make parsing simple, reliable and composable.
Swift
612
star
4

koheesio

Python framework for building efficient data pipelines. It promotes modularity and collaboration, enabling the creation of complex pipelines from simple, reusable components.
Python
570
star
5

burnside

Fast and Reliable E2E Web Testing with only Javascript
JavaScript
382
star
6

wingtips

Wingtips is a distributed tracing solution for Java based on the Google Dapper paper.
Java
326
star
7

hal

hal provides an AWS Lambda Custom Runtime environment for your Haskell applications.
Haskell
235
star
8

brickflow

Pythonic Programming Framework to orchestrate jobs in Databricks Workflow
Python
176
star
9

SQift

Powerful Swift wrapper for SQLite
Swift
141
star
10

riposte

Riposte is a Netty-based microservice framework for rapid development of production-ready HTTP APIs.
Java
122
star
11

timeseries-generator

A library to generate synthetic time series data by easy-to-use factors and generator
Python
122
star
12

bartlett

A simple Jenkins command line client to serve your needs.
Haskell
81
star
13

cerberus-doc-site

Secure Property Store for Cloud Applications
CSS
81
star
14

aws-greengrass-core-sdk-rust

Provides an idiomatic Rust wrapper around the AWS Greengrass Core C SDK to more easily enable Greengrass native lambda functions in Rust.
Rust
71
star
15

cerberus

The Cerberus micro-service, a secure property store for cloud applications. It includes a REST API, authentication and encryption features, as well as a self-service web UI for users.
Java
62
star
16

referee

Referee is a UI for using Spinnaker Kayenta as a standalone service.
TypeScript
59
star
17

moirai

Libraries that can be used to determine if a feature should be exposed to a user.
Java
52
star
18

riposte-microservice-template

An example template for quickly creating a new Riposte microservice project.
Java
51
star
19

harbormaster

Harbormaster is a webhook handler for the Kubernetes API.
Go
42
star
20

fastbreak

Fastbreak is a simple Java 8 native circuit breaker supporting async future, blocking, and callback/manual modes.
Java
40
star
21

signal_analog

A troposphere-inspired library for programmatic, declarative definition and management of SignalFx Charts, Dashboards, and Detectors.
Python
39
star
22

backstopper

Backstopper is a framework-agnostic API error handling and (optional) model validation solution for Java 7 and up.
Java
38
star
23

react-virtualized-item-grid

React component for efficiently rendering a large, scrollable list of items in a series of wrapping rows
JavaScript
38
star
24

knockoff-factory

A library for generating fake data and populating database tables.
Python
34
star
25

pterradactyl

Pterradactyl is a library developed to abstract Terraform configuration from the Terraform environment setup.
Python
32
star
26

lambda-logger-node

A middleware logger that implements the MDC logging pattern for use in AWS NodeJS Lambdas.
TypeScript
29
star
27

lambda-router

JavaScript
23
star
28

bokor

Bokor is a simple, Record and Playback Mock Server written in Node.js, utilized for Service Virtualization.
JavaScript
23
star
29

piggyback

This tool allows you to tunnel SSH (using ProxyCommand) via HTTPS (with Squid Proxy). It is a python implementation of corkscrew, but over https (TLS) instead of http (plaintext).
Python
17
star
30

cerberus-node-client

Node client for interacting with a Cerberus backend. It can be used in Amazon EC2 instances and Amazon Lambdas.
JavaScript
16
star
31

cerberus-java-client

Java Client for Cerberus
Java
14
star
32

cerberus-lifecycle-cli

Command Line Interface for managing a Cerberus environment in AWS
Java
14
star
33

cerberus-python-client

Python Client for Cerberus
Python
13
star
34

cerberus-management-dashboard

A single page react app that is the self service web UI for administration of Safe Deposit Boxes, access control, and data.
HTML
13
star
35

tdd-training-cube

Papercraft cube used as training aid for Outside-In Test Driven Development
11
star
36

cerberus-go-client

A Golang client for interacting with Cerberus, a secure property store for cloud applications.
Go
11
star
37

cerberus-serverless-components

A collection of AWS Serverless components for Cerberus
Java
11
star
38

gradle-localstack

Gradle plugin for working with mock AWS endpoints using LocalStack.
Java
11
star
39

aws-thin-dynamo-node

A small, fast re-implementation of the AWS Dynamo DocumentClient
JavaScript
10
star
40

cerberus-archaius-client

An Archaius property provider implementation backed by Cerberus.
Java
9
star
41

epc-standards

Implementation of decoding GS1 EPC tags
Java
9
star
42

lambda-zipper

Zip up your node lambda code and production dependencies without pruning node_modules
JavaScript
9
star
43

java-vault-client

This is a java based Vault client library for communicating with Vault via HTTP.
Java
8
star
44

cerberus-cli

A CLI for the Cerberus API.
Go
8
star
45

cerberus-integration-tests

Groovy
8
star
46

cerberus-gateway-puppet-module

Puppet Module for installing Nginx and config downloader scripts
Python
8
star
47

cerberus-consul-puppet-module

A Puppet module for installing Hashicorp's Consul as a service with customized start up scripts for Cerberus.
HTML
7
star
48

Fleam

Scala
6
star
49

bluegreen-manager

Java
6
star
50

aws-thin-s3-node

A super-thin AWS S3 client
JavaScript
5
star
51

homebrew-nike

Homebrew formulas provided by Nike, Inc.
Ruby
5
star
52

sagerender

A library for configuring SageMaker pipelines using hierarchical configuration pattern.
Python
5
star
53

dynamo-arc

TypeScript
5
star
54

metrics-new-relic-insights

Reporter to send Dropwizard Metrics to New Relic Insights.
Java
5
star
55

cerberus-vault-puppet-module

A Puppet module for installing Hashicorp's Vault as a service with customized start up scripts for Cerberus.
HTML
5
star
56

cerberus-spring-boot-client

Spring Boot client for interacting with a Cerberus backend.
Java
4
star
57

dabber

Dabber is a Node CLI tool and AWS Lambda that helps you work with Dynamo.
JavaScript
3
star
58

actions-cerberus-secrets

Read secrets from Cerberus and make it as environment variables in GitHub Actions job so that it can be used in CICD process.
TypeScript
3
star
59

nike-inc.github.io

HTML
3
star
60

aws-thin-ses-node

A super-thin AWS Simple Email Service client
JavaScript
3
star
61

phiera

Python
2
star
62

cerberus-ruby-client

Ruby Client for Cerberus
Ruby
2
star
63

aws-scale

AWS Scaling Made Simple
JavaScript
2
star
64

gimme-a-cli

Gimme a CLI is a Java library for creating quick and easy command line interfaces (CLIs) using JCommander and Spring dependency injection.
Java
2
star
65

Fawcett

A collection of Monocle lenses for navigating Amazon's API models.
Scala
1
star
66

dynamo-butter

JavaScript
1
star
67

redwiggler

The composting worm. Composts your contract specification and tests and confirms that the contract specification is being followed.
Scala
1
star
68

gradle-localdynamodb-plugin

XSLT
1
star
69

gimme-a-cli-starter-project

Clone and modify this project to quickly create your own CLI based on the Gimme a CLI library.
Java
1
star