14.1 并发、并行和协程

并发和并行的区别

并发(concurrency):看上去是同时执行。例如,在某时间段内,作业 A 和 B 都执行了,则 A,B 在此时间段是并发的。

并行(parallelism):实际上真的同执行。例如,在某时刻,A 和 B 都在各自执行其任务,则 A,B 在此时刻是并行的。

$ 并行 \sube 并发 $

在目前的 Go 中,协程不是并行的,而是时间片轮流执行。

数据同步

多线程常常面临一个问题:同时读写一个内存数据,结果会不可预料。怎么办?

经典的解决方法是加锁,但是这样势必让代码更复杂、更慢。Go 使用的是基于通道(channel)同步的协程(goroutines)的方式。

协程:通过调度器实现的对进程的多路复用,比如将一个线程划分给多个协程使用,或者把多个线程轮流给一个协程使用。这样大大提高了利用率,从而减少 CPU 时间的空转。

并发的实现方式有两种:

  • 有序的、确定的。通过额外的安排实现。
  • 无序的、不确定的。通过加锁实现。

当使用 go func () 协程运行一个子程序时,子程序拥有自己的栈,会自己维护这个栈并最后返回。

Go 的主程序 main () 也可以看作一个协程。

使用 runtime.Gosched () 可以临时出让 CPU,以避免子程序占用过高导致其他程序变慢。

GOMAXPROCS 环境变量

GOMAXPROCS 等同于(并发的)线程数量,在一台核心数多于 1 个的机器上,会尽可能有等同于核心数的线程在并行运行。

指定使用的核心数量

从命令行参数获取核心数:

var numCores = flag.Int("n", 2, "number of CPU cores to use")

指定并发线程数

flag.Parse()
runtime.GOMAXPROCS(*numCores)

14.2 信息通道

通道(channel)是协程间消息队列

使用方法如下:

package main

import (
	"fmt"
	"time"
)

func main() {

	// 创建一个字符串通道
	ch := make(chan string)

	// 创建两个协程
	go sendData(ch)
	go getData(ch)

	// 避免提前退出
	time.Sleep(1e9)
}

func sendData(ch chan string) {
	// 向通道写入数据
	
	ch <- "Beijing"
	ch <- "Tokyo"
	ch <- "London"
	ch <- "Tripoli"
	ch <- "Washington"
}

func getData(ch chan string) {
	var input string
	// time.Sleep(2e9)
	for {
		// 从通道读取数据
		input = <-ch
		fmt.Printf("%s \n", input)
	}
}

结果:

Beijing 
Tokyo 
London 
Tripoli 
Washington 

无缓冲同步阻塞通道

上面这种通道是无缓冲的,也叫同步的,在任何一个协程没有准备好的时候,值无法被保存。

例如,去掉第一个 time.Sleep (1e9),则不会有任何输出,因为发送协程还没准备好,程序就结束了。

例如,去掉第二个 time.Sleep (2e9) 的注释,同样不会有任何输出,因为通道不具有保存数据的能力。

并且这种通道还是阻塞的,第一个数据必须被接收了,才能发送第二个。不妨把 getData 改成:

func getData(ch chan string) {
	var input string
	for {
		// 从通道读取数据
		input = <-ch
		fmt.Printf("%s \n", input)
		time.Sleep(2e9)
	}
}

会发现只输出一个 Beijing. 原因是只发送了第一个数据,然后休眠的时候,程序就结束了。

14.3 计时器

time.Ticker 可以定期向管道发送时间(纳秒)

调用 Stop () 使计时器停止

Timer 只发送一次时间。

简单超时模式

timeout := make(chan bool, 1)
go func() {
        time.Sleep(1e9) // one second
        timeout <- true
}()
select {
    case <-ch:
        // a read from ch has occured
    case <-timeout:
        // the read from ch has timed out
        break
}

14.4 恢复

func server(workChan <-chan *Work) {
    for work := range workChan {
        go safelyDo(work)   // start the goroutine for that work
    }
}

func safelyDo(work *Work) {
    defer func() {
        if err := recover(); err != nil {
            log.Printf("Work failed with %s in %v", err, work)
        }
    }()
    do(work)
}

safelyDo 会在 do (work) 执行后或者 panic 后进入 defer 代码区。而 recover () 会阻止程序崩溃。因此上面的代码能够

  • 记录错误
  • 防止协程错误让整个程序崩溃

14.5 互斥锁和通道的对比

使用互斥锁的模型:

func Worker(pool *Pool) {
    for {
        // 加锁
        pool.Mu.Lock()
        // 处理
        // begin critical section:
        task := pool.Tasks[0]        // take the first task
        pool.Tasks = pool.Tasks[1:]  // update the pool of tasks
        // end critical section
        // 解锁
        pool.Mu.Unlock()
        process(task)
    }
}

