• Stars
    star
    790
  • Rank 57,622 (Top 2 %)
  • Language
    Go
  • License
    Apache License 2.0
  • Created about 9 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A stream processing API for Go (alpha)

automi

A data stream processing API for Go (alpha)


GoDoc example workflow Go Report Card

Automi is an API for processing streams of data using idiomatic Go. Using Automi, programs can process streaming of data chunks by composing stages of operations that are applied to each element of the stream.

Concept

Automi streaming concepts


The Automi API expresses a stream with four primitives including:

  • An emitter: an in-memory, network, or file resource that can emit elements for streaming
  • The stream: represents a conduit whithin which data elements are streamed
  • Stream operations: code which can be attached to the stream to process streamed elements
  • A collector: an in-memory, network, or file resource that can collect streamed data.

Automi streams use Go channels internally to route data. This means Automi streams automatically support features such as buffering, automatic back-pressure queuing, and concurrency safety.

Using Automi

Now, let us explore some examples to see how easy it is to use Automi to stream and process data.

See all examples in the ./example directory.

Example: streaming from a slice into stdout

This first example shows how easy it is to compose and express stream operations with Automi. In this example, rune values are emitted from a slice and are streamed invidividually. Stream operator method Filter is applied to filter out unwanted rune values and the Sort operator method sorts the remaining items. Lastly, a collector is used to collect the result into an io.Writer and piped to stdout.

func main() {
	strm := stream.New([]rune("B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw"))

	strm.Filter(func(item rune) bool {
		return item >= 65 && item < (65+26)
	}).Map(func(item rune) string {
		return string(item) 
	}).Batch().Sort() 
	strm.Into(collectors.Writer(os.Stdout))

	if err := <-strm.Open(); err != nil {
		fmt.Println(err)
		return
	}
}

See the full source code.

How it works
  1. Create the stream with an emitter source. Automi supports several types of sources including channels, io.Reader, slices, etc. (see list of emitters below). Each element in the slice will be streamed individually.
strm := stream.New([]rune(`B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw`))
  1. Apply user-provided or built-in stream operations as shown below:
strm.Filter(func(item rune) bool {
    return item >= 65 && item < (65+26)
}).Map(func(item rune) string {
    return string(item)
}).Batch().Sort()
  1. Collect the result. In this example, the result is collected into an io.Writer which further streams the data into standard output:
strm.Into(collectors.Writer(os.Stdout))
  1. Lastly, open the stream once it is properly composed:
if err := <-strm.Open(); err != nil {
    fmt.Println(err)
    return
}  

Example: streaming from an io.Reader into collector function

The next example shows how to use Automi to stream data from an io.Reader emitting buffered string values from an in-memory source in 50-byte chunks. The data is processed with a Map and Filter opertor methods and the result is sent to a user-provided collector function which prints the result.

func main() {
	data := `"request", "/i/a", "00:11:51:AA", "accepted"
"response", "/i/a/", "00:11:51:AA", "served"
"response", "/i/a", "00:BB:22:DD", "served"...`

 	reader := strings.NewReader(data)
    
	// create stream from a buffered io.Reader emitter,
	// emitting 50-byte chunks.
	stream := stream.New(emitters.Reader(reader).BufferSize(50))
	stream.Map(func(chunk []byte) string {
		str := string(chunk)
		return str
	})
	stream.Filter(func(e string) bool {
		return (strings.Contains(e, `"response"`))
	})
	stream.Into(collectors.Func(func(data interface{}) error {
		e := data.(string)
		fmt.Println(e)
		return nil
	}))

	if err := <-stream.Open(); err != nil {
		fmt.Println(err)
		return
	}
}

See complete example here.

Example: streaming using CSV files

The following example streams data from a CSV source file. Each row is mapped to a custom type, filtered, then mapped to a slice of strings which is then collected into another CSV file.

