• Stars
    star
    372
  • Rank 114,221 (Top 3 %)
  • Language
  • Created over 3 years ago
  • Updated about 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Software Architecture for ML engineers

Machine Learning Design patterns

Pipeline

A pipeline is about processing some data sequentially using an arbitrary number of functions. It's useful for data preprocessing or within the context of an inference framework.

For example you may want to do preprocess -> inference -> postprocess

from typing import Union

def preprocess(input : Union[str, Image, Video, Audio]) -> Tensor:
    # implementation

def inference(input : Tensor) -> Tensor:
    # implementation

def postprocess(input : Tensor) -> Union[str, Image, Video, Audio]:
    # implementation

And then you'd run your pipeline by saying

pipeline = [preprocess, inference, postprocess]

input = ...
for step in pipeline:
    input = step(input)

return input

An import detail is that the input and output types of function in a pipeline need to match.

This pattern isn't only limited to an inferencing framework but a framework like Keras explictly has a concept of a layer so if you were to implement it from scratch a grossly simplified version would be something like.

class KerasModel():
    def __init__(self):
        self.layers = []
    
    def add_layer(self, layer):
        self.layers.append(layer)
    
    def forward(self, input):
        for layer in self.layers:
            input = layer(input)
        return input

An exercise to the reader is to make the above work with a batch of examples.

Workflow

A workflow is a more complex version of a pipeline that allows for sequential behaviors. But the more general pattern is a Directed Acyclic Graph (DAG). This is what DAG providers like Airflow, metaflow and the ensemble support in torchserve do.

# Dag example
graph = {
    'input': ['a'],
    'a': ['b', 'e'],
    'b': ['c', 'd'],
    'd': ['e']}

In the example we above we use a Python dictionary where the keys on the right hand side are the nodes where arrows are pointing out of and the values on the left hand side have arrows pointing into them. If you don't like python dictionaries you can also create a DAG using YAML or python decorators.

Now imagine if every node was some Python function or even a Pytorch model how would you go about executing this DAG?

class WorkflowEngine():
    def __init__(self, dag):
        self.dag = dag
    
    def execute():
        for key, value in dag.items():
            Step(key, value, False)

class Step():
    def __init__(self, inputs, outputs, dependencies_met):
        self.inputs = inputs
        self.outputs = outputs
        self.dependencies_met = False
        self.resources = {"cpu" : 2, "gpu" : 1}
    
    def execute():
        if self.dependencies_met:
            # Execute steps

A real world orchestrator would need to take care of dependency management, scheduling and resource allocation.

Function as data

Function as data is something LISP programmers talk a lot about. The main idea is you could have a function like

;; Add 1 and 2
(+ 1 2)

But if you add a quote at the beginning of it then it becomes a string

;; The string (+ 1 2)
'(+ 1 2)

This is powerful because now you could have a seperate program analyze the string (+ 1 2) realize that the inputs never change, the function is pure so the outputs never change so this function can be replaced by 3

PyTorch also has a similar idea but first let's define a very simple toy model.

class myModel(torch.nn.Model):
    def __init__(self):
        self.linear = torch.nn.Linear(100)
    
    def forward(self, input):
        output = self.linear(input)
        return output

Run an inference with myModel(torch.randn(100)) so it's a function! But also if you were to run myModel.data you would get the weights of the model so it's also data. So a function = data.

This is also made clearer if you've ever pickled model which is essentially a method to serialize some python objects as strings on disk so again function = data

model = myModel()
pickle.dumps(model)

Iterator design pattern

for i in range(10):
    print(i)

But a more useful operation would be something like

for batch in dataset:
    model(batch)

So how do you make something like for _ in _ available for your classes. We do this by implementing the __iter__() and __next()__ functions

from typing import List