使用通道的模型:

    func Worker(in, out chan *Task) {
        for {
            // 从待办通道取出
            task := <-in
            process(task)
            // 放入完成通道
            out <- task
        }
    }

在 Go 中,通道依旧是用锁实现的,只不过锁对用户不可见了。

14.6 延迟求值

延迟生成器能够调用时才返回值:

    generateInteger() => 0
    generateInteger() => 1
    generateInteger() => 2
    ....

实现:

package main

import (
    "fmt"
)

var resume chan int

func integers() chan int {
    yield := make(chan int)
    count := 0
    go func() {
        for {
            yield <- count
            count++
        }
    }()
    return yield
}

func generateInteger() int {
    return <-resume
}

func main() {
    resume = integers()
    fmt.Println(generateInteger())  //=> 0
    fmt.Println(generateInteger())  //=> 1
    fmt.Println(generateInteger())  //=> 2    
}

通用模式:

package main

import (
    "fmt"
)

type Any interface{}
type EvalFunc func(Any) (Any, Any)

func main() {
    evenFunc := func(state Any) (Any, Any) {
        os := state.(int)
        ns := os + 2
        return os, ns
    }
    
    even := BuildLazyIntEvaluator(evenFunc, 0)
    
    for i := 0; i < 10; i++ {
        fmt.Printf("%vth even: %v\n", i, even())
    }
}

func BuildLazyEvaluator(evalFunc EvalFunc, initState Any) func() Any {
    retValChan := make(chan Any)
    loopFunc := func() {
        var actState Any = initState
        var retVal Any
        for {
            retVal, actState = evalFunc(actState)
            retValChan <- retVal
        }
    }
    retFunc := func() Any {
        return <- retValChan
    }
    go loopFunc()
    return retFunc
}

func BuildLazyIntEvaluator(evalFunc EvalFunc, initState Any) func() int {
    ef := BuildLazyEvaluator(evalFunc, initState)
    return func() int {
        return ef().(int)
    }
}

14.7 期值模式

