• This repository has been archived on 06/Jan/2024
  • Stars
    star
    473
  • Rank 89,020 (Top 2 %)
  • Language
    Java
  • License
    Other
  • Created over 6 years ago
  • Updated about 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Generic Data Ingestion & Dispersal Library for Hadoop

Marmaray

Note: For an End to End example of how all our components tie together, please see com.uber.marmaray.common.job.JsonHoodieIngestionJob

Marmaray is a generic Hadoop data ingestion and dispersal framework and library. It is a plug-in based framework built on top of the Hadoop ecosystem where support can be added to ingest data from any source and disperse to any sink leveraging the power of Apache Spark.

Marmaray describes a number of abstractions to support the ingestion of any source to any sink. They are described at a high-level below to help developers understand the architecture and design of the overall system.

This system has been canonically used to ingest data into a Hadoop data lake and disperse data from a data lake to online data stores usually with lower latency semantics. The framework was intentionally designed, however, to not be tightly coupled to just this particular use case and can move data from any source to any sink.

End-to-End Job Flow

The figure below illustrates a high level flow of how Marmaray jobs are orchestrated, independent of the specific source or sink.

During this process, a configuration defining specific attributes for each source and sink orchestrates every step of the next job. This includes figuring out the amount of data we need to process (i.e., its Work Unit), applying forking functions to split the raw data, for example, into ‘valid’ and ‘error’ records and converting the data to an appropriate sink format. At the end of the job the metadata will be saved/updated in the metadata manager, and metrics can be reported to track progress.

The following sections give an overview of each of the major components that enable the job flow previously illustrated.

High-Level Architecture

The architecture diagram below illustrates the fundamental building blocks and abstractions in Marmaray that enable its overall job flow. These generic components facilitate the ability to add extensions to Marmaray, letting it support new sources and sinks.

Avro Payload

The central component of Marmaray’s architecture is what we call the AvroPayload, a wrapper around Avro’s GenericRecord binary encoding format which includes relevant metadata for our data processing needs.

One of the major benefits of Avro data (GenericRecord) is that it is efficient both in its memory and network usage, as the binary encoded data can be sent over the wire with minimal schema overhead compared to JSON. Using Avro data running on top of Spark’s architecture means we can also take advantage of Spark’s data compression and encryption features. These benefits help our Spark jobs more efficiently handle data at a large scale.

To support our any-source to any-sink architecture, we require that all ingestion sources define converters from their schema format to Avro and that all dispersal sinks define converters from the Avro Schema to the native sink data model (i.e., ByteBuffers for Cassandra).

Requiring that all converters either convert data to or from an AvroPayload format allows a loose and intentional coupling in our data model. Once a source and its associated transformation have been defined, the source theoretically can be dispersed to any supported sink, since all sinks are source-agnostic and only care that the data is in the intermediate AvroPayload format.

This is illustrated in the figure below:

Data Model

The central component of our architecture is the introduction of the concept of what we termed the AvroPayload. AvroPayload acts as a wrapper around Avro’s GenericRecord binary encoding format along with relevant metadata for our data processing needs. One of the major benefits of Avro data (GenericRecord) is that once an Avro schema is registered with Spark, data is only sent during internode network transfers and disk writes which are then highly optimized. Using Avro data running on top of Spark’s architecture means we can also take advantage of Spark’s data compression and encryption features. These benefits factor heavily in helping our Spark jobs handle data at large scale more efficiently. Avro includes a schema to specify the structure of the data being encoded while also supporting schema evolution. For large data files, we take advantage that each record is encoded with the same schema and this schema only needs to be defined once in the file which reduces overhead. To support our any-source to any-sink architecture, we require that all ingestion sources define converters from their schema format to Avro and that all dispersal sinks define converters from the Avro Schema to the native sink data model (i.e ByteBuffers for Cassandra).

This allows an loose and intentional coupling in our data model, where once a source and its associated transformation has been defined, it theoretically can now be dispersed to any supported sink since all sinks are source agnostic and only care that the data is in the intermediate AvroPayload format.

