• Stars
    star
    222
  • Rank 178,564 (Top 4 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created about 9 years ago
  • Updated over 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Spark package for checking data quality

Drunken Data Quality (DDQ) Logo

Join the chat at https://gitter.im/FRosner/drunken-data-quality Build Status Codacy Badge codecov.io Known Vulnerabilities

Description

DDQ is a small library for checking constraints on Spark data structures. It can be used to assure a certain data quality, especially when continuous imports happen.

Getting DDQ

Spark Package

DDQ is available as a spark package. You can add it to your spark-shell, spark-submit or pyspark using the --packages command line option:

spark-shell --packages FRosner:drunken-data-quality:4.1.1-s_2.11

Python API

DDQ also comes with a Python API. It is available via the Python Package Index, so you have to install it once using pip:

pip install pyddq==4.1.1

Project Dependency Latest Release

In order to use DDQ in your project, you can add it as a library dependency. This can be done through the SBT spark package plugin, or you can add it using JitPack.io.

Keep in mind that you might need to add additional resolvers as DDQ has some external dependencies starting from version 4.1.0:

resolvers += "lightshed-maven" at "http://dl.bintray.com/content/lightshed/maven"

If neither of the above-mentioned ways work for you, feel free to download one of the compiled artifacts in the release section. Alternatively you may of course also build from source.

Using DDQ

Getting Started

Create two example tables to play with (or use your existing ones).

case class Customer(id: Int, name: String)
case class Contract(id: Int, customerId: Int, duration: Int)

val customers = spark.createDataFrame(List(
  Customer(0, "Frank"),
  Customer(1, "Alex"),
  Customer(2, "Slavo")
))

val contracts = spark.createDataFrame(List(
  Contract(0, 0, 5),
  Contract(1, 0, 10),
  Contract(0, 1, 6)
))

Run some checks and see the results on the console.

import de.frosner.ddq.core._

Check(customers)
  .hasNumRows(_ >= 3)
  .hasUniqueKey("id")
  .run()

Check(contracts)
  .hasNumRows(_ > 0)
  .hasUniqueKey("id", "customerId")
  .satisfies("duration > 0")
  .hasForeignKey(customers, "customerId" -> "id")
  .run()

Custom Reporters

By default the check result will be printed to stdout using ANSI escape codes to highlight the output. To have a report in another format, you can specify one or more custom reporters.

import de.frosner.ddq.reporters.MarkdownReporter

Check(customers)
  .hasNumRows(_ >= 3)
  .hasUniqueKey("id")
  .run(MarkdownReporter(System.err))

Running multiple checks

You can use a runner to generate reports for multiple checks at once. It will execute all the checks and report the results to the specified reporters.

import de.frosner.ddq.reporters.ConsoleReporter
import java.io.{PrintStream, File}

val check1 = Check(customers)
  .hasNumRows(_ >= 3)
  .hasUniqueKey("id")

val check2 = Check(contracts)
  .hasNumRows(_ > 0)
  .hasUniqueKey("id", "customerId")
  .satisfies("duration > 0")
  .hasForeignKey(customers, "customerId" -> "id")

val consoleReporter = new ConsoleReporter(System.out)
val markdownMd = new PrintStream(new File("report.md"))
val markdownReporter = new MarkdownReporter(markdownMd)

Runner.run(Seq(check1, check2), Seq(consoleReporter, markdownReporter))

markdownMd.close()

Unit Tests

You can also use DDQ to write automated quality tests for your data. After running a check or a series of checks, you can inspect the results programmatically.

def allConstraintsSatisfied(checkResult: CheckResult): Boolean =
  checkResult.constraintResults.map {
    case (constraint, ConstraintSuccess(_)) => true
    case (constraint, ConstraintFailure(_)) => false
  }.reduce(_ && _)

val results = Runner.run(Seq(check1, check2), Seq.empty)
assert(allConstraintsSatisfied(results(check1)))
assert(allConstraintsSatisfied(results(check2)))

If you want to fail the data load if the number of rows and the unique key constraints are not satisfied, but the duration constraint can be violated, you can write individual assertions for each constraint result.

val numRowsConstraint = Check.hasNumRows(_ >= 3)
val uniqueKeyConstraint = Check.hasUniqueKey("id", "customerId")
val durationConstraint = Check.satisfies("duration > 0")

val check = Check(contracts)
  .addConstraint(numRowsConstraint)
  .addConstraint(uniqueKeyConstraint)
  .addConstraint(durationConstraint)

val results = Runner.run(Seq(check), Seq.empty)
val constraintResults = results(check).constraintResults
assert(constraintResults(numRowsConstraint).isInstanceOf[ConstraintSuccess])
assert(constraintResults(uniqueKeyConstraint).isInstanceOf[ConstraintSuccess])

Python API

In order to use the Python API, you have to start PySpark with the DDQ jar added. Unfortunately, using the --packages way is not working in Spark < 2.0.

pyspark --driver-class-path drunken-data-quality_2.11-x.y.z.jar

Then you can create a dummy dataframe and run a few checks.

from pyddq.core import Check

df = spark.createDataFrame([(1, "a"), (1, None), (3, "c")])
check = Check(df)
check.hasUniqueKey("_1", "_2").isNeverNull("_1").run()

Just as the Scala version of DDQ, PyDDQ supports multiple reporters. In order to facilitate them, you can use pyddq.streams, which wraps the Java streams.

from pyddq.reporters import MarkdownReporter, ConsoleReporter
from pyddq.streams import FileOutputStream, ByteArrayOutputStream
import sys

# send the report in a console-friendly format the standard output
# and in markdown format to the bytearray
stdout_stream = FileOutputStream(sys.stdout)
bytearray_stream = ByteArrayOutputStream()

Check(df)\
    .hasUniqueKey("_1", "_2")\
    .isNeverNull("_1")\
    .run([MarkdownReporter(bytearray_stream), ConsoleReporter(stdout_stream)])

# print markdown report
print bytearray_stream.get_output()

Spark Version Compatibility

Although we try to maintain as much compatibility between all available Spark versions we cannot guarantee that everything works smoothly for every possible combination of DDQ and Spark versions. The following matrix shows you what version of DDQ is built and tested against what version of Spark.

DDQ Version Spark Version
5.x 2.2.x
4.x 2.0.x
3.x 1.6.x
2.x 1.3.x
1.x 1.3.x

Documentation

For a comprehensive list of available constraints, please refer to the Wiki.

Authors

Thanks also to everyone who submitted pull requests, bug reports and feature requests.

License

This project is licensed under the Apache License Version 2.0. For details please see the file called LICENSE.

More Repositories

1

FrOS

My 32 bit x86 OS journey.
C
95
star
2

cluster-broccoli

Self service for Nomad based on templates.
Scala
60
star
3

spawncamping-dds

Data-Driven Spark allows quick data exploration based on Apache Spark.
Scala
28
star
4

sns-sqs-test

Showcase on synchronous and asynchronous event processing using AWS SNS, AWS SQS and AWS Lambda.
HCL
12
star
5

DataGenerator

Tool for generating various test data for machine learning and data mining algorithms.
Java
10
star
6

nomad-docker-wrapper

Small wrapper to enable running arbitrary docker run commands in nomad.
Shell
9
star
7

FrSrv

Simple kqueue based TCP server
Go
9
star
8

aws-iot-test

HCL
7
star
9

aws_travis

HCL
6
star
10

docker-zeppelin

Docker image for starting Apache Zeppelin
Shell
6
star
11

aws-lambda-monitoring-alerting-example

HCL
6
star
12

vertx-kotlin-resilience-example

Example code demonstrating the retry, fallback, and circuit breaker pattern in Vert.x with Kotlin
Kotlin
5
star
13

lambda-vs-beanstalk

HCL
4
star
14

outpourings

Just some blog posts and talks.
JavaScript
4
star
15

mustached-hive-udfs

Some useful Hive UDFs and UDAFs
Java
4
star
16

hausmeister

Slack app to automatically archive Slack channels
JavaScript
3
star
17

struct-udf

Some quick 'n' dirty hack to make UDFs work with struct input (https://issues.apache.org/jira/browse/SPARK-12823)
Scala
2
star
18

http-api-tester

Framework for testing HTTP APIs
Shell
2
star
19

handwritten

Toy project for hand written digit recognition.
Scala
2
star
20

scala-gatling-intro

Scala
2
star
21

repl-helper

Small library allowing to provide annotation based help text for functions in the Scala REPL
Scala
2
star
22

stift-und-papier

Want to join for a game, fam?
TypeScript
2
star
23

aws_rds_test

HCL
1
star
24

cloaked-gsdmm

Gibbs Sampler for Dirichlet-Multinomial Model
Scala
1
star
25

project-pia

Clojure
1
star
26

BeeHive

Bee population simulation.
Java
1
star
27

etcd-playground

Java
1
star
28

pymltk

Python Machine Learning Toolkit
Python
1
star
29

cql-trace-viewer

Python
1
star
30

ddq-demo-elk

Showcase on how to use DDQ with ELK.
1
star
31

animated-ngram-hipster

Scala
1
star
32

spark-mtf-cube

Spark data source for the MtF cube binary format.
Scala
1
star
33

docker-h2o

H2O in Docker
Shell
1
star
34

multi-aws-lambda-nodejs-example

How to manage multiple node.js AWS Lambda functions with yarn, gulp, and Terraform.
JavaScript
1
star
35

cherry-edit

Coding exercise implementing a collaborative text editor based on replicated growable arrays.
Scala
1
star