go_教程_03

channels

channels: routines之前可以传递数据的管道

  • 默认的channel只能有对应的接受者(<- chan)
  • 才能发送数据到这个channel(chan <-)
  • 发送数据的channel (<- chan)
  • 接收数据的channel (chan <-)
    fmt.Println("-----channels------")
    messages := make(chan string)
    go func() {
        fmt.Println("i am routines")
        messages <- "ping"
    }()
    msg := <-messages
    fmt.Println(msg)

buffering channels

  • 可以配置一个接收一定量参数列表的channel
  • 可配置接收一定数量的参数
    fmt.Println("-----buffering channels------")
    messages1 := make(chan string, 2)
    messages1 <- "buffered"
    messages1 <- "channel"
    fmt.Println(<-messages1)
    fmt.Println(<-messages1)

channel synchronization 阻塞等待(同步)

  • 同步不同的goroutines的操作
  • 例如一个goroutine阻塞,等待另外一个goroutine结束
// 使用channel通知,别的goroutine工作做完了
func worker(done chan bool) {
    fmt.Println("working....")
    time.Sleep(time.Second)
    fmt.Println("done")
    done <- true
}
    done := make(chan bool, 1)
    go worker(done) //启动一个异步的goroutine,传一个channel进去
    <-done          // 主线程阻塞直到收到这个channel里面的信号,继续往下执行
    // 去掉<- done, 程序会在worker没开始就退出

channel directions 数据流方向

//把msg写入pings这个channel
func ping(pings chan<- string, msg string) {
    pings <- msg
}

//把ping这个channel的消息拿出来,写入pong这个channel
func pong(pings <-chan string, pongs chan<- string) {
    msg := <-pings
    pongs <- msg
}
    fmt.Println("------channel directions-------")
    //channel directions
    pings := make(chan string, 1)
    pongs := make(chan string, 1)
    ping(pings, "pass message")
    pong(pings, pongs)
    fmt.Println(<-pongs)

select

  • 多个channel操作的等待
  • 结合goroutine和channels功能强大
c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    //同事等待两个并发的goroutines到来
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
    //另一种实现方式
    //msg1 := ""
    //msg2 := ""
    // for {
    //   select {
    //   case msg1 = <-c1:
    //       fmt.Println("received", msg1)
    //   case msg2 = <-c2:
    //       fmt.Println("received",msg2)
    //   }
    //   if msg1 == "one" && msg2 == "two" {
    //       break
    //   }
    // }

Timeouts 超时

  • 调用外部资源,或者有执行时间限制的时候,timeout很有用
  • 使用channel和select实现很方便
  • select和timeout的模式,通过channel交流的goroutine实现
  • go的很多重要的功能都是基于channel和select
c3 := make(chan string, 1)
    go func() {
        time.Sleep(2 * time.Second) // 模拟2秒
        c3 <- "result 1"
    }()
    select {
    case res := <-c3: //获取channel的值
        fmt.Println(res)
    case <-time.After(1 * time.Second): //获取channel的值超时
        fmt.Println("timeout 1")
    }

    c4 := make(chan string, 1)
    go func() {
        time.Sleep(2 * time.Second)
        c4 <- "result 2"
    }()
    select {
    case res := <-c4: //获取channel的值
        fmt.Println(res)
    case <-time.After(3 * time.Second): //获取channel的值超时
        fmt.Println("timeout 2")
    }

Non-Blocking Channel Operations

  • 非阻塞接收,使用select的default实现:
  • 如果messages2接收到消息,执行case
  • 如果messages2没有接收到消息,执行default
    messages2 := make(chan string)
    signals := make(chan bool)
    select {
        case msg := <-messages2:
            fmt.Println("received message2", msg)
        default:
            fmt.Println("no messages2 received")
        }
    
  • 非阻塞发送,使用select的default实现:
  • msg2是无法发送到messages2,因为这个channel没有buffer,
  • 也没有接收者,所以default执行
    msg2 := "hi"
    select {
    case messages2 <- msg2:
        fmt.Println("sent message", msg2)
    default:
        fmt.Println("no message sent")
    }
  • 对多个channel实现,非阻塞的接收:
  • 使用多个case实现
select {
    case msg := <-messages2:
        fmt.Println("received message", msg)
    case sig := <-signals:
        fmt.Println("received signal", sig)
    default:
        fmt.Println("no activity")
    }

Closing Channels

