• This repository has been archived on 31/Aug/2022
  • Stars
    star
    146
  • Rank 252,769 (Top 5 %)
  • Language
    Java
  • License
    Apache License 2.0
  • Created about 9 years ago
  • Updated over 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A lightweight workflow definition library

flo    CircleCI Codecov Maven Central License

Please note that we, at Spotify, have ceased further development of flo, so no new features will come; on the other hand, we will fix critical issues.

flo is a lightweight workflow definition library

  • It's not a workflow management system
  • It's not a workflow scheduler

Some key features

  • Programmatic Java and Scala API for expressing workflow construction (task DAG expansion)
  • Use of arbitrary program logic for DAG expansion
  • Recursive definitions
  • Lazy DAG expansion
  • DAG serialization (for 3rd party persistence)
  • Extensible DAG evaluation

Dependency

<dependency>
  <groupId>com.spotify</groupId>
  <artifactId>flo-workflow</artifactId>
  <version>${flo.version}</version>
</dependency>
"com.spotify" %% "flo-scala" % floVersion

JavaDocs here: http://spotify.github.io/flo/maven/latest/apidocs/

Table of contents

Quick Example: Fibonacci

Fibonacci serves as a good example even though it's not at all the kind of thing that flo is meant to be used for. Nevertheless, it demonstrates how a task DAG can be recursively defined with arbitrary logic governing which inputs are chosen.

class Fib {

  static Task<Long> fib(long n) {
    TaskBuilder<Long> builder = Task.named("fib", n).ofType(Long.class);
    if (n < 2) {
      return builder
          .process(() -> n);
    } else {
      return builder
          .input(() -> fib(n - 1))
          .input(() -> fib(n - 2))
          .process((a, b) -> a + b);
    }
  }

  public static void main(String[] args) {
    Task<Long> fib92 = fib(92);
    EvalContext evalContext = MemoizingContext.composeWith(EvalContext.sync());
    EvalContext.Value<Long> value = evalContext.evaluate(fib92);

    value.consume(f92 -> System.out.println("fib(92) = " + f92));
  }
}

Scala equivalent

import java.util.function.Consumer

import com.spotify.flo._
import com.spotify.flo.context.MemoizingContext

object Fib extends App {

  def fib(n: Long): Task[Long] = defTask[Long](n) dsl (
    if (n < 2) {
      $ process n
    } else {
      $ input fib(n - 1) input fib(n - 2) process (_ + _)
    }
  )

  val fib92 = fib(92)
  val evalContext = MemoizingContext.composeWith(EvalContext.sync)
  val value = evalContext.evaluate(fib92)

  value.consume(new Consumer[Long] {
    //noinspection ScalaStyle
    override def accept(t: Long): Unit = Console.println(s"fib(92) = ${t}")
  })
}

For more details on a high-level runner implementation, see flo-runner.

Task<T>

Task<T> is one of the more central types in flo. It represents some task which will evaluate a value of type T. It has a parameterized name, zero or more input tasks and a processing function which will be executed when inputs are evaluated. Tasks come with a few key properties governing how they are defined, behave and are interacted with. We'll cover these in the following sections.

Tasks are defined by regular methods

Your workflow tasks are not defined as classes that extend Task<T>, rather they are defined by using the TaskBuilder API as we've already seen in the fibonacci example. This is in many ways very similar to a very clean class with no mutable state, only final members and two overridden methods for inputs and evaluation function. But with a very important difference, we're handling the input tasks in a type-safe manner. Each input task you add will further construct the type for your evaluation function. This is how we can get a clean lambda such as (a, b) -> a + b as the evaluation function for our fibonacci example.

Here's a simple example of a flo task depending on two other tasks:

Task<Integer> myTask(String arg) {
  return Task.named("MyTask", arg).ofType(Integer.class)
      .input(() -> otherTask(arg))
      .input(() -> yetATask(arg))
      .process((otherResult, yetAResult) -> /* ... */);
}

This is how the same thing would typically look like in other libraries:

class MyTask extends Task<Integer> {

  private final String arg;

  MyTask(String arg) {
    super("MyTask", arg);
    this.arg = arg;
  }

  @Override
  public List<? extends Task<?>> inputs() {
    return Arrays.asList(new OtherTask(arg), new YetATask(arg));
  }

  @Override
  public Integer process(List<Object> inputs) {
    // lose all type safety and guess your inputs
    // ...
  }
}

Task embedding