class Dataset:
    def __init__(self, data : List[str]):
        self.data = data
        self.elements = 0
        
    def __iter__(self):
        return data[0]
    
    def __next__(self, batch : int = 0):
        
        # Return a batch of examples
        if batch > 0:
            # TODO: Fix typo here, this will only return a single or 0 elements
            self.elements = self.elements + batch
            return self.data[self.elements : self.elements + batch]
        
        # Return a single example
        else:
            self.elements = self.elements + 1
            return self.data[self.elements]

Job queues

Let's say we have a service that needs to pick one of n PyTorch models to run on some input

from dataclass import dataclass

@dataclass
class Job:
    model : str
    input : Union[str, Image, Audio, Video]
    endpoint : Tuple[str, int] # url : port

class JobProcessor():
    def __init__(self):
        self.jobs : List[Job] = []
    
    def process_job(self):
        job = jobs.pop()
        execute(job)
    
    def execute(self, job):
        output = job.model(job.input)
        expose(output, endpoint)
    
    def expose(self, output, endpoint)L
        # Use FastAPI or something else

With only a couple of lines of code we've designed a multi model inferencing framework. Let's say you're not using Python to design this job manager you can also still just spawn a Python process, run the inference and then write it either to disk or stdout and pick it back up from the other language.

Callbacks

Many trainer loops will implement callbacks where you can trigger some behavior if some condition is fulfilled for example

on_training_ends -> do_something
on_epoch_end -> do_something

def do_something():
    save_logs_to_tensorboard()
    change_learning_rate()

A callback is a particular case of something called the Observer pattern so let's implement that. Code paraphrased from https://refactoring.guru/design-patterns/observer/python/example#lang-features

So an observer needs to subscribe to some subject that changes its behavior

class ModelSubject():
    def __init__(self):
        state : Trainer = None # A trainer includes a model, which epoch its on, loss, model weights...
        observers : List[ModelObserver] = None

    def attach(self, observer : ModelObserver):
        observers.append(observer)


    def detach(self, observer : ModelObserver):
        observers.remove(observer)

    def notify(self):
        for observer in observers:
            observer.update(state)

The observer is notified of all state changes of the subject and then needs to do something when that happens

At a high level an Observer is an abstract class that implements a function called update

from abc import abstractmethod, ABC

class Observer(ABC):

    @abstractmethod
    def update(self):
        """
        Implement your own observer here
        """
        pass

We can then build specific kinds of observers by by implementing the update() function. In the example below we build an observer to adjust the learning rate of a model when the loss increases

class ChangeLearningRateObserver(Observer):
    def __init__(self):
        self.state : [TrainerState] = None
    
    def update(self, new_state):
        if self.state = None:
            pass
        
        else:
            # Do not use this in production code this is educational only
            if new_state.loss > state.loss:
                state.lr = state.lr * 0.1
        self.state = new_state

But this is a powerful framework and we can also implement something like logging without changing the library code.

class LogObserver(Observer):
    def __init__(self, log_dir='/logs/'):
        self.state : [TrainerState] = None
        self.log_dir : str = log_dir

    def update(self, new_state : Dict): # Asssume new state is a dictionary
        with open(filename, "w") as f:
            for key, value in new_state.items():
                f.write(f"{key}:{value}")
        self.state = new_state

So the benefit of this approach you can extend functionality of a library without changing the core code which may require you to get a PR merged in by the core team that may make the core code unmaintable by adding all sorts of usecases that people care about. So the observer pattern is primarily a way to extend code which is why it's very popular in training frameworks like fast.ai or PyTorch LIghtning.

Learner pattern

Learner pattern was popularized by frameworks like Sci-kit learn that started approach to modeling that was as simple as

model.fit(data)

But implementing code for this at least within the context of neural networks is something you already do if you've used vanillay PyTorch without a training framework.

# data[0][0] means the first input example
# data[1][5] means the label for the 5th input example
data = [[inputs], [labels]]