type scientist struct {
	FirstName string
	LastName  string
	Title     string
	BornYear  int
}

func main() {
    // creates a stream using a CSV emitter
    // emits each row as []string
    stream := stream.New("./data.txt")

    // Map each CSV row, []string, to type scientist
    stream.Map(func(cs []string) scientist {
        yr, _ := strconv.Atoi(cs[3])
        return scientist{
            FirstName: cs[1],
            LastName:  cs[0],
            Title:     cs[2],
            BornYear:  yr,
        }
    })
    stream.Filter(func(cs scientist) bool {
        return (cs.BornYear > 1930)
    })
    stream.Map(func(cs scientist) []string {
        return []string{cs.FirstName, cs.LastName, cs.Title}
    })
    stream.Into("./result.txt")

    // open the stream
    if err := <-stream.Open(); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    fmt.Println("wrote result to file result.txt")
}

See complete example here.

Example: streaming HTTP requests and responses

The following example shows how to use Automi to stream and process data using HTTP requests and responses. The following HTTP server program streams data from the request Body, encodes it using base64, and streams the result into the HTTP response:

func main() {

	http.HandleFunc(
		"/",
		func(resp http.ResponseWriter, req *http.Request) {
			resp.Header().Add("Content-Type", "text/html")
			resp.WriteHeader(http.StatusOK)

			strm := stream.New(req.Body)
			strm.Process(func(data []byte) string {
				return base64.StdEncoding.EncodeToString(data)
			}).Into(resp)

			if err := <-strm.Open(); err != nil {
				resp.WriteHeader(http.StatusInternalServerError)
				log.Printf("Stream error: %s", err)
			}
		},
	)

	log.Println("Server listening on :4040")
	http.ListenAndServe(":4040", nil)
}

See complete example here.

Streaming gRPC service payload

The following example shows how to use Automi to stream data items from a gRPC streaming sevice. The following gRPC client setups an Automi emitter to emit time values that are streamed from a gRPC time service:

// setup an Automi emitter function to stream from the gRPC service
func emitStreamFrom(client pb.TimeServiceClient) <-chan []byte {
	source := make(chan []byte)
	timeStream, err := client.GetTimeStream(context.Background(), &pb.TimeRequest{Interval: 3000})
	...
	go func(stream pb.TimeService_GetTimeStreamClient, srcCh chan []byte) {
		defer close(srcCh)
		for {
			t, err := stream.Recv()
			srcCh <- t.Value
		}
	}(timeStream, source)

	return source
}

func main() {
	...
	client := pb.NewTimeServiceClient(conn)
	// create automi stream
	stream := stream.New(emitStreamFrom(client))
	stream.Map(func(item []byte) time.Time {
		secs := int64(binary.BigEndian.Uint64(item))
		return time.Unix(int64(secs), 0)
	})
	stream.Into(collectors.Func(func(item interface{}) error {
		time := item.(time.Time)
		fmt.Println(time)
		return nil
	}))

	// open the stream
	if err := <-stream.Open(); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

}

See complete example here.

More Examples

Examples - View a long list of examples that cover all aspects of using Automi.

Automi components

Automi comes with a set of built-in components to get you started with stream processing including the followings.

Emitters

  • Channel
  • CSV
  • io.Reader
  • io.Scanner
  • Slice

Operators

  • Stream.Filter
  • Stream.Map
  • Stream.FlatMap
  • Stream.Reduce
  • Stream.GroupByKey
  • Stream.GroupByName
  • Stream.GroupByPos
  • Stream.Sort
  • Stream.SortByKey
  • Stream.SortByName
  • Stream.SortByPos
  • Stream.SortWith
  • Stream.Sum
  • Stream.SumByKey
  • Stream.SumByName
  • Stream.SumByPos
  • Stream.SumAllKeys

Collectors

  • CSV
  • Func
  • Null
  • Slice
  • Writer

Licence

Apache 2.0

More Repositories

1

