• Stars
    star
    243
  • Rank 160,877 (Top 4 %)
  • Language
    Go
  • License
    MIT License
  • Created 11 months ago
  • Updated about 1 month ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Structured concurrency made easy

Flowmatic GoDoc Coverage Status Go Report Card Mentioned in Awesome Go

Flowmatic logo

Flowmatic is a generic Go library that provides a structured approach to concurrent programming. It lets you easily manage concurrent tasks in a manner that is simple, yet effective and flexible.

Flowmatic has an easy to use API with functions for handling common concurrency patterns. It automatically handles spawning workers, collecting errors, and propagating panics.

Flowmatic requires Go 1.20+.

Features

  • Has a simple API that improves readability over channels/waitgroups/mutexes
  • Handles a variety of concurrency problems such as heterogenous task groups, homogenous execution of a task over a slice, and dynamic work spawning
  • Aggregates errors
  • Properly propagates panics across goroutine boundaries
  • Has helpers for context cancelation
  • Few dependencies
  • Good test coverage

How to use Flowmatic

Execute heterogenous tasks

One problem that Flowmatic solves is managing the execution of multiple tasks in parallel that are independent of each other. For example, let's say you want to send data to three different downstream APIs. If any of the sends fail, you want to return an error. With traditional Go concurrency, this can quickly become complex and difficult to manage, with Goroutines, channels, and sync.WaitGroups to keep track of. Flowmatic makes it simple.

To execute heterogenous tasks, just use flowmatic.Do:

flowmatic stdlib
err := flowmatic.Do(
    func() error {
        return doThingA(),
    },
    func() error {
        return doThingB(),
    },
    func() error {
        return doThingC(),
    })
var wg sync.WaitGroup
var errs []error
errChan := make(chan error)

wg.Add(3)
go func() {
    defer wg.Done()
    if err := doThingA(); err != nil {
        errChan <- err
    }
}()
go func() {
    defer wg.Done()
    if err := doThingB(); err != nil {
        errChan <- err
    }
}()
go func() {
    defer wg.Done()
    if err := doThingC(); err != nil {
        errChan <- err
    }
}()

go func() {
    wg.Wait()
    close(errChan)
}()

for err := range errChan {
    errs = append(errs, err)
}

err := errors.Join(errs...)

To create a context for tasks that is canceled on the first error, use flowmatic.All. To create a context for tasks that is canceled on the first success, use flowmatic.Race.

// Make variables to hold responses
var pageA, pageB, pageC string
// Race the requests to see who can answer first
err := flowmatic.Race(ctx,
	func(ctx context.Context) error {
		var err error
		pageA, err = request(ctx, "A")
		return err
	},
	func(ctx context.Context) error {
		var err error
		pageB, err = request(ctx, "B")
		return err
	},
	func(ctx context.Context) error {
		var err error
		pageC, err = request(ctx, "C")
		return err
	},
)

Execute homogenous tasks

flowmatic.Each is useful if you need to execute the same task on each item in a slice using a worker pool:

flowmatic stdlib
things := []someType{thingA, thingB, thingC}

err := flowmatic.Each(numWorkers, things,
    func(thing someType) error {
        foo := thing.Frobincate()
        return foo.DoSomething()
    })
things := []someType{thingA, thingB, thingC}

work := make(chan someType)
errs := make(chan error)

for i := 0; i < numWorkers; i++ {
    go func() {
        for thing := range work {
            // Omitted: panic handling!
            foo := thing.Frobincate()
            errs <- foo.DoSomething()
        }
    }()
}

go func() {
    for _, thing := range things {
            work <- thing
    }

    close(tasks)
}()

var collectedErrs []error
for i := 0; i < len(things); i++ {
    collectedErrs = append(collectedErrs, <-errs)
}

err := errors.Join(collectedErrs...)

Use flowmatic.Map to map an input slice to an output slice.

func main() {
	results, err := Google(context.Background(), "golang")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}
	for _, result := range results {
		fmt.Println(result)
	}
}
flowmatic x/sync/errgroup
func Google(ctx context.Context, query string) ([]Result, error) {
	searches := []Search{Web, Image, Video}
	return flowmatic.Map(ctx, flowmatic.MaxProcs, searches,
		func(ctx context.Context, search Search) (Result, error) {
			return search(ctx, query)
		})
}
func Google(ctx context.Context, query string) ([]Result, error) {
	g, ctx := errgroup.WithContext(ctx)

	searches := []Search{Web, Image, Video}
	results := make([]Result, len(searches))
	for i, search := range searches {
		i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
		g.Go(func() error {
			result, err := search(ctx, query)
			if err == nil {
				results[i] = result
			}
			return err
		})
	}
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return results, nil
}

Manage tasks that spawn new tasks