class Model:
    def __init__(self):
        self.model = nn_model()
        self.loss_function = substract/square_loss/l1 etc..
    
    def fit(self, data):
        # 1. Compute forward function
        output = self.model(data) 

        # 2. Get loss
        loss = loss_function(data)

        # 3. Update model
        model.update(loss)
    
    def update(self, loss):
        # 1. Compute gradients with autograd

        self.model.weights = ...     

Batch processing

So suppose you'd like to run model.forward() on two different inputs. The naive way of doing this is running

model.forward(input_1)
model.forward(input_2)

But this becomes painfully slow if you start dealing with a large number of examples

# model.forward is called O(inputs)
for input in inputs:
    model.forward(input)

Generally in numerical code you should fear for loops like the plague and as much as possible try to replace them with batch operations.

So instead rewrite your code as

tensor = torch.Tensor
for input in inputs:
    tensor.stack(input)

# model.forward is called once
model.forward(tensor)

Remember GPUs aren't that great at doing many small operations because there's an overhead to sending data to it so as much as possible it's better to batch jobs into large ones to take advantage of speedups. (Technically this can be worked around with CUDA graphs but that's still a relatively new feature)

As another exercise vectorization on CPU is also another technique to eliminate for loops but by operating over chunks of data concurrently. So for example some new newer Intel CPUs will turn matrices into long vectors and do matrix math on them by using a large instruction width AVX512.

Decorator

Decorators are a technique to add functionality to a function or class without modifying its code. You may have already heard of or used decorators like @memoize, @lru_cache, @profile, @step

As an example let's take a look at how to implement a @profile decorator borrowing code from https://medium.com/uncountable-engineering/pythons-line-profiler-32df2b07b290

from line_profiler import LineProfiler

profiler = LineProfiler()

# A decorator is just a python function that takes in a function
def profile(func)
    # Inner function takes in unnamed and named arguments
    def inner(*args, **kwargs)
        # New code decorator adds
        profiler.add_function(func)
        profiler.enable_by_count()

        # Running the decorated function
        return func(*args, **kwargs)
    return inner

So now you can just run

@profile
def my_slow_func():
    # some terrible code here

In the above decorator we ran some commands before returning func but we could also change func, its arguments or do whatever we please this is another one of those patterns like callbacks that let you extend some code without modifying it.

One of the most interesting decorators is the FastAPI one https://github.com/tiangolo/fastapi

@app.get("/")
def read_root():
    return {"Hello": "World"}

The above application redirects calls to / to the read_root() function so digging into the code a bit you'll find a function called get() in fastapi/application.py https://github.com/tiangolo/fastapi/blob/master/fastapi/applications.py#L425

It's a complicated function but what we care about is

def get(...) -> Callable[DecoratedCallable]:
    return self.router.get(...)

Digging through the code a bit more we find that add_api_route() whenever a new @app.get() is called where see func being returned in much the same way as it is in the plain profiling decorator https://github.com/tiangolo/fastapi/blob/87e29ec2c54ce3651939cc4d10e05d07a2f8b9ce/fastapi/applications.py#L378

The flipside of decorators is that they can lead you to a monolithic architecture where your infrastructure and deployment is tightly coupled to your implementation, this is generally fine if you're a startup but not so fine if multiple people are contributing code to the same place.

Strategy Pattern

The strategy pattern is classic Object Oriented programming and is generally useful when you to set some particular strategy for an object without constraining it too much as a library designer.

For example suppose you're creating a new Trainer class and don't have time to implement all optimizers that people care about. So you start with adding support for an SGDOptimizer

class Trainer:
    def __init__(self):
        optimizer : Optimizer = SGDOptimizer
        ...

# Create an abstract optimizer class
class Optimizer(ABC):
    @abstractmethod
    # We don't want to constrain the input types for such a function
    # Return type is a tensor because value in a tensor needs to be changed by a bit
    def step(*args, **kwargs) -> Tensor:
        pass 

