A Bite of GoLang (7)

叁叁肆2018-09-19 11:25


本文来自网易云社区


作者:盛国存


9.1、使用Channel等待任务结束

前面的例子中我们等待任务结束是通过sleep来处理,因为打印的数据较少,1 毫秒足够;但是这种方式等待任务结束显然不是很优雅。 对于任务结束首先我们需要确定的通知外面我们打印结束了,那我们又如何通知呢?在Go语言中我们不要通过共享内存来通信,而是要通过通信来共享内存。直接用Channel就可以,下面我们来改造上面的例子

package main

import (
    "fmt"
)

type worker struct {
    in chan int
    done chan bool
}

func work(in chan int, done chan bool, num int) {
    for ch := range in {

        fmt.Println("Work ID :", num)
        fmt.Println(ch)
        done<- true
    }
}

func createWork(num int) worker {

    ch := worker{
        in: make(chan int),
        done: make(chan  bool),
    }
    go work(ch.in, ch.done, num)
    return ch
}

func main() {

    var workers [10]worker
    for i := 0; i < 10; i ++ {
        workers[i] = createWork(i)
    }

    for i := 0; i < 10; i ++ {
        workers[i].in <- 'M' + i
        <-workers[i].done
    }
}

打印输出结果

Work ID : 0
77
Work ID : 1
78
Work ID : 2
79
Work ID : 3
80
Work ID : 4
81
Work ID : 5
82
Work ID : 6
83
Work ID : 7
84
Work ID : 8
85
Work ID : 9
86

虽然sleep部分的代码已经删除了,但是发现是顺序打印的,这显然不是我想要的结果。Go语言对等待多任务完成提供了一个库 WaitGroup,下面我们就用它继续重构上述的代码

package main

import (
    "fmt"
    "sync"
)

type worker struct {
    in chan int
    done func()
}

func work(worker worker, num int) {
    for ch := range worker.in {

        fmt.Println("Work ID :", num)
        fmt.Println(ch)
        worker.done()
    }
}

func createWork(num int, wg *sync.WaitGroup) worker {

    worker := worker{
        in: make(chan int),
        done: func() {
            wg.Done() // 每个任务做完了就调用Done
        },
    }
    go work(worker, num)
    return worker
}


func main() {

    var wg sync.WaitGroup
    var workers [10]worker
    for i := 0; i < 10; i ++ {
        workers[i] = createWork(i, &wg)
    }

    wg.Add(10) // Add 总共有多少个任务

    for i := 0; i < 10; i ++ {
        workers[i].in <- 'M' + i
    }

    wg.Wait() // 等待所有的任务做完
}

结果输出

Work ID : 4
81
Work ID : 5
82
Work ID : 1
78
Work ID : 2
79
Work ID : 6
Work ID : 3
80
Work ID : 0
Work ID : 9
86
83
77
Work ID : 7
84
Work ID : 8
85

Process finished with exit code 0

这样相应的结果才是我们想要的。

面试题实战

协程交替执行,使其能顺序输出1-20的自然数

这个问题就不做演示了,留给读者自行发挥。


9.2、用select进行调度

1、select使用

首先我们先来介绍一下select常规的应用场景,比如

var ch1, ch2 chan int

我们有两个channel,我们想从 ch1、ch2 里面收数据,

var ch1, ch2 chan int
data1 := <- ch1
data2 := <- ch2

谁快我就要谁,这就是我们的select

package main

import (
    "fmt"
)

func main() {

    var ch1, ch2 chan int
    select {

        case data := <- ch1:
            fmt.Println("CH1 的数据:", data)
        case data := <-ch2:
            fmt.Println("CH2 的数据:", data)
        default:
            fmt.Println("没收到 CH1、CH2 的数据")
    }
}

这就相当于做了一个非阻塞式的获取。下面我们就结合一个channel生成器来做一个例子演示

package main

import (
    "fmt"
    "time"
    "math/rand"
)