期值就是在使用一个值之前先在其它核心进行计算,类似 Promise (Javascript) 和 Task (C#)。例如如果方阵 $A,B$ 可逆,有:$(AB)^{-1} = B^{-1} A^{-1}$:

func InverseProduct(a Matrix, b Matrix) {
    a_inv := Inverse(a)
    b_inv := Inverse(b)
    return Product(a_inv, b_inv)
}

我们希望 a_invb_inv 可以并行计算,从而提高运算性能:

func InverseProduct(a Matrix, b Matrix) {
    a_inv_future := InverseFuture(a)   // start as a goroutine
    b_inv_future := InverseFuture(b)   // start as a goroutine
    a_inv := <-a_inv_future
    b_inv := <-b_inv_future
    return Product(a_inv, b_inv)
}

func InverseFuture(a Matrix) chan Matrix {
    future := make(chan Matrix)
    go func() {
        future <- Inverse(a)
    }()
    return future
}

14.8 复用

下面的例子以 C/S 模式为例

package main

import "fmt"

type Request struct {
	a, b   int
	replyc chan int // reply channel inside the Request
}

type binOp func(a, b int) int

func run(op binOp, req *Request) {
	req.replyc <- op(req.a, req.b)
}

func server(op binOp, service chan *Request, quit chan bool) {
	for {
		select {
		case req := <-service:
			go run(op, req)
		case <-quit:
			return
		}
	}
}

func startServer(op binOp) (service chan *Request, quit chan bool) {
	service = make(chan *Request)
	quit = make(chan bool)
	go server(op, service, quit)
	return service, quit
}

func main() {
	adder, quit := startServer(func(a, b int) int { return a + b })
	const N = 100
	var reqs [N]Request
	for i := 0; i < N; i++ {
		req := &reqs[i]
		req.a = i
		req.b = i + N
		req.replyc = make(chan int)
		adder <- req
	}
	// checks:
	for i := N - 1; i >= 0; i-- { // doesn't matter what order
		if <-reqs[i].replyc != N+2*i {
			fmt.Println("fail at", i)
		} else {
			fmt.Println("Request ", i, " is ok!")
		}
	}
	quit <- true
	fmt.Println("done")
}

server 提供了一个二元运算服务(加法)。在第一个 for 循环中,外部请求送入了 Request 管道,然后进入 server 中被专用协程处理。

另外有一个 quit 通道,可用于 main 函数发送中止信号。

14.9 请求数控制

package main

const MAXREQS = 50

var sem = make(chan int, MAXREQS)

type Request struct {
	a, b   int
	replyc chan int
}

func process(r *Request) {
	// do something
}

func handle(r *Request) {
	sem <- 1 // doesn't matter what we put in it
	process(r)
	<-sem // one empty place in the buffer: the next request can start
}

func server(service chan *Request) {
	for {
		request := <-service
		go handle(request)
	}
}

func main() {
	service := make(chan *Request)
	go server(service)
}

这里通过一个 sem 标志量,每开始处理一个请求,送入 sem 一个值,每完成一个请求,送出 sem 一个值。

14.10 链式协程

package main

import (
	"flag"
	"fmt"
)

var ngoroutine = flag.Int("n", 100000, "how many goroutines")

func f(left, right chan int) { left <- 1 + <-right }

func main() {
	flag.Parse()
	leftmost := make(chan int)
	var left, right chan int = nil, leftmost
	for i := 0; i < *ngoroutine; i++ {
		left, right = right, make(chan int)
		go f(left, right)
	}
	right <- 0      // bang!
	x := <-leftmost // wait for completion
	fmt.Println(x)  // 100000, ongeveer 1,5 s
}

14.11 多核运算

func DoAll(){
    sem := make(chan int, NCPU) // Buffering optional but sensible
    for i := 0; i < NCPU; i++ {
        go DoPart(sem)
    }
    // Drain the channel sem, waiting for NCPU tasks to complete
    for i := 0; i < NCPU; i++ {
        <-sem // wait for one task to complete
    }
    // All done.
}

func DoPart(sem chan int) {
    // do the part of the computation
    sem <-1 // signal that this piece is done
}

func main() {
    runtime.GOMAXPROCS(NCPU) // runtime.GOMAXPROCS = NCPU
    DoAll()
}
   

14.12 流水线

无流水:

func SerialProcessData(in <-chan *Data, out chan<- *Data) {
    for data := range in {
        tmpA := PreprocessData(data)
        tmpB := ProcessStepA(tmpA)
        tmpC := ProcessStepB(tmpB)
        out <- PostProcessData(tmpC)
    }
}

有流水:

func ParallelProcessData (in <-chan *Data, out chan<- *Data) {
    // make channels:
    preOut := make(chan *Data, 100)
    stepAOut := make(chan *Data, 100)
    stepBOut := make(chan *Data, 100)
    stepCOut := make(chan *Data, 100)
    // start parallel computations:
    go PreprocessData(in, preOut)
    go ProcessStepA(preOut,StepAOut)
    go ProcessStepB(StepAOut,StepBOut)
    go ProcessStepC(StepBOut,StepCOut)
    go PostProcessData(StepCOut,out)
}  

14.13 漏桶

客户端不停地发送数据:

 func client() {
    for {
        var b *Buffer
        // Grab a buffer if available; allocate if not 
        select {
            case b = <-freeList:
                // Got one; nothing more to do
            default:
                // None free, so allocate a new one
                b = new(Buffer)
        }
        loadInto(b)         // Read next message from the network
        serverChan <- b     // Send to server
        
    }
 }
 

服务器:

func server() {
    for {
        b := <-serverChan       // Wait for work.
        process(b)
        // Reuse buffer if there's room.
        select {
            case freeList <- b:
                // Reuse buffer if free slot on freeList; nothing more to do
            default:
                // Free list full, just carry on: the buffer is 'dropped'
        }
    }
}

freeList 非空时,会一直读取数据。否则漏桶会溢出。

14.14 使用通道并发访问对象

package main

import (
	"fmt"
	"strconv"
)

type Person struct {
	Name   string
	salary float64
	chF    chan func()
}

func NewPerson(name string, salary float64) *Person {
	p := &Person{name, salary, make(chan func())}
	go p.backend()
	return p
}

func (p *Person) backend() {
	for f := range p.chF {
		f()
	}
}

// Set salary.
func (p *Person) SetSalary(sal float64) {
	p.chF <- func() { p.salary = sal }
}

// Retrieve salary.
func (p *Person) Salary() float64 {
	fChan := make(chan float64)
	p.chF <- func() { fChan <- p.salary }
	return <-fChan
}

func (p *Person) String() string {
	return "Person - name is: " + p.Name + " - salary is: " + strconv.FormatFloat(p.Salary(), 'f', 2, 64)
}

func main() {
	bs := NewPerson("Smith Bill", 2500.5)
	fmt.Println(bs)
	bs.SetSalary(4000.25)
	fmt.Println("Salary changed:")
	fmt.Println(bs)
}

差不多是个发布订阅模式的属性值修改。通过通道保证并发访问时数据的一致性。

参考

无锁队列的实现 | 酷 壳 - CoolShell