class SGDOptimizer(Optimizer):
    def step(self, learn_rate : float, n_iter : int, tolerance : float):
        # Your SGD implementation here

So now someone else that doesn't understand how your whole trainer codebase works could create a new optimizer by just making sure to inherit from Optimizer

class AdamOptimizer(Optimizer):
    def step(self, beta_1 : float, beta_2 : float, epsilon : float):
        # Out of core Adam implementation here

TODO

More Repositories

1

awesome-profiling

Awesome utilities for performance profiling
120
star
2

C-compiler-optimizations

Description of commonly done compiler optimizations in C
43
star
3

multiple_dispatch

Why multiple dispatch lets you write composable code
Julia
40
star
4

Data-Science-From-Scratch

Code Companion to Joel Grus' book
Python
26
star
5

Discord-PDFPreview

Preview PDFs locally within the Discord UI!
Python
21
star
6

ML-devops

Helper scripts I use to run many experiments in the morning to check at night
Python
20
star
7

mynotes

Python
15
star
8

sixfigurecareer

Pip install yourself to a six figure career!
Python
12
star
9

torchprep

Python
11
star
10

vscode-pytorch-extension

TypeScript
11
star
11

Driver-Telematics-Kaggle-Neural-Net

My try at solving the Driver Telematics problem on Kaggle using neural networks
Python
7
star
12

OpenAIGymInJupyterNotebook

How to run and visualize Open AI gym within a Jupyter notebook
6
star
13

SpaceCannon

SpaceCannon iOS spritekit game
Objective-C
4
star
14

metal-tutorial

Swift
4
star
15

mlsys-experiments

stuff
Jupyter Notebook
3
star
16

tinyoptimizer

Python
3
star
17

hello-pip

A sample app that shows how to submit your work as a Pypi package
Python
2
star
18

cpuoffload

Python
2
star
19

How-to-make-AI-for-video-games-using-Reinforcement-Learning

Jupyter Notebook
2
star
20

Things-I-like-about-Julia

Julia
2
star
21

openaitritontutorial

Jupyter Notebook
2
star
22

intermediate-python

An intro for people that want to ship not just read code
2
star
23

cmake-experiments

CMake
2
star
24

pytorch-from-scratch

PyTorch models implemented from scratch
Python
2
star
25

BrainfuckJIT-in-PyPy

BrainFuck JIT compiler implemented using PyPy with explanations
Brainfuck
2
star
26

setup

Shell
2
star
27

Python-hackery

Python
1
star
28

CorrodedRobotics

A modern robotics library written in Rust http://modernrobotics.org/ 🤘
Rust
1
star
29

when-did-CUDA-add-X-

1
star
30

Useful-Jupyter-Tricks

Useful Jupyter Tricks in a Jupyter Notebook
Jupyter Notebook
1
star
31

Golem

Monitor the performance of your robots
Python
1
star
32

YuriSDK

Interfaces you'll need to implement to train with www.yuri.ai
C#
1
star
33

load_model

Python
1
star
34

python-weekend-experiments

Download all pdf links that are on a page
Python
1
star
35

MLblog

Source code of my blog, powered by jekyll and mathjax
HTML
1
star
36

Oxford-ML-practicals-solutions

Lua
1
star
37

BingAds-Tag-Verifier

Check if you have configured your UET tag correctly
JavaScript
1
star
38

UndergradPythonDS

A bunch of popular undergrad data structures implemented in Python
Python
1
star
39

matmul-pad

smad;isagdasdbaudasdas
Python
1
star
40

spacenet

Jupyter Notebook
1
star
41

animated-math-charts-resolve

How to do simple math animations using Da Vinci Resolve
1
star
42

cpu-offload

Python
1
star
43

ReinforceJLTutorial

Jupyter Notebook
1
star
44

TryingOutPharo

Trying out the smalltalk inspired Pharo environment
Smalltalk
1
star