【go并发编程】

GO 并发编程

协程(进程 线程)

  1. 进程是“程序执行的一个实例” ,担当分配系统资源的实体。进程创建必须分配一个完整的独立地址空间。进程切换只发生在内核态。
  2. 线程:线程是进程的一个执行流,独立执行它自己的程序代码。
  3. 协程:协程不是进程或线程,其执行过程更类似于子例程,或者说不带返回值的函数调用。在语言级别可以创建并发协程,然后编写代码去进行管理。go将这一步承包下来,使协程并发运行成本更低。
func main() {
    http.HandleFunc("/next", handler)
    // func这个函数会是以协程的方式运行。这样就可以提供程序的并发处理能力
    go func() {
        for i := 0; ; i++ {
            nextID <- i
        }
    }()
    http.ListenAndServe("localhost:8080", nil)
}

goruntime

参考goruntime详解,操作系统对cpu有自己的scheduler方案,如任务A在执行完后,选择哪个任务来执行,使得某个因素(如进程总执行时间,或者磁盘寻道时间等)最小,达到最优的服务。
Go有自己的scheduler,语言级别实现了并发。

每一个Go程序都附带一个runtime,runtime负责与底层操作系统交互,也都会有scheduler对goruntines进行调度。在scheduler中有三个非常重要的概念:P,M,G。
详情后续再写。

# Goroutine scheduler
# The scheduler's job is to distribute ready-to-run goroutines over worker threads.
#
# The main concepts are:
# G - goroutine.
# M - worker thread, or machine.
# P - processor, a resource that is required to execute Go code.
#     M must have an associated P to execute Go code, however it can be
#     blocked or in a syscall w/o an associated P.
#
# Design doc at https://golang.org/s/go11sched.

runtime包与goroutime

尽管 Go 编译器产生的是本地可执行代码,这些代码仍旧运行在 Go 的 runtime(这部分的代码可以在 runtime 包中找到)当中。这个 runtime 类似 Java 和 .NET 语言所用到的虚拟机,它负责管理包括内存分配、垃圾回收(第 10.8 节)、栈处理、goroutine、channel、切片(slice)、map 和反射(reflection)等等。

  • Gosched:让当前线程让出 cpu 以让其它线程运行,它不会挂起当前线程,因此当前线程未来会继续执行
  • NumCPU:返回当前系统的 CPU 核数量
  • GOMAXPROCS:设置最大的可同时使用的 CPU 核数
  • Goexit:退出当前 goroutine(但是defer语句会照常执行)
  • NumGoroutine:返回正在执行和排队的任务总数
  • GOOS:目标操作系统

NumCPU

package main
import (
    "fmt"
    "runtime"
)
func main() {
    fmt.Println("cpus:", runtime.NumCPU())
    fmt.Println("goroot:", runtime.GOROOT())
    fmt.Println("archive:", runtime.GOOS)
    // 4
    // /usr/local/golang
    // linux
}

GOMAXPROCS

package main

import (
    "fmt"
    "runtime"
)

func init() {
    runtime.GOMAXPROCS(1)
}

func main() {
    // 任务逻辑...
}

Golang 默认所有任务都运行在一个 cpu 核里,如果要在 goroutine 中使用多核,可以使用 runtime.GOMAXPROCS 函数修改,当参数小于 1 时使用默认值。

Gosched

这个函数的作用是让当前 goroutine 让出 CPU,当一个 goroutine 发生阻塞,Go 会自动地把与该 goroutine 处于同一系统线程的其他 goroutine 转移到另一个系统线程上去,以使这些 goroutine 不阻塞。

package main

import (
    "fmt"
    "runtime"
)

func init() {
    runtime.GOMAXPROCS(1)  # 使用单核
}

func main() {
    exit := make(chan int)
    go func() {
        defer close(exit)
        go func() {
            fmt.Println("b")
        }()
    }()

    for i := 0; i < 4; i++ {
        fmt.Println("a:", i)

        if i == 1 {
            runtime.Gosched()  #切换任务
        }
    }
    <-exit
}
# 运行结果
# a: 0
# a: 1
# b: 
# a:2
# a: 3

channel

channel是Go语言在语言级别提供的goroutine间的通信方式。我们可以使用channel在两个或 多个goroutine之间传递消息。
channel 会某种情况下出现阻塞,通过控制channel的阻塞来管理协程的并发与流程控制。

channel类型

chan T          // 可以接收和发送类型为 T 的数据
chan<- float64  // 只可以用来发送 float64 类型的数据(可以关闭)
<-chan int      // 只可以用来接收 int 类型的数据(也不能关闭)
func counter(out chan<- int) {
    for x := 0; x < 100; x++ {
        out <- x
    }
    close(out)
}
func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}
func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}
func main() {
    naturals := make(chan int)
    squares := make(chan int)
    go counter(naturals)
    go squarer(squares, naturals)
    printer(squares)
}