go-cshared-examples

Calling Go Functions from Other Languages using C Shared Libraries
Dart
875
star
2

ktop

A top-like tool for your Kubernetes clusters
Go
728
star
3

gosh

Gosh - a pluggable framework for building command shell programs
Go
530
star
4

go-plugin-example

Playing around with Go 1.8 plugin system
Go
319
star
5

go-grpc

A collection of gRPC and Go examples showcasing features of the framework
Go
241
star
6

go4vl

A Go library for working with the Video for Linux API (V4L2).
C
236
star
7

learning-go

Source code repository for my book "Learning Go Programming"
Go
232
star
8

go-networking

Code sample for Learning Network Programming with Go
Go
226
star
9

gowfs

A Go client binding for Hadoop HDFS using WebHDFS.
Go
134
star
10

clamshell-cli

A framework to build command-line console applications in Java
Java
134
star
11

k8s-client-examples

Building stuff with the Kubernetes API
Go
118
star
12

gexe

Script-like OS interaction wrapped in the security and type safety of the Go programming language
Go
72
star
13

iot-dev

Example IoT projects
Go
70
star
14

jmx-cli

[Project Inactive] Jmx-Cli is a command-line interface console for JMX
Java
65
star
15

go-ntp-client

A Network Time Protocol client in Go
Go
50
star
16

gomes

Pure Go Framework API for Apache Mesos
Go
33
star
17

workbench

My code collection for testing new ideas, blog examples, etc
Java
32
star
18

go-tar

Examples using archive/tar compress/gz Go packages
Go
17
star
19

go-binary

Examples using encoding/binary package
Go
16
star
20

streaming-runtime-go

Go
11
star
21

docker.io-recipes

Some favorite Docker.Io recipes
9
star
22

dapr-examples

Examples of Dapr distributed services in Go
Go
6
star
23

go-tutorials

A place for quick Go tutorials
Go
5
star
24

startype

Roundtrip automatic conversion of Starlark-Go API types to regular Go types and back🤩
Go
4
star
25

go-algorithms

Classic CS algorithms examples in Go
Go
4
star
26

embedding-starlark

Examples of how to embed Starlark in Go programs using the Starlark-Go project
Go
4
star
27

go-httpmux-example

Example to show use of the new enhanced http.ServeMux router in Go v1.22.0 or later
Go
3
star
28

mesos-http

Example of Mesos HTTP API
Protocol Buffer
3
star
29

jmx-logger

JMX Logger for JUL and Log4J (old project & little support)
Java
3
star
30

kob

kob simplifies the programmatic construction of Kubernetes API object graphs
Go
2
star
31

gophercon2022

GopherCon 2022 - reveal.js presentation
JavaScript
2
star
32

timeapp

A simple application to print time based on configured time layout (perfect Kubernetes sample app)
Go
2
star
33

go-in-10

Go
2
star
34

mqt

MQT = Mesos Query Tool
Go
1
star
35

cloudy-apps

Cloud native application examples
Go
1
star
36

emojiis

Emojiis is a Go module for emoji icon search
Go
1
star
37

knative-workbench

Playing around with knative examples
Go
1
star
38

libstorage-client

Sample code on writing libstorage client code
Go
1
star
39

mango

Playground for an automated build tool in Go
Go
1
star
40

pourover

simple http reverse proxy
Go
1
star
41

vladimirvivien

1
star
42

go-tour

Examples and test code I use to tour the Go language and packages
Go
1
star
43

homebrew-oss-tools

Homebrew repository for distributing OSS binaries.
Ruby
1
star
44

go-workbench

A playground for Go proof of concepts
Go
1
star
45

horizon

Framework for building distributed apps
Go
1
star
46

e2eframework-controller-example

Repository for showing how to test Kubebuilder's Cronjob example controller using the e2e-framework - https://github.com/kubernetes-sigs/e2e-framework
Go
1
star