首先需要澄清一个事实:redis
服务端是单线程处理客户端请求,也就是说客户端请求在服务端是串行化执行的,因此对服务端来说,并不存在并发问题。但业务方却存在并发操作redis
中的同一个key
的情况。所以如何让A
客户端知道B
客户端正在操作它想操作的 key
,就成了必须要讨论的问题。
那么开始总结下方案吧:
1. SETNX key value //key存在就不做任何操作,返回0;不存在操作成功返回1
这种方式通过对需要操作的key
加锁来保证并发操作的串行化。这里我们以Golang
代码为例来举例说明该操作。先看多个协程写同一个key
的情况。代码如下:
package main
import (
"fmt"
"github.com/garyburd/redigo/redis"
"runtime"
"sync"
"time"
)
var w sync.WaitGroup
func newRdsPool(server, auth string) *redis.Pool {
return &redis.Pool{
MaxIdle: 100,
MaxActive: 30,
IdleTimeout: 60 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server)
if err != nil {
return nil, err
}
if auth == "" {
return c, err
}
if _, err := c.Do("AUTH", auth); err != nil {
c.Close()
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func g1(r redis.Conn) {
for i := 0; i < 2; i++ {
if _, err := redis.String(r.Do("set", "hello", "1")); err != nil {
fmt.Println(err)
}
time.Sleep(10 * time.Millisecond)
}
w.Done()
}
func g2(r redis.Conn) {
for i := 0; i < 2; i++ {
if _, err := redis.String(r.Do("set", "hello", "2")); err != nil {
fmt.Println(err)
}
time.Sleep(10 * time.Millisecond)
}
w.Done()
}
func main() {
w.Add(2)
runtime.GOMAXPROCS(runtime.NumCPU())
var rc1 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
var rc2 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
defer rc1.Close()
defer rc2.Close()
go g1(rc1)
go g2(rc2)
w.Wait()
}
执行上面的代码之后,hello
的值在1和2之间徘徊。希望出现的是如果协程1在操作时候,协程2就放弃操作,也即让操作串行化。这样就需要有一个锁来保证不能同时让两个协程进去临界区。setnx = set if not exists
不存在返回1,存在返回0。通过这个机制可以判断当前的lock
是否已经被设置了。lock
必须给一个过期时间,因为很有可能goroutine1
在do work
的时候出现panic
,这样就导致goroutine2
一直在尝试获取锁。
package main
import (
"fmt"
"github.com/garyburd/redigo/redis"
"runtime"
"sync"
"time"
)
var w sync.WaitGroup
func newRdsPool(server, auth string) *redis.Pool {
return &redis.Pool{
MaxIdle: 100,
MaxActive: 30,
IdleTimeout: 60 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server)
if err != nil {
return nil, err
}
if auth == "" {
return c, err
}
if _, err := c.Do("AUTH", auth); err != nil {
c.Close()
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func g1(r redis.Conn) {
var lock int64
var lock_timeout int64 = 2
var lock_time int64
var now int64
for lock != 1 {
now = time.Now().Unix()
lock_time = now + lock_timeout
lock, err1 := redis.Int64(r.Do("setnx", "foo", lock_time))
lockValue1, err2 := redis.Int64(r.Do("get", "foo"))
if lock == 1 && err1 == nil {
break
} else {
if now > lockValue1 && err2 == nil {
lockValue2, err3 := redis.Int64(r.Do("getset", "foo", lock_time))
if err3 == nil && now > lockValue2 {
break
} else {
fmt.Println(`g1 not get lock`)
time.Sleep(1000 * time.Millisecond)
}
} else {
fmt.Println(`g1 not get lock`)
time.Sleep(1000 * time.Millisecond)
}
}
}
for i := 0; i < 5; i++ {
if _, err := redis.String(r.Do("set", "hello", "1")); err != nil {
fmt.Println(err)
}
fmt.Println(`g1 now work... `)
time.Sleep(1 * time.Second)
}
if time.Now().Unix() < lock_time {
if _, err4 := redis.Int64(r.Do("del", "foo")); err4 != nil {
fmt.Println(err4)
}
}
w.Done()
}
func g2(r redis.Conn) {
var lock int64
var lock_timeout int64 = 2
var lock_time int64
var now int64
for lock != 1 {
now = time.Now().Unix()
lock_time = now + lock_timeout
lock, err1 := redis.Int64(r.Do("setnx", "foo", lock_time))
lockValue1, err2 := redis.Int64(r.Do("get", "foo"))
if lock == 1 && err1 == nil {
break
} else {
if now > lockValue1 && err2 == nil {
lockValue2, err3 := redis.Int64(r.Do("getset", "foo", lock_time))
if err3 == nil && now > lockValue2 {
break
} else {
fmt.Println(`g2 not get lock`)
time.Sleep(1000 * time.Millisecond)
}
} else {
fmt.Println(`g2 not get lock`)
time.Sleep(1000 * time.Millisecond)
}
}
}
for i := 0; i < 5; i++ {
if _, err := redis.String(r.Do("set", "hello", "2")); err != nil {
fmt.Println(err)
}
fmt.Println(`g2 now work... `)
time.Sleep(1 * time.Second)
}
if time.Now().Unix() < lock_time {
if _, err4 := redis.Int64(r.Do("del", "foo")); err4 != nil {
fmt.Println(err4)
}
}
w.Done()
}
func main() {
w.Add(2)
runtime.GOMAXPROCS(runtime.NumCPU())
var rc1 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
var rc2 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
defer rc1.Close()
defer rc2.Close()
go g1(rc1)
go g2(rc2)
w.Wait()
}
上面的代码给出了两个goroutine
通过锁达到串行化操作同一个key
的效果。好了,setnx
的解法就到此为止了。
2. MULTI、DISCARD、 EXEC // redis事务
利用MULTI 、DISCARD、 EXEC
可以完成上面类似的操作,为了证明两个goroutine
的执行是串行化的,在goroutine1
的最后删除了hello这个key,goroutine2
最后的输出来没有获取到任何东西nil
。这也证明了MULTI
和EXEC
之间的逻辑是一个执行体,不受外界的干扰,但这里要说明下MULTI
和EXEC
只能保证它们之间的执行是一个整体不受干扰,但如果在A
执行事务之前就有人把hello
的值修改了,这样A
事务就会在修改后的结果上继续操作。这种情况是否正常需要就具体业务来分析。针对这种情况这里不做过多分析了。
package main
import (
"fmt"
"github.com/garyburd/redigo/redis"
"runtime"
"sync"
"time"
)
var w sync.WaitGroup
func newRdsPool(server, auth string) *redis.Pool {
return &redis.Pool{
MaxIdle: 100,
MaxActive: 30,
IdleTimeout: 60 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server)
if err != nil {
return nil, err
}
if auth == "" {
return c, err
}
if _, err := c.Do("AUTH", auth); err != nil {
c.Close()
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func g1(r redis.Conn) {
r.Send("MULTI")
fmt.Println(`g1 MULTI begin ...`)
for i := 0; i < 5; i++ {
if _, err := redis.String(r.Do("set", "hello", "10")); err != nil {
fmt.Println(err)
}
fmt.Println(`g1 now work... `)
}
time.Sleep(1 * time.Second)
r.Send("DEL", "hello")
g, _ := r.Do("EXEC")
fmt.Println(`g1 MULTI end ...`, g)
w.Done()
}
func g2(r redis.Conn) {
r.Send("MULTI")
fmt.Println(`g2 MULTI begin ...`)
for i := 0; i < 5; i++ {
if _, err := redis.String(r.Do("get", "hello")); err != nil {
fmt.Println(err)
}
fmt.Println(`g2 now work... `)
}
time.Sleep(1 * time.Second)
g, _ := r.Do("EXEC")
fmt.Println(`g2 MULTI end ...`, g)
w.Done()
}
func main() {
w.Add(2)
runtime.GOMAXPROCS(runtime.NumCPU())
var rc1 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
var rc2 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
defer rc1.Close()
defer rc2.Close()
go g1(rc1)
go g2(rc2)
w.Wait()
}
如果你有更多优雅的方式解决这类问题,请在评论区拍砖. end~