For tasks that may create more work, use flowmatic.ManageTasks. Create a manager that will be serially executed, and have it save the results and examine the output of tasks to decide if there is more work to be done.

// Task fetches a page and extracts the URLs
task := func(u string) ([]string, error) {
    page, err := getURL(ctx, u)
    if err != nil {
        return nil, err
    }
    return getLinks(page), nil
}

// Map from page to links
// Doesn't need a lock because only the manager touches it
results := map[string][]string{}
var managerErr error

// Manager keeps track of which pages have been visited and the results graph
manager := func(req string, links []string, err error) ([]string, bool) {
    // Halt execution after the first error
    if err != nil {
        managerErr = err
        return nil, false
    }
    // Save final results in map
    results[req] = urls

    // Check for new pages to scrape
    var newpages []string
    for _, link := range links {
        if _, ok := results[link]; ok {
            // Seen it, try the next link
            continue
        }
        // Add to list of new pages
        newpages = append(newpages, link)
        // Add placeholder to map to prevent double scraping
        results[link] = nil
    }
    return newpages, true
}

// Process the tasks with as many workers as GOMAXPROCS
flowmatic.ManageTasks(flowmatic.MaxProcs, task, manager, "http://example.com/")
// Check if anything went wrong
if managerErr != nil {
    fmt.Println("error", managerErr)
}

Normally, it is very difficult to keep track of concurrent code because any combination of events could occur in any order or simultaneously, and each combination has to be accounted for by the programmer. flowmatic.ManageTasks makes it simple to write concurrent code because everything follows a simple rule: tasks happen concurrently; the manager runs serially.

Centralizing control in the manager makes reasoning about the code radically simpler. When writing locking code, if you have M states and N methods, you need to think about all N states in each of the M methods, giving you an M Γ— N code explosion. By centralizing the logic, the N states only need to be considered in one location: the manager.

Advanced patterns with TaskPool

For very advanced uses, flowmatic.TaskPool takes the boilerplate out of managing a pool of workers. Compare Flowmatic to this example from x/sync/errgroup:

func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}

	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}
flowmatic x/sync/errgroup
// MD5All reads all the files in the file tree rooted at root
// and returns a map from file path to the MD5 sum of the file's contents.
// If the directory walk fails or any read operation fails,
// MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// Make a pool of 20 digesters
	in, out := flowmatic.TaskPool(20, digest)

	m := make(map[string][md5.Size]byte)
	// Open two goroutines:
	// one for reading file names by walking the filesystem
	// one for recording results from the digesters in a map
	err := flowmatic.All(ctx,
		func(ctx context.Context) error {
			return walkFilesystem(ctx, root, in)
		},
		func(ctx context.Context) error {
			for r := range out {
				if r.Err != nil {
					return r.Err
				}
				m[r.In] = *r.Out
			}
			return nil
		},
	)

	return m, err
}

func walkFilesystem(ctx context.Context, root string, in chan<- string) error {
	defer close(in)

	return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if !info.Mode().IsRegular() {
			return nil
		}
		select {
		case in <- path:
		case <-ctx.Done():
			return ctx.Err()
		}

		return nil
	})
}