Closing Channels: 没有值可以发送到这个channel

  • 体现出接收者把工作做完
  • 当主线程没有工作输入到jobs这个channel,就将它关闭
  • 其中goroutine是worker线程
    jobs := make(chan int, 5)
    done1 := make(chan bool)
  • work goroutine
  • 当jobs这channel已经关闭并且所有值都被接收后,more=false
  • 当more=false, 把信息输入到done1这个channel通知主线程
    go func() {
        for {
            j, more := <-jobs
            if more {
                fmt.Println("received job", j)
            } else {
                fmt.Println("received all jobs")
                done1 <- true
                return
            }
        }

    }()
  • 主线程发送3个job到jobs这个channel
  • 发送完成就关闭这个channel
for j := 1; j <= 3; j++ {
        jobs <- j
        fmt.Println("sent job", j)
    }
    close(jobs)
    fmt.Println("send all jobs")
  • 主线程阻塞直到done1这个channel接收到信息,叫醒主线程
    <-done1

Range over Channels

  • 使用for loop遍历channel里面的值
  • for loop 循环完2次会结束,因为我们前面关闭了channel
  • 说明:关闭一个为不空的channel是可以的,
  • 并且关闭后,值还是可以被接收的
queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue)

    for elem := range queue {
        fmt.Println(elem)
    }

Timers

  • 指定在未来的一个时间点执行一次
  • timer1创建的时候往timer1.C这个channel发送一个值
  • 当从timer1.C接收到值的时候这个channel就过期
  • 这个接收到值的时间就是创建timer1的时候设置的时间
    timer1 := time.NewTimer(2 * time.Second)

    <-timer1.C // timer.C 是一个channel,接收值
    fmt.Println("timer 1 expired")

    timer2 := time.NewTimer(time.Second)
    go func() {
        <-timer2.C //异步接收值
        fmt.Println("timer 2 expired")
    }()
    stop2 := timer2.Stop() //手动停止这个timer
    if stop2 {
        fmt.Println("timer 2 stopped")
    }

Tickers

  • 你想在一个时间间隔内重复做一件事
  • 创建一个ticker的时候就会往ticker.C发送值,
  • 遍历这个ticker.C的channel的值,就可以在间隔的时间做操作
  • 每个遍历出值刚刚好ticker设置好的间隔时间
fmt.Println("---Tickers---")
    //创建一个500ms的ticker,
    ticker := time.NewTicker(500 * time.Millisecond)
    go func() {
        // 遍历这个channel的range
        for t := range ticker.C {
            fmt.Println("ticker at ", t)
        }
    }()
    //1600ms之后手动停止这个ticker
    time.Sleep(1600 * time.Millisecond)
    ticker.Stop()
    fmt.Println("ticker stopped")

worker pools

  • 使用goroutine和channel实现worker pool
  • <-chan是发送的channel
  • chan<-是接收的channel
func worker2(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

    fmt.Println("---worker pools---")
    /**
    - 使用goroutine和channel实现worker pool
    */
    //jobs1的channel用来发送工作
    //result的channel用来收集结果
    jobs1 := make(chan int, 100)
    result := make(chan int, 100)
    //启动3个worker
    for w := 1; w <= 3; w++ {
        go worker2(w, jobs1, result)
    }

    //发送5个工作到jobs1这个channel,发送完后关闭
    for j := 1; j <= 5; j++ {
        jobs1 <- j
    }
    close(jobs1)

    //收集所有工作的结果
    for a := 1; a <= 5; a++ {
        <-result
    }

rate limiting

    fmt.Println("---rate limiting---")
    requests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        requests <- i
    }
    close(requests)
    //limiter channel 每200ms接收一个值
    limiter := time.Tick(200 * time.Millisecond)

    for req := range requests {
        //阻塞,通过获取limiter channel的值,再执行req
        //即使每200ms才会执行一次req
        <-limiter
        fmt.Println("requst :", req, time.Now())
    }
    burstyLimiter := make(chan time.Time, 3)

    //Fill up the channel to represent allowed bursting
    for i := 0; i < 3; i++ {
        burstyLimiter <- time.Now()
    }

    // 异步填充这个channel
    go func() {
        //200ms内,允许添加3个值到burstyLimiter channel
        for t := range time.Tick(200 * time.Millisecond) {
            fmt.Println("t : ", t)
            burstyLimiter <- t
        }
    }()

    //请求过来,放入burstyRequest channel
    burstyRequest := make(chan int, 9)
    for i := 1; i <= 9; i++ {
        burstyRequest <- i
    }
    close(burstyRequest)

    // 9个请求的前3个请求,可以在200毫秒内处理完
    for req := range burstyRequest {
        <-burstyLimiter
        fmt.Println("request", req, time.Now())
    }

    /**

    Running our program we see the first batch of requests handled once every ~200 milliseconds as desired.
    For the second batch of requests we serve the first 3 immediately because of the burstable rate limiting, then serve the remaining 2 with ~200ms delays each.
    */

