锁
互斥锁
函数write中的这条defer语句保证了在该函数被执行结束之前互斥锁mutex一定会被解锁。
var mutex sync.Mutex
func write() {
mutex.Lock()
defer mutex.Unlock()
// 省略若干条语句
}
func repeatedlyLock() {
var mutex sync.Mutex
fmt.Println("Lock the lock. (G0)")
mutex.Lock()
fmt.Println("The lock is locked. (G0)")
for i := 1; i <= 3; i++ {
//开启3个协程,mutex已经锁定,所以程序会被阻塞。在unlock之后,随机启动一个。
go func(i int) {
//协程阻塞,只打印这一行。
fmt.Printf("Lock the lock. (G%d)\n", i)
mutex.Lock()
fmt.Printf("The lock is locked. (G%d)\n", i)
}(i)
}
time.Sleep(time.Second)
fmt.Println("Unlock the lock. (G0)")
mutex.Unlock()
fmt.Println("The lock is unlocked. (G0)")
time.Sleep(time.Second)
}
虽然互斥锁可以被直接的在多个Goroutine之间共享,但是我们还是强烈建议把对同一个互斥锁的成对的锁定和解锁操作放在同一个层次的代码块中。例如,在同一个函数或方法中对某个互斥锁的进行锁定和解锁。又例如,把互斥锁作为某一个结构体类型中的字段,以便在该类型的多个方法中使用它。
读写锁
- 多个写操作之间都是互斥的.
- 写操作与读操作之间也都是互斥的.
- 多个读操作之间却不存在互斥关系.
func (*RWMutex) Lock //写锁定
func (*RWMutex) Unlock //写解锁
func (*RWMutex) RLock //读锁定
func (*RWMutex) RUnlock //读解锁
对于同一个读写锁来说,施加在它之上的读锁定可以有多个。因此,只有我们对互斥锁进行相同数量的读解锁,才能够让某一个相应的写锁定获得进行的机会。*sync.RWMutex类型都没有相应的方法让我们获得已进行的读锁定的数量,所以这里是很容易出现问题的。还好我们可以使用defer语句来尽量避免此类问题的发生。
package main
import (
"sync"
"time"
)
var m *sync.RWMutex
func main() {
m = new(sync.RWMutex)
//可以多个同时读
go read(1)
go read(2)
time.Sleep(2 * time.Second)
}
func read(i int) {
println(i, "read start")
m.RLock()
println(i, "reading")
time.Sleep(1 * time.Second)
m.RUnlock()
println(i, "read end")
}
package main
import (
"sync"
"time"
)
var m *sync.RWMutex
func main() {
m = new(sync.RWMutex)
//写的时候啥都不能干
go write(1)
go read(2)
go write(3)
time.Sleep(4 * time.Second)
}
func read(i int) {
println(i, "read start")
m.RLock()
println(i, "reading")
time.Sleep(1 * time.Second)
m.RUnlock()
println(i, "read end")
}
//1 write end结束之后,2才能reading
//2 read end结束之后,3 才能writing
func write(i int) {
println(i, "write start")
m.Lock()
println(i, "writing")
time.Sleep(1 * time.Second)
m.Unlock()
println(i, "write end")
}
条件变量
- 等待通知: wait
阻塞当前线程,直到收到该条件变量发来的通知。 - 单发通知: signal
让该条件变量向至少一个正在等待它的通知的线程发送通知,表示共享数据的状态已经改变。 - 广播通知: broadcast
让条件变量给正在等待它的通知的所有线程都发送通知。
var productCount int
type Factory struct {
locker *sync.Mutex
cond *sync.Cond
goods []int
}
//
func (self *Factory) init() {
self.locker = &sync.Mutex{}
self.cond = sync.NewCond(self.locker)
}
//生产
func (self *Factory) product() {
self.cond.L.Lock()
defer self.cond.L.Unlock()
productCount++
self.goods = append(self.goods, productCount)
self.cond.Signal()
fmt.Println("发信号 ==")
}
//消费
func (self *Factory) consume() {
self.cond.L.Lock()
defer self.cond.L.Unlock()
for {
if len(self.goods) == 0 {
fmt.Println("睡了 =>")
self.cond.Wait()
fmt.Println("醒了 <=")
} else {
break
}
}
fmt.Println(self.goods[0])
self.goods = self.goods[1:]
}
//消费流水线
func (self *Factory) assemblyLine1() {
for {
self.consume()
time.Sleep(time.Millisecond * time.Duration(rand.Int63n(100)))
}
}
//生产流水线
func (self *Factory) assemblyLine0() {
for {
self.product()
time.Sleep(time.Millisecond * time.Duration(rand.Int63n(200)))
}
}
func main() {
runtime.GOMAXPROCS(1)
var factory = &Factory{}
factory.init()
go factory.assemblyLine0()
go factory.assemblyLine1()
select {}
}
原子操作
增或减
函数名称都以Add为前缀,并后跟针对的具体类型的名称。
被操作的类型只能是数值类型
- int32,int64,uint32,uint64,uintptr类型可以使用原子增或减操作
- 第一个参数值必须是一个指针类型的值,以便施加特殊的CPU指令
- 第二个参数值的类型和第一个被操作值的类型总是相同的。
func main(){
var i32 int32
fmt.Println("=====old i32 value=====")
fmt.Println(i32)
//第一个参数值必须是一个指针类型的值,因为该函数需要获得被操作值在内存中的存放位置,以便施加特殊的CPU指令
//结束时会返回原子操作后的新值
newI32 := atomic.AddInt32(&i32,3)
fmt.Println("=====new i32 value=====")
fmt.Println(i32)
fmt.Println(newI32)
var i64 int64
fmt.Println("=====old i64 value=====")
fmt.Println(i64)
newI64 := atomic.AddInt64(&i64,-3)
fmt.Println("=====new i64 value=====")
fmt.Println(i64)
fmt.Println(newI64)
//=====old i32 value=====
//0
//=====new i32 value=====
//3
//3
//=====old i64 value=====
//0
//=====new i64 value=====
//-3
//-3
}
比较并交换CAS
Compare And Swap 简称CAS,在sync/atomic包种,这类原子操作由名称以‘CompareAndSwap’为前缀的若干个函数代表。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
载入Load
存储Store
交换Swap
waitGroup
WaitGroup用于线程同步,WaitGroup等待一组线程集合完成,才会继续向下执行。 主线程(goroutine)调用Add来设置等待的线程(goroutine)数量。 然后每个线程(goroutine)运行,并在完成后调用Done。 同时,Wait用来阻塞,直到所有线程(goroutine)完成才会向下执行。
- Add()
- Done()
- Wait()
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
go func(url string) {
// Launch a goroutine to fetch the URL.
defer wg.Done()
// Fetch the URL.
fmt.Println(url)
}(url)
}
// Wait for all goroutines to finish.
wg.Wait()
fmt.Println("Game Over")
}
临时对象池
Pool用于存储那些被分配了但是没有被使用,而未来可能会使用的值,以减小垃圾回收的压力。
sync.Pool有两个公开的方法。一个是Get
,另一个是Put
。前者的功能是从池中获取一个interface{}类型的值,而后者的作用则是把一个interface{}类型的值放置于池中。
// 一个[]byte的对象池,每个对象为一个[]byte
var bytePool = sync.Pool{
New: func() interface{} {
b := make([]byte, 1024)
return &b
},
}
func main() {
a := time.Now().Unix()
// 不使用对象池
for i := 0; i < 1000000000; i++{
obj := make([]byte,1024)
_ = obj
}
b := time.Now().Unix()
// 使用对象池
for i := 0; i < 1000000000; i++{
obj := bytePool.Get().(*[]byte)//TODO 这里的*[]byte,不太懂什么意思,猜测:返回的interface{}类型由*[]byte实现
_ = obj
bytePool.Put(obj)
}
c := time.Now().Unix()
fmt.Println("without pool ", b - a, "s")
fmt.Println("with pool ", c - b, "s")
}
// without pool 34 s
// with pool 24 s
对象池使用是较简单的,但原生的sync.Pool有个较大的问题:我们不能自由控制Pool中元素的数量,放进Pool中的对象每次GC发生时都会被清理掉。这使得sync.Pool做简单的对象池还可以,但做连接池就有点心有余而力不足了,比如:在高并发的情景下一旦Pool中的连接被GC清理掉,那每次连接DB都需要重新三次握手建立连接,这个代价就较大了。