There's of course nothing stopping you from having the task defined in a regular class. It might even be useful if your evaluation function is part of an existing class. flo does not force anything on to your types, it just needs to know what to run.

class SomeExistingClass {

  private final String arg;

  SomeExistingClass(String arg) {
    this.arg = arg;
  }

  Task<Integer> task() {
    return Task.named("EmbeddedTask", arg).ofType(Integer.class)
        .input(() -> otherTask(arg))
        .input(() -> yetATask(arg))
        .process(this::process);
  }

  int process(String otherResult, int yetAResult) {
    // ...
  }
}

Tasks are lazy

Creating instances of Task<T> is cheap. No matter how complex and deep the task DAG might be, creating the top level Task<T> will not cause the whole DAG to be created. This is because all inputs are declared using a Supplier<T>, utilizing their properties for deferred evaluation:

someLibrary.maybeNeedsValue(() -> expensiveCalculation());

This pattern is on its way to become an idiom for achieving laziness in Java 8. A good example is the additions to the Java 8 Logger class which lets the logger decide if the log line for a certain log level should be computed or not.

So we can easily create an endlessly recursive task (useless, but illustrative) and still be able to construct instances of it without having to worry about how complex or resource consuming the construction might be.

Task<String> endless() {
  return Task.named("Endless").ofType(String.class)
      .input(() -> endless())
      .process((impossible) -> impossible);
}

This means that we can always refer to tasks directly by using their definition:

TaskId endlessTaskId = endless().id();

Task DAGs as data structures

A Task<T> can be transformed into a data structure where a materialized view of the task DAG is needed. In this example we have two simple tasks where one is used as the input to the other.

Task<String> first(String arg) {
  return Task.named("First", arg).ofType(String.class)
      .process(() -> "hello " + arg);
}

Task<String> second(String arg) {
  return Task.named("Second", arg).ofType(String.class)
      .input(() -> first(arg))
      .process((firstResult) -> "well, " + firstResult);
}

void printTaskInfo() {
  Task<String> task = second("flo");
  TaskInfo taskInfo = TaskInfo.ofTask(task);
  System.out.println("taskInfo = " + taskInfo);
}

taskInfo in this example will be:

taskInfo = TaskInfo {
  id=Second(flo)#375f5234,
  isReference=false,
  inputs=[
    TaskInfo {
      id=First(flo)#65f4e738,
      isReference=false,
      inputs=[]
    }
  ]
}

The id and inputs fields should be pretty self explanatory. isReference is a boolean which signals if some task has already been materialized earlier in the tree, given a depth first, post-order traversal.

Recall that the DAG expansion can choose inputs arbitrarily based on the arguments. In workflow libraries where expansion is coupled with evaluation, it's hard to know what will be evaluated beforehand. Evaluation planning and result caching/memoizing becomes integral parts of such libraries. flo aims to expose useful information together with flexible evaluation apis to make it a library for easily building workflow management systems, rather than trying to be the can-do-it-all workflow management system itself. More about how this is achieved in the EvalContext sections.

EvalContext

EvalContext defines an interface to a context in which Task<T> instances are evaluated. The context is responsible for expanding the task DAG and invoking the task process-functions. It gives library authors a powerful abstraction to use when implementing the specific details of evaluating a task DAG. All details around setting up wiring of dependencies between tasks, interaction with user code for DAG expansion, invoking task functions with upstream arguments, and other mundane plumbing is dealt with by flo.

These are just a few aspects of evaluation that can be implemented in a EvalContext:

  • Evaluation concurrency and thread pool management
  • Persisted memoization of previous task evaluations
  • Distributed coordination of evaluating shared DAGs
  • Short-circuiting DAG expansion of previously evaluated tasks

Since multi worker, asynchronous evaluation is a very common pre-requisite for many evaluation implementations, flo comes with a base implementation of an AsyncContext that can be extended with further behaviour.

See also SyncContext, InstrumentedContext and MemoizingContext.

More Repositories

1

luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Python
17,796
star
2

annoy

Approximate Nearest Neighbors in C++/Python optimized for memory usage and loading/saving to disk
C++
13,197
star
3

pedalboard

🎛 🔊 A Python library for audio.
C++
5,147
star
4

docker-gc

INACTIVE: Docker garbage collection of containers and images
Shell
5,068
star
5

chartify

Python library that makes it easy for data scientists to create charts.
Python
3,510
star
6

basic-pitch

A lightweight yet powerful audio-to-MIDI converter with pitch bend detection
Python
3,184
star
7

dockerfile-maven

