這里整理 使用reflect操作channel 一下,把它分享給大家。
Go語言實現(xiàn)了基于CSP(Communicating Sequential Processes)理論的并發(fā)方案。方案包含兩個重要元素,一個是Goroutine,它是Go應(yīng)用并發(fā)設(shè)計的基本構(gòu)建與執(zhí)行單元;另一個就是channel,它在并發(fā)模型中扮演著重要的角色。channel既可以用來實現(xiàn)Goroutine間的通信,還可以實現(xiàn)Goroutine間的同步。
我們先來簡要回顧一下有關(guān)channel的常規(guī)語法。
我們可以通過make(chan T, n)創(chuàng)建元素類型為T、容量為n的channel類型實例,比如:
ch1 := make(chan int) // 創(chuàng)建一個無緩沖的channel實例ch1
ch2 := make(chan int, 5) // 創(chuàng)建一個帶緩沖的channel實例ch2
Go提供了“<-”操作符用于對channel類型變量進(jìn)行發(fā)送與接收操作,下面是一些對上述channel ch1和ch2進(jìn)行收發(fā)操作的代碼示例:
ch1 <- 13 // 將整型字面值13發(fā)送到無緩沖channel類型變量ch1中
n := <- ch1 // 從無緩沖channel類型變量ch1中接收一個整型值存儲到整型變量n中
ch2 <- 17 // 將整型字面值17發(fā)送到帶緩沖channel類型變量ch2中
m := <- ch2 // 從帶緩沖channel類型變量ch2中接收一個整型值存儲到整型變量m中
Go不僅提供了單獨(dú)操作channel的語法,還提供了可以同時對多個channel進(jìn)行操作的select-case語法,比如下面代碼:
select {
case x := <-ch1: // 從channel ch1接收數(shù)據(jù)
... ...
case y, ok := <-ch2: // 從channel ch2接收數(shù)據(jù),并根據(jù)ok值判斷ch2是否已經(jīng)關(guān)閉
... ...
case ch3 <- z: // 將z值發(fā)送到channel ch3中:
... ...
default: // 當(dāng)上面case中的channel通信均無法實施時,執(zhí)行該默認(rèn)分支
}
我們看到:select語法中的case數(shù)量必須是固定的,我們只能把事先要交給select“監(jiān)聽”的channel準(zhǔn)備好,在select語句中平鋪開才可以。這就是select語句常規(guī)語法的限制,即select語法不支持動態(tài)的case集合。如果我們要監(jiān)聽的channel個數(shù)是不確定的,且在運(yùn)行時會動態(tài)變化,那么select語法將無法滿足我們的要求。
那怎么突破這一限制呢?鳥窩老師告訴我們用reflect包[2]。
很多朋友可能和我一樣,因為沒有使用過reflect包操作channel,就會以為reflect操作channel的能力是Go新版本才提供的,但實則不然。reflect包中用于操作channel的函數(shù)Select以及其切片參數(shù)的元素類型SelectCase早在Go 1.1版本就加入到Go語言中了,有下圖為證:
那么如何使用這一“古老”的機(jī)制呢?我們一起來看一些例子。
首先我們來看第一種情況,也是最好理解的一種情況,即從一個動態(tài)的channel集合進(jìn)行receive operations的select,下面是示例代碼:
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-recv/main.go
package main
import (
'fmt'
'math/rand'
'reflect'
'sync'
'time'
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
var rchs []chan int
for i := 0; i < 10; i++ {
rchs = append(rchs, make(chan int))
}
// 創(chuàng)建SelectCase
var cases = createRecvCases(rchs)
// 消費(fèi)者goroutine
go func() {
defer wg.Done()
for {
chosen, recv, ok := reflect.Select(cases)
if ok {
fmt.Printf('recv from channel [%d], val=%v\n', chosen, recv)
continue
}
// one of the channels is closed, exit the goroutine
fmt.Printf('channel [%d] closed, select goroutine exit\n', chosen)
return
}
}()
// 生產(chǎn)者goroutine
go func() {
defer wg.Done()
var n int
s := rand.NewSource(time.Now().Unix())
r := rand.New(s)
for i := 0; i < 10; i++ {
n = r.Intn(10)
rchs[n] <- n
}
close(rchs[n])
}()
wg.Wait()
}
func createRecvCases(rchs []chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 創(chuàng)建recv case
for _, ch := range rchs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
return cases
}
在這個例子中,我們通過createRecvCases這個函數(shù)創(chuàng)建一個元素類型為reflect.SelectCase的切片,之后使用reflect.Select可以監(jiān)聽這個切片集合,就像常規(guī)select語法那樣,從有數(shù)據(jù)的recv Channel集合中隨機(jī)選出一個返回。
reflect.SelectCase有三個字段:
// $GOROOT/src/reflect/value.go
type SelectCase struct {
Dir SelectDir // direction of case
Chan Value // channel to use (for send or receive)
Send Value // value to send (for send)
}
其中Dir字段的值是一個“枚舉”,枚舉值如下:
// $GOROOT/src/reflect/value.go
const (
_ SelectDir = iota
SelectSend // case Chan <- Send
SelectRecv // case <-Chan:
SelectDefault // default
)
從常量名我們也可以看出,Dir用于標(biāo)識case的類型,SelectRecv表示這是一個從channel做receive操作的case,SelectSend表示這是一個向channel做send操作的case;SelectDefault則表示這是一個default case。
構(gòu)建好SelectCase的切片后,我們就可以將其傳給reflect.Select了。Select函數(shù)的語義與select關(guān)鍵字語義是一致的,它會監(jiān)聽傳入的所有SelectCase,以上面示例為例,如果所有channel都沒有數(shù)據(jù),那么reflect.Select會阻塞,直到某個channel有數(shù)據(jù)或關(guān)閉。
Select函數(shù)有三個返回值:
// $GOROOT/src/reflect/value.go
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
對于上面示例而言,如果監(jiān)聽的某個case有數(shù)據(jù)了,那么Select的返回值chosen中存儲了該channel在cases切片中的下標(biāo),recv中存儲了從channel收到的值,recvOK等價于comma, ok模式的ok,當(dāng)正常接收到由send channel操作發(fā)送的值時,recvOK為true,如果channel被close了,recvOK為false。
上面的示例啟動了兩個goroutine,一個goroutine充當(dāng)消費(fèi)者,由reflect.Select監(jiān)聽一組channel,當(dāng)某個channel關(guān)閉時,該goroutine退出;另外一個goroutine則是隨機(jī)的向這些channel中發(fā)送數(shù)據(jù),發(fā)送10次后,關(guān)閉其中某個channel通知消費(fèi)者退出。
我們運(yùn)行一下該示例程序,得到如下結(jié)果:
$go run main.go
recv from channel [1], val=1
recv from channel [4], val=4
recv from channel [5], val=5
recv from channel [8], val=8
recv from channel [1], val=1
recv from channel [1], val=1
recv from channel [8], val=8
recv from channel [3], val=3
recv from channel [5], val=5
recv from channel [9], val=9
channel [9] closed, select goroutine exit
我們?nèi)粘>幋a時經(jīng)常會在select語句中加上default分支,以防止select完全阻塞,下面我們就來改造一下示例,讓其增加對default分支的支持:
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-recv-with-default/main.go
package main
import (
'fmt'
'math/rand'
'reflect'
'sync'
'time'
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
var rchs []chan int
for i := 0; i < 10; i++ {
rchs = append(rchs, make(chan int))
}
// 創(chuàng)建SelectCase
var cases = createRecvCases(rchs, true)
// 消費(fèi)者goroutine
go func() {
defer wg.Done()
for {
chosen, recv, ok := reflect.Select(cases)
if cases[chosen].Dir == reflect.SelectDefault {
fmt.Println('choose the default')
continue
}
if ok {
fmt.Printf('recv from channel [%d], val=%v\n', chosen, recv)
continue
}
// one of the channels is closed, exit the goroutine
fmt.Printf('channel [%d] closed, select goroutine exit\n', chosen)
return
}
}()
// 生產(chǎn)者goroutine
go func() {
defer wg.Done()
var n int
s := rand.NewSource(time.Now().Unix())
r := rand.New(s)
for i := 0; i < 10; i++ {
n = r.Intn(10)
rchs[n] <- n
}
close(rchs[n])
}()
wg.Wait()
}
func createRecvCases(rchs []chan int, withDefault bool) []reflect.SelectCase {
var cases []reflect.SelectCase
// 創(chuàng)建recv case
for _, ch := range rchs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
if withDefault {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectDefault,
Chan: reflect.Value{},
Send: reflect.Value{},
})
}
return cases
}
在這個示例中,我們的createRecvCases函數(shù)增加了一個withDefault布爾型參數(shù),當(dāng)withDefault為true時,返回的cases切片中將包含一個default case。我們看到,創(chuàng)建defaultCase時,Chan和Send兩個字段需要傳入空的reflect.Value。
在消費(fèi)者goroutine中,我們通過選出的case的Dir字段是否為reflect.SelectDefault來判定是否default case被選出,其余的處理邏輯不變,我們運(yùn)行一下這個示例:
$go run main.go
recv from channel [8], val=8
recv from channel [8], val=8
choose the default
choose the default
choose the default
choose the default
choose the default
recv from channel [1], val=1
choose the default
choose the default
choose the default
recv from channel [3], val=3
recv from channel [6], val=6
choose the default
choose the default
recv from channel [0], val=0
choose the default
choose the default
choose the default
recv from channel [5], val=5
recv from channel [2], val=2
choose the default
choose the default
choose the default
recv from channel [2], val=2
choose the default
choose the default
recv from channel [2], val=2
choose the default
choose the default
channel [2] closed, select goroutine exit
我們看到,default case被選擇的幾率還是蠻大的。
最后,我們再來看看如何使用reflect包向channel中發(fā)送數(shù)據(jù),看下面示例代碼:
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-send/main.go
package main
import (
'fmt'
'reflect'
'sync'
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
ch0, ch1, ch2 := make(chan int), make(chan int), make(chan int)
var schs = []chan int{ch0, ch1, ch2}
// 創(chuàng)建SelectCase
var cases = createCases(schs)
// 生產(chǎn)者goroutine
go func() {
defer wg.Done()
for range cases {
chosen, _, _ := reflect.Select(cases)
fmt.Printf('send to channel [%d], val=%v\n', chosen, cases[chosen].Send)
cases[chosen].Chan = reflect.Value{}
}
fmt.Println('select goroutine exit')
return
}()
// 消費(fèi)者goroutine
go func() {
defer wg.Done()
for range schs {
var v int
select {
case v = <-ch0:
fmt.Printf('recv %d from ch0\n', v)
case v = <-ch1:
fmt.Printf('recv %d from ch1\n', v)
case v = <-ch2:
fmt.Printf('recv %d from ch2\n', v)
}
}
}()
wg.Wait()
}
func createCases(schs []chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 創(chuàng)建send case
for i, ch := range schs {
n := i + 100
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ch),
Send: reflect.ValueOf(n),
})
}
return cases
}
在這個示例中,我們針對三個channel:ch0,ch1和ch2創(chuàng)建了寫操作的SelectCase,每個SelectCase的Send字段都被賦予了要發(fā)送給該channel的值,這里使用了“100+下標(biāo)號”。
生產(chǎn)者goroutine中有一個“與眾不同”的地方,那就是每次某個寫操作觸發(fā)后,我都將該SelectCase中的Chan重置為一個空Value,以防止下次該channel被重新選出:
cases[chosen].Chan = reflect.Value{}
運(yùn)行一下該示例,我們得到:
$go run main.go
recv 101 from ch1
send to channel [1], val=101
send to channel [0], val=100
recv 100 from ch0
recv 102 from ch2
send to channel [2], val=102
select goroutine exit
通過上面的幾個例子我們看到,reflect.Select有著與select等價的語義,且還支持動態(tài)增刪和修改case,功能不可為不強(qiáng)大,現(xiàn)在還剩一點(diǎn)要care,那就是它的執(zhí)行性能如何呢?我們接著往下看。
我們用benchmark test來對比一下常規(guī)select與reflect.Select在執(zhí)行性能上的差別,下面是benchmark代碼:
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-benchmark/benchmark_test.go
package main
import (
'reflect'
'testing'
)
func createCases(rchs []chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 創(chuàng)建recv case
for _, ch := range rchs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
return cases
}
func BenchmarkSelect(b *testing.B) {
var c1 = make(chan int)
var c2 = make(chan int)
var c3 = make(chan int)
go func() {
for {
c1 <- 1
}
}()
go func() {
for {
c2 <- 2
}
}()
go func() {
for {
c3 <- 3
}
}()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
select {
case <-c1:
case <-c2:
case <-c3:
}
}
}
func BenchmarkReflectSelect(b *testing.B) {
var c1 = make(chan int)
var c2 = make(chan int)
var c3 = make(chan int)
go func() {
for {
c1 <- 1
}
}()
go func() {
for {
c2 <- 2
}
}()
go func() {
for {
c3 <- 3
}
}()
chs := createCases([]chan int{c1, c2, c3})
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, _ = reflect.Select(chs)
}
}
運(yùn)行一下該benchmark:
$go test -bench .
goos: darwin
goarch: amd64
pkg: github.com/bigwhite/experiments/reflect-operate-channel/select-benchmark
... ...
BenchmarkSelect-8 2765396 427.8 ns/op 0 B/op 0 allocs/op
BenchmarkReflectSelect-8 1839706 806.0 ns/op 112 B/op 6 allocs/op
PASS
ok github.com/bigwhite/experiments/reflect-operate-channel/select-benchmark 3.779s
我們看到:reflect.Select的執(zhí)行效率相對于select還是要差的,并且在其執(zhí)行過程中還要做額外的內(nèi)存分配。
本文介紹了reflect.Select與SelectCase的結(jié)構(gòu)以及如何使用它們在不同場景下操作channel。但大多數(shù)情況下,我們是不需要使用reflect.Select,常規(guī)select語法足以滿足我們的要求。并且reflect.Select有對cases數(shù)量的約束,最大支持65536個cases,雖然這個約束對于大多數(shù)場合而言足夠用了。
本文涉及的示例源碼可以在這里[3]下載。
《Go并發(fā)編程實戰(zhàn)課》: http://gk.link/a/11OCq
[2]reflect包: https://tonybai.com/2021/04/19/variable-operation-using-reflection-in-go
[3]這里: https://github.com/bigwhite/experiments/tree/master/reflect-operate-channel
[4]“Gopher部落”知識星球: https://wx.zsxq.com/dweb2/index/group/51284458844544
聯(lián)系客服