14.1 并发、并行和协程
并发和并行的区别
并发(concurrency):看上去是同时执行。例如,在某时间段内,作业 A 和 B 都执行了,则 A,B 在此时间段是并发的。
并行(parallelism):实际上真的同时执行。例如,在某时刻,A 和 B 都在各自执行其任务,则 A,B 在此时刻是并行的。
$ 并行 \sube 并发 $
在目前的 Go 中,协程不是并行的,而是时间片轮流执行。
数据同步
多线程常常面临一个问题:同时读写一个内存数据,结果会不可预料。怎么办?
经典的解决方法是加锁,但是这样势必让代码更复杂、更慢。Go 使用的是基于通道(channel)同步的协程(goroutines)的方式。
协程:通过调度器实现的对进程的多路复用,比如将一个线程划分给多个协程使用,或者把多个线程轮流给一个协程使用。这样大大提高了利用率,从而减少 CPU 时间的空转。
并发的实现方式有两种:
- 有序的、确定的。通过额外的安排实现。
- 无序的、不确定的。通过加锁实现。
当使用 go func ()
协程运行一个子程序时,子程序拥有自己的栈,会自己维护这个栈并最后返回。
Go 的主程序 main ()
也可以看作一个协程。
使用 runtime.Gosched ()
可以临时出让 CPU,以避免子程序占用过高导致其他程序变慢。
GOMAXPROCS 环境变量
GOMAXPROCS 等同于(并发的)线程数量,在一台核心数多于 1 个的机器上,会尽可能有等同于核心数的线程在并行运行。
指定使用的核心数量
从命令行参数获取核心数:
var numCores = flag.Int("n", 2, "number of CPU cores to use")
指定并发线程数
flag.Parse()
runtime.GOMAXPROCS(*numCores)
14.2 信息通道
通道(channel)是协程间的消息队列。
使用方法如下:
package main
import (
"fmt"
"time"
)
func main() {
// 创建一个字符串通道
ch := make(chan string)
// 创建两个协程
go sendData(ch)
go getData(ch)
// 避免提前退出
time.Sleep(1e9)
}
func sendData(ch chan string) {
// 向通道写入数据
ch <- "Beijing"
ch <- "Tokyo"
ch <- "London"
ch <- "Tripoli"
ch <- "Washington"
}
func getData(ch chan string) {
var input string
// time.Sleep(2e9)
for {
// 从通道读取数据
input = <-ch
fmt.Printf("%s \n", input)
}
}
结果:
Beijing
Tokyo
London
Tripoli
Washington
无缓冲同步阻塞通道
上面这种通道是无缓冲的,也叫同步的,在任何一个协程没有准备好的时候,值无法被保存。
例如,去掉第一个 time.Sleep (1e9)
,则不会有任何输出,因为发送协程还没准备好,程序就结束了。
例如,去掉第二个 time.Sleep (2e9)
的注释,同样不会有任何输出,因为通道不具有保存数据的能力。
并且这种通道还是阻塞的,第一个数据必须被接收了,才能发送第二个。不妨把 getData
改成:
func getData(ch chan string) {
var input string
for {
// 从通道读取数据
input = <-ch
fmt.Printf("%s \n", input)
time.Sleep(2e9)
}
}
会发现只输出一个 Beijing
. 原因是只发送了第一个数据,然后休眠的时候,程序就结束了。
14.3 计时器
time.Ticker
可以定期向管道发送时间(纳秒)
调用 Stop ()
使计时器停止
而 Timer
只发送一次时间。
简单超时模式
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // one second
timeout <- true
}()
select {
case <-ch:
// a read from ch has occured
case <-timeout:
// the read from ch has timed out
break
}
14.4 恢复
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work) // start the goroutine for that work
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Printf("Work failed with %s in %v", err, work)
}
}()
do(work)
}
safelyDo 会在 do (work)
执行后或者 panic
后进入 defer 代码区。而 recover ()
会阻止程序崩溃。因此上面的代码能够
- 记录错误
- 防止协程错误让整个程序崩溃
14.5 互斥锁和通道的对比
使用互斥锁的模型:
func Worker(pool *Pool) {
for {
// 加锁
pool.Mu.Lock()
// 处理
// begin critical section:
task := pool.Tasks[0] // take the first task
pool.Tasks = pool.Tasks[1:] // update the pool of tasks
// end critical section
// 解锁
pool.Mu.Unlock()
process(task)
}
}
使用通道的模型:
func Worker(in, out chan *Task) {
for {
// 从待办通道取出
task := <-in
process(task)
// 放入完成通道
out <- task
}
}
在 Go 中,通道依旧是用锁实现的,只不过锁对用户不可见了。
14.6 延迟求值
延迟生成器能够调用时才返回值:
generateInteger() => 0
generateInteger() => 1
generateInteger() => 2
....
实现:
package main
import (
"fmt"
)
var resume chan int
func integers() chan int {
yield := make(chan int)
count := 0
go func() {
for {
yield <- count
count++
}
}()
return yield
}
func generateInteger() int {
return <-resume
}
func main() {
resume = integers()
fmt.Println(generateInteger()) //=> 0
fmt.Println(generateInteger()) //=> 1
fmt.Println(generateInteger()) //=> 2
}
通用模式:
package main
import (
"fmt"
)
type Any interface{}
type EvalFunc func(Any) (Any, Any)
func main() {
evenFunc := func(state Any) (Any, Any) {
os := state.(int)
ns := os + 2
return os, ns
}
even := BuildLazyIntEvaluator(evenFunc, 0)
for i := 0; i < 10; i++ {
fmt.Printf("%vth even: %v\n", i, even())
}
}
func BuildLazyEvaluator(evalFunc EvalFunc, initState Any) func() Any {
retValChan := make(chan Any)
loopFunc := func() {
var actState Any = initState
var retVal Any
for {
retVal, actState = evalFunc(actState)
retValChan <- retVal
}
}
retFunc := func() Any {
return <- retValChan
}
go loopFunc()
return retFunc
}
func BuildLazyIntEvaluator(evalFunc EvalFunc, initState Any) func() int {
ef := BuildLazyEvaluator(evalFunc, initState)
return func() int {
return ef().(int)
}
}
14.7 期值模式
期值就是在使用一个值之前先在其它核心进行计算,类似 Promise
(Javascript) 和 Task
(C#)。例如如果方阵 $A,B$ 可逆,有:$(AB)^{-1} = B^{-1} A^{-1}$:
func InverseProduct(a Matrix, b Matrix) {
a_inv := Inverse(a)
b_inv := Inverse(b)
return Product(a_inv, b_inv)
}
我们希望 a_inv
,b_inv
可以并行计算,从而提高运算性能:
func InverseProduct(a Matrix, b Matrix) {
a_inv_future := InverseFuture(a) // start as a goroutine
b_inv_future := InverseFuture(b) // start as a goroutine
a_inv := <-a_inv_future
b_inv := <-b_inv_future
return Product(a_inv, b_inv)
}
func InverseFuture(a Matrix) chan Matrix {
future := make(chan Matrix)
go func() {
future <- Inverse(a)
}()
return future
}
14.8 复用
下面的例子以 C/S 模式为例
package main
import "fmt"
type Request struct {
a, b int
replyc chan int // reply channel inside the Request
}
type binOp func(a, b int) int
func run(op binOp, req *Request) {
req.replyc <- op(req.a, req.b)
}
func server(op binOp, service chan *Request, quit chan bool) {
for {
select {
case req := <-service:
go run(op, req)
case <-quit:
return
}
}
}
func startServer(op binOp) (service chan *Request, quit chan bool) {
service = make(chan *Request)
quit = make(chan bool)
go server(op, service, quit)
return service, quit
}
func main() {
adder, quit := startServer(func(a, b int) int { return a + b })
const N = 100
var reqs [N]Request
for i := 0; i < N; i++ {
req := &reqs[i]
req.a = i
req.b = i + N
req.replyc = make(chan int)
adder <- req
}
// checks:
for i := N - 1; i >= 0; i-- { // doesn't matter what order
if <-reqs[i].replyc != N+2*i {
fmt.Println("fail at", i)
} else {
fmt.Println("Request ", i, " is ok!")
}
}
quit <- true
fmt.Println("done")
}
server 提供了一个二元运算服务(加法)。在第一个 for 循环中,外部请求送入了 Request 管道,然后进入 server 中被专用协程处理。
另外有一个 quit 通道,可用于 main 函数发送中止信号。
14.9 请求数控制
package main
const MAXREQS = 50
var sem = make(chan int, MAXREQS)
type Request struct {
a, b int
replyc chan int
}
func process(r *Request) {
// do something
}
func handle(r *Request) {
sem <- 1 // doesn't matter what we put in it
process(r)
<-sem // one empty place in the buffer: the next request can start
}
func server(service chan *Request) {
for {
request := <-service
go handle(request)
}
}
func main() {
service := make(chan *Request)
go server(service)
}
这里通过一个 sem 标志量,每开始处理一个请求,送入 sem 一个值,每完成一个请求,送出 sem 一个值。
14.10 链式协程
package main
import (
"flag"
"fmt"
)
var ngoroutine = flag.Int("n", 100000, "how many goroutines")
func f(left, right chan int) { left <- 1 + <-right }
func main() {
flag.Parse()
leftmost := make(chan int)
var left, right chan int = nil, leftmost
for i := 0; i < *ngoroutine; i++ {
left, right = right, make(chan int)
go f(left, right)
}
right <- 0 // bang!
x := <-leftmost // wait for completion
fmt.Println(x) // 100000, ongeveer 1,5 s
}
14.11 多核运算
func DoAll(){
sem := make(chan int, NCPU) // Buffering optional but sensible
for i := 0; i < NCPU; i++ {
go DoPart(sem)
}
// Drain the channel sem, waiting for NCPU tasks to complete
for i := 0; i < NCPU; i++ {
<-sem // wait for one task to complete
}
// All done.
}
func DoPart(sem chan int) {
// do the part of the computation
sem <-1 // signal that this piece is done
}
func main() {
runtime.GOMAXPROCS(NCPU) // runtime.GOMAXPROCS = NCPU
DoAll()
}
14.12 流水线
无流水:
func SerialProcessData(in <-chan *Data, out chan<- *Data) {
for data := range in {
tmpA := PreprocessData(data)
tmpB := ProcessStepA(tmpA)
tmpC := ProcessStepB(tmpB)
out <- PostProcessData(tmpC)
}
}
有流水:
func ParallelProcessData (in <-chan *Data, out chan<- *Data) {
// make channels:
preOut := make(chan *Data, 100)
stepAOut := make(chan *Data, 100)
stepBOut := make(chan *Data, 100)
stepCOut := make(chan *Data, 100)
// start parallel computations:
go PreprocessData(in, preOut)
go ProcessStepA(preOut,StepAOut)
go ProcessStepB(StepAOut,StepBOut)
go ProcessStepC(StepBOut,StepCOut)
go PostProcessData(StepCOut,out)
}
14.13 漏桶
客户端不停地发送数据:
func client() {
for {
var b *Buffer
// Grab a buffer if available; allocate if not
select {
case b = <-freeList:
// Got one; nothing more to do
default:
// None free, so allocate a new one
b = new(Buffer)
}
loadInto(b) // Read next message from the network
serverChan <- b // Send to server
}
}
服务器:
func server() {
for {
b := <-serverChan // Wait for work.
process(b)
// Reuse buffer if there's room.
select {
case freeList <- b:
// Reuse buffer if free slot on freeList; nothing more to do
default:
// Free list full, just carry on: the buffer is 'dropped'
}
}
}
当 freeList
非空时,会一直读取数据。否则漏桶会溢出。
14.14 使用通道并发访问对象
package main
import (
"fmt"
"strconv"
)
type Person struct {
Name string
salary float64
chF chan func()
}
func NewPerson(name string, salary float64) *Person {
p := &Person{name, salary, make(chan func())}
go p.backend()
return p
}
func (p *Person) backend() {
for f := range p.chF {
f()
}
}
// Set salary.
func (p *Person) SetSalary(sal float64) {
p.chF <- func() { p.salary = sal }
}
// Retrieve salary.
func (p *Person) Salary() float64 {
fChan := make(chan float64)
p.chF <- func() { fChan <- p.salary }
return <-fChan
}
func (p *Person) String() string {
return "Person - name is: " + p.Name + " - salary is: " + strconv.FormatFloat(p.Salary(), 'f', 2, 64)
}
func main() {
bs := NewPerson("Smith Bill", 2500.5)
fmt.Println(bs)
bs.SetSalary(4000.25)
fmt.Println("Salary changed:")
fmt.Println(bs)
}
差不多是个发布订阅模式的属性值修改。通过通道保证并发访问时数据的一致性。