MATURE: A set of Maven tools for dealing with Dockerfiles
Java
2,756
star
8

docker-maven-plugin

INACTIVE: A maven plugin for Docker
Java
2,652
star
9

scio

A Scala API for Apache Beam and Google Cloud Dataflow.
Scala
2,485
star
10

helios

Docker container orchestration platform
Java
2,097
star
11

web-api-examples

Basic examples to authenticate and fetch data using the Spotify Web API
HTML
1,889
star
12

HubFramework

DEPRECATED – Spotify’s component-driven UI framework for iOS
Objective-C
1,861
star
13

apollo

Java libraries for writing composable microservices
Java
1,648
star
14

dh-virtualenv

Python virtualenvs in Debian packages
Python
1,614
star
15

docker-client

INACTIVE: A simple docker client for the JVM
Java
1,431
star
16

docker-kafka

Kafka (and Zookeeper) in Docker
Shell
1,399
star
17

SPTPersistentCache

Everyone tries to implement a cache at some point in their iOS app’s lifecycle, and this is ours.
Objective-C
1,243
star
18

voyager

🛰️ An approximate nearest-neighbor search library for Python and Java with a focus on ease of use, simplicity, and deployability.
C++
1,242
star
19

mobius

A functional reactive framework for managing state evolution and side-effects.
Java
1,223
star
20

sparkey

Simple constant key/value storage library, for read-heavy systems with infrequent large bulk inserts.
C
1,178
star
21

ruler

Gradle plugin which helps you analyze the size of your Android apps.
Kotlin
1,130
star
22

XCMetrics

XCMetrics is the easiest way to collect Xcode build metrics and improve developer productivity.
Swift
1,102
star
23

web-api

This issue tracker is no longer used. Join us in the Spotify for Developers forum for support with the Spotify Web API ➡️ https://community.spotify.com/t5/Spotify-for-Developers/bd-p/Spotify_Developer
RAML
981
star
24

echoprint-codegen

Codegen for Echoprint
C++
948
star
25

snakebite

A pure python HDFS client
Python
856
star
26

heroic

The Heroic Time Series Database
Java
843
star
27

klio

Smarter data pipelines for audio.
Python
836
star
28

XCRemoteCache

Swift
830
star
29

ios-sdk

Spotify SDK for iOS
Objective-C
643
star
30

SPTDataLoader

The HTTP library used by the Spotify iOS client
Objective-C
630
star
31

apps-tutorial

A Spotify App that contains working examples of the use of Spotify Apps API
627
star
32

JniHelpers

Tools for writing great JNI code
C++
593
star
33

postgresql-metrics

Tool that extracts and provides metrics on your PostgreSQL database
Python
590
star
34

Mobius.swift

A functional reactive framework for managing state evolution and side-effects [Swift implementation]
Swift
557
star
35

reactochart

📈 React chart component library 📉
JavaScript
552
star
36

dockerfile-mode

An emacs mode for handling Dockerfiles
Emacs Lisp
535
star
37

threaddump-analyzer

A JVM threaddump analyzer
JavaScript
488
star
38

featran

A Scala feature transformation library for data science and machine learning
Scala
467
star
39

android-sdk

Spotify SDK for Android
HTML
457
star
40

echoprint-server

Server for the Echoprint audio fingerprint system
Java
395
star
41

completable-futures

Utilities for working with futures in Java 8
Java
393
star
42

web-scripts

DEPRECATED: A collection of base configs and CLI wrappers used to speed up development @ Spotify.
TypeScript
383
star
43

spotify-web-api-ts-sdk

A Typescript SDK for the Spotify Web API with types for returned data.
TypeScript
356
star
44

SpotifyLogin

Swift framework for authenticating with the Spotify API
Swift
347
star
45

ratatool

A tool for data sampling, data generation, and data diffing
Scala
338
star
46

fmt-maven-plugin

Opinionated Maven Plugin that formats your Java code.
Java
324
star
47

coordinator

A visual interface for turning an SVG into XY coördinates.
HTML
288
star
48

big-data-rosetta-code

Code snippets for solving common big data problems in various platforms. Inspired by Rosetta Code
Scala
287
star
49

trickle

A small library for composing asynchronous code
Java
285
star
50

pythonflow

🐍 Dataflow programming for python.
Python
285
star
51

styx

"The path to execution", Styx is a service that schedules batch data processing jobs in Docker containers on Kubernetes.
Java
266
star
52

cstar