Data Converters

The primary function of ingestion and dispersal jobs are to perform transformations on input records from the source to ensure it is in the desired format before writing the data to the destination sink. Marmaray allows jobs to chain converters together to perform multiple transformations as needed with the potential to also write to multiple sinks.

A secondary but critical function of DataConverters is to produce error records with every transformation. Before data is ingested into our Hadoop data lake, it is critical that all data conforms to a schema for analytical purposes and any data that is malformed, missing required fields, or otherwise deemed to have issues will be filtered out and written to error tables. This ensures a high level of data quality in our Hadoop data lake. This functionality is abstracted out by only exposing a “convert()” method to user. The convert() will act on a single piece of datum from the input schema format and do one of the following: Return an output record in the desired output schema format Write the input record to the error table with an error message and other useful metadata or discard the record.

Using the Kafka -> Hudi (Hive) ingestion case, we use 2 converters:

KafkaSourceDataConverter

  • Converts Kafka messages (byte[]) to GenericRecord (wrapped in AvroPayload as described earlier). This record is then sent to our data lake for ingestion HoodieSinkDataConverter
  • Converts GenericRecord (wrapped in an AvroPayload) received from the data lake into a HoodieRecord which is needed for insertion into our Hoodie storage System

Error Tables

Error Tables are written to by DataConverters as described in a previous section. The main purpose of error tables was to enable easy debugging of jobs and reject records which do not have a backward compatible schema change Since some of this error data can have potentially sensitive user information, we control access to this error table on a “owner”+”table” level. In addition, once the owners have fixed the data and ensured it is schema conforming they can push the data back into the pipeline where it can now be successfully ingested.

WorkUnit Calculator

Marmaray moves data in mini-batches of configurable size. In order to calculate the amount of data to process, we introduced the concept of a WorkUnitCalculator. At a very high level, a work unit calculator will look at the type of input source, the previously stored checkpoint, and calculate the next work unit or batch of work. An example of a work unit would be Offset Ranges for Kafka or a collection of HDFS files for Hive/HDFS source.

When calculating the next batch of data to process, a work unit can also take into account throttling information. Examples include the maximum amount of data to read or number of messages to read from Kafka. This is configurable per use case and gives maximum flexibility to ensure that work units are appropriately sized especially as the amount of data increases in scale and doesn’t overwhelm source or sink systems

Each WorkUnitCalculator will return a IWorkCalculatorResult which will include the list of work units to process in the current batch as well as the new checkpoint state if the job succeeds in processing the input batch. We have also added functionality to calculate the cost of the execution of each work unit for chargeback purposes. This is very useful because now users can define various methods to compute cost using number of records, size of total records, spark executor’s effective execution time etc. As we allow multiple ingestions in a single run (i.e. multiple kafka topics can be ingested in single spark job run using separate topic specific dags.) having per topic level execution time helps in differentiating execution cost between topics.

Metadata Manager

All Marmaray jobs need a persistent store, known as the metadata manager, to store job level metadata information. A job can update its state during its execution and job will replace old saved state only if current execution of the job is successful. Otherwise, any modifications to the state are rejected. We use this for storing checkpoint information (partition offsets in case of kafka), average record size, average number of messages etc. The metadata store is designed to be generic, however, and can store any relevant metrics that is useful to track, describe, or collect status on jobs depending on the use case and user needs.

When a job begins execution, an in memory copy of the current metadata is created and shared with the appropriate job components which will need to update the in-memory copy during job execution. If the job fails, this in memory copy will be discarded to ensure that the next run will start from the previously saved state of the last successful run. If the job succeeds the in-memory copy is now saved to the persistent store. As of now since the metadata manager has an in-memory copy there is a limitation on the amount of metadata a job can store

Fork Operator

The main purpose for the ForkOperator is to split the input stream of records into multiple output streams. The canonical use case for this is to have an input stream each for valid and error records which then can be appropriately handled in an separate and independent manner.

