今天我们开始学习Go中最有意思的事情,也就是并发编程。在学习之前我们先回顾下以往在Linux使用和学习中经常碰到的几个技术点。
-
文件描述符 Unix及衍生系统Everything is a file(descriptor)一切皆文件的特征,使研发人员可以将一切操作都抽象为文件操作,也就是open、close、read、write..
-
I/O多路复用 而文件操作实际上就是对I/O的操作,而多I/O操作就涉及并发和阻塞的处理。
-
管道(Pipeline) I/O操作也是对输出输出流进行的操作。管道是一种使用消息传递进行进程间通信的机制,管道可以将多个进程的标准输入和输出进行连接起来,将上一个进程的输出作为下一个进程的输入。
在任何语言中并发都是个值得讨论的问题,相对于其他语言中的进程、线程、协程等复杂的操作,使的我们使用起来很是困难,尤其在并发过程中的内存共享、数据通信问题;而Go在涉及之初就考虑了这些问题,语言层面就加大了对并发编程的支持。
Go鼓励一种新的开发思维:不通过共享内存进行通信;相反,通过通信共享内存。
Do not communicate by sharing memory; instead, share memory by communicating.
Goroutine
我们不能将 goroutine 直接视为其他语言中的 coroutine 协程,其主要原因是 goroutine 的实现机制和 coroutine 的实现上是有一些差异的,例如最常见的当一个线程进行I/O等待时,一般语言的实现上就会直接阻塞,而Go不会,goroutine 被多路复用到多个 OS 线程上,因此如果一个线程阻塞,例如在等待 I/O 时,其他线程会继续运行,设计上隐藏了线程创建和管理的许多复杂性。
Go的并发模型起源于Hoare的通信顺序进程CSP(Communicating sequential processes) 模型,也和Unix的管道模型类似。Goroutine 和 Channel 分别对应了CSP中的实体和传递媒介。
Goroutine -> Channel -> Goroutine
创建一个 goroutine 也十分简单,只需要在方法调用前增加一个 go 关键词就可以完成。调用完成后,goroutine 静默退出,类似于Unix shell 中的 &
符号。
func hello() {
print("world")
}
func main() {
go hello()
print("hello,")
time.Sleep(1 * time.Second)
}
或者在函数结构内直接调用
func main() {
go func() {
print("hello,world")
}()
time.Sleep(1 * time.Second)
}
Channel
Go 中 channel 的分配使用make关键词,收发操作遵循**先进先出(FIFO)**的设计
ci := make(chan int) // 整数的无缓冲Channel
cj := make(chan int, 0) // 整数的无缓冲Channel
cs := make(chan *os.File, 100) // 指向指针的缓冲Channel文件
同步 channel 不需要缓冲区,发送方会直接将数据交给接收方,没有缓冲区时,发送方会直接阻塞直到被接收方获取。
c := make(chan int)
// 在 goroutine 中开始排序;完成后,在通道上发出信号。
go func() {
// list.Sort()
c <- 1 // 发送信号;值无所谓。
}()
// doSomethingForAWhile()
<-c // 等待排序完成;丢弃发送的值。
如果设置了缓冲区,发送方会阻塞,直到值被复制到缓冲区内,当缓冲区已满,就需要等到下一次接收方获取后进行设置。我们可以用 channel 来限制并发处理的数量。
var sem = make(chan int, MaxOutstanding)
当我们在使用循环的时候要注意变量的共享范围,例如以下做法会使req
的值在goroutine
中共享,这不是我们想要的结果从而导致BUG的产生。
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
process(req) // Buggy; 请参阅下面的说明。
<-sem
}()
}
}
我们可以将req值作为参数传递到goroutine中来得到预期结果。
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<-sem
}(req)
}
}
也可以每次创建一个同名的新变量来隐藏循环的变量从而得到预期结果,看起来有点奇怪但在Go中是合理的办法。
func Serve(queue chan *Request) {
for req := range queue {
req := req // 为 goroutine 创建新的 req 实例。
sem <- 1
go func() {
process(req)
<-sem
}()
}
}
Go 中的 channel 可以像其他任何东西一样进行分配和值传递,我们将上述的Request
类型丰富后就可以得到一个限速、并行、非阻塞 RPC 系统的框架,而且看不到互斥锁。
type Request struct {
args []int
f func([]int) int
resultChan chan int
}
// Client
func sum(a []int) (s int){ ... }
request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Send request
clientRequests <- request
// Wait for response.
fmt.Printf("answer: %d\n", <-request.resultChan)
// Server
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}
根据 Go channels on steroids 中 channel 类型划分,共分为了三种内部 channel 类型。
- 同步 Channel - 不需要缓冲区。
- 异步 Channel - 基于环形缓冲区的传统生产者-消费者队列。
- 异步 Channel (chan struct{} 类型) - 本质是信号,不需要缓冲区,且零元素不占用内存空间。
Select
Context
Lock
Timer
源码分析-errors
参考
- https://go.dev/doc/
- 《Go语言设计与实现》 - 左书祺(@Draven)