这里使用了单向channel。很明显数据的流向是单向的。获取的地方不应该对channel赋值。这样把一个双向的channel转为一个单向的channel能够防止channel被滥用。降低了风险。

channel初始化

make(chan int, 100)
make(chan int)

非缓冲的Channel

ch1 := make(chan int, 1) //缓冲通道
ch2 := make(chan int, 0) //非缓冲通道
ch3 := make(chan int) //非缓冲通道

非缓冲通道特性:

  • 向此类通道发送元素值的操作会被阻塞,直到至少有一个针对该通道的接收操作开始进行为止。
  • 从此类通道接收元素值的操作会被阻塞,直到至少有一个针对该通道的发送操作开始进行为止。
  • 针对非缓冲通道的接收操作会在与之相应的发送操作完成之前完成。

对于第三条要特别注意,发送操作在向非缓冲通道发送元素值的时候,会等待能够接收该元素值的那个接收操作。并且确保该元素值被成功接收,它才会真正的完成执行。而缓冲通道中,刚好相反,由于元素值的传递是异步的,所以发送操作在成功向通道发送元素值之后就会立即结束(它不会关心是否有接收操作)

make(chan int) 和 make(chan int, 1)

package main
import "fmt"
func main() {
    var c = make(chan int)
    var a string

    go func() {
        a = "hello world"
        <-c
    }()

    c <- 0
    fmt.Println(a)
}

上面的例子会打印 “hello world”。如果改成 var c = make(chan int, 1) a 可能是 “hello world” 也可能是空,make(chan int) 是 unbuffered channel, send 之后 send 语句会阻塞执行,直到有人 receive 之后 send 解除阻塞,后面的语句接着执行。
所以执行 c <- 0 时会阻塞,直到 <-c, 这时 a 已赋值。

make(chan int, 1) 是 buffered channel, 容量为 1。在 buffer 未满时往里面 send 值并不会阻塞, 只有 buffer 满时再 send 才会阻塞,所以执行到 c <- 0 时并不会阻塞

send语句

c := make(chan int)
defer close(c)
go func() { c <- 3 + 4 }()
i := <-c
fmt.Println(i)

send被执行前(proceed)通讯(communication)一直被阻塞着。如前所言,无缓存的channel只有在receiver准备好后send才被执行。如果有缓存,并且缓存未满,则send会被执行。

往一个已经被close的channel中继续发送数据会导致run-time panic。

往nil channel中发送数据会一致被阻塞着。

receive语句

<-ch用来从channel ch中接收数据,这个表达式会一直被block,直到有数据可以接收。 从一个nil channel中接收数据会一直被block。从一个被close的channel中接收数据不会被阻塞,而是立即返回,接收完已发送的数据后会返回元素类型的零值(zero value)。
如前所述,你可以使用一个额外的返回参数来检查channel是否关闭。

x, ok := <-ch
x, ok = <-ch
var x, ok = <-ch

如果OK 是false,表明接收的x是产生的零值,这个channel被关闭了或者为空。

Range

func main() {
    go func() {
        time.Sleep(1 * time.Hour)
    }()
    c := make(chan int)
    go func() {
        for i := 0; i < 10; i = i + 1 {
            c <- i
        }
        close(c)
    }()
    for i := range c {
        fmt.Println(i)
    }
    fmt.Println("Finished")
}

range c产生的迭代值为Channel中发送的值,它会一直迭代知道channel被关闭。上面的例子中如果把close(c)注释掉,程序会一直阻塞在for …… range那一行。

select

  • 每个case语句里必须是一个IO操作
  • 如果有多个case都可以运行,Select会随机公平地选出一个执行(其他不会执行)。
  • 所有跟在case关键字右边的发送语句或接收语句中的通道表达式和元素表达式都会先被求值。无论它们所在的case是否有可能被选择都会这样。
func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}
func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

我们不想等到通道被关闭后再退出循环,利用一个辅助通道模拟出操作超时。

package main

