• Stars
    star
    392
  • Rank 109,735 (Top 3 %)
  • Language
    Go
  • License
    Apache License 2.0
  • Created almost 10 years ago
  • Updated about 7 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Fanout - make writing parallel code even easier

Fanout - make writing parallel code even easier

This code is port from sample code of Go blog post Go Concurrency Patterns: Pipelines and cancellation's Bounded parallelism section sample code

I made the fanout pattern sample code reusable, So you can easily write parallel code without worry about fatal error: all goroutines are asleep - deadlock!, Which I encountered quite often when trying to write parallel code, and difficult to figure out why.

From the blog post:

Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.

For a big list of data, You want to go though each of them and do something with them in parallel. and get the result for each of them to another list for later use. But normally you don't start a new goroutine for every one of them, Because you might don’t know how big the list is, and if it’s too big, that would eat too much memory. This package makes writing this kind of program easier.

API

type Worker func(input interface{}) (result interface{}, err error)
func ParallelRun(workerNum int, w Worker, inputs []interface{}) (results []interface{}, err error) {

The package contains these two public api:

  1. Worker is the custom code you want to run,
  2. ParallelRun start to run the Worker
    • workerNum: how many goroutines to start consuming the inputs list, it could be larger than your list size, or smaller, If it’s larger, some of the goroutines will run empty because they can’t get input to work from the channel.
    • inputs: the inputs list that you first need to convert your list to []interface{}
    • results: the result list that returned from the Worker, you normally want to go through them and cast them back to your the real type the Worker actually returns.

A chart to describe how it works

After you fire up the ParallelRun func, It will instantly start the workerNum goroutines, and start to simultaneously work on the Inputs List. After all workers finished without error, It will return the results list. If any of the workers return error. All the other workers will immediately stop and return the first error to ParallelRun


                                   (goroutines)
ParallelRun method:            +------------------+
                             ++|     worker 1     |++
                             | +------------------+ |
                             | |     worker 2     | |
                             | +------------------+ |
                             | |     worker 3     | |
     Inputs List             | +------------------+ |    Output List (random order)
+------------------------+   | |     worker 4     | |    +----+----+----+----+----+
| 1  | 2  | 3  | 4  | 5  | +-+ +------------------+ +--> | o1 | o2 | o3 | o4 | o5 |
+----+----+----+----+----+   | |     worker 5     | |    +----+----+----+----+----+
                             | +------------------+ |
                             | |     worker 6     | |
                             | +------------------+ |
                             | |     worker 7     | |
                             | +------------------+ |
                             | |     worker 8     | |
                             | +------------------+ |
            workerNum: 9     ++|     worker 9     |++
                               +------------------+

Example: check my ideal domain is available or not

For example, I have a text file contains Chinese words like this

小米
翻译
老兵
电影
土豆
豆瓣
文章
圆通
做爱
迅雷
昆明
地图
... and thousands more

I want to do first to convert the word to PinYin, and then suffix .com, Then use whois command to all of them to see if they are still available. So that if they are, I can quickly register it.

The non-parallel slow program will look like this:

for _, word := range domainWords {
	if strings.TrimSpace(word) == "" {
		continue
	}

	py := pinyin.Convert(word)
	pydowncase := strings.ToLower(py)
	domain := pydowncase + ".com"
	outr, err := domainAvailable(word, domain)
	if err != nil {
		fmt.Println("Error: ", err)
		continue
	}

	if outr.available {
		fmt.Printf("[Ohh Yeah] %s %s\n", outr.word, outr.domain)
		continue
    }

	fmt.Printf("\t\t\t %s %s %s\n", outr.word, outr.domain, outr.summary)
}

type checkResult struct {
	word      string
	domain    string
	available bool
	summary   string
}

func domainAvailable(word string, domain string) (ch checkResult, err error) {
	var summary string
	var output []byte

	ch.word = word
	ch.domain = domain

	cmd := exec.Command("whois", domain)
	output, err = cmd.Output()
	if err != nil {
		fmt.Println(err)
		return
	}

	outputstring := string(output)
	if strings.Contains(outputstring, "No match for \"") {
		ch.available = true
		return
	}

	summary = firstLineOf(outputstring, "Registrant Name") + " => "
	summary = summary + firstLineOf(outputstring, "Expiration Date")
	ch.summary = summary
	return
}

This would be ridiculously slow, Because for each word, the program start to run command whois, and after it finishes, It start to run next word. It would take days to run for example 30000 words.

Change it to parallel with fanout

You need to install the package if you haven't:

go get github.com/sunfmin/fanout

And with one call, you get it all parallel:

inputs := []interface{}{}


for _, word:= range domainWords {
	inputs = append(inputs, word)
}

results, err2 := fanout.ParallelRun(60, func(input interface{}) (interface{}, error) {
	word := input.(string)
	if strings.TrimSpace(word) == "" {
		return nil, nil
	}

	py := pinyin.Convert(word)
	pydowncase := strings.ToLower(py)
	domain := pydowncase + ".com"
	outr, err := domainAvailable(word, domain)
	if err != nil {
		fmt.Println("Error: ", err)
		return nil, err
	}

	if outr.available {
		fmt.Printf("[Ohh Yeah] %s %s\n", outr.word, outr.domain)
		return outr, nil
	}
	fmt.Printf("\t\t\t %s %s %s\n", outr.word, outr.domain, outr.summary)
	return outr, nil
}, inputs)

fmt.Println("Finished ", len(results), ", Error:", err2)

The full runnable code is here: https://github.com/sunfmin/domaincheck/blob/master/main.go

Q&A

  1. Does fanout.ParallelRun block execution?

    Yes, it does, It will wait until all the running goroutines finish, then collect all the results into the returned []interface{} value, and you might want to unwrap it and sort it for later use.

  2. How does it make the program run faster?

    For example there is a list contains 20 elements need to process, if the func for processing one elements takes exactly 1 second. In a non-parallel way, It basically will spend 20 seconds to do the work and show you the result of 20 elements. But by using fanout.ParallelRun, if you set the workerNum to be 20, In total, it will only spend the longest execution time 1 second to finish the 20 elements. So it's a 20x improvement. In reality it won't be exactly a 20x improvement. Because of the cores of CPU, and how intensive the program is using I/O. But it will maximize CPU usage and I/O throughput.

Enjoy!

More Repositories

1

learn_go_the_hard_way

Go
78
star
2

redisgosearch

Go
32
star
3

gowebapp

JavaScript
14
star
4

reflectutils

Easier way to use reflect to set and get values in Go
Go
14
star
5

assemblyline

Go
14
star
6

google_closure_editor_demo

JavaScript
11
star
7

domaincheck

Go
7
star
8

mgodb

Simple layer for close mgo.Session automatically
Go
6
star
9

gtemplate

Go
6
star
10

html2go

Go
5
star
11

tenpu

Go
5
star
12

goform

Go
4
star
13

collections

Go
4
star
14

readtracking

Read Tracking
JavaScript
4
star
15

govalidations

Go
4
star
16

performance_evaluation

JavaScript
3
star
17

top

Go
3
star
18

mangotemplate

Go
3
star
19

tcpspy

Go
3
star
20

earthquake

Ruby
3
star
21

formdata

Go
3
star
22

goauth

Go
3
star
23

dm-gitdb

Add git feature to database
Ruby
3
star
24

branfileserver

Go
2
star
25

snippetgo

Go
2
star
26

mangoauth

Package that provide mango Middleware and Handlers for oauth related authentication
Go
2
star
27

mangolog

Go
2
star
28

benchmark_go_closures

Go
2
star
29

sunfmin.github.com

CSS
2
star
30

batchbuy

Objective-C
2
star
31

provisiondemo

Ruby
2
star
32

emacs

My Emacs config
Emacs Lisp
2
star
33

perapera

Go
2
star
34

goairbrake

Go
2
star
35

notebook

My Note Book
2
star
36

qor_depot

Ruby
2
star
37

gitdemo

2
star
38

integrationtest

Go
2
star
39

resize

Go
2
star
40

timezones

Go
2
star
41

taobaokeurl

Go
2
star
42

lighthouse-timetracking

Python
2
star
43

envoydemo

2
star
44

auth1

Go
2
star
45

profilesamples

Ruby
2
star
46

bran-component-template

TypeScript
2
star
47

shellgo

2
star
48

friendlydemo

Ruby
2
star
49

taobaoimg

Go
2
star
50

beego_with_qor

Go
2
star
51

deno-bitbar-plugins

TypeScript
1
star
52

armhello

Assembly
1
star
53

awesome-builders

1
star
54

talks

Go
1
star
55

smtpdgo

Go
1
star
56

goapigen

Go
1
star
57

hyperboot

Go
1
star
58

printgostring

Go
1
star
59

duotests

HTML
1
star
60

carraytogo

Go
1
star
61

denocontainer

Dockerfile
1
star
62

checkdailyplan

Go
1
star
63

dotenv2export

Go
1
star
64

excerpt

Go
1
star
65

gogen

Generate Go code with structured blocks and composition
Go
1
star
66

mimemail

Go package for parsing mime email
Go
1
star
67

why-go

为什么选Go,中文
1
star
68

godoc2readme

Generate a Better GitHub Repository README.md
Go
1
star