This version: 0.2.3
Current development version is here: 0.2.4
A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs, supporting both control flow and dataflow execution paradigms as well as de-centralized CPU & GPU scheduling.
For a quick look at what makes Entangle special, take a look at Design Goals.
New In This Release
- Bug fixes to process.py, ssh.py
- Distributed dataflow example
- Dataflow decorator re-write. Now works with ssh for distributed dataflow. Fixes prior issues with local dataflows.
- Retry usage example
- Dockerfile provided for quick and easy experimentation.
- Workflows can now return the call graph structure upon completion. See Graph Example
- Support for workflow futures (if that's your thing) See Workflow Future Example
Quick Usage
With Entangle you can run simple, hardware parallelized code with conditional logic that looks like this.
result = add(
add(
num(6),
two() if False else one()
),
subtract(
five(),
two()
)
)
print(result())
or train two AI models in parallel using tensorflow container utilizing dedicated CPU and GPU usage.
@process
@docker(image="tensorflow/tensorflow:latest-gpu", packages=['tensorflow_datasets'])
def train_modelA():
# train it
return
@process
@docker(image="tensorflow/tensorflow:latest-gpu", packages=['tensorflow_datasets'])
def train_modelB():
# train it
return
@workflow
def train_models(*args):
# I'm training a bunch of models in parallel!
return
workflow = train_models(
train_modelA(),
train_modelB()
)
result = workflow()
Docker
To quickly get started with Entangle, build and run a docker container from the included Dockerfile.
$ docker build -t entangle .
$ docker run -it entangle:latest
root@9579336b3e34:/# python -m entangle.examples.example
Or if you have the NVIDIA Docker Environment setup you can test the numba GPU vector example.
$ docker run -it --gpus all entangle
root@13428af4a37b:/# python -m entangle.examples.example3
(0.2957176749914652, 0.41134210501331836)
Wiki Articles
Outline
- Overview
- Installation
- Design Goals
- Architecture
- Declarative Infrastructure
- Execution
- Workflows
- Process Behavior
- Composition
- Containers
- Dataflows
- Schedulers
- Distributed Flows
- Examples
- Logging
- Design Tool
Overview
Entangle is a different kind of parallel compute framework for multi-CPU/GPU environments. It allows for simple workflow design using plain old python and special decorators that control the type of parallel compute and infrastructure needed.
One key feature of entangle is fine-grained control over individual functions in a workflow. You could easily describe multiple functions running across multiple compute environments all interacting as if they were simple local python functions. No central scheduler or workflow manager is needed allowing you to choose where and how functions operate with declarative infrastructure.
Another unique quality is the use of composition to build parallel workflows dynamically.
What does "Entangle" mean?
The term is derived from a quantum physics phenomena called quantum entanglement which involves the state of a pair or group of particles affecting one another at a distance instantaneously.
In this context, it is a metaphor for how tasks send data (particles) to one another in the context of a connected microflow.
IMPORTANT NOTES!
Please keep in mind that Entangle is in development and is classified as Pre-Alpha
. Some of the functionality shown here is incomplete. If you clone this repo and want to experiment be sure to update often as things break, improve, get fixed etc. quite frequently. The main
branch will always contain the most current release. All development for the next version is done on the development branch for the next released listed at the top of this document.
Installation
NOTE: At the moment entangle only works with python 3.8 due to how coroutines work there and also shared memory features.
From PyPi
$ pip install --upgrade py-entangle
$ python -m entangle.examples.example
From repo root
python3.8
$ virtualenv --python=python3.8 venv
$ source venv/bin/activate
(venv) $ python setup.py install
(venv) $ python -m entangle.examples.example
miniconda3
- Install miniconda3 with python3.8 for linux
$ conda init
$ python setup.py install
$ python -m entangle.examples.example
Installing Numba
NOTE: Numba package is disabled by default in
setup.py
. If you want this package, just uncomment it; however some OS specific steps might be required.
On some systems you might encounter the following error when trying to install numba
.
RuntimeError: Could not find a `llvm-config` binary.
Try the following remedy (for ubuntu systems)
$ sudo apt-get install -y --no-install-recommends llvm-10 llvm-10-dev
$ export LLVM_CONFIG=/usr/bin/llvm-config-10
$ pip3 install numba
Testing
$ pytest --verbose --color=yes --disable-pytest-warnings --no-summary --pyargs entangle.tests
or if you don't have GPU
$ pytest --verbose --color=yes --pyargs entangle.tests.test_entangle
or just do this
$ python setup.py test
Cleaning
Clean all build files, directories, temp files and any files created by examples and tests.
$ python setup.py clean
Miniconda
If you are planning to run or use GPU enabled code it is recommended to set up a miniconda3 virtualenv.
Design Goals
- Small & Simple
- Easy to Understand
- API-less
- Plain Old Python
- True Parallelism
- Pluggable & Flexible
- Composition Based
- Shared-Nothing
- Serverless & Threadless
- True Dataflow Support
- CPU/GPU Scheduling
- Distributed Dataflow
Architecture
Entangle is designed without a central scheduler or workflow manager. Rather, each function is decorated with special descriptors that turn them into their own workflow managers. These decorators implement logic to parallelize and gather values from its dependent arguments, which are executed as separate processes. As each function is assigned a dedicated CPU, the workflow is thus an ensemble of parallel, independent micro-flows that resolve themselves and pass their values into queues until the workflow completes.
This offers an extreme shared nothing design that maximizes CPU usage in a multi-CPU environment.
Each function (or task) is given a process and scheduled to a CPU by the operating system. Since python Processes are native bound OS processes, this inherits the benefit of the operating system scheduler which is optimized for the underlying hardware. Arguments that satisfy the function are run in parallel in the same fashion. The parent function then uses asyncio coroutines to monitor queues for the results from the processes. This keeps the CPU usage down while the dependent functions produce their results and eliminates the need for monitor threads.
As a workflow executes, it fans out over CPUs. Each process acting as it's own scheduler to spawn new processes and resolve arguments, while also monitoring queues for incoming results asynchronously. This makes the workflow a truly emergent, dynamic computing construct vs a monolithic service managing all the pieces. Of course, this is not to say one approach is better, just that entangle takes a different approach based on its preferred tradeoffs.
Tradeoffs
Every design approach is a balance of tradeoffs. Entangle favors CPU utilization and true parallelism over resource managers, centralized (which is to say network centric) schedulers or other shared services. It favors simplicity over behavior - leaving specific extensions to you, attempting to be minimal and un-opinionated. It tries to be invisible to the end user as much as possible. It strives for the basic principle that, "if it looks like it should work, it should work."
Entangle leans on the OS scheduler to prioritize processes based on the behavior of those processes and underlying resource utilizations. It therefore does not provide its own redundant (which is to say centralized) scheduler or task manager. Because of this, top-down visibility or control of workflow processes is not as easy as with centralized task managers.
Entangle prefers the non-API approach, where it looks like regular python expressions, over strict API's or invocation idioms. This makes it easier to pick up and use and plays well with 3rd party frameworks too.
Use Cases
Because of these tradeoffs, there are certain use cases that align with entangle and others that probably do not.
If you want top-down visibility & control of workflows and tasks, Entangle is probably not ready for you.
If you have lots of CPUs, entangle could be for you! If you want easy python workflows that span local and remote cloud resources, entangle could be for you. If you want to write custom handlers that enrich or execute code in custom ways for your needs, entangle makes this easy for you.
Orchestration
One focused use case for entangle is when you want to orchestrate across different compute nodes, remote APIs and other disparate endpoints in a single workflow, with inherent parallelism.
Each step of the workflow has different parameters, needs and protocols used to communicate with it. Such a workflow might simply look like:
data = data_refinement(
get_source_data()
)
result = measure_vectors(
vector1(
data("vector1")
),
vector2(
data("vector2")
),
vector3(
data("vector3")
)
)
GPU Processing
Another use case is the need to run multiple parallel tasks that operate on matrix data using a GPU. Entangle makes this quite easy as seen in GPU Example, Docker Example and Shared Memory Example
DevOps
For devops use cases Entangle allows you to write simple, parallel workflow graphs using plain old python. This let's you write efficient parallel devops pipelines with ease. Build simple workflows that do powerful things like orchestrating across multiple clouds, services, repositories etc in an efficient dataflow parallel design.
What Entangle is not
Here are some things entangle is not, out-of-the-box. This isn't to say entangle can't do these things. In fact, entangle is designed to be a low level framework for implementing these kinds of things.
- Entangle does not yet perform fail over (TBD)
- Entangle is not a batch process framework (TBD)
- Entangle is not map/reduce
- Entangle is not a centralized task manager
Declarative Infrastructure
Entangle allows you to target specific infrastructure environments or needs using simple decorators.
For example, to specify a process run on local hardware you can use the @local decorator
@process
@local
def myfunc():
return
If you want to execute a function in AWS EC2 or fargate, you could write it as:
@process
@aws(keys=[])
@ec2(ami='ami-12345')
def myfunc():
return
@process
@aws(keys=[])
@fargate(ram='2GB', cpu='Xeon')
def myfunc():
return
or using docker containers
@process
@docker(image="tensorflow/tensorflow:latest-gpu")
def reduce_sum():
import tensorflow as tf
return tf.reduce_sum(tf.random.normal([1000, 1000]))
If you have a custom on-prem environment you can write a simple decorator that deploys the task to that and use it alongside other infrastructure decorators.
Where, What & How: Using Mixins
Entangle uses the concept of mixins to associate infrastructure needs (where) with compute (what) and concurrency needs (how). Thus, it allows you to mix and match combinations of these within a single workflow or dataflow.
For example, you might need to get data from AWS Lambda
, run GPU Algorithm on that data inside a container
then send those results to 10 CPUs in a compute cloud
for parallel analysis, then gather those results and send them to a web service
on your network for storage or rendering.
All these steps have different locations, infrastructure requirements, compute needs, processing times, and protocols.
Execution
As we mentioned above, entangle workflows will fan out during execution and occupy CPUs throughout the workflow. The OS will determine the priority of processes based on their resource needs at the time of execution. Here is a simple workflow and diagram showing how the parallel execution unfolds.
result = add(
mult(
one(),
two()
),
three()
)
This execution order applies to workflows in entange. If you use @dataflow
decorator the execution follows that of a dataflow compute model. Refer to the section Dataflows for more information.
Threads vs Processes
In Python, threads do not execute in parallel to one another, it only gives the illusion of such. Python handles the context switching between threads and is limited by the GIL. Processes on the other hand, are not controlled by a GIL and can thus truly run in parallel. The host operating system governs the sheduling of processes and entangle is designed to exploit this benefit.
Workflows
A workflow in Entangle is just a fancy term for a call graph of function processes. The example above in Execution is a simple workflow. Workflows execute in natural dependency ordering that you'd expect from any python function call. The inner most dependencies are invoked first so they can return their values to parent functions.
Note that this paradigm is pretty much the way most imperative languages operate today, but it does differ from dataflows which we talk about down below Dataflows.
Imperative vs Structured Declaration
In Entangle, there are two ways you can write your workflows, depending which is more convenient for you. Both produce in the same execution sequence and results.
Let's look at the example below:
result = add(
add(
num(6),
two() if False else one()
),
subtract(
five(),
two()
)
)
This represents the structured paradigm, based off a more strict type of lambda math notation where functions invoke functions until the top-most value is produced.
We can also write this as a sequence of imperative declarations
_five = five()
_two = two()
_sub = subtract(_five,_two)
_num = num(6)
_two2 = two() if False else one()
_add1 = add(_num,_two2)
result = add(_add1,_sub)
Process Behavior
Keyword parameters on the @process
decorator allow you to control some meta-behavior of the process.
Wait
Wait indicates how long a function should wait before its arguments arrive. It is a sibling to timeout however it is different.
@process(wait=20)
def values(*args):
values = [arg for arg in args]
return values
o = values(
one(),
train()
)
In the above example, it is saying that the values
function will wait up to 20 seconds for both one()
and train()
functions to complete and return values otherwise it will throw a ProcessTimeoutException
.
Timeout
Timeout is more self-evident. It is the wait period in seconds, entangle will allow a process to run.
# Wait at most, 3 seconds for this task to complete
@process(timeout=3)
def task():
return True
# Wait indefinitely for this task to complete
@process
def taskB():
return False
When a process times out, a ProcessTimeoutException
will be thrown by Entangle and the process will be terminated if it is still alive.
Composition
Entangle offers a couple different ways to use composition effectively: with decorators and with workflows.
Decorator Composition
You can compose your tasks by combining process and infrastructure decorators.
Again, in the example below, we are declaring a process and local infrastructure for our task to run by composing two decorators together.
@process
@local
def taskA():
return
or, specifying that the task run as a process inside AWS fargate, unchanged.
@process
@aws(keys=[])
@fargate(ram='2GB', cpu=4)
def taskA():
return
Workflow Composition
Composing workflows is just as simple. You can write code that itself constructs workflows on the fly easily.
from entangle.process import process
from entangle.http import request
from entangle.workflow import workflow
@process
@request(url='https://datausa.io/api/data', method='GET')
def mydata(data):
import json
data = json.loads(data)
return int(data['data'][0]['Year'])
@process
def two():
return 2
@process
def add(a, b):
v = int(a) + int(b)
print("ADD: *"+str(v)+"*")
return v
@workflow
def workflow1():
return add(
mydata(drilldowns='Nation', measures='Population'),
two()
)
@workflow
def workflow2(value):
return add(
value(),
two()
)
result = workflow2(workflow1)
print(result())
The key to making this work is the deferring of execution trait of Entangle which we will discuss in a later post. But essentially it allows for separation of workflow declaration from execution. Doing this allows you to treat workflows as objects and pass them around anywhere a normal python function (or workflow) is expected. Prior to execution.
Containers
Entangle supports two container technologies: docker and singularity(TBD). These are used with the associated decorators @docker
and @singularity
.
Using containers allows you to run functions that have complex OS or python depdendencies not native to your hosting environment.
For a complete example, please see Docker Example
Dataflows
Entangle supports two kinds of execution flow, dataflow[8] and workflow (or what is more traditionally called control flow). They both complete a DAG-based execution graph but in slightly different ways and with different advantages to the programmer.
As wikipedia states[8]:
Dataflow is a software paradigm based on the idea of disconnecting computational actors into stages (pipelines) that can execute concurrently. Dataflow can also be called stream processing or reactive programming.[1]
However, Merriam-Webster's simple definition[9] illuminates a key trait of dataflows - "...as data becomes available"
: a computer architecture that utilizes multiple parallel processors to perform simultaneous operations as data becomes available
Data Readiness
In many parallel data computations the arrival or readiness of some data might lag behind other data, perhaps coming from longer computations or farther away. True dataflow models allow the computation to proceed on a parallel path as far as it can go with the currently available data. This means dependent operations are not held up by control flow execution order in some cases and the overall computation is optimized.
Dataflows vs Workflows
Author, slikts [1], descibes these differences very nicely (from [1]).
Control flow refers to the path the point of execution takes in a program, and sequential programming that focuses on explicit control flow using control structures like loops or conditionals is called imperative programming. In an imperative model, data may follow the control flow, but the main question is about the order of execution.
Dataflow abstracts over explicit control flow by placing the emphasis on the routing and transformation of data and is part of the declarative programming paradigm. In a dataflow model, control follows data and computations are executed implicitly based on data availability.
Concurrency control refers to the use of explicit mechanisms like locks to synchronize interdependent concurrent computations. Dataflow is also used to abstract over explicit concurrency control.
Simple Example
Let's start with a simple workflow example:
A(B(),C())
In traditional control flow or what I call lambda based[10] execution, the programming language's dependency analysis will determine the order of execution. In this example B()
and C()
are dependencies of A()
and thus need to complete before A()
can be executed. In other words, they are inputs to A()
. Basic stuff.
This means the execution of each compute function is aware of the specific dependent functions it must resolve first. We call this control depedency [2].
Let's say the dependency was reversed. Whereby, a value computed by A()
was a dependency of both B()
and C()
. How would we write this in conventional control flow?
We might do something like this.
B(A())
C(A())
Breaking our previous single expression into multiple expressions. However, in this case, A()
is being invoked twice, which could produce different values.
So we might introduce a variable
a = A()
B = B(a)
C = C(a)
Now we have 3 expressions that must run in a proper order. We have done some of the work by making a separate expression for our dependent value a
. But for large dataflows this can be a bigger burden on the programmer to unravel all the dependencies and put them in proper order.
What if the execution of an expression was not computed using the traditional dependency analysis most languages use today but instead was defined by stricter dataflow semantics?
DAG Dataflow
In dataflow, a DAG represents the flow of values from compute nodes where each node computes its value once and the value is emitted or sent to directionally connected nodes in the DAG.
This paradigm makes it easier to express our intentions of sharing values from A()
by computing it once and sending the results to B()
and C()
. Neither B()
nor C()
explicitly depend on A()
. The dataflow DAG provides the dependency structure for all the compute nodes.
Now let's rewrite our expression if it were executed in strict dataflow order.
A(
B(),
C()
)
Here, the dataflow engine executing this expression understands the intention to compute A()
first, then in parallel compute B()
and C()
with the same result computed only once from A()
as their input.
Written imperatively, this would equate to:
a = A()
B(a)
C(a)
where B(a)
and C(a)
run in parallel.
IMPORTANT! The dataflow syntax provides the necessary graph structure for a dataflow engine to know explicitly which functions can operate in parallel.
Results Comparison
So what are the differences in the results from our workflow version and our dataflow version? It should be clear that the workflow version takes as input 2 values (B(),C()) and produces 1 value, A().
However, our dataflow version is different. It takes as input 1 value A() and produces two results, B() and C(), in parallel. So the computations are different!
Advantages of Strict Dataflow
As was pointed out in the intro to this section, dataflow provides declarative data dependency modelling for a computation. This is sometimes a more natural way of thinking about a problem for the human programmer. It allows a clean separation between the initial state of a dataflow and various desired outcomes that would be more difficult to model using control flow programming, as the programmer will have to use multiple imperative steps to introduce the proper execution order and indicating which computations are parallel is unclear from linear ordering.
Dataflow has improved efficiencies when it comes to data-centric computations as well because it only computes nodes once per DAG execution. This approach requires no caching or variables that might be required with imperative-based control flow.
Naturally Parallel
A dataflow DAG is a naturally and implicitly parallel model - by its declarative structure. For CPU-bound, data centric tasks it is simple and easy to understand for this reason.
Detailed Example
For a more detailed example of using @dataflow
in entangle see Dataflow Examples.
References
- Concurrency Glossary - https://slikts.github.io/concurrency-glossary/
- Dependency Graphs - https://en.wikipedia.org/wiki/Dependency_graph
- Dataflow Programming - https://en.wikipedia.org/wiki/Dataflow_programming
- Data-Flow vs Control-Flow for Extreme Level Computing - https://ieeexplore.ieee.org/document/6919190
- Advances in Dataflow Programming Languages - https://futureofcoding.org/notes/dataflow/advances-in-dataflow-programming-langauges.html
- Data dependency - https://en.wikipedia.org/wiki/Data_dependency
- An introduction to a formal theory of dependence analysis - https://link.springer.com/article/10.1007/BF00128174
- Dataflow - https://en.wikipedia.org/wiki/Dataflow
- Dataflow - https://www.merriam-webster.com/dictionary/dataflow
- Functional Programming/Lambda Calculus -https://www.tutorialspoint.com/functional_programming/functional_programming_lambda_calculus.htm
Schedulers
Entangle supports a composition based mechanism for attaching schedulers to workflows and functions. The scheduler class will control access to CPU resources based on its constraints. For example, if you want to run a workflow with potentially 20 parallel tasks, but only want to allocate 4 CPUs to execute the workflow, the scheduler class can ensure entangle doesn't spawn more processes than requested. Schedulers wrap individual functions and pull CPU "cookies" off a scheduler queue to hand off to processes. Each cookie contains a CPU identifier that the process then binds to. When a cookie is placed on the queue by a process it means that CPU (id) is available for use.
Parallel processes thus use the queue mechanism to self-organize around the allocated CPUs by requesting cookies, assigning their CPU affinity to that cpu id, running their behaviors and returning the cookie to the queue when complete. This approach requires no centralized scheduler server as the workfow processes all use the same multiprocessing.Queue to retrieve CPU cookies.
Pluggable Schedulers
Entangle allows you to provide your own scheduler class using the decorator.
@scheduler(cpus=4,impl='my.package.MyScheduler'}
def myfunc():
return
Currently, the scheduler class need only implement one method.
def register(self, f, cpus=12):
and return a function
or partial
that wraps the provided function with scheduler behavior.
To see the implementation of DefaultScheduler
click here.
For a workflow example using scheduler see Scheduler Example below.
Distributed Flows
Entangle allows you to pass a workflow (or dataflow) to a remote machine for execution. When combined with @scheduler
decorators this also forwards scheduler behavior to the remote machine where it manages the received workflow there.
This type of propagation requires no centralized (i.e. shared) scheduler or services and thus scales very well.
Moreover, parts of a workflow can be sent to different machines for a truly distributed workflow.
SSH Decorator
Functions or flows (graph of functions) are remoted by using the @ssh
decorator like the example below.
@ssh(user='me', host='radiant', key='/home/me/.ssh/id_rsa.pub', python='/home/me/venv/bin/python')
@scheduler(**scheduler_config)
@thread
def workflow2():
pass
In this example, we have declared a (contrived) workflow that adds the return values of 2 embedded functions and returns it.
The @ssh
decorator indicates that this workflow is to be copied and executed on the remote server myserver
as user me
using the python executable at /home/me/venv/bin/python
.
Entangle will marshall the codes to the remote server and execute them there.
Scheduler Propagation
If any dependent or subsequent functions are invoked on the remote server, any decorators that apply to those will be enforced.
If you use @scheduler
then it will utilize the scheduler queue to request CPU cookies. If you also use another @ssh
decorator then that dependent function will be shipped to a 3rd remote server and the process repeated there.
diagram here
Each time a workflow decorated with @scheduler
is sent to a remote machine, that scheduler then manages its portion of the workflow and any dependent functions that it might resolve.
This pattern forms a sort of distributed tree of schedulers that work in parallel across multiple machines, yet fully resolve to complete the root workflow.
Let's take a closer look at this example, which uses 3 different machines to solve its workflow.
@ssh(user='darren', host='miko', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@scheduler(**config)
@process
def two():
# run some codes
return 2
@ssh(user='darren', host='radiant', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@scheduler(**config)
@process
def three():
# run some codes
return 3
@scheduler(**config)
@process
def add(a, b):
v = int(a.get_result()) + int(b)
return v
@ssh(user='darren', host='phoenix', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@scheduler(**config)
@process
def workflow():
_add = add(
three(),
two()
)
return _add()
In the above example, the workflow()
is first sent to machine phoenix
and executed there. It wraps the function add
which also executes on phoenix
because it has no @ssh
decorator and the workflow is already there.
The add()
function requires the functions three()
and two()
be solved first. These two functions are sent to machines miko
and radiant
to be solved.
The results are returned to the add
function running on phoenix
and the result of the workflow()
is returned to the calling machine, or the machine where the workflow was executed on.
Once the workflow()
reaches phoenix
the @scheduler
attached to the workflow manages the CPU's there according to its constraints.
Since the add()
function has two dependencies that can run in parallel the @schedular
can request 2 CPUs and run them in parallel.
diagram
Examples
- GPU Example
- Shared Memory Example
- AI Example
- Docker Example
- Dataflow Examples
- Scheduler Example
- Graph Example
- Workflow Future Example
- Retry Example
- General Example
There are a variety of example workflows and dataflows you can run. In addition to the sample code provided below you can run these using the following commands.
$ python -m entangle.examples.example
$ python -m entangle.examples.example2
$ python -m entangle.examples.example3
$ python -m entangle.examples.example4
$ python -m entangle.examples.example5
$ python -m entangle.examples.example6
$ python -m entangle.examples.example_graph.py
$ python -m entangle.examples.example_graph_future.py
$ python -m entangle.examples.example_with_future.py
$ python -m entangle.examples.lambdaexample
$ python -m entangle.examples.listexample
$ python -m entangle.examples.listexample2
$ python -m entangle.examples.dataflowexample
$ python -m entangle.examples.dataflowexample2
$ python -m entangle.examples.dockerexample
$ python -m entangle.examples.aiexample
$ python -m entangle.examples.retry_example
$ python -m entangle.examples.schedulerexample
$ python -m entangle.examples.schedulerexample2
$ python -m entangle.examples.sshdatafloweexample
$ python -m entangle.examples.sshschedulerexample
$ python -m entangle.examples.timeoutexample
For a complete list of the examples source code and binders to run them please visit the wiki.
GPU Example
This example assumes you have installed nvidia-cuda-toolkit
and associated python packages along with numba
.
In this example, two vectorized functions with different sized matrices are run in parallel, and their times are gathered.
import numpy as np
from entangle.process import process
from timeit import default_timer as timer
from numba import vectorize
@process
def dovectors1():
@vectorize(['float32(float32, float32)'], target='cuda')
def pow(a, b):
return a ** b
vec_size = 100
a = b = np.array(np.random.sample(vec_size), dtype=np.float32)
c = np.zeros(vec_size, dtype=np.float32)
start = timer()
pow(a, b)
duration = timer() - start
return duration
@process
def dovectors2():
@vectorize(['float32(float32, float32)'], target='cuda')
def pow(a, b):
return a ** b
vec_size = 100000000
a = b = np.array(np.random.sample(vec_size), dtype=np.float32)
c = np.zeros(vec_size, dtype=np.float32)
start = timer()
pow(a, b)
duration = timer() - start
return duration
@process
def durations(*args):
times = [arg for arg in args]
return times
dp = durations(
dovectors1(),
dovectors2()
)
print(dp())
Which outputs something like
[0.21504536108113825, 0.3445616390090436]
Shared Memory Example
The default return value conduit in Entangle is the Queue. Task return values are marshalled back through queues where they are gathered and provided as function parameters to the parent process task. This method is not desirable for very large data sets such as matrices in GPU computations. The below example shows how Entangle uses python 3.8's shared memory feature to implicitly share volatile memory across native parallel processes.
import numpy as np
from entangle.process import process
from timeit import default_timer as timer
from numba import vectorize
@process(shared_memory=True)
def dopow(names, smm=None, sm=None):
(namea, nameb, shapea, shapeb, typea, typeb) = names
start = timer()
shma = sm(namea)
shmb = sm(nameb)
# Get matrixes from shared memory
np_shma = np.frombuffer(shma.buf, dtype=typea)
np_shmb = np.frombuffer(shmb.buf, dtype=typeb)
@vectorize(['float32(float32, float32)'], target='cuda')
def pow(a, b):
return a ** b
pow(np_shma, np_shmb)
duration = timer() - start
print("Powers Time: ", duration)
@process(shared_memory=True)
def createvectors(smm=None, sm=None):
vec_size = 100000000
start = timer()
a = b = np.array(np.random.sample(vec_size), dtype=np.float32)
c = np.zeros(vec_size, dtype=np.float32)
# create shared memory for matrices
shma = smm.SharedMemory(a.nbytes)
shmb = smm.SharedMemory(b.nbytes)
names = (shma.name, shmb.name, a.shape, b.shape, a.dtype, b.dtype)
duration = timer() - start
print("Create Vectors Time: ", duration)
return names
dp = dopow(
createvectors()
)
dp()
Outputs
Create Vectors Time: 0.8577492530457675
Powers Time: 0.8135421359911561
SharedMemoryManager & SharedMemory
In the example above, you will notice two special keywords being passed into the functions,
def createvectors(smm=None, sm=None):
smm
is a handle to the SharedMemoryManager
being used for this workflow and sm
is a handle to the SharedMemory
class needed to acquire the shared memory segments by name.
If you set shared_memory=True
then you must include these keyword arguments in your method or an error will occur.
Now, you might be asking yourself, if one of the design goals was shared-nothing then why are we talking about shared memory? When we say "shared" (in shared-nothing) we refer to resources that have to be synchronized or locked when accessed by parallel processes, thereby creating bottlenecks in the execution. The shared memory example here does not introduce any contention, rather, it is used in a pipeline fashion. In this approach, a given shared memory address is only updated by one process at a time (e.g. using it to return its data to the waiting process). Multiple shared memory segments can be created during the course of a workflow for parallel running processes.
AI Example
Here is an example that uses tensorflow to train a model and return the summary.
from entangle.logging.debug import logging
from entangle.docker import docker
from entangle.process import process
@process
@docker(image="tensorflow/tensorflow:latest-gpu", packages=['tensorflow_datasets'])
def train():
import tensorflow as tf
import tensorflow_datasets as tfds
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
return tf.cast(image, tf.float32) / 255., label
ds_train = ds_train.map(
normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.experimental.AUTOTUNE)
ds_test = ds_test.map(
normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.experimental.AUTOTUNE)
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
optimizer=tf.keras.optimizers.Adam(0.001),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)
model.fit(
ds_train,
epochs=6,
verbose=0,
validation_data=ds_test,
)
return model.summary()
model = train()
print(model())
Docker Example
In this example we are running a process that spawns the decorated function inside a docker container and waits for the result.
We compose this using the @process
and @docker
decorators to achieve the design. The function reduce_sum
is run inside the docker container using image tensorflow/tensorflow:latest-gpu
and the result is returned seamlessly to the workflow.
from entangle.docker import docker
from entangle.process import process
import logging
logging.basicConfig(
format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
@process
@docker(image="tensorflow/tensorflow:latest-gpu")
def reduce_sum():
import tensorflow as tf
return tf.reduce_sum(tf.random.normal([1000, 1000]))
rs = reduce_sum()
print(rs())
Tensorflow GPU Container
The above example launches a GPU enabled docker on the nvidia docker
platform (running on your local machine). Tensorflow by default will consume the entire GPU for its processing, however if you want to run parallel GPU jobs that only consume GPU memory as needed, then you need to use:
@docker(image="tensorflow/tensorflow:latest-gpu", consume_gpu=False)
Dataflow Examples
The example below demonstrates the dataflow capability of Entangle. This is a different compute paradigm from workflows. Please read the section on Dataflows vs Workflows for complete explanation of the difference.
NOTE: We use threads as our execution unit in this example as it makes seeing the output possible. With
@process
you won't see the aggregate output on your console, instead it will be logged toentangle.log
file. With Entangle you decide whether to use concurrency (threads) or parallelism (processes). Entangle is itself, threadless.
import threading
import time
from entangle.logging.debug import logging
from entangle.dataflow import thread
from entangle.dataflow import process
from entangle.dataflow import dataflow
def triggered(func, result):
print("triggered: {} {}".format(func.__name__, result))
@dataflow(callback=triggered)
@thread
def printx(x):
print('printx: {}'.format(threading.current_thread().name))
return("X: {}".format(x))
@dataflow(callback=triggered)
@thread
def printy(y):
print('printy: {}'.format(threading.current_thread().name))
return("Y: {}".format(y))
@dataflow(callback=triggered)
@thread
def printz(z):
print('printz: {}'.format(threading.current_thread().name))
return("Z: {}".format(z))
@dataflow(callback=triggered)
@thread
def echo(e):
print('echo: {}'.format(threading.current_thread().name))
return "Echo! {}".format(e)
@dataflow(executor='thread', callback=triggered, maxworkers=3)
def emit(a, **kwargs):
print('emit: {}'.format(threading.current_thread().name))
return a+"!"
# Create the dataflow graph
flow = emit(
printx(
printz(
echo()
)
),
printy(
printz()
),
printy()
)
# Invoke the dataflow graph with initial input
flow('emit')
time.sleep(2)
# Call flow again with different input value
flow('HELLO')
Data-Driven Branching
It's useful to have a data flow that routes to different paths depending on input data.
Entangle makes this relatively easy. The example below embeds a lambda expression directly in the dataflow structure that chooses either printx()
or printy()
as the next compute node depending on what the input value is - after emit has generated the value.
In the snippet below emit
first produces a value based on some input, the result is emitted to either printx()
or printy()
depending on the value of the result.
Note that this is computed during the execution of the DAG, not at declaration time.
flow = emit(
lambda x: printx() if x == 'emit' else printy()
)
flow('emit')
Full example:
import threading
import time
from entangle.dataflow import thread
from entangle.dataflow import dataflow
def triggered(func, result):
print("triggered: {} {}".format(func.__name__, result))
@dataflow(callback=triggered)
@thread
def printx(x):
print('printx: {}'.format(threading.current_thread().name))
return("X: {}".format(x))
@dataflow(callback=triggered)
@thread
def printy(y):
print('printy: {}'.format(threading.current_thread().name))
return("Y: {}".format(y))
@dataflow(executor='thread', callback=triggered, maxworkers=3)
def emit(a, **kwargs):
print('emit: {}'.format(threading.current_thread().name))
return a+"!"
# Create the dataflow graph
# Defer whether we will forward to printx() or printy() depending
# on the result receive from emit. This won't be known until the data is ready.
flow = emit(
lambda x: printx() if x == 'emit' else printy()
)
# Invoke the dataflow graph with initial input
flow('emit')
time.sleep(2)
# Call flow again with different input value
flow('HELLO')
Which outputs:
printx: emit MainThread
triggered: inner X: emit
printz: ThreadPoolExecutor-3_0
triggered: inner Z: X: emit
----------------------------
printy: MainThread
triggered: inner Y: HELLO
Distributed Dataflow
In the example below, we combine @dataflow
with @ssh
to get instant distributed dataflow!
import threading
import time
from entangle.logging.debug import logging
from entangle.ssh import ssh
from entangle.process import process
from entangle.dataflow import dataflow
def triggered(func, result):
print("triggered: {} {}".format(func.__name__, result))
@dataflow(callback=triggered)
@ssh(user='darren', host='miko', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@process
def printz(z):
print('printz: {}'.format(threading.current_thread().name))
with open('/tmp/printz.out', 'w') as pr:
pr.write("Z: {}".format(z))
return "Z: {}".format(z)
@dataflow(callback=triggered)
@ssh(user='darren', host='radiant', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@process
def printx(x):
print('printx: {}'.format(threading.current_thread().name))
with open('/tmp/printx.out', 'w') as pr:
pr.write("X: {}".format(x))
return "X: {}".format(x)
@dataflow(callback=triggered)
@process
def printy(y):
print('printy: {}'.format(threading.current_thread().name))
return "Y: {}".format(y)
@dataflow(callback=triggered)
@ssh(user='darren', host='radiant', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@process
def echo(e):
print('echo: {}'.format(threading.current_thread().name))
with open('/tmp/echo.out', 'w') as pr:
pr.write("Echo! {}".format(e))
return "Echo! {}".format(e)
@dataflow(callback=triggered, maxworkers=3)
def emit(value):
print('emit: {}'.format(threading.current_thread().name))
return value+"!"
if __name__ == '__main__':
results = []
# Create the dataflow graph
flow = emit(
printx(
printz(
echo()
)
),
printy(
printz()
),
printy()
)
result = flow('emit')
Scheduler Example
from entangle.logging.debug import logging
from entangle.process import process
from entangle.http import request
from entangle.workflow import workflow
from entangle.scheduler import scheduler
scheduler_config = {'cpus': 2,
'impl': 'entangle.scheduler.DefaultScheduler'}
@scheduler(**scheduler_config)
@process
def two():
return 2
@scheduler(**scheduler_config)
@process
def three():
return 3
@scheduler(**scheduler_config)
@process
def add(a, b):
print("add: {} {}".format(a,b))
v = int(a) + int(b)
print("ADD: *"+str(v)+"*")
return v
@scheduler(**scheduler_config)
@workflow
def workflow2():
return add(
three(),
two()
)
result = workflow2()
print(result())
Graph Example
The follow example code shows how we can collect the call graph trace for our workflow and display it.
import json
import time
import asyncio
from entangle.logging.debug import logging
from entangle.process import process
@process
def one():
return 1
@process
def two():
return 2
@process
def five():
return 5
@process
def num(n):
return n
@process
def add(a, b):
v = int(a) + int(b)
print("ADD: *"+str(v)+"*")
return v
@process
def subtract(a, b):
return int(a) - int(b)
if __name__ == '__main__':
workflow = add(
add(
num(6),
two() if False else one()
),
subtract(
five(),
add(
subtract(
num(8),
two()
),
one()
)
)
)
result = workflow()
print(result)
graph = workflow.graph(wait=True)
print("GRAPH:",json.dumps(graph, indent=4))
Which outputs this graph structure
GRAPH: {
"add": [
{
"add": {
"num": {
"6": []
},
"one": []
}
},
{
"subtract": {
"five": [],
"add": {
"subtract": {
"num": {
"8": []
},
"two": []
},
"one": []
}
}
}
]
}
We can also use futures to wait for the graph data to arrive as a callback.
future = workflow.graph(wait=False)
def show_graph(graph):
print("GRAPH:", graph.result())
future.add_done_callback(show_graph)
future.entangle()
Workflow Future Example
Entangle allows you to use future results for workflows if the blocking method doesn't meet your use case. To do this, we alter the invocation of the workflow slightly.
def callback(result):
print("CALLBACK:", result.result())
# set up future callbacks
future = workflow.future(callback=callback)
print('Future:', future)
# Trigger workflow. Does not block
workflow(proc=True)
# Notify results when available
future.entangle() # Does not block
Retry Example
To specify how many times a function should be retried before throwing an exception with a sleep value in seconds between retries.
@process(retry=5. sleep=1)
def five():
import time
val = int(str(time.time()).split('.')[1]) % 5
if val != 0:
raise Exception("Not a FIVE!")
return 5
General Example
An example of how entangle will be used (still in development)
from entangle.process import process
from entangle.thread import thread
from entangle.task import task
from entangle.local import local
from entangle.aws import ec2
from entangle.aws import lmbda
from entangle.http import request
@process(timeout=60)
@local(cpus=4)
def add(a, b):
return a + b
@process(cache=True)
@aws(keys=[])
@ec2
def one():
return 1
@thread
@local
def two():
return 2
@lmbda(function='name')
@aws(keys=[])
def proxy():
# lambda proxy
pass
@process
def subtract(a, b):
return int(a) - int(b)
@process
def five():
return 5
@process
def num(n):
return n
@process
@request(url='http://..../', method='POST')
def request(data):
# Manipulate http response data here and return new result
return data
# 1,2,3 get passed to lambda function and result returned
result = proxy(1,2,3)
# Pass key:value params and get result from your function
result = request(key1=value, key2=value )
# parallel workflow is just "plain old python"
result = add(
add(
num(6),
two() if False else one()
),
subtract(
five(),
two()
)
)
print(result())
Logging
Logging in Entangle is intended to be convenient and provide some useful out-of-the-box defaults that "just work".
There are 3 default loggers you can import.
from entangle.logging.info import logging
from entangle.logging.debug import logging
from entangle.logging.file import logging
And the details of each are:
info
import logging
logging.basicConfig(
format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
debug
import logging
logging.basicConfig(
format='%(asctime)s : %(levelname)s : %(message)s', level=logging.DEBUG)
file
import logging
logging.basicConfig(filename='entangle.log',
format='%(asctime)s : %(levelname)s : %(message)s', level=logging.DEBUG)
You can of course provide your own logging configuration, but be sure to include it at the top of your file so the various entangle modules pick it up.
Design Tool
A prototype visual design tool for Entangle is shown below. More details will be posted on thye wiki here.