func genChan() chan int {

    out := make(chan int)
    go func() {

        i := 0
        for {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
            out <- i
            i ++
        }
    }()
    return out
}

func main() {

    var ch1, ch2 = genChan(), genChan()
    for {
        select {
            case data := <- ch1:
                fmt.Println("CH1 的数据:", data)
            case data := <-ch2:
                fmt.Println("CH2 的数据:", data)
        }
    }
}

输出结果(部分)

CH1 的数据: 0
CH2 的数据: 0
CH1 的数据: 1
CH2 的数据: 1
CH1 的数据: 2
CH2 的数据: 2
CH1 的数据: 3
CH2 的数据: 3
CH1 的数据: 4
CH2 的数据: 4
CH1 的数据: 5
CH2 的数据: 5
CH2 的数据: 6
CH1 的数据: 6
CH1 的数据: 7
CH1 的数据: 8
CH2 的数据: 7
CH1 的数据: 9
CH2 的数据: 8
CH1 的数据: 10
CH2 的数据: 9
CH1 的数据: 11
CH1 的数据: 12
CH1 的数据: 13
CH2 的数据: 10
CH2 的数据: 11
CH1 的数据: 14
CH2 的数据: 12
CH2 的数据: 13
CH1 的数据: 15

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

这就是select的一个应用场景,从输出结果可以看到,CH1、CH2的输出结果不一样,谁先出数据就先选择谁;两个同时出就随机的选择一个。

2、定时器的使用

比如上面的这段代码我想要在10秒之后程序就终止,我该如何处理呢?我们这里需要介绍一下Go语言的 time.After

// After waits for the duration to elapse and then sends the current time
// on the returned channel.
// It is equivalent to NewTimer(d).C.
// The underlying Timer is not recovered by the garbage collector
// until the timer fires. If efficiency is a concern, use NewTimer
// instead and call Timer.Stop if the timer is no longer needed.
func After(d Duration) <-chan Time {
    return NewTimer(d).C
}

从源码来看,他的返回值类型是一个 <-chan Time ,那就方便很多了

package main

import (
    "fmt"
    "time"
)

func genChan() chan int {

    out := make(chan int)
    go func() {

        i := 0
        for {
            time.Sleep(time.Second)
            out <- i
            i ++
        }
    }()
    return out
}

func main() {

    var ch1, ch2 = genChan(), genChan()
    tm := time.After(10 * time.Second) // 加上10秒的定时
    for {
        select {
            case data := <- ch1:
                fmt.Println("CH1 的数据:", data)
            case data := <-ch2:
                fmt.Println("CH2 的数据:", data)
            case <-tm:
                return // 收到指令程序直接return
        }
    }
}

运行到10秒,代码自动退出。


9.3、传统同步机制

Go语言除了CSP模型外,还是有传统同步机制的,比如互斥量 Mutex ,现在我们就用它举个例子: 用互斥量实现 atomic

package main

import (
    "sync"
    "time"
    "fmt"
)

type atomicInt struct {
    value int
    lock sync.Mutex
}

func increment(a *atomicInt) {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value ++
}

func get(a *atomicInt) int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {

    var a atomicInt
    increment(&a)
    go func() {
        increment(&a)
    }()
    time.Sleep(time.Second)
    fmt.Println(get(&a))
}

结果输出

2

Process finished with exit code 0

代码写完,可以用上面介绍的race来检查一下,是否有冲突,是否安全;当然这里还是不建议自己来造这些轮子的,直接使用系统的就可以了。系统提供了 atomic.AddInt32() 等等这些原子操作。


相关阅读:A Bite of GoLang (1)

A Bite of GoLang (2)

A Bite of GoLang (3)

A Bite of GoLang (4)

A Bite of GoLang (5)

A Bite of GoLang (6)

A Bite of GoLang (7)

A Bite of GoLang (8)


网易云免费体验馆0成本体验20+款云产品!

更多网易研发、产品、运营经验分享请访问网易云社区