• Stars
    star
    392
  • Rank 107,449 (Top 3 %)
  • Language
    Go
  • License
    Apache License 2.0
  • Created over 9 years ago
  • Updated over 6 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Fanout - make writing parallel code even easier

Fanout - make writing parallel code even easier

This code is port from sample code of Go blog post Go Concurrency Patterns: Pipelines and cancellation's Bounded parallelism section sample code

I made the fanout pattern sample code reusable, So you can easily write parallel code without worry about fatal error: all goroutines are asleep - deadlock!, Which I encountered quite often when trying to write parallel code, and difficult to figure out why.

From the blog post:

Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.

For a big list of data, You want to go though each of them and do something with them in parallel. and get the result for each of them to another list for later use. But normally you don't start a new goroutine for every one of them, Because you might don’t know how big the list is, and if it’s too big, that would eat too much memory. This package makes writing this kind of program easier.

API

type Worker func(input interface{}) (result interface{}, err error)
func ParallelRun(workerNum int, w Worker, inputs []interface{}) (results []interface{}, err error) {

The package contains these two public api:

  1. Worker is the custom code you want to run,
  2. ParallelRun start to run the Worker
    • workerNum: how many goroutines to start consuming the inputs list, it could be larger than your list size, or smaller, If it’s larger, some of the goroutines will run empty because they can’t get input to work from the channel.
    • inputs: the inputs list that you first need to convert your list to []interface{}
    • results: the result list that returned from the Worker, you normally want to go through them and cast them back to your the real type the Worker actually returns.

A chart to describe how it works

After you fire up the ParallelRun func, It will instantly start the workerNum goroutines, and start to simultaneously work on the Inputs List. After all workers finished without error, It will return the results list. If any of the workers return error. All the other workers will immediately stop and return the first error to ParallelRun


                                   (goroutines)
ParallelRun method:            +------------------+
                             ++|     worker 1     |++
                             | +------------------+ |
                             | |     worker 2     | |
                             | +------------------+ |
                             | |     worker 3     | |
     Inputs List             | +------------------+ |    Output List (random order)
+------------------------+   | |     worker 4     | |    +----+----+----+----+----+
| 1  | 2  | 3  | 4  | 5  | +-+ +------------------+ +--> | o1 | o2 | o3 | o4 | o5 |
+----+----+----+----+----+   | |     worker 5     | |    +----+----+----+----+----+
                             | +------------------+ |
                             | |     worker 6     | |
                             | +------------------+ |
                             | |     worker 7     | |
                             | +------------------+ |
                             | |     worker 8     | |
                             | +------------------+ |
            workerNum: 9     ++|     worker 9     |++
                               +------------------+

Example: check my ideal domain is available or not

For example, I have a text file contains Chinese words like this

小米
翻译
老兵
电影
土豆
豆瓣
文章
圆通
做爱
迅雷
昆明
地图
... and thousands more

I want to do first to convert the word to PinYin, and then suffix .com, Then use whois command to all of them to see if they are still available. So that if they are, I can quickly register it.

The non-parallel slow program will look like this:

for _, word := range domainWords {
	if strings.TrimSpace(word) == "" {
		continue
	}

	py := pinyin.Convert(word)
	pydowncase := strings.ToLower(py)
	domain := pydowncase + ".com"
	outr, err := domainAvailable(word, domain)
	if err != nil {
		fmt.Println("Error: ", err)
		continue
	}

	if outr.available {
		fmt.Printf("[Ohh Yeah] %s %s\n", outr.word, outr.domain)
		continue
    }

	fmt.Printf("\t\t\t %s %s %s\n", outr.word, outr.domain, outr.summary)
}

type checkResult struct {
	word      string
	domain    string
	available bool
	summary   string
}

func domainAvailable(word string, domain string) (ch checkResult, err error) {
	var summary string
	var output []byte

	ch.word = word
	ch.domain = domain

	cmd := exec.Command("whois", domain)
	output, err = cmd.Output()
	if err != nil {
		fmt.Println(err)
		return
	}

	outputstring := string(output)
	if strings.Contains(outputstring, "No match for \"") {
		ch.available = true
		return
	}

	summary = firstLineOf(outputstring, "Registrant Name") + " => "
	summary = summary + firstLineOf(outputstring, "Expiration Date")
	ch.summary = summary
	return
}

This would be ridiculously slow, Because for each word, the program start to run command whois, and after it finishes, It start to run next word. It would take days to run for example 30000 words.

Change it to parallel with fanout

You need to install the package if you haven't:

go get github.com/sunfmin/fanout

And with one call, you get it all parallel:

inputs := []interface{}{}


for _, word:= range domainWords {
	inputs = append(inputs, word)
}

results, err2 := fanout.ParallelRun(60, func(input interface{}) (interface{}, error) {
	word := input.(string)
	if strings.TrimSpace(word) == "" {
		return nil, nil
	}

	py := pinyin.Convert(word)
	pydowncase := strings.ToLower(py)
	domain := pydowncase + ".com"
	outr, err := domainAvailable(word, domain)
	if err != nil {
		fmt.Println("Error: ", err)
		return nil, err
	}

	if outr.available {
		fmt.Printf("[Ohh Yeah] %s %s\n", outr.word, outr.domain)
		return outr, nil
	}
	fmt.Printf("\t\t\t %s %s %s\n", outr.word, outr.domain, outr.summary)
	return outr, nil
}, inputs)

fmt.Println("Finished ", len(results), ", Error:", err2)

The full runnable code is here: https://github.com/sunfmin/domaincheck/blob/master/main.go

Q&A

  1. Does fanout.ParallelRun block execution?

    Yes, it does, It will wait until all the running goroutines finish, then collect all the results into the returned []interface{} value, and you might want to unwrap it and sort it for later use.

  2. How does it make the program run faster?

    For example there is a list contains 20 elements need to process, if the func for processing one elements takes exactly 1 second. In a non-parallel way, It basically will spend 20 seconds to do the work and show you the result of 20 elements. But by using fanout.ParallelRun, if you set the workerNum to be 20, In total, it will only spend the longest execution time 1 second to finish the 20 elements. So it's a 20x improvement. In reality it won't be exactly a 20x improvement. Because of the cores of CPU, and how intensive the program is using I/O. But it will maximize CPU usage and I/O throughput.

Enjoy!

More Repositories

1

learn_go_the_hard_way

Go
78
star
2

redisgosearch

Go
32
star
3

gowebapp

JavaScript
14
star
4

assemblyline

Go
14
star
5

reflectutils

Easier way to use reflect to set and get values in Go
Go
13
star
6

google_closure_editor_demo

JavaScript
11
star
7

domaincheck

Go
7
star
8

mgodb

Simple layer for close mgo.Session automatically
Go
6
star
9

gtemplate

Go
6
star
10

html2go

Go
5
star
11

tenpu

Go
5
star
12

goform

Go
4
star
13

readtracking

Read Tracking
JavaScript
4
star
14

govalidations

Go
4
star
15

performance_evaluation

JavaScript
3
star
16

top

Go
3
star
17

mangotemplate

Go
3
star
18

tcpspy

Go
3
star
19

collections

Go
3
star
20

earthquake

Ruby
3
star
21

goauth

Go
3
star
22

formdata

Go
3
star
23

dm-gitdb

Add git feature to database
Ruby
3
star
24

snippetgo

Go
2
star
25

mangoauth

Package that provide mango Middleware and Handlers for oauth related authentication
Go
2
star
26

mangolog

Go
2
star
27

benchmark_go_closures

Go
2
star
28

sunfmin.github.com

CSS
2
star
29

batchbuy

Objective-C
2
star
30

provisiondemo

Ruby
2
star
31

emacs

My Emacs config
Emacs Lisp
2
star
32

perapera

Go
2
star
33

goairbrake

Go
2
star
34

notebook

My Note Book
2
star
35

qor_depot

Ruby
2
star
36

gitdemo

2
star
37

integrationtest

Go
2
star
38

resize

Go
2
star
39

taobaokeurl

Go
2
star
40

timezones

Go
2
star
41

lighthouse-timetracking

Python
2
star
42

profilesamples

Ruby
2
star
43

shellgo

2
star
44

friendlydemo

Ruby
2
star
45

taobaoimg

Go
2
star
46

beego_with_qor

Go
2
star
47

deno-bitbar-plugins

TypeScript
1
star
48

branfileserver

Go
1
star
49

armhello

Assembly
1
star
50

awesome-builders

1
star
51

talks

Go
1
star
52

goapigen

Go
1
star
53

smtpdgo

Go
1
star
54

hyperboot

Go
1
star
55

envoydemo

1
star
56

printgostring

Go
1
star
57

duotests

HTML
1
star
58

carraytogo

Go
1
star
59

denocontainer

Dockerfile
1
star
60

checkdailyplan

Go
1
star
61

dotenv2export

Go
1
star
62

excerpt

Go
1
star
63

auth1

Go
1
star
64

bran-component-template

TypeScript
1
star
65

gogen

Generate Go code with structured blocks and composition
Go
1
star
66

mimemail

Go package for parsing mime email
Go
1
star
67

why-go

为什么选Go,中文
1
star
68

godoc2readme

Generate a Better GitHub Repository README.md
Go
1
star