The internal execution engine of Spark performs all operations in a manner of lazy-evaluation. Unless an action is performed (count, forEach, etc), no data is actually read. The ForkOperator was invented to avoid the re-execution of input transformations as well as the re-reading of data from the source which would have been very expensive.

A provided ForkFunction is used by the ForkOperator to tag each datum with a valid or error annotation. These ForkOperators are called by our data converters during job execution. Users can now filter to get the desired collection of tagged records. These records are persisted in Spark to avoid having to re-read the raw input and re-apply the transformation when filtering. By default we currently use DISK_ONLY persistence to avoid memory overhead and pressure. These components are used in DataConverters to split input stream into 2 streams (output + error) but it can be used for splitting it into more than 2 streams with overlapping records if desired. For example, we could decide to split an input stream of integers (1 to 6) into an even number stream (2,4,6), odd number stream (1,3,5) and a multiple of 3 stream (3,6).

JobDag

The JobDag component orchestrates and performs the actual execution of the Job. It does the following:

  • Initialize the MetadataManager so checkpoint information can be retrieved
  • Reads input records from the ISource to create the RDD
  • Hands over the RDD to the ISink to write the data to the destination storage system
  • Persists updated metadata and checkpoint information to the MetadataManager if the JobDag execution succeeded
  • Report status of the job and other metrics

JobManager

The JobManager is responsible for running multiple JobDags. For example, a JobDag can correspond to each topic in Kafka that is ingested, and N number of JobDags can be run by the we do instead is group together multiple JobDags as a single logical job all sharing the same SparkContext (and resources). The JobManager will be responsible for managing each one of these JobDags and can be configured to run a certain number in parallel which results in much better resource utilization since we don’t currently take advantage of Spark’s ability to dynamically allocate resources. The ordering of jobs can be defined to ensure longer running jobs and higher priority jobs get resources first. The JobManager also handles reporting job success metrics and maintain registered reporters for reporting various metrics.

ISource & ISink

The ISource contain all the necessary information to read in the source data for the appropriate requested work units and ISink contain all the necessary information on how to write to the sink. For example, a Cassandra sink would contain information about the cluster, table, partitioning keys, and clustering keys for where the data should reside. A Kafka source would contain information about the topic name, maximum messages to read, cluster information, offset initialization strategy etc.

More Repositories

1

react-vis

Data Visualization Components
JavaScript
8,653
star
2

baseweb

A React Component library implementing the Base design language
TypeScript
8,611
star
3

cadence

Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.
Go
7,766
star
4

RIBs

Uber's cross-platform mobile architecture framework.
Kotlin
7,661
star
5

kraken

P2P Docker registry capable of distributing TBs of data in seconds
Go
5,817
star
6

prototool

Your Swiss Army Knife for Protocol Buffers
Go
5,053
star
7

causalml

Uplift modeling and causal inference with machine learning algorithms
Python
4,720
star
8

h3

Hexagonal hierarchical geospatial indexing system
C
4,566
star
9

NullAway

A tool to help eliminate NullPointerExceptions (NPEs) in your Java code with low build-time overhead
Java
3,517
star
10

AutoDispose

Automatic binding+disposal of RxJava streams.
Java
3,357
star
11

aresdb

A GPU-powered real-time analytics storage and query engine.
Go
2,981
star
12

react-digraph

A library for creating directed graph editors
JavaScript
2,581
star
13

piranha

A tool for refactoring code related to feature flag APIs
Java
2,219
star
14

orbit

A Python package for Bayesian forecasting with object-oriented design and probabilistic models under the hood.
Python
1,797
star
15

ios-snapshot-test-case

Snapshot view unit tests for iOS
Objective-C
1,763
star
16

petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Python
1,742
star
17

needle

Compile-time safe Swift dependency injection framework
Swift
1,740
star
18

manifold

A model-agnostic visual debugging tool for machine learning
JavaScript
1,637
star
19

okbuck

