harmony
Generic Concurrency Patterns Library
gomarkdoc)
Reference (generated byimport "github.com/butuzov/harmony"
Package harmony
provides generic concurrency patterns library, created for educational proposes by it's author. It provides next patterns:
Bridge
FanIn
Feature
OrDone
/OrWithDone
/OrWithContext
Pipeline
Queue
Tee
WorkerPool
Example (Fastest Sqrt)
What SQRT funtion is faster? Complex example that shows the combination of few patterns Queue, Tee, FanIn patterns.
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
"log"
"math/rand"
"time"
)
func main() {
// the fastert square root cracker....
type Report struct {
Method string
Value uint64
}
var (
// Babylonian method
sqrtBabylonian = func(n uint64) Report {
var (
o = float64(n) // Original value as float64
x = float64(n) // x of binary search
y = 1.0 // y of binary search
e = 1e-5 // error
)
for x-y > e {
x = (x + y) / 2
y = o / x
// fmt.Printf("y=%f, x=%f.", y, x)
}
return Report{"Babylonian", uint64(x)}
}
// Bakhshali method
sqrtBakhshali = func(n uint64) Report {
iterate := func(x float64) float64 {
a := (float64(n) - x*x) / (2 * x)
xa := x + a
return xa - ((a * a) / (2 * xa))
}
var (
o = float64(n)
x = float64(n) / 2.0
e = 1e-5
)
for x*x-o > e {
x = iterate(x)
}
return Report{"Bakhshali", uint64(x)}
}
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch, _ := harmony.FututeWithContext(ctx, func() uint64 {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
v := r.Uint64()
fmt.Printf("Initial number: %d.", v)
return v
})
if ch1, ch2, err := harmony.TeeWithContext(ctx, ch); err == nil {
log.Printf("err: %v", err)
return
} else {
chRep1, _ := harmony.PipelineWithContext(ctx, ch1, 1, sqrtBabylonian)
chRep2, _ := harmony.PipelineWithContext(ctx, ch2, 1, sqrtBakhshali)
chRep1, _ = harmony.OrDoneWithContext(ctx, chRep1)
chRep2, _ = harmony.OrDoneWithContext(ctx, chRep2)
out, _ := harmony.FanInWithContext(ctx, chRep1, chRep2)
fmt.Printf("Result is :%v", <-out)
}
}
Index
- Variables
- func Bridge[T any](done <-chan struct{}, incoming <-chan (<-chan T)) (<-chan T, error)
- func BridgeWithContext[T any](ctx context.Context, incoming <-chan (<-chan T)) (<-chan T, error)
- func FanIn[T any](done <-chan struct{}, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)
- func FanInWithContext[T any](ctx context.Context, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)
- func Futute[T any](done <-chan struct{}, futureFn func() T) (<-chan T, error)
- func FututeWithContext[T any](ctx context.Context, futureFn func() T) (<-chan T, error)
- func OrDone[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, error)
- func OrDoneWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, error)
- func Pipeline[T1, T2 any](done <-chan struct{}, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)
- func PipelineWithContext[T1, T2 any](ctx context.Context, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)
- func Queue[T any](done <-chan struct{}, genFn func() T) (<-chan T, error)
- func QueueWithContext[T any](ctx context.Context, genFn func() T) (<-chan T, error)
- func Tee[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, <-chan T, error)
- func TeeWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, <-chan T, error)
- func WorkerPool[T any](done <-chan struct{}, jobQueue chan T, maxWorkers int, workFunc func(T)) error
- func WorkerPoolWithContext[T any](ctx context.Context, jobQueue chan T, maxWorkers int, workFunc func(T)) error
Variables
var ErrContext = errors.New("harmony: nil Context")
var ErrDone = errors.New("harmony: nil done chant")
func Bridge
func Bridge[T any](done <-chan struct{}, incoming <-chan (<-chan T)) (<-chan T, error)
Bridge will return chan of generic type T
used a pipe for the values received from the sequence of channels or ErrDone
. Close received channel .one you got fromincoming
. in order to switch for a new one. Goroutines exists on close of incoming
or done chan closed.
func BridgeWithContext
func BridgeWithContext[T any](ctx context.Context, incoming <-chan (<-chan T)) (<-chan T, error)
BridgeWithContext will return chan of generic type T
used a pipe for the values received from the sequence of channels or ErrContext
. Close received channel .one you got fromincoming
. in order to switch for a new one. Goroutines exists on close of incoming
or context canceled.
func FanIn
func FanIn[T any](done <-chan struct{}, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)
FanIn returns unbuffered channel of generic type T
which serves as delivery pipeline for the values received from at least 2 incoming channels, it's closed once all of the incoming channels closed or done is closed
func FanInWithContext
func FanInWithContext[T any](ctx context.Context, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)
FanInWithContext returns unbuffered channel of generic type T
which serves as delivery pipeline for the values received from at least 2 incoming channels, it's closed once all of the incoming channels closed or context cancelled.
Example
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
)
func main() {
// return channel that generate
filler := func(start, stop int) chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := start; i <= stop; i++ {
ch <- i
}
}()
return ch
}
ch1 := filler(10, 12)
ch2 := filler(12, 14)
ch3 := filler(15, 16)
ctx := context.Background()
if ch, err := harmony.FanInWithContext(ctx, ch1, ch2, ch3); err != nil {
for val := range ch {
fmt.Println(val)
}
}
}
func Futute
func Futute[T any](done <-chan struct{}, futureFn func() T) (<-chan T, error)
Futute.T any. will return buffered channel of size 1 and generic type T
, which will eventually contain the results of the execution `futureFn``, or be closed in case if context cancelled.
func FututeWithContext
func FututeWithContext[T any](ctx context.Context, futureFn func() T) (<-chan T, error)
FututeWithContext.T any. will return buffered channel of size 1 and generic type T
, which will eventually contain the results of the execution `futureFn``, or be closed in case if context cancelled.
Example
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
)
func main() {
// Requests random dogs picture from dog.ceo (dog as service)
ctx := context.Background()
a, _ := harmony.FututeWithContext(ctx, func() int { return 1 })
b, _ := harmony.FututeWithContext(ctx, func() int { return 0 })
fmt.Println(<-a, <-b)
}
Output
1 0
Example (Dogs_as_service)
FututeWithContext is shows creation of two "futures" that are used in our "rate our dogs" startup.
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/butuzov/harmony"
"io/ioutil"
"log"
"net/http"
"time"
)
func main() {
// Requests random dogs picture from dog.ceo (dog as service)
getRandomDogPicture := func() string {
var data struct {
Message string "json:'message'"
}
const API_URL = "https://dog.ceo/api/breeds/image/random"
ctx := context.Background()
if req, err := http.NewRequestWithContext(ctx, http.MethodGet, API_URL, nil); err != nil {
log.Println(fmt.Errorf("request: %w", err))
return ""
} else if res, err := http.DefaultClient.Do(req); err != nil {
log.Println(fmt.Errorf("request: %w", err))
return ""
} else {
defer res.Body.Close()
if body, err := ioutil.ReadAll(res.Body); err != nil {
log.Println(fmt.Errorf("reading body: %w", err))
return ""
} else if err := json.Unmarshal(body, &data); err != nil {
log.Println(fmt.Errorf("unmarshal: %w", err))
return ""
}
}
return data.Message
}
a, _ := harmony.FututeWithContext(context.Background(), func() string {
return getRandomDogPicture()
})
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
b, _ := harmony.FututeWithContext(ctx, func() string {
return getRandomDogPicture()
})
fmt.Printf("Rate My Dog: ..a) %s..b) %s.", <-a, <-b)
}
func OrDone
func OrDone[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, error)
OrDone will return a new unbuffered channel of type T
that serves as a pipeline for the incoming channel. Channel is closed once the context is canceled or the incoming channel is closed. This is variation or the pattern that usually called OrWithDone
orCancel
.
func OrDoneWithContext
func OrDoneWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, error)
OrDoneWithContext will return a new unbuffered channel of type T
that serves as a pipeline for the incoming channel. Channel is closed once the context is canceled or the incoming channel is closed. This is variation or the pattern that usually called OrWithDone
orCancel
.
func Pipeline
func Pipeline[T1, T2 any](done <-chan struct{}, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)
Pipeline returns the channel of generic type T2
that can serve as a pipeline for the next stage. It's implemented in almost same manner as a WorkerPool
and allows to specify number of workers that going to proseed values received from the incoming channel. Outgoing channel is going to be closed once the incoming chan is closed or context canceld.
func PipelineWithContext
func PipelineWithContext[T1, T2 any](ctx context.Context, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)
PipelineWithContext returns the channel of generic type T2
that can serve as a pipeline for the next stage. It's implemented in almost same manner as a WorkerPool
and allows to specify number of workers that going to proseed values received from the incoming channel. Outgoing channel is going to be closed once the incoming chan is closed or context canceld.
Example (0rimes)
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
"log"
"math"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
var (
incomingCh = make(chan uint64)
isPrime = func(n uint64) bool {
for i := uint64(2); i < (n/2)+1; i++ {
if n%i == 0 {
return false
}
}
return true
}
)
var results []uint64
workerFunc := func(n uint64) uint64 {
if isPrime(n) {
return n
}
return 0
}
// Producer: Initial numbers
go func() {
for i := uint64(0); i < math.MaxUint64; i++ {
incomingCh <- i
}
}()
if ch, err := harmony.PipelineWithContext(ctx, incomingCh, 100, workerFunc); err != nil {
log.Printf("Error: %v", err)
} else {
for val := range ch {
if val == 0 {
continue
}
results = append(results, val)
}
fmt.Println(results)
}
}
func Queue
func Queue[T any](done <-chan struct{}, genFn func() T) (<-chan T, error)
Queue returns an unbuffered channel that is populated by func genFn
. Chan is closed once context is Done. It's similar to Future
pattern, but doesn't have a limit to just one result.
func QueueWithContext
func QueueWithContext[T any](ctx context.Context, genFn func() T) (<-chan T, error)
QueueWithContext returns an unbuffered channel that is populated by func genFn
. Chan is closed once context is Done. It's similar to Future
pattern, but doesn't have a limit to just one result.
Example
Generate fibonacci sequence
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
"log"
)
func main() {
// fin returns function that returns Fibonacci sequence up to n element,
// it returns 0 after limit reached.
fib := func(limit int) func() int {
a, b, nTh := 0, 1, 1
return func() int {
if nTh > limit {
return 0
}
nTh++
a, b = b, a+b
return a
}
}
first10FibNumbers := make([]int, 10)
incoming, err := harmony.QueueWithContext(context.Background(), fib(10))
if err != nil {
log.Printf("err: %v", err)
return
}
for i := 0; i < cap(first10FibNumbers); i++ {
first10FibNumbers[i] = <-incoming
}
fmt.Println(first10FibNumbers)
}
Output
[1 1 2 3 5 8 13 21 34 55]
func Tee
func Tee[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, <-chan T, error)
Tee will return two channels of generic type T
used to fan
-out data from the incoming channel. Channels needs to be read in order next iteration over incoming chanel happen.
func TeeWithContext
func TeeWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, <-chan T, error)
TeeWithContext will return two channels of generic type T
used to fan
-out data from the incoming channel. Channels needs to be read in order next iteration over incoming chanel happen.
func WorkerPool
func WorkerPool[T any](done <-chan struct{}, jobQueue chan T, maxWorkers int, workFunc func(T)) error
WorkerPool accepts channel of generic type T
which is used to serve jobs to max workersTotal workers. Goroutines stop: Once channel closed and drained, or done is closed
func WorkerPoolWithContext
func WorkerPoolWithContext[T any](ctx context.Context, jobQueue chan T, maxWorkers int, workFunc func(T)) error
WorkerPoolWithContext accepts channel of generic type T
which is used to serve jobs to max workersTotal workers. Goroutines stop: Once channel closed and drained, or context cancelled.
Example (0rimes)
package main
import (
"context"
"fmt"
"github.com/butuzov/harmony"
"math"
"runtime"
"sync"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
var (
primesCh = make(chan uint64)
incomingCh = make(chan uint64)
isPrime = func(n uint64) bool {
for i := uint64(2); i < (n/2)+1; i++ {
if n%i == 0 {
return false
}
}
return true
}
totalWorkers = runtime.NumCPU() - 1
)
// Producer: Initial numbers
go func() {
for i := uint64(0); i < math.MaxUint64; i++ {
incomingCh <- i
}
}()
// Consumers Worker Pool: checking primes of incoming numbers.
harmony.WorkerPoolWithContext(ctx, incomingCh, totalWorkers, func(n uint64) {
if !isPrime(n) {
return
}
primesCh <- n
})
var results []uint64
var mu sync.RWMutex
go func() {
for n := range primesCh {
mu.Lock()
results = append(results, n)
mu.Unlock()
}
}()
<-ctx.Done()
mu.RLock()
fmt.Println(results)
mu.RUnlock()
}
Resources
talk
Bryan C. Mills - Rethinking Classical Concurrency Patterns +slides
+comments+notes
book
Katherine Cox-Buday - Concurrency In Gotalk
Rob Pike - Concurrency is not Parallelism +slides
blog
Go Concurrency Patterns: Contextblog
Go Concurrency Patterns: Pipelines and cancellationtalk
Sameer Ajmani - Advanced Go Concurrency Patterns +slides
talk
Rob Pike - Go Concurrency Patterns +slides
blog
Go Concurrency Patterns: Timing out, moving on