go channel详解

协程,通道

  • 我们在普通程序中要执行代码如下代码
func main() {
    foo()
    bar()
}
func foo() {
    for i := 0; i < 45; i++ {
        fmt.Println("Foo:", i)
    }
}
func bar() {
    for i := 0; i < 45; i++ {
        fmt.Println("Bar:", i)
    }
}
得到结果
Foo: 41
Foo: 42
Foo: 43
Foo: 44
Bar: 0
Bar: 1
Bar: 2
Bar: 3
这个是按顺序执行的
  • 错误使用协程
package main
import "fmt"
func main() {
    go foo()
    go bar()
}
func foo() {
    for i := 0; i < 45; i++ {
        fmt.Println("Foo:", i)
    }
}
func bar() {
    for i := 0; i < 45; i++ {
        fmt.Println("Bar:", i)
    }
}

由于没有调度,主协程率先执行完毕,代码执行已经关闭,所以协程根本没有执行。

  • 正确使用协程
package main
import (
    "fmt"
    "sync"
)
var wg sync.WaitGroup
func main() {
    wg.Add(2)
    go foo()
    go bar()
    wg.Wait()
}
func foo() {
    for i := 0; i < 45; i++ {
        fmt.Println("Foo:", i)
    }
    wg.Done()
}
func bar() {
    for i := 0; i < 45; i++ {
        fmt.Println("Bar:", i)
    }
    wg.Done()
}

用sync.WaitGroup,可以阻塞主线程,wg.Add(2)加入两个子线程,wg.Wait()一直等待,每当一个执行完成调用函数wg.Done()将一个线程去掉,直到全部执行完子线程。‘’

  • 协程:正如官方所言,goroutine 是一个轻量级的执行单元,相比线程开销更小,完全由 Go 语言负责调度,是 Go 支持并发的核
    心。开启一个 goroutine 非常简单:

  • 协程的运行

package main
   import (
    "fmt"
    "time"
)
func main() {
    go fmt.Println("goroutine message")
    time.Sleep(1) //1
    fmt.Println("main function message")
}
  • 协程的好处,上面的例子我们没有看到协程究竟能有什么好处
package main
import (
    "fmt"
    "sync"
    "time"
)
var wg sync.WaitGroup

func main() {
    wg.Add(3)
    go foo()
    go bar()
    go dar()
    wg.Wait()
}
func foo() {
    for i := 0; i < 45; i++ {
        fmt.Println("Foo:", i)
        time.Sleep(3 * time.Millisecond)
    }
    wg.Done()
}
func bar() {
    for i := 0; i < 45; i++ {
        fmt.Println("Bar:", i)
        time.Sleep(20 * time.Millisecond)
    }
    wg.Done()
}
func dar() {
    for i := 0; i < 45; i++ {
        fmt.Println("dar:", i)
    }
    wg.Done()
}

从上面的代码执行效果看到,协程能够使我们,遇到阻塞,不会阻塞到那,另一个协程还会继续执行。

  • 在前面我们只是用了系统中的单核,go其实是一门异步多线程语言
package main
import (
    "fmt"
    "runtime"
    "sync"
    "time"
)
var wg sync.WaitGroup

func init() {
    runtime.GOMAXPROCS(runtime.NumCPU())
}
func main() {
    wg.Add(2)
    go foo()
    go bar()
    wg.Wait()
}
func foo() {
    for i := 0; i < 45; i++ {
        fmt.Println("Foo:", i)
        time.Sleep(3 * time.Millisecond)
    }
    wg.Done()
}
func bar() {
    for i := 0; i < 45; i++ {
        fmt.Println("Bar:", i)
        time.Sleep(20 * time.Millisecond)
    }
    wg.Done()
}

当某个协程阻塞时,cpu会把其他协程加载进来执行,这样确保cpu不会空闲着。 runtime.GOMAXPROCS(runtime.NumCPU())表示在系统的所有核上并发运行。

  • 竞争当用同一个变量时,该变量会使该变量变化。
package main
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
var wg sync.WaitGroup
var counter int
func main() {
    wg.Add(2)
    go incrementor("Foo:")
    go incrementor("Bar:")
    wg.Wait()
    fmt.Println("Final Counter:", counter)
}
func incrementor(s string) {
    rand.Seed(time.Now().UnixNano())
    for i := 0; i < 20; i++ {
        x := counter
        x++
        time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
        counter = x
        fmt.Println(s, i, "Counter:", counter)
    }
    wg.Done()
}
  • 加锁
package main
import (
   "fmt"
   "math/rand"
   "sync"
   "time"
)
var wg sync.WaitGroup
var counter int
var mutex sync.Mutex
func main() {
   wg.Add(2)
   go incrementor("Foo:")
   go incrementor("Bar:")
   wg.Wait()
   fmt.Println("Final Counter:", counter)
}
func incrementor(s string) {
   for i := 0; i < 20; i++ {
       time.Sleep(time.Duration(rand.Intn(20)) * time.Millisecond)
       mutex.Lock()
       counter++
       fmt.Println(s, i, "Counter:", counter)
       mutex.Unlock()
   }
   wg.Done()
}

  • 原子操作
package main

import (
   "fmt"
   "math/rand"
   "sync"
   "sync/atomic"
   "time"
)
var wg sync.WaitGroup
var counter int64
func main() {
   wg.Add(2)
   go incrementor("Foo:")
   go incrementor("Bar:")
   wg.Wait()
   fmt.Println("Final Counter:", counter)
}
func incrementor(s string) {
   for i := 0; i < 20; i++ {
       time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
       atomic.AddInt64(&counter, 1)
       fmt.Println(s, i, "Counter:", atomic.LoadInt64(&counter)) // access without race
   }
   wg.Done()
}

// go run -race main.go
// vs
// go run main.go

  • 为什么要需要channel
    time.Sleep(1) 这是为了让新开启的 goroutine 有机会得到执行,开启一个 goroutine 之后,后续的代码会继续执行,在上面的例子中后续代码执行完毕程序就终止了,而开启的 goroutine 可能还没开始执行。
    如果尝试去掉 #1 处的代码,程序也可能会正常运行,这是因为恰巧开启的 goroutine 只是简单的执行了一次输出,如果 goroutine 中耗时稍长就会导致只能看到主一句 main function message 。
    换句话话说,这里的 time.sleep 提供的是一种调度机制,这也是 Go 中 channel 存在的目的:负责消息传递和调度。

  • 什么是channel
    Channel 是 Go 中为 goroutine 提供的一种通信机制,channel 是有类型的,而且是有方向的,可以把 channel 类比成 unix
    中的 pipe。channel 是用来传递消息的。

package main
import (
   "fmt"
 )
func main() {
 i := make(chan int)//int 类型
 s := make(chan string)//字符串类型
 r := make(<-chan bool)//只读
 w := make(chan<- []int)//只写
   c := make(chan int)
   go func() {
       fmt.Println("goroutine message")
       c <- 1 //1
   }()
   <-c //2
   fmt.Println("main function message")
}

声明了一个 int 类型的 channel,在 goroutine 中在代码 #1 处向 channel 发送了数据 1 ,在 main 中 #2 处等待数据的接收,如果 c 中没有数据,代码的执行将发生阻塞,直到 c 中数据接收完毕。这是 channel 最简单的用法之一:同步 ,这种类型的 channel 没有设置容量,称之为 unbuffered channel。
发送与等待互相阻塞,

    原文作者:seven_son
    原文地址: https://www.jianshu.com/p/961a512fa456
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