import (
    "fmt"
    "time"
)
func main(){
    //初始化通道
    ch11 := make(chan int, 1000)
    sign := make(chan int, 1)

    //给ch11通道写入数据
    for i := 0; i < 1000; i++ {
        ch11 <- i
    }
    //关闭ch11通道
    close(ch11)

    //我们不想等到通道被关闭之后再推出循环,我们创建并初始化一个辅助的通道,利用它模拟出操作超时行为
    timeout := make(chan bool,1)
    go func(){
        time.Sleep(time.Millisecond) //休息1ms
        timeout <- false
    }()

    //单独起一个Goroutine执行select
    go func(){
        var e int
        ok := true

        for{
            select {
                case e,ok = <- ch11:
                    if !ok {
                        fmt.Println("End.")
                        break
                    }
                    fmt.Printf("ch11 -> %d\n",e)
                case ok = <- timeout:
                //向timeout通道发送元素false后,该case几乎马上就会被执行, ok = false
                    fmt.Println("Timeout.")
                    break
            }
            //终止for循环
            if !ok {
                sign <- 0
                break
            }
        }

    }()

    //惯用手法,读取sign通道数据,为了等待select的Goroutine执行。
    <- sign
}

上面实现了单个操作的超时,但是那个超时触发器开始计时有点早。

package main

import (
    "fmt"
    "time"
)
func main(){
    //初始化通道
    ch11 := make(chan int, 1000)
    sign := make(chan int, 1)

    //给ch11通道写入数据
    for i := 0; i < 1000; i++ {
        ch11 <- i
    }
    //关闭ch11通道
    //close(ch11),为了看效果先注释掉

    //单独起一个Goroutine执行select
    go func(){
        var e int
        ok := true

        for{
            select {
                case e,ok = <- ch11:
                    if !ok {
                        fmt.Println("End.")
                        break
                    }
                    fmt.Printf("ch11 -> %d\n",e)
                case ok = <- func() chan bool {
                    //经过大约1ms后,该接收语句会从timeout通道接收到一个新元素并赋值给ok,从而恰当地执行了针对单个操作的超时子流程,恰当地结束当前for循环
                    timeout := make(chan bool,1)
                    go func(){
                        time.Sleep(time.Millisecond)//休息1ms
                        timeout <- false
                    }()
                    return timeout
                }():
                    fmt.Println("Timeout.")
                    break
            }
            //终止for循环
            if !ok {
                sign <- 0
                break
            }
        }

    }()

    //惯用手法,读取sign通道数据,为了等待select的Goroutine执行。
    <- sign
}

timeout

我们可能就需要一个超时操作,用来处理超时的情况。 下面这个例子我们会在2秒后往channel c1中发送一个数据,但是select设置为1秒超时,因此我们会打印出timeout 1,而不是result 1。

import "time"
import "fmt"
func main() {
    c1 := make(chan string, 1)
    go func() {
        // time.Sleep(time.Millisecond) 1ms
        time.Sleep(time.Second * 2)
        c1 <- "result 1"
    }()
    select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }
}

其实它利用的是time.After方法,它返回一个类型为<-chan Time的单向的channel,在指定的时间发送一个当前时间给返回的channel中。

Timer和Ticker

我们看一下关于时间的两个Channel。 timer是一个定时器,代表未来的一个单一事件,你可以告诉timer你要等待多长时间,它提供一个Channel,在将来的那个时间那个Channel提供了一个时间值。下面的例子中第二行会阻塞2秒钟左右的时间,直到时间到了才会继续执行。

timer1 := time.NewTimer(time.Second * 2)
<-timer1.C
fmt.Println("Timer 1 expired")

当然如果你只是想单纯的等待的话,可以使用time.Sleep来实现。你还可以使用timer.Stop来停止计时器。

    timer2 := time.NewTimer(time.Second)
    go func() {
        <-timer2.C
        fmt.Println("Timer 2 expired")
    }()
    stop2 := timer2.Stop()
    if stop2 {
        fmt.Println("Timer 2 stopped")
    }

ticker是一个定时触发的计时器,它会以一个间隔(interval)往Channel发送一个事件(当前时间),而Channel的接收者可以以固定的时间间隔从Channel中读取事件。下面的例子中ticker每500毫秒触发一次,你可以观察输出的时间。

ticker := time.NewTicker(time.Millisecond * 500)
go func() {
   for t := range ticker.C {
      fmt.Println("Tick at", t)
    }
}()

类似timer, ticker也可以通过Stop方法来停止。一旦它停止,接收者不再会从channel中接收数据了。

close

总结一下channel关闭后sender的receiver操作。 如果channel c已经被关闭,继续往它发送数据会导致panic: send on closed channel,但是从这个关闭的channel中不但可以读取出已发送的数据,还可以不断的读取零值。

c := make(chan int, 10)
c <- 1
c <- 2
close(c)
fmt.Println(<-c) //1
fmt.Println(<-c) //2
fmt.Println(<-c) //0
fmt.Println(<-c) //0

但是如果通过range读取,channel关闭后for循环会跳出:

c := make(chan int, 10)
c <- 1
c <- 2
close(c)
for i := range c {
   fmt.Println(i)
}
    原文作者:jincheng828
    原文地址: https://segmentfault.com/a/1190000016466500
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