1、Go语言并发之扇入和扇出
编程中经常遇到扇入和扇出两个概念,所谓的扇入是指将多路通道聚合到一条通道中处理,Go 语言最简单的扇入
就是使用 select 聚合多条通道服务;所谓的扇出是指将一条通道发散到多条通道中处理,在Go语言里面具体实
现就是使用go关键字启动多个 goroutine 并发处理。
中国有句经典的哲学名句叫分久必合,合久必分,软件的设计和开发也遵循同样的哲学思想,扇入就是合,扇出
就是分。当生产者的速度很慢时,需要使用扇入技术聚合多个生产者满足消费者,比如很耗时的加密/解密服务;
当消费者的速度很慢时,需要使用扇出技术,比如Web服务器并发请求处理。扇入和扇出是Go并发编程中常用的
技术。
1.1 单链工作模式
如果我们没有使用扇入扇出的流程,就是传统的单链工作模式。
package mainimport "fmt"func A(n int) <-chan string {out := make(chan string)go func() {defer close(out)for i := 1; i <= n; i++ {out <- fmt.Sprint("节点A-", i)}}()return out
}func B(in <-chan string) <-chan string {out := make(chan string)go func() {defer close(out)for c := range in {out <- "节点B" + c}}()return out
}func C(in <-chan string) <-chan string {out := make(chan string)go func() {defer close(out)for c := range in {out <- "节点C" + c}}()return out
}func main() {componentA := A(9)componentB := B(componentA)componentC := C(componentB)for goods := range componentC {fmt.Println(goods)}
}
# 程序输出
节点C节点B节点A-1
节点C节点B节点A-2
节点C节点B节点A-3
节点C节点B节点A-4
节点C节点B节点A-5
节点C节点B节点A-6
节点C节点B节点A-7
节点C节点B节点A-8
节点C节点B节点A-9
1.2 扇入扇出模式
如果使用扇入扇出模式的话,我们在增加一个汇聚的功能函数就可以了。
package mainimport ("fmt""sync"
)func A(n int) <-chan string {out := make(chan string)go func() {defer close(out)for i := 1; i <= n; i++ {out <- fmt.Sprint("节点A-", i)}}()return out
}func B(in <-chan string) <-chan string {out := make(chan string)go func() {defer close(out)for c := range in {out <- "节点B" + c}}()return out
}func C(in <-chan string) <-chan string {out := make(chan string)go func() {defer close(out)for c := range in {out <- "节点C" + c}}()return out
}// 扇入的主要操作
func merge(ins ...<-chan string) <-chan string {var wg sync.WaitGroupout := make(chan string)// 将一个channel中的数据发送到out当中dispose := func(in <-chan string) {defer wg.Done()for c := range in {out <- c}}// 添加等待组的数量wg.Add(len(ins))// 扇入阶段,启动多个goroutine处理在channel当中的数据for _, cs := range ins {go dispose(cs)}// 等待所有的输入的数据ins处理完,再关闭输出的outgo func() {wg.Wait()close(out)}()return out
}func main() {componentA := A(9)componentB1 := B(componentA)componentB2 := B(componentA)componentB3 := B(componentA)// 汇聚三个进行扇入操作mergeComponent := merge(componentB1, componentB2, componentB3)// 扇出操作componentC := C(mergeComponent)for goods := range componentC {fmt.Println(goods)}
}
# 程序输出
节点C节点B节点A-2
节点C节点B节点A-3
节点C节点B节点A-1
节点C节点B节点A-5
节点C节点B节点A-7
节点C节点B节点A-4
节点C节点B节点A-6
节点C节点B节点A-8
节点C节点B节点A-9
1.3 扇入扇出法寻找素数
扇入扇出法,体现并发性能的,相信每个学golang的都应该听过扇入扇出法寻找素数,我们先来看看不使用扇入
扇出法的。
package mainimport ("fmt""math/rand""time"
)// 重复调用函数的生成器
var repeatFn = func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {valueStream := make(chan interface{})go func() {defer close(valueStream)for {select {case <-done:returncase valueStream <- fn():}}}()return valueStream
}// 转为int的channel
var toInt = func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {intStream := make(chan int)go func() {defer close(intStream)for v := range valueStream {select {case <-done:returncase intStream <- v.(int):}}}()return intStream
}// 返回素数
var primeChanArr = func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {primeStream := make(chan interface{})go func() {defer close(primeStream)// intStream一直会循环生成for integer := range intStream {integer -= 1prime := truefor divisor := integer - 1; divisor > 1; divisor-- {if integer%divisor == 0 {prime = falsebreak}}if prime {select {case <-done:returncase primeStream <- integer:}}}}()return primeStream
}// 将另外一个channel的数据从全部倒腾出来
var take = func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {takeStream := make(chan interface{})go func() {defer close(takeStream)for i := 0; i < num; i++ {select {case <-done:returncase takeStream <- <-valueStream:}}}()return takeStream
}func main() {// 返回一个随机数randNum := func() interface{} {return rand.Intn(50000000)}// 完成chandone := make(chan interface{})defer close(done)start := time.Now()// 生成整形数字randIntStream := toInt(done, repeatFn(done, randNum))// channel数组fmt.Printf("Primes:\n")// 取出10个素数for prime := range take(done, primeChanArr(done, randIntStream), 10) {fmt.Printf("%d\n", prime)}fmt.Printf("消耗时间:%v", time.Since(start))
}
# 程序输出
Primes:
24941317
36122539
6410693
10128161
25511527
2107939
14004383
7190363
45931967
2393161
消耗时间:22.3454984s
然后我们来使用扇入删除法。
package mainimport ("fmt""math/rand""runtime""sync""time"
)// 重复调用函数的生成器
var repeatFn = func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {valueStream := make(chan interface{})go func() {defer close(valueStream)for {select {case <-done:returncase valueStream <- fn():}}}()return valueStream
}// 转为int的channel
var toInt = func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {intStream := make(chan int)go func() {defer close(intStream)for v := range valueStream {select {case <-done:returncase intStream <- v.(int):}}}()return intStream
}var primeChanArr = func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {primeStream := make(chan interface{})go func() {defer close(primeStream)for integer := range intStream {integer -= 1prime := truefor divisor := integer - 1; divisor > 1; divisor-- {if integer%divisor == 0 {prime = falsebreak}}if prime {select {case <-done:returncase primeStream <- integer:}}}}()return primeStream
}// 将另外一个channel的数据从全部倒腾出来
var take = func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {takeStream := make(chan interface{})go func() {defer close(takeStream)for i := 0; i < num; i++ {select {case <-done:returncase takeStream <- <-valueStream:}}}()return takeStream
}var fanIn = func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {var wg sync.WaitGroupmultiplexedStream := make(chan interface{})multiplex := func(c <-chan interface{}) {defer wg.Done()for i := range c {select {case <-done:returncase multiplexedStream <- i:}}}wg.Add(len(channels))for _, c := range channels {go multiplex(c)}// 等待所有的读取的完成go func() {wg.Wait()close(multiplexedStream)}()return multiplexedStream
}func main() {// 生成随机数randNum := func() interface{} {return rand.Intn(50000000)}// 完成chandone := make(chan interface{})defer close(done)start := time.Now()randIntStream := toInt(done, repeatFn(done, randNum))cpuNumber := runtime.NumCPU()fmt.Printf("find cpu is : %d\n", cpuNumber)// channel数组chanArray := make([]<-chan interface{}, cpuNumber)fmt.Printf("Primes:\n")for i := 0; i < cpuNumber; i++ {chanArray[i] = primeChanArr(done, randIntStream)}for prime := range take(done, fanIn(done, chanArray...), 10) {fmt.Println(prime)}fmt.Printf("消耗时间:%v", time.Since(start))
}
# 程序输出
find cpu is : 8
Primes:
6410693
24941317
10128161
36122539
25511527
2107939
14004383
7190363
2393161
45931967
消耗时间:4.3099368s
1.4 扇入扇出法模拟任务处理
package mainimport ("context""log""sync""time"
)// Task包含任务编号及任务所需时长
type Task struct {Number intCost time.Duration
}// task channel生成器
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {taskCh := make(chan Task)go func() {defer close(taskCh)for _, task := range taskList {select {case <-ctx.Done():returncase taskCh <- task:}}}()return taskCh
}// doTask处理并返回已处理的任务编号作为通道的函数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {doneTaskCh := make(chan int)go func() {defer close(doneTaskCh)for task := range taskCh {select {case <-ctx.Done():returndefault:log.Printf("do task number: %d\n", task.Number)// task 任务处理// 根据任务耗时休眠time.Sleep(task.Cost)// 已处理任务的编号放入通道doneTaskCh <- task.Number}}}()return doneTaskCh
}// fan-in意味着将多个数据流复用或合并成一个流
// merge函数接收参数传递的多个通道taskChs,并返回单个通道<-chan int
func merge(ctx context.Context, taskChs []<-chan int) <-chan int {var wg sync.WaitGroupmergedTaskCh := make(chan int)mergeTask := func(taskCh <-chan int) {defer wg.Done()for t := range taskCh {select {case <-ctx.Done():returncase mergedTaskCh <- t:}}}wg.Add(len(taskChs))for _, taskCh := range taskChs {go mergeTask(taskCh)}// 等待所有任务处理完毕go func() {wg.Wait()close(mergedTaskCh)}()return mergedTaskCh
}func main() {start := time.Now()// 使用context来防止goroutine泄漏,即使在处理过程中被中断ctx, cancel := context.WithCancel(context.Background())defer cancel()// taskList定义每个任务及其成本taskList := []Task{Task{1, 1 * time.Second},Task{2, 7 * time.Second},Task{3, 2 * time.Second},Task{4, 3 * time.Second},Task{5, 5 * time.Second},Task{6, 3 * time.Second},}// taskChannelGerenator是一个函数,它接收一个taskList并将其转换为Task类型的通道// 执行结果(int slice channel)存储在worker中// 由于doTask的结果是一个通道,被分给了多个worker,这就对应了fan-out处理taskCh := taskChannelGerenator(ctx, taskList)numWorkers := 4workers := make([]<-chan int, numWorkers)for i := 0; i < numWorkers; i++ {// doTask处理并返回已处理的任务编号作为通道的函数workers[i] = doTask(ctx, taskCh)}count := 0// merge从中读取已处理的任务编号for d := range merge(ctx, workers) {count++log.Printf("done task number: %d\n", d)}log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}
# 程序输出
2023/06/10 17:51:04 do task number: 1
2023/06/10 17:51:04 do task number: 4
2023/06/10 17:51:04 do task number: 3
2023/06/10 17:51:04 do task number: 2
2023/06/10 17:51:05 do task number: 5
2023/06/10 17:51:05 done task number: 1
2023/06/10 17:51:06 do task number: 6
2023/06/10 17:51:06 done task number: 3
2023/06/10 17:51:07 done task number: 4
2023/06/10 17:51:09 done task number: 6
2023/06/10 17:51:10 done task number: 5
2023/06/10 17:51:11 done task number: 2
2023/06/10 17:51:11 Finished. Done 6 tasks. Total time: 7.009781s
1.5 扇入扇出法模拟流水线工作
假如我们有个流水线共分三个步骤,分别是 job1
、job2
和job3
。
package mainimport ("fmt""time"
)func job1(count int) <-chan int {outCh := make(chan int, 2)go func() {defer close(outCh)for i := 0; i < count; i++ {time.Sleep(time.Second)fmt.Println("job1 finish:", 1)outCh <- 1}}()return outCh
}func job2(inCh <-chan int) <-chan int {outCh := make(chan int, 2)go func() {defer close(outCh)for val := range inCh {// 耗时2秒time.Sleep(time.Second * 2)val++fmt.Println("job2 finish:", val)outCh <- val}}()return outCh
}func job3(inCh <-chan int) <-chan int {outCh := make(chan int, 2)go func() {defer close(outCh)for val := range inCh {val++fmt.Println("job3 finish:", val)outCh <- val}}()return outCh
}func main() {t := time.Now()firstResult := job1(10)secondResult := job2(firstResult)thirdResult := job3(secondResult)for v := range thirdResult {fmt.Println(v)}fmt.Println("all finish")fmt.Println("duration:", time.Since(t).String())
}
# 程序输出
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 21.0077945s
共计计算21秒,主要是因为 job2 中的耗时太久导致,现在我们的主要任务就是解决掉这个问题了。
这里只用了一个 job2 来处理 job1 的结果,如果我们能多开启几个 goroutine job2 并行处理会不会提升性能呢?
现在我们改进下代码,解决 job2 耗时的问题,需要注意一下,这里对ch的关闭也要作一下调整,由于启用了多个
job2 的 goroutine,所以在 job2 内部进行关闭了。
package mainimport ("fmt""sync""time"
)func job1(count int) <-chan int {outCh := make(chan int, 2)go func() {defer close(outCh)for i := 0; i < count; i++ {time.Sleep(time.Second)fmt.Println("job1 finish:", 1)outCh <- 1}}()return outCh
}func job2(inCh <-chan int) <-chan int {outCh := make(chan int, 2)go func() {defer close(outCh)for val := range inCh {// 耗时2秒time.Sleep(time.Second * 2)val++fmt.Println("job2 finish:", val)outCh <- val}}()return outCh
}func job3(inCh <-chan int) <-chan int {outCh := make(chan int, 2)go func() {defer close(outCh)for val := range inCh {val++fmt.Println("job3 finish:", val)outCh <- val}}()return outCh
}func merge(inCh ...<-chan int) <-chan int {outCh := make(chan int, 2)var wg sync.WaitGroupfor _, ch := range inCh {wg.Add(1)go func(wg *sync.WaitGroup, in <-chan int) {defer wg.Done()for val := range in {outCh <- val}}(&wg, ch)}// 重要注意,wg.Wait() 一定要在goroutine里运行,否则会引起deadlockgo func() {wg.Wait()close(outCh)}()return outCh
}func main() {t := time.Now()firstResult := job1(10)// 拆分成三个job2,即3个goroutine (扇出)secondResult1 := job2(firstResult)secondResult2 := job2(firstResult)secondResult3 := job2(firstResult)// 合并结果(扇入)secondResult := merge(secondResult1, secondResult2, secondResult3)thirdResult := job3(secondResult)for v := range thirdResult {fmt.Println(v)}fmt.Println("all finish")fmt.Println("duration:", time.Since(t).String())
}
# 程序输出
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
job1 finish: 1
3
job2 finish: 2
job1 finish: 1
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 12.0213193s
可以看到,性能提升了90%,由原来的22s减少到12s。上面代码中为了演示效果,使用的缓冲 chan 很小,如果调
大的话,性能更明显。
FAN-OUT
模式:多个 goroutine 从同一个通道读取数据,直到该通道关闭。OUT 是一种张开的模式,所以又被称
为扇出,可以用来分发任务。
FAN-IN
模式:1个 goroutine 从多个通道读取数据,直到这些通道关闭。IN 是一种收敛的模式,所以又被称为扇
入,用来收集处理的结果。
是不是很像扇子的状态,先展开(扇出)再全并(扇入)。
总结:在类似流水线这类的逻辑中,我们可以使用 FAN-IN 和 FAN-OUT 模式来提升程序性能。