OkBuck is a gradle plugin that lets developers utilize the Buck build system on a gradle project.
Java
1,536
star
20

UberSignature

Provides an iOS view controller allowing a user to draw their signature with their finger in a realistic style.
Objective-C
1,283
star
21

nanoscope

An extremely accurate Android method tracing tool.
HTML
1,239
star
22

tchannel

network multiplexing and framing protocol for RPC
Thrift
1,150
star
23

queryparser

Parsing and analysis of Vertica, Hive, and Presto SQL.
Haskell
1,067
star
24

fiber

Distributed Computing for AI Made Simple
Python
1,037
star
25

neuropod

A uniform interface to run deep learning models from multiple frameworks
C++
928
star
26

uReplicator

Improvement of Apache Kafka Mirrormaker
Java
894
star
27

pam-ussh

uber's ssh certificate pam module
Go
832
star
28

ringpop-go

Scalable, fault-tolerant application-layer sharding for Go applications
Go
813
star
29

h3-js

h3-js provides a JavaScript version of H3, a hexagon-based geospatial indexing system.
JavaScript
796
star
30

mockolo

Efficient Mock Generator for Swift
Swift
770
star
31

xviz

A protocol for real-time transfer and visualization of autonomy data
JavaScript
760
star
32

h3-py

Python bindings for H3, a hierarchical hexagonal geospatial indexing system
Python
751
star
33

streetscape.gl

Visualization framework for autonomy and robotics data encoded in XVIZ
JavaScript
702
star
34

react-view

React View is an interactive playground, documentation and code generator for your components.
TypeScript
686
star
35

nebula.gl

A suite of 3D-enabled data editing overlays, suitable for deck.gl
TypeScript
659
star
36

RxDogTag

Automatic tagging of RxJava 2+ originating subscribe points for onError() investigation.
Java
645
star
37

peloton

Unified Resource Scheduler to co-schedule mixed types of workloads such as batch, stateless and stateful jobs in a single cluster for better resource utilization.
Go
636
star
38

motif

A simple DI API for Android / Java
Kotlin
531
star
39

signals-ios

Typeful eventing
Objective-C
526
star
40

tchannel-go

Go implementation of a multiplexing and framing protocol for RPC calls
Go
479
star
41

grafana-dash-gen

grafana dash dash dash gen
JavaScript
473
star
42

zanzibar

A build system & configuration system to generate versioned API gateways.
Go
451
star
43

clay

Clay is a framework for building RESTful backend services using best practices. It’s a wrapper around Flask.
Python
441
star
44

astro

Astro is a tool for managing multiple Terraform executions as a single command
Go
429
star
45

NEAL

🔎🐞 A language-agnostic linting platform
OCaml
423
star
46

react-vis-force

d3-force graphs as React Components.
JavaScript
401
star
47

arachne

An always-on framework that performs end-to-end functional network testing for reachability, latency, and packet loss
Go
387
star
48

cadence-web

Web UI for visualizing workflows on Cadence
JavaScript
378
star
49

Python-Sample-Application

Python
375
star
50

rides-ios-sdk

Uber Rides iOS SDK (beta)
Swift
366
star
51

stylist

A stylist creates cool styles. Stylist is a Gradle plugin that codegens a base set of Android XML themes.
Kotlin
355
star
52

storagetapper

StorageTapper is a scalable realtime MySQL change data streaming, logical backup and logical replication service
Go
333
star
53

swift-concurrency

Concurrency utilities for Swift
Swift
323
star
54

RemoteShuffleService

Remote shuffle service for Apache Spark to store shuffle data on remote servers.
Java
316
star
55

cyborg

Display Android Vectordrawables on iOS.
Swift
300
star
56

rides-android-sdk

Uber Rides Android SDK (beta)
Java
287
star
57

h3-go

Go bindings for H3, a hierarchical hexagonal geospatial indexing system
Go
279
star
58

h3-java

Java bindings for H3, a hierarchical hexagonal geospatial indexing system
Java
258
star
59

