断续器
计时器 是当想在未来做一些事情 – tickers 是用于定期做一些事情。这里是一个例行程序,周期性执行直到停止。
代码使用与计时器的机制类似:发送值到通道。这里我们将使用通道上的一个范围内来迭代,每隔500ms发送一次。
代码可以像定时器一样停止,当代码停止后,它不会再其通道上接收值。
package main
import (
"time"
"fmt"
)
func main(){
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C{
fmt.Println("Tick at ",t)
}
}()
time.Sleep(time.Millisecond * 1600 )
ticker.Stop()
fmt.Println("Ticker stopped")
}
Go 工作池
在这个例子中,我们将实现如何使用 goroutine 和 channel 实现一个工作池。
这里是工作程序(worker),我们将运行几个并发实例。这些工作程序(worker)将在工作 chan 上接收工作,并将发送相应的结果。这里使用 延时1s的方式模拟工作的过程。
为了使用工作程序(worker)池,需要向他们发送任务并收集相关结果。这里实现的时候使用了两个通道。这启动了 3 个worker,最初被阻止,因为没有任务。
然后手机作业的所有结果。
package main
import (
"fmt"
"time"
)
//worker本体函数
func worker(id int,job <-chan int, result chan<- int){
for j:=range job{
fmt.Println("worker",id,"started job",j)
time.Sleep(time.Second)
fmt.Println("worker",id,"finished job",j)
result<- j*2
}
}
func main(){
jobs:= make(chan int,100)
results := make(chan int,100)
//创建3个worker
for w:=1 ; w<= 3;w++{
go worker(w,jobs,results)
}
//分配5个任务
for j:=1 ;j<= 5 ; j++{
jobs <- j
}
close(jobs)
//等待所有工作完成
for a :=1 ; a<=5 ; a++{
<- results
}
}
Go 速率限制
速率限制是控制资源利用和维持服务质量的重要机制。通过 goroutines,channel,ticker 都可以优雅的支持速率限制。
首先我们来看一下基本速率限制。假设想限制对传入请求的处理。我们需要在同一个通道上处理。
这个限制器通道将 2000ms 接收一个值。这是速率限制方案中的调节器。
通过在服务每个请求之前阻塞来自限制器信道的接收,我们限制自己每200ms接收一个请求。
我们可能希望在速率限制方案中允许端脉冲串请求,同时保持总体速率限制。可以通过缓冲的限制器通道来实现。这个 burstyLimiter通道将允许最多 3 个事件的突发。
填充通道以表示允许突发。
每2000ms,将尝试向 burstyLimiter添加一个新值,最大限制为 3 。现在模拟 5个更多的传入请求。这些传入请求的前三个未超过burstyLimiter 值。
package main
import (
"time"
"fmt"
)
func main(){
requests := make(chan int , 5)
for i:= 1 ; i<= 5 ; i++{
requests <- i
}
close(requests)
limiter := time.Tick(time.Millisecond * 2000)
for req := range requests{
<- limiter
fmt.Println("request",req,time.Now())
}
burstyLimiter := make(chan time.Time , 3)
for i:= 0 ; i<3;i++{
burstyLimiter <- time.Now()
}
go func() {
for t:= range time.Tick(time.Millisecond * 2000){
burstyLimiter <- t
}
}()
burstyRequests := make(chan int , 5)
for i:=1 ; i<= 5 ; i++{
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests{
<- burstyLimiter
fmt.Println("request",req,time.Now())
}
}
Go原子计数器
go语言中管理状态的主要机制是通过通道进行通信。在过去的文章中,我们已经看到了这一点,例如工作池。还有一些其他选项用于管理状态。这里我们将使用 sync/atomic 包来实现由多个 goroutine 访问的原子计数器。
使用一个无符号整数表示计数器(正数)
为了模拟并发更新,将启动 50个 goroutine , 每个增量计数器大学是 1ms。
为了原子地递增计数器,这里使用 AddUint64() 函数,在 ops 计数器的内存地址上使用 & 语法。
为了安全地使用计数器,同时它任然被其他 goroutine 更新。通过 LoadUint64提取一个当前值的副本到 opsFinal。如上所述,需要获取值的内存地址 &ops 给这个函数。
运行程序显示执行了大约 40000次操作。根据自己机器性能可以尝试其他更nice的操作。
package main
import (
"sync/atomic"
"time"
"fmt"
)
func main(){
var ops uint64 = 0
for i:= 0 ; i< 50 ; i++{
go func() {
for{
atomic.AddUint64(&ops,1)
time.Sleep(time.Millisecond * 1 )
}
}()
}
time.Sleep(time.Second * 10)
opsFinal := atomic.LoadUint64(&ops)
fmt.Println("ops",opsFinal)
}