Apache Cassandra cluster orchestration tool for the command line
Python
254
star
53

confidence

Python
254
star
54

netty-zmtp

A Netty implementation of ZMTP, the ZeroMQ Message Transport Protocol.
Java
243
star
55

ios-style

Guidelines for iOS development in use at Spotify
243
star
56

cassandra-reaper

Software to run automated repairs of cassandra
235
star
57

docker-cassandra

Cassandra in Docker with fast startup
Shell
220
star
58

basic-pitch-ts

A lightweight yet powerful audio-to-MIDI converter with pitch bend detection.
TypeScript
216
star
59

terraform-gke-kubeflow-cluster

Terraform module for creating GKE clusters to run Kubeflow
HCL
213
star
60

linux

Spotify's Linux kernel for Debian-based systems
C
208
star
61

dns-java

DNS wrapper library that provides SRV lookup functionality
Java
206
star
62

git-test

test your commits
Shell
203
star
63

SPStackedNav

[DEPRECATED] Navigation controller which represents its content in stacks of panes, rather than one at a time
Objective-C
195
star
64

spotify-json

Fast and nice to use C++ JSON library.
C++
194
star
65

quickstart

A CommonJS module resolver, loader and compiler for node.js and browsers.
JavaScript
193
star
66

dbeam

DBeam exports SQL tables into Avro files using JDBC and Apache Beam
Java
189
star
67

flink-on-k8s-operator

Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
Go
185
star
68

bazel-tools

Tools for dealing with very large Bazel-managed repositories
Java
166
star
69

magnolify

A collection of Magnolia add-on modules
Scala
163
star
70

dataenum

Algebraic data types in Java.
Java
163
star
71

lingon

A user friendly tool for building single-page JavaScript applications
JavaScript
162
star
72

async-google-pubsub-client

[SUNSET] Async Google Pubsub Client
Java
158
star
73

gcp-audit

A tool for auditing security properties of GCP projects.
Python
157
star
74

spark-bigquery

Google BigQuery support for Spark, SQL, and DataFrames
Scala
155
star
75

should-up

Remove most of the "should" noise from your tests
JavaScript
153
star
76

folsom

An asynchronous memcache client for Java
Java
147
star
77

missinglink

Build time tool for detecting link problems in java projects
Java
146
star
78

spotify-web-playback-sdk-example

React based example app that creates a new player in Spotify Connect to play music from in the browse using Spotify Web Playback SDK.
JavaScript
144
star
79

android-auth

Spotify authentication and authorization for Android. Part of the Spotify Android SDK.
HTML
143
star
80

proto-registry

An implementation of the Protobuf Registry API
TypeScript
141
star
81

futures-extra

Java library for working with Guava futures
Java
138
star
82

zoltar

Common library for serving TensorFlow, XGBoost and scikit-learn models in production.
Java
138
star
83

annoy-java

Approximate nearest neighbors in Java
Java
138
star
84

spydra

Ephemeral Hadoop clusters using Google Compute Platform
Java
134
star
85

github-java-client

A Java client to Github API
Java
129
star
86

docker-stress

Simple docker stress test and monitoring tools
Python
125
star
87

spotify-tensorflow

Provides Spotify-specific TensorFlow helpers
Python
124
star
88

crtauth

a public key backed client/server authentication system
Python
118
star
89

sparkey-java

Java implementation of the Sparkey key value store
Java
118
star
90

redux-location-state

Utilities for reading & writing Redux store state to & from the URL
JavaScript
118
star
91

realbook

Easier audio-based machine learning with TensorFlow.
Python
112
star
92

rspec-dns

Easily test your DNS with RSpec
Ruby
107
star
93

web-playback-sdk

This issue tracker is no longer used. Join us in the Spotify for Developers forum for support with the Spotify Web Playback SDK ➡️ https://community.spotify.com/t5/Spotify-for-Developers/bd-p/Spotify_Developer
107
star
94

ffwd-ruby

An event and metrics fast-forwarding agent.
Ruby
105
star
95

gimme

Creating time bound IAM Conditions with ease and flair
Python
103
star
96

super-smash-brogp

Sends and withdraws BGP prefixes for fun.
Python
98
star
97

spotify.github.io

Showcase site for hand-picked open-source projects by Spotify
HTML
96
star
98

lighthouse-audit-service

TypeScript
95
star
99

python-graphwalker

Python re-implementation of the graphwalker testing tool
Python
93
star
100

noether

Scala Aggregators used for ML Model metrics monitoring
Scala
91
star