atomic counters

//被多个goroutines使用的原子化的计数器,使用sync/atomic package
    var ops uint64
    //开启50个goroutines每个线程都不断累加这个变量
    for i := 0; i < 50; i++ {
        go func() {
            //for { //改为每个线程都加1,结果刚刚好50说明,这个变量是并发安全的
            //原子化增加方法,传入变量内存地址
            atomic.AddUint64(&ops, 1)
            fmt.Println("tmp ops: ", atomic.LoadUint64(&ops))
            //等待1ms
            time.Sleep(time.Millisecond)
            //}
        }()
    }
    time.Sleep(time.Second)
    //为了安全地使用这个变量,当这个变量被过个goroutines使用时,
    // 我们从当前值复制一个副本
    opsFinal := atomic.LoadUint64(&ops)
    fmt.Println("ops: ", opsFinal)

Mutexes

/**
    在被多个goroutines调用时,
    上面使用了atomic operations去管理简单的计数器状态,
    对于更加复杂的情况,我们使用mutex去安全获取数据
    */

    var state = make(map[int]int)

    var mutex = &sync.Mutex{}

    var readOps uint64
    var writeOps uint64

    for r := 0; r < 100; r++ {
        go func() {
            total := 0
            for {
                key := rand.Intn(5)
                mutex.Lock()
                total += state[key]
                mutex.Unlock()
                atomic.AddUint64(&readOps, 1)
                time.Sleep(time.Millisecond)
            }
        }()

        for w := 0; w < 10; w++ {
            go func() {
                key := rand.Intn(5)
                val := rand.Intn(100)
                mutex.Lock()
                state[key] = val
                mutex.Unlock()
                atomic.AddUint64(&writeOps, 1)
                time.Sleep(time.Millisecond)
            }()
        }
    }

    time.Sleep(time.Second)

    readOpsFinal := atomic.LoadUint64(&readOps)
    fmt.Println("readOps: ", readOpsFinal)
    writeOpsFinal := atomic.LoadUint64(&writeOps)
    fmt.Println("writeOps: ", writeOpsFinal)

    mutex.Lock()
    fmt.Println("state: ", state)
    mutex.Unlock()

Stateful Goroutines

  • 上面使用显式锁和互斥(mutexes)来实现同步代码块在不同的goroutines之间
  • 另外一种方式是使用goroutines和channel内置的同步功能,去实现。
  • channel-based方式的原理:共享内存只属于一个goroutine所有
 // TODO:

sort 排序

  • sort方法对于内置类型有效
  • in-place sorting, 所以返回同一个slice而不是一个新的对象
strs := []string{"c","a","b"}
    sort.Strings(strs)
    fmt.Println("strings: ",strs)

    ints := []int{7,2,4}
    sort.Ints(ints)
    fmt.Println("Ints: ",ints)

    //- 判断是否已经排序
    s := sort.IntsAreSorted(ints)
    fmt.Println("sorted:",s)

sort by function 重写sort接口自定义查询

   // todo

panic 异常捕捉

    panic("a problem") //这函数会主断程序
    _, err := os.Create("./tmp/file")
    if err != nil {
        panic(err)
    }

defer 延迟执行

func createFile(p string) *os.File {
    fmt.Println("creating")
    f, err := os.Create(p)
    if err != nil {
        panic(err)
    }
    return f
}
func writeFile(f *os.File) {
    fmt.Println("writing")
    fmt.Fprintf(f,"data")
}
func closeFile(f *os.File){
    fmt.Println("closing")
    f.Close()
}
func main(){
    f := createFile("/tmp/defer.txt")
    defer closeFile(f) //这个方法会在writeFile方法执行后,关闭main主程序之前执行
    writeFile(f)

    原文作者:Kate_Blog
    原文地址: https://www.jianshu.com/p/769c36377e69
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