func digest(path string) (*[md5.Size]byte, error) {
	data, err := os.ReadFile(path)
	if err != nil {
		return nil, err
	}
	hash := md5.Sum(data)
	return &hash, nil
}
type result struct {
	path string
	sum  [md5.Size]byte
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.
	g, ctx := errgroup.WithContext(ctx)
	paths := make(chan string)

	g.Go(func() error {
		defer close(paths)
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	})

	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		g.Go(func() error {
			for path := range paths {
				data, err := ioutil.ReadFile(path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, md5.Sum(data)}:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}
	go func() {
		g.Wait()
		close(c)
	}()

	m := make(map[string][md5.Size]byte)
	for r := range c {
		m[r.path] = r.sum
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return m, nil
}

Note on panicking

In Go, if there is a panic in a goroutine, and that panic is not recovered, then the whole process is shutdown. There are pros and cons to this approach. The pro is that if the panic is the symptom of a programming error in the application, no further damage can be done by the application. The con is that in many cases, this leads to a shutdown in a situation that might be recoverable.

As a result, although the Go standard HTTP server will catch panics that occur in one of its HTTP handlers and continue serving requests, a standard Go HTTP server cannot catch panics that occur in separate goroutines, and these will cause the whole server to go offline.

Flowmatic fixes this problem by catching a panic that occurs in one of its worker goroutines and repropagating it in the parent goroutine, so the panic can be caught and logged at the appropriate level.

More Repositories

1

requests

HTTP requests for Gophers
Go
1,392
star
2

versioninfo

Importable package that parses version info from debug.ReadBuildInfo().
Go
234
star
3

heffalump

Heffalump is an endless honeypot
Go
184
star
4

pomodoro

Command line pomodoro timer
Go
172
star
5

be

The Go test helper for minimalists
Go
90
star
6

go-cli

Template for creating Go CLIs
Go
55
star
7

new

A helper function to create a pointer to a new object in Go 1.18+
Go
55
star
8

workgroup

Structured concurrency manager for Go
Go
45
star
9

certinfo

Get information about the certificate used at a domain
Go
34
star
10

truthy

Package truthy provides truthy condition testing with Go generics
Go
33
star
11

exembed

Go Embed experiments
Go
30
star
12

deque

Generic deque container
Go
27
star
13

get-headers

Tool that shows headers and stats from GET-ing a URL
Go
27
star
14

netlify-go-function-demo

https://blog.carlmjohnson.net/post/2020/how-to-host-golang-on-netlify-for-free/
HTML
25
star
15

shitpic

Recompresses JPEGs to make shitpics
JavaScript
24
star
16

resperr

Go package to associate status codes and messages with errors
Go
23
star
17

scattered

Command line tool for asset hashing
Go
20
star
18

tumblr-importr

An importer that uses the Tumblr API to create a Hugo static site
Go
19
star
19

decoder-ring

CLI tool for decoding/encoding from common formats
Go
15
star
20

syncx

Go sync utility functions using generics
Go
14
star
21

springerle

A cookiecutter tool written in Go
Go
14
star
22

errorx

Error helpers for Go
Go
13
star
23

opensesame

Trivial password generator
Go
11
star
24

bytemap

Bytemap contains types for making maps from bytes to bool, integer, or float using a backing array
Go
11
star
25

exitcode

Go package to convert errors to exit codes
Go
10
star
26

pointer

Generic pointer helpers
Go
10
star
27

csv

Go CSV reader like Python's DictReader
Go
10
star
28

flagext

Implementations of the flag.Value interface to extend the flag package
Go
9
star
29

json-tidy

Pretty prints JSON from stdin, files, or URLs
Go
9
star
30

go-run

Shebang line equivalent for Go
Go
8
star
31

flagx

Extensions to the Go flag package
Go
7
star
32

collections

Go
6
star
33

sudoku

Sudoku solver in Go
Go
6
star
34

simple-reverse-proxy

A simple reverse proxy written in Go
Go
5
star
35

monterey-jack

A friend to zippers
Go
4
star
36

tsrproxy

Simple Tailscale reverse proxy
Go
4
star
37

haystack

Pinboard search CLI
Go
4
star
38

go-utils

Implementation of the Fisher–Yates shuffle (Knuth shuffle) in Go
Go
4
star
39

feed2json

Given an Atom or RSS feed, creates a comparable JSON feed
Go
3
star
40

whatsit

Looks at a file and guess its content type
Go
3
star
41

gsize

Utility that tells how large a file will be after gzip compression
Go
3
star
42

stringutil

Some Go string utilities
Go
3
star
43

portfor

Deterministic hash from name to a port number
Go
3
star
44

crockford

Go implementation of Crockford base 32 encoding
Go
3
star
45

bletchley

Simple command line application for basic public key crypto
Go
2
star
46

errutil

Helpful tools for errors in Go
Go
2
star
47

randline

Chooses random line(s) from a file
Go
2
star
48

Watsuji-and-Aesthetics

Dissertation, Carl M. Johnson
HTML
2
star
49

nfspampurge

Delete all spam from Netlify spam page
Go
2
star
50

webarchive

Go
2
star
51

lich

A port of Wolf Rentzsch's Lich binary file format to Go (golang)
Go
1
star
52

rootdown

The internet demanded another Go router
Go
1
star
53

echo-request

Simple server that echoes requests back as text
Go
1
star
54

loggo

Logs requests to files for debugging webhooks
Go
1
star
55

luhn

Yet another Luhn algorithm implementation
Go
1
star
56

django-context-variables

Simple utility to help make declarative class-based views in Django
Python
1
star
57

junix

A re-implementation of common Unix commands with output as JSON instead of plain text
Go
1
star
58

rank-em

CLI tool for making ranking lists
Go
1
star
59

errors

Some Go error helpers
Go
1
star
60

jax

JavaScript for Automation scripts
AppleScript
1
star
61

python-defunct

Defunct. See https://github.com/carlmjohnson/python-tools instead.
Python
1
star
62

352-interactive-design-development

HTML
1
star
63

randpwd

Randomly generated passwords
HTML
1
star
64

slackhook

Simple client for Slack web hook URLs
Go
1
star
65

dotfiles-public

Public repo for configuration files (git, Sublime Text, etc.)
Python
1
star
66

gracefulserver

Boilerplate for starting an HTTP server in Go with graceful shutdown
Go
1
star