• Stars
    star
    262
  • Rank 156,136 (Top 4 %)
  • Language
    Python
  • License
    Apache License 2.0
  • Created over 6 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Builds Airflow DAGs from configuration files. Powers all DAGs on the Etsy Data Platform

Build StatusCoverage Status

boundary-layer

boundary-layer is a tool for building Airflow DAGs from human-friendly, structured, maintainable yaml configuration. It includes first-class support for various usability enhancements that are not built into Airflow itself:

  • Managed resources created and destroyed by Airflow within a DAG: for example, ephemeral DAG-scoped hadoop clusters on Dataproc
  • Type checking and automatic preprocessing on all arguments to all operators, based on flexible schemas
  • Automatic imports of required classes
  • Distinct before and after operator groups, to make it easier to manage actions taken at the beginning or end of workflows
  • DAG pruning, for extracting or eliminating sections of the graph while maintaining dependency relationships

boundary-layer also performs various checks to find errors that would only be made visible upon deployment to an Airflow instance, such as cycles in the DAG, duplicate task names, etc.

boundary-layer is used heavily on the Etsy Data Platform. Every DAG on our platform is defined by a boundary-layer configuration instead of in raw python, which greatly reduces the barrier to entry for our data scientists and engineers to develop DAGs, while ensuring that best practices are always observed in the generated python code. boundary-layer is the core of our fully self-service deployment process, in which DAGs are tested by our CI tools and errors are surfaced prior to allowing DAGs to be merged and deployed to our Airflow instances.

In addition, our migration from Oozie to Airflow relied heavily on boundary-layer's included conversion tool.

boundary-layer is pluggable, supporting custom configuration and extensions via plugins that are installed using pip. The core package does not contain any etsy-specific customizations; instead, those are all defined in an internally-distributed etsy plugin package.

For more information, see our article on Etsy's Code as Craft blog.

Supported operators and Airflow versions

boundary-layer requires that each operator have a configuration file to define its schema, the python class it corresponds to, etc. These configuration files are stored in the boundary-layer-default-plugin. We currently include configurations for a number of common Airflow operators (sufficient to support our needs at Etsy, plus a few more), but we know that we are missing quite a few operators that may be needed to satisfy common Airflow use cases. We are committed to continuing to add support for more operators, and we also commit to supporting a quick turn-around time for any contributed pull requests that only add support for additional operators. So please, submit a pull request if something is missing, or at least drop an issue to let us know.

Furthermore, due to some differences in the operators and sensors between Airflow release versions, there may be incompatibilities between boundary-layer and some Airflow versions. All of our operators are known to work with Airflow release versions 1.9 and 1.10 (although our schemas validate against the operator arguments for 1.10, which is a superset of those for 1.9 --- there could be some parameters that we allow but that 1.9 will not properly use).

Installation

boundary-layer is distributed via PyPI and can be installed using pip.

pip install boundary-layer --upgrade

We recommend installing into a virtual environment, but that's up to you.

You should now be able to run boundary-layer and view its help message:

$ boundary-layer --help

If the installation was successful, you should see output like:

usage: boundary-layer [-h] {build-dag,prune-dag,parse-oozie} ...

positional arguments:
  {build-dag,prune-dag,parse-oozie}

optional arguments:
  -h, --help            show this help message and exit

Publishing updates to PyPI (admins only)

boundary-layer is distributed via PyPI. We rely on an automated Github Actions build to publish updates. The build runs every time a tag is pushed to the repository. We have a script that automates the creation of these tags, making sure that they are versioned correctly and created for the intended commits.

The recommended process for publishing a relatively minor boundary layer update is to simply run

./release.py

which will bump the patch version.

For bigger changes, you can bump the minor (or major) versions, or you can force a specific version string, via one of the following commands:

./release.py --bump minor
./release.py --bump major
./release.py --force-version a.b.c

There are a few other options supported by the release.py command, as described by the usage string:

â•°$ ./release.py --help
usage: release.py [-h]
                  [--bump {major,minor,patch} | --force-version FORCE_VERSION]
                  [--git-remote-name GIT_REMOTE_NAME]
                  [--remote-branch-name REMOTE_BRANCH_NAME]

