• Stars
    star
    362
  • Rank 117,671 (Top 3 %)
  • Language
    Go
  • License
    Apache License 2.0
  • Created about 4 years ago
  • Updated almost 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles

Machine GoDoc

concurrency

import "github.com/autom8ter/machine/v4"

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles:

  • In memory Publish Subscribe for asynchronously broadcasting & consuming messages in memory
  • Asynchronous worker groups similar to errgroup.Group
  • Throttled max active goroutine count
  • Asynchronous error handling(see WithErrorHandler to override default error handler)
  • Asynchronous cron jobs- Cron()

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent. Really, it can be used anywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

// Machine is an interface for highly asynchronous Go applications
type Machine interface {
// Publish synchronously publishes the Message
Publish(ctx context.Context, msg Message)
// Subscribe synchronously subscribes to messages on a given channel,  executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.
// Glob matching IS supported for subscribing to multiple channels at once.
Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, opts ...SubscriptionOpt)
// Subscribers returns total number of subscribers to the given channel
Subscribers(channel string) int
// Channels returns the channel names that messages have been sent to
Channels() []string
// Go asynchronously executes the given Func
Go(ctx context.Context, fn Func)
// Cron asynchronously executes the given function on a timed interval UNTIL the context cancels OR false is returned by the CronFunc
Cron(ctx context.Context, interval time.Duration, fn CronFunc)
// Current returns the number of active jobs that are running concurrently
Current() int
// Wait blocks until all active async functions(Go, Cron) exit
Wait()
// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptions
Close()
}

Example

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  	defer cancel()
  	var (
  		m       = machine.New()
  		results []string
  		mu      sync.RWMutex
  	)
  	defer m.Close()
  
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "accounting.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "engineering.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "human_resources.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	<-time.After(1 * time.Second)
  	m.Publish(ctx, machine.Message{
  		Channel: "human_resources.chat_room6",
  		Body:    "hello world human resources",
  	})
  	m.Publish(ctx, machine.Message{
  		Channel: "accounting.chat_room2",
  		Body:    "hello world accounting",
  	})
  	m.Publish(ctx, machine.Message{
  		Channel: "engineering.chat_room1",
  		Body:    "hello world engineering",
  	})
  	m.Wait()
  	sort.Strings(results)
  	for _, res := range results {
  		fmt.Print(res)
  	}
  	// Output:
  	//(accounting.chat_room2) received msg: hello world accounting
  	//(accounting.chat_room2) received msg: hello world accounting
  	//(engineering.chat_room1) received msg: hello world engineering
  	//(engineering.chat_room1) received msg: hello world engineering
  	//(human_resources.chat_room6) received msg: hello world human resources
  	//(human_resources.chat_room6) received msg: hello world human resources

Extended Examples

All examples are < 500 lines of code(excluding code generation)

More Repositories

1

dagger

dagger is a fast, concurrency safe, mutable, in-memory directed graph library. Also includes a number of generic, concurrency safe data-structures
Go
306
star
2

goproxy

a reverse proxy authentication server (golang)
Go
28
star
3

engine

a plugin based grpc framework
Go
16
star
4

myjson

MyJSON is an embedded relational document store built on top of pluggable key value storage
Go
13
star
5

protoc-gen-authorize

A protoc plugin and library for authorizing gRPC requests using expression based rules. It allows developers to specify authorization rules in the proto file itself, instead of in the application code.
Go
11
star
6

oauth-graphql-ide

An oauth protected graphQL IDE
Go
9
star
7

slasher

slasher makes it easy to write http servers that respond to slack slash commands
Go
7
star
8

memeblast

annoy your friends with smsblasts
Go
4
star
9

mappy

persistant, concurrency safe, chain-able hash tables
Go
4
star
10

openid

Complete OpenID Connect http handlers
Go
4
star
11

kubego

a simple wrapper around kubernetes-client-go & istio-client-go, & helm-client-go
Go
4
star
12

helmgate

secure grpc/graphQL/REST API for managing k8s applications with helm
Go
3
star
13

async

utilities for async processing in Go
Go
3
star
14

grpcutil

serve grpc/json on the same port.
Go
2
star
15

goproxyrpc

GoProxyRPC- a highly configurable rest-to-grpc gateway/authentication server
Go
2
star
16

getter-render

getter-render extends Hashicorps go-getter library/cli by adding template rendering functionality.
Go
2
star
17

clientz

a plethora of golang clients
Go
2
star
18

goconnect

Google Cloud Platform, Kubernetes, Twilio, Stripe, Slack, and SendGrid client set and grpc server
Go
2
star
19

objectify

a golang utilities library used across many Autom8ter projects
Go
1
star
20

goflix

torrent streaming client and http handler
Go
1
star
21

subscribe

Go
1
star
22

authCache

Go
1
star
23

fire

Go
1
star
24

deployer

Go
1
star
25

gosaas

Go
1
star
26

morpheus

Go
1
star
27

tasks

task api server. api/protocol docs: https://autom8ter.github.io/tasks/.
Go
1
star
28

builder

Go
1
star
29

api

autom8ter protobuf library docs: https://autom8ter.github.io/api/.
C#
1
star
30

azure

Go
1
star
31

pkce

TypeScript
1
star
32

cobraslack

create a slack slash command handler that executes a cobra command
Go
1
star
33

advent_of_code_2019

JavaScript
1
star
34

lua

learning how to write lua
Lua
1
star
35

queuerpc

a protoc plugin to generate type safe RPC client and server code that use a message queue for transport/service discovery.
Go
1
star