h3-py-notebooks

Jupyter notebooks for h3-py, a hierarchical hexagonal geospatial indexing system
Jupyter Notebook
244
star
60

hermetic_cc_toolchain

Bazel C/C++ toolchain for cross-compiling C/C++ programs
Starlark
230
star
61

geojson2h3

Conversion utilities between H3 indexes and GeoJSON
JavaScript
214
star
62

artist

An artist creates views. Artist is a Gradle plugin that codegens a base set of Android Views.
Kotlin
210
star
63

tchannel-node

JavaScript
205
star
64

RxCentralBle

A reactive, interface-driven central role Bluetooth LE library for Android
Java
199
star
65

uberalls

Track code coverage metrics with Jenkins and Phabricator
Go
187
star
66

SwiftCodeSan

SwiftCodeSan is a tool that "sanitizes" code written in Swift.
Swift
172
star
67

rides-python-sdk

Uber Rides Python SDK (beta)
Python
170
star
68

doubles

Test doubles for Python.
Python
166
star
69

logtron

A logging MACHINE
JavaScript
158
star
70

cadence-java-client

Java framework for Cadence Workflow Service
Java
139
star
71

athenadriver

A fully-featured AWS Athena database driver (+ athenareader https://github.com/uber/athenadriver/tree/master/athenareader)
Go
138
star
72

cassette

Store and replay HTTP requests made in your Python app
Python
138
star
73

UBTokenBar

Flexible and extensible UICollectionView based TokenBar written in Swift
Swift
136
star
74

tchannel-java

A Java implementation of the TChannel protocol.
Java
133
star
75

bayesmark

Benchmark framework to easily compare Bayesian optimization methods on real machine learning tasks
Python
128
star
76

android-template

This template provides a starting point for open source Android projects at Uber.
Java
127
star
77

crumb

An annotation processor for breadcrumbing metadata across compilation boundaries.
Kotlin
122
star
78

py-find-injection

Look for SQL injection attacks in python source code
Python
119
star
79

rides-java-sdk

Uber Rides Java SDK (beta)
Java
102
star
80

startup-reason-reporter

Reports the reason why an iOS App started.
Objective-C
96
star
81

uber-poet

A mock swift project generator & build runner to help benchmark various module dependency graphs.
Python
94
star
82

cadence-java-samples

Java
93
star
83

charlatan

A Python library to efficiently manage and install database fixtures
Python
89
star
84

swift-abstract-class

Compile-time abstract class validation for Swift
Swift
83
star
85

simple-store

Simple yet performant asynchronous file storage for Android
Java
81
star
86

tchannel-python

Python implementation of the TChannel protocol.
Python
77
star
87

eight-track

Record and playback HTTP requests
JavaScript
70
star
88

client-platform-engineering

A collection of cookbooks, scripts and binaries used to manage our macOS, Ubuntu and Windows endpoints
Ruby
70
star
89

multidimensional_urlencode

Python library to urlencode a multidimensional dict
Python
67
star
90

lint-checks

A set of opinionated and useful lint checks
Kotlin
67
star
91

uncaught-exception

Handle uncaught exceptions.
JavaScript
66
star
92

swift-common

Common code used by various Uber open source projects
Swift
65
star
93

uberscriptquery

UberScriptQuery, a SQL-like DSL to make writing Spark jobs super easy
Java
58
star
94

sentry-logger

A Sentry transport for Winston
JavaScript
55
star
95

graph.gl

WebGL2-Powered Visualization Components for Graph Visualization
JavaScript
50
star
96

nanoscope-art

C++
48
star
97

assume-role-cli

CLI for AssumeRole is a tool for running programs with temporary credentials from AWS's AssumeRole API.
Go
47
star
98

airlock

A prober to probe HTTP based backends for health
JavaScript
47
star
99

mutornadomon

Easy-to-install monitor endpoint for Tornado applications
Python
46
star
100

kafka-logger

A kafka logger for winston
JavaScript
45
star