optional arguments:
  -h, --help            show this help message and exit
  --bump {major,minor,patch}
                        Select the portion of the version string to bump.
                        default: `patch`
  --force-version FORCE_VERSION
                        Force the new version to this value. Must be a valid
                        semver.
  --git-remote-name GIT_REMOTE_NAME
                        Name of the git remote from which to release. default:
                        `origin`
  --remote-branch-name REMOTE_BRANCH_NAME
                        Name of the remote branch to use as the basis for the
                        release. default: `master`

boundary-layer YAML configs

The primary feature of boundary-layer is its ability to build python DAGs from simple, structured YAML files.

Below is a simple boundary-layer yaml config, used for running a Hadoop job on Google Cloud Dataproc:

name: my_dag
dag_args:
  schedule_interval: '@daily'
resources:
- name: dataproc-cluster
  type: dataproc_cluster
  properties:
    cluster_name: my-cluster-{{ execution_date.strftime('%s') }}
    num_workers: 10
    region: us-central1
default_task_args:
  owner: etsy-data-platform
  project_id: my-project-id
  retries: 2
  start_date: '2018-10-31'
  dataproc_hadoop_jars:
  - gs://my-bucket/my/path/to/my.jar
before:
- name: data-sensor
  type: gcs_object_sensor
  properties:
    bucket: my-bucket
    object: my/object
operators:
- name: my-job
  type: dataproc_hadoop
  requires_resources:
  - dataproc-cluster
  properties:
    main_class: com.etsy.my.job.ClassName
    dataproc_hadoop_properties:
      mapreduce.map.output.compress: 'true'
    arguments: [ '--date', '{{ ds }}' ]

A few interesting features:

  • The resources section of the configuration defines a transient DataProc cluster resource that is required by the hadoop job. boundary-layer will automatically insert the operators to create and delete this cluster, as well as the dependencies between the jobs and the cluster, when the DAG is created.
  • The before section of the configuration defines sensors that will be inserted by boundary-layer as prerequisites for all downstream operations in the DAG, including the creation of the transient DataProc cluster.

To convert the above YAML config into a python DAG, save it to a file (for convenience, this DAG is already stored in the examples directory) and run

$ boundary-layer build-dag readme_example.yaml > readme_example.py

and, if all goes well, this will write a valid Airflow DAG into the file readme_example.py. You should open this file up and look at its contents, to get a feel for what boundary-layer is doing. In particular, after some comments at the top of the file, you should see something like this:

import os
from airflow import DAG

import datetime

from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStorageObjectSensor
from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator, DataProcHadoopOperator, DataprocClusterCreateOperator

DEFAULT_TASK_ARGS = {
        'owner': 'etsy-data-platform',
        'retries': 2,
        'project_id': 'my-project-id',
        'start_date': '2018-10-31',
        'dataproc_hadoop_jars': ['gs://my-bucket/my/path/to/my.jar'],
    }

dag = DAG(
        schedule_interval = '@daily',
        catchup = True,
        max_active_runs = 1,
        dag_id = 'my_dag',
        default_args = DEFAULT_TASK_ARGS,
    )

data_sensor = GoogleCloudStorageObjectSensor(
        dag = (dag),
        task_id = 'data_sensor',
        object = 'my/object',
        bucket = 'my-bucket',
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
    )


dataproc_cluster_create = DataprocClusterCreateOperator(
        dag = (dag),
        task_id = 'dataproc_cluster_create',
        num_workers = 10,
        region = 'us-central1',
        cluster_name = "my-cluster-{{ execution_date.strftime('%s') }}",
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
    )

dataproc_cluster_create.set_upstream(data_sensor)

my_job = DataProcHadoopOperator(
        dag = (dag),
        task_id = 'my_job',
        dataproc_hadoop_properties = { 'mapreduce.map.output.compress': 'true' },
        region = 'us-central1',
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
        cluster_name = "my-cluster-{{ execution_date.strftime('%s') }}",
        arguments = ['--date','{{ ds }}'],
        main_class = 'com.etsy.my.job.ClassName',
    )

my_job.set_upstream(dataproc_cluster_create)

