funcmain() { end := make(chanbool) for i := 0; i < 10; i++ { go test(end) // <-end // 若此处有end,在通道没有缓存的情况下,会每0.5s一个"goroutine" } for i := 0; i < 10; i++ { <-end // 写在此处,则不妨碍0s时10个goroutine一同启动 } }
工厂模式
传统的
在main函数里新建一个channel
另外声明一个函数f,将1中建立的channel作为参数
函数f中向channel中发送数据
一定情景下可以变成
函数f自己新建channel
函数f向新建的channel中发送数据
函数f返回这个装有数据的channel
优点
整体感更强,能够封装一些连带操作(比如启动一个后台监视程序等等)并将这些连带操作对外屏蔽
有时(通道带关闭时)可以用出其他语言generator的感觉
一定程度上可以使用只读/只写通道来保护数据
缺点:
写了非常多的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
funcpump() <-chanint { // 返回一个只读的chan ch := make(chanint) gofunc() { for i := 0; i < 10; i++ { ch <- i } close(ch) }() return ch }
funcmain() { for x := range pump() { fmt.Println(x) } }
funcgenerater()chanint { c := make(chanint, 10) deferclose(c)
for i := 0; i < 10; i++ { c <- i } return c }
functest(upstream chanint, next chanint) { deferfunc() { if err := recover(); err != nil { fmt.Println("error in goroutine", err) } }() i := <-upstream res := 2520 / i next <- res }
funcmain() { res := make(chanint, 10) up := generater()
for i := 0; i < 10; i++ { go test(up, res) }
for i := 0; i < 9; i++ { // 有一个出错,因此通道里只有9个值 fmt.Println(<-res) } }
funcsource() <-chanint { numbers := make(chanint, 20) func() { for i := 0; i < 20; i++ { numbers <- i } close(numbers) }() return numbers }
funcfilter(in <-chanint, out chan<- int) { for { time.Sleep(time.Millisecond * 100) if i, ok := <-in; ok { if i%2 == 0 { out <- i } } else { break } } close(out) }
funcmain() { numbers := source() out := make(chanint)
go filter(numbers, out) for x := range out { fmt.Println(x) } }
// 上游生成大于2的正整数 funcgenerate()chanint { ch := make(chanint) gofunc() { for i := 2; ; i++ { ch <- i } }() return ch }
// filter中对每个上游的数字,判断是否为素数 // 不需要再次遍历是之前所有数字的倍数 // 只需要不是最新产生的素数的倍数即可 funcfilter(in chanint, prime int)chanint { out := make(chanint) gofunc() { for { time.Sleep(time.Millisecond * 100) if i := <-in; i%prime != 0 { out <- i } } }() return out }
funcsieve()chanint { out := make(chanint) gofunc() { ch := generate() for { prime := <-ch // 第一个素数是2 ch = filter(ch, prime) out <- prime // 收集素数到out通道 } }() return out }
funcmain() { primes := sieve() for { fmt.Println(<-primes) } }
funcmain() { request, quit := startServer(func(a, b int)int { return a + b }) const N = 100 var reqs [N]Request for i := 0; i < N; i++ { // 取出并包装一个请求 req := &reqs[i] req.a = i req.b = i + N req.replyc = make(chanint)
// 放入请求通道 request <- req }
// checks: for i := N - 1; i >= 0; i-- { // doesn’t matter what order fmt.Println(<-reqs[i].replyc) }
funcSerialProcessData(in <- chan *Data, out <- chan *Data) { for data := range in { tmpA := PreprocessData(data) tmpB := ProcessStepA(tmpA) tmpC := ProcessStepB(tmpB) out <- PostProcessData(tmpC) } }
// 在多个线程中分别启动处理 go PreprocessData(in, preOut) go ProcessStepA(preOut, stepAOut) go ProcessStepB(stepAOut, stepBOut) go ProcessStepC(stepBOut, stepCOut) go PostProcessData(stepCOut, out) }