dataproc_cluster_destroy_sentinel = DummyOperator(
        dag = (dag),
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
        task_id = 'dataproc_cluster_destroy_sentinel',
    )

dataproc_cluster_destroy_sentinel.set_upstream(my_job)

dataproc_cluster_destroy = DataprocClusterDeleteOperator(
        dag = (dag),
        task_id = 'dataproc_cluster_destroy',
        trigger_rule = 'all_done',
        region = 'us-central1',
        cluster_name = "my-cluster-{{ execution_date.strftime('%s') }}",
        priority_weight = 50,
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
    )

dataproc_cluster_destroy.set_upstream(my_job)

This python DAG is now ready for ingestion directly into a running Airflow instance, following whatever procedure is appropriate for your Airflow deployments.

A few things to note:

  • boundary-layer converted the start_date parameter from a string to a python datetime object. This is an example of the boundary-layer argument-preprocessor feature, which allows config parameters to be specified as user-friendly strings and converted to the necessary python data structures automatically.
  • boundary-layer added a sentinel node in parallel with the cluster-destroy node, which serves as an indicator to Airflow itself regarding the ultimate outcome of the Dag Run. Airflow determines the Dag Run status from the leaf nodes of the DAG, and normally the cluster-destroy node will always execute (irrespective of upstream failures) and will likely succeed. This would cause DAGs with failures in critical nodes to be marked as successes, if not for the sentinel node. The sentinel node will only trigger if all of its upstream dependencies succeed --- otherwise it will be marked as upstream-failed, which induces a failure state for the Dag Run.

Oozie Migration tools

In addition to allowing us to define Airflow workflows using YAML configurations, boundary-layer also provides a module for converting Oozie XML configuration files into boundary-layer YAML configurations, which can then be used to create Airflow DAGs.

Admittedly, boundary-layer's Oozie support is currently limited: it is only capable of building DAGs that submit their Hadoop jobs to Dataproc (it does not support stand-alone Hadoop clusters, for example), and it does not support Oozie coordinators. We are open to working on improved Oozie support if there is community demand for it, and of course, we are open to community contributions toward this goal.

The following command will translate an example Oozie workflow to a boundary-layer DAG that will execute on a 64-node Dataproc cluster in GCP's us-east1 region, for the GCP project my-project-id:

boundary-layer parse-oozie example \
  --local-workflow-base-path test/data/oozie-workflows/ \
  --cluster-project-id my-project-id \
  --cluster-region us-east1 \
  --cluster-num-workers 64

More Repositories

1

AndroidStaggeredGrid

An Android staggered grid view which supports multiple columns with rows of varying sizes.
Java
4,756
star
2

skyline

It'll detect your anomalies! Part of the Kale stack.
Python
2,135
star
3

logster

Parse log files, generate metrics for Graphite and Ganglia
Python
1,968
star
4

deployinator

Deployinate!
Ruby
1,878
star
5

morgue

post mortem tracker
PHP
1,017
star
6

411

An Alert Management Web Application
PHP
971
star
7

feature

Etsy's Feature flagging API used for operational rampups and A/B testing.
PHP
869
star
8

MIDAS

Mac Intrusion Detection Analysis System
833
star
9

opsweekly

On call alert classification and reporting
JavaScript
761
star
10

oculus

The metric correlation component of Etsy's Kale system
Java
707
star
11

mctop

a top like tool for inspecting memcache key values in realtime
Ruby
507
star
12

supergrep

realtime log streamer
JavaScript
411
star
13

Conjecture

Scalable Machine Learning in Scalding
Java
361
star
14

statsd-jvm-profiler

Simple JVM Profiler Using StatsD and Other Metrics Backends
Java
330
star
15

nagios-herald

Add context to Nagios alerts
Ruby
322
star
16

dashboard

JavaScript
308
star
17

Testing101

Etsy's educational materials on testing and design
PHP
262
star
18

DebriefingFacilitationGuide

Leading Groups at Etsy to Learn From Accidents
247
star
19

phpunit-extensions

Etsy PHPUnit Extensions
PHP
228
star
20

nagios_tools

Tools for use with Nagios
Python
173
star
21

open-api

We are working on a new version of Etsy’s Open API and want feedback from developers like you.
166
star
22

TryLib

TryLib is a simple php library that helps you generate a diff of your working copy and send it to Jenkins to run the test suite(s) on the latest code patched with your changes.
PHP
155
star
23

BugHunt-iOS

Objective-C
148
star
24

mod_realdoc

Apache module to support atomic deploys - http://codeascraft.com/2013/07/01/atomic-deploys-at-etsy/
C
128
star
25

ab

Etsy's little framework for A/B testing, feature ramp up, and more.
128
star
26

wpt-script

Scripts to generate WebPagetest tests and download results
PHP
121
star
27

applepay-php

A PHP extension that verifies and decrypts Apple Pay payment tokens
C
118
star
28

foodcritic-rules

Etsy's foodcritic rules
Ruby
115
star
29

kevin-middleware

This is an Express middleware that makes developing javascript in a monorepo easier.
JavaScript
110
star
30

mixer

a tool to initiate meetings by randomly pairing individuals
Go
100
star
31

cloud-jewels

Estimate energy consumption using GCP Billing Data
TSQL
96
star
32

jenkins-master-project

Jenkins Plugin: Master Project. Jenkins project type that allows for selection of sub-jobs to execute, watch, and report worst status of all sub-projects.
Java
83
star
33

Sahale

A Cascading Workflow Visualizer
JavaScript
83
star
34

PushBot

An IRC Bot for organizing code pushes
Java
79
star
35

cdncontrol

CLI tool for working with multiple CDNs
Ruby
79
star
36

rules_grafana

Bazel rules for building Grafana dashboards
Starlark
70
star
37

chef-whitelist

Simple library to enable host based rollouts of changes
Ruby
68
star
38

rfid-checkout

Low Frequency RFID check out/in client for Raspberry Pi
Python
64
star
39

Etsy-Engineering-Career-Ladder

Etsy's Engineering Career Ladder
HTML
61
star
40

Evokit

Rust
60
star
41

ELK-utils

Utilities for working with the ELK (Elasticsearch, Logstash, Kibana) stack
Ruby
59
star
42

incpath

PHP extension to support atomic deploys
C
52
star
43

arbiter

A utility for generating Oozie workflows from a YAML definition
Java
48
star
44

VIPERBuilder

Scaffolding for building apps in a clean way with VIPER architecture
Swift
41
star
45

chef-handlers

Chef handlers we use at Etsy
Ruby
40
star
46

sbt-checkstyle-plugin

SBT Plugin for Running Checkstyle on Java Sources
Scala
32
star
47

es-restlog

Plugin for logging Elasticsearch REST requests
Java
29
star
48

yubigpgkeyer

Script to make RSA authentication key generation on Yubikeys differently painful
Python
28
star
49

Apotheosis

Python
28
star
50

jenkins-deployinator

Jenkins Plugin: Deployinator. Links key deployinator information to Jenkins builds via the CLI.
Java
25
star
51

sbt-compile-quick-plugin

SBT Plugin for Compiling a Single File
Scala
25
star
52

geonames

Scripts for using Geonames
PHP
24
star
53

jading

cascading.jruby build and execution tool
16
star
54

etsy.github.com

Etsy! on Github!
HTML
16
star
55

divertsy-client

The Android client for running DIVERTsy, a waste stream recording tool to help track diversion rates.
Java
13
star
56

cdncontrol_ui

A web UI for Etsy's cdncontrol tool
CSS
13
star
57

terraform-demux

A user-friendly launcher (à la bazelisk) for Terraform.
Go
12
star
58

logstash-plugins

Ruby
11
star
59

jenkins-triggering-user

Jenkins Plugin: Triggering User. Populates a $TRIGGERING_USER environment variable from the build cause and other sources, a best guess.
10
star
60

EtsyCompositionalLayoutBridge

iOS framework that allows for simultaneously leveraging flow layout and compositional layout in collection views
Swift
3
star
61

consulkit

Ruby API for interacting with HashiCorp's Consul.
Ruby
1
star
62

soft-circuits-workshop

Etsy Soft Circuits Workshop
Arduino
1
star