go map的线程安全使用

go map的线程安全使用

简单线程安全使用

在很多时候,我们会并发地使用map对象,尤其是在一定规模的项目中,map总会保存goroutine共享的数据。在Go官方blog的Go maps in action一文中,提供了一种简便的解决方案。

var counter = struct{
    sync.RWMutex
    m map[string]int
}{m: make(map[string]int)}

它使用嵌入struct为map增加一个读写锁。

读数据的时候很方便的加锁:

counter.RLock()
n := counter.m["some_key"]
counter.RUnlock()
fmt.Println("some_key:", n)

写数据的时候:

unter.Lock()
counter.m["some_key"]++
counter.Unlock()

demo

type MyMap struct {
    v map[string]string
    sync.RWMutex
}

func (this *MyMap)Put(key string,value string)  {
    this.Lock()
    defer this.Unlock()
    this.v[key]=value
}
func (this *MyMap)Get(key string) string {
    this.RLock()
    defer   this.RUnlock()
    return this.v[key]
}

sync.Map

主要方法

Store(key, value interface{})

说明: 存储一个设置的键值。

LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)

说明: 返回键的现有值(如果存在),否则存储并返回给定的值,如果是读取则返回true,如果是存储返回false。

Load(key interface{}) (value interface{}, ok bool)

说明: 读取存储在map中的值,如果没有值,则返回nil。OK的结果表示是否在map中找到值。

Delete(key interface{})

说明: 删除键对应的值。

Range(f func(key, value interface{}) bool)

说明: 循环读取map中的值。

源码分析

数据结构

type Map struct {
    // 当涉及到dirty数据的操作的时候,需要使用这个锁
    mu Mutex
    // 一个只读的数据结构,因为只读,所以不会有读写冲突。
    // 所以从这个数据中读取总是安全的。
    // 实际上,实际也会更新这个数据的entries,如果entry是未删除的(unexpunged), 并不需要加锁。如果entry已经被删除了,需要加锁,以便更新dirty数据。
    read atomic.Value // readOnly
    // dirty数据包含当前的map包含的entries,它包含最新的entries(包括read中未删除的数据,虽有冗余,但是提升dirty字段为read的时候非常快,不用一个一个的复制,而是直接将这个数据结构作为read字段的一部分),有些数据还可能没有移动到read字段中。
    // 对于dirty的操作需要加锁,因为对它的操作可能会有读写竞争。
    // 当dirty为空的时候, 比如初始化或者刚提升完,下一次的写操作会复制read字段中未删除的数据到这个数据中。
    dirty map[interface{}]*entry
    // 当从Map中读取entry的时候,如果read中不包含这个entry,会尝试从dirty中读取,这个时候会将misses加一,
    // 当misses累积到 dirty的长度的时候, 就会将dirty提升为read,避免从dirty中miss太多次。因为操作dirty需要加锁。
    misses int
}

Load

加载方法,也就是提供一个键key,查找对应的值value,如果不存在,通过ok反映:

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    // 1.首先从m.read中得到只读readOnly,从它的map中查找,不需要加锁
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 2. 如果没找到,并且m.dirty中有新数据,需要从m.dirty查找,这个时候需要加锁
    if !ok && read.amended {
        m.mu.Lock()
        // 双检查,避免加锁的时候m.dirty提升为m.read,这个时候m.read可能被替换了。
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        // 如果m.read中还是不存在,并且m.dirty中有新数据
        if !ok && read.amended {
            // 从m.dirty查找
            e, ok = m.dirty[key]
            // 不管m.dirty中存不存在,都将misses计数加一
            // missLocked()中满足条件后就会提升m.dirty
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

这里有两个值的关注的地方。一个是首先从m.read中加载,不存在的情况下,并且m.dirty中有新数据,加锁,然后从m.dirty中加载。

二是这里使用了双检查的处理,因为在下面的两个语句中,这两行语句并不是一个原子操作。

if !ok && read.amended {
        m.mu.Lock()

虽然第一句执行的时候条件满足,但是在加锁之前,m.dirty可能被提升为m.read,所以加锁后还得再检查m.read,后续的方法中都使用了这个方法。

双检查的技术Java程序员非常熟悉了,单例模式的实现之一就是利用双检查的技术。

可以看到,如果我们查询的键值正好存在于m.read中,无须加锁,直接返回,理论上性能优异。即使不存在于m.read中,经过miss几次之后,m.dirty会被提升为m.read,又会从m.read中查找。所以对于更新/增加较少,加载存在的key很多的case,性能基本和无锁的map类似。

下面看看m.dirty是如何被提升的。 missLocked方法中可能会将m.dirty提升。

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

上面的最后三行代码就是提升m.dirty的,很简单的将m.dirty作为readOnly的m字段,原子更新m.read。提升后m.dirty、m.misses重置, 并且m.read.amended为false。

Store

这个方法是更新或者新增一个entry。

func (m *Map) Store(key, value interface{}) {
    // 如果m.read存在这个键,并且这个entry没有被标记删除,尝试直接存储。
    // 因为m.dirty也指向这个entry,所以m.dirty也保持最新的entry。
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }
    // 如果`m.read`不存在或者已经被标记删除
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() { //标记成未被删除
            m.dirty[key] = e //m.dirty中不存在这个键,所以加入m.dirty
        }
        e.storeLocked(&value) //更新
    } else if e, ok := m.dirty[key]; ok { // m.dirty存在这个键,更新
        e.storeLocked(&value)
    } else { //新键值
        if !read.amended { //m.dirty中没有新的数据,往m.dirty中增加第一个新键
            m.dirtyLocked() //从m.read中复制未删除的数据
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value) //将这个entry加入到m.dirty中
    }
    m.mu.Unlock()
}
func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }
    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    for p == nil {
        // 将已经删除标记为nil的数据标记为expunged
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

你可以看到,以上操作都是先从操作m.read开始的,不满足条件再加锁,然后操作m.dirty。

Store可能会在某种情况下(初始化或者m.dirty刚被提升后)从m.read中复制数据,如果这个时候m.read中数据量非常大,可能会影响性能。

Delete

删除一个键值。

func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()
    }
}

同样,删除操作还是从m.read中开始, 如果这个entry不存在于m.read中,并且m.dirty中有新数据,则加锁尝试从m.dirty中删除。

注意,还是要双检查的。 从m.dirty中直接删除即可,就当它没存在过,但是如果是从m.read中删除,并不会直接删除,而是打标记:

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        // 已标记为删除
        if p == nil || p == expunged {
            return false
        }
        // 原子操作,e.p标记为nil
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

Range

因为for … range map是内建的语言特性,所以没有办法使用for range遍历sync.Map, 但是可以使用它的Range方法,通过回调的方式遍历。

func (m *Map) Range(f func(key, value interface{}) bool) {
    read, _ := m.read.Load().(readOnly)
    // 如果m.dirty中有新数据,则提升m.dirty,然后在遍历
    if read.amended {
        //提升m.dirty
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly) //双检查
        if read.amended {
            read = readOnly{m: m.dirty}
            m.read.Store(read)
            m.dirty = nil
            m.misses = 0
        }
        m.mu.Unlock()
    }
    // 遍历, for range是安全的
    for k, e := range read.m {
        v, ok := e.load()
        if !ok {
            continue
        }
        if !f(k, v) {
            break
        }
    }
}

Range方法调用前可能会做一个m.dirty的提升,不过提升m.dirty不是一个耗时的操作。

    原文作者:吃猫的鱼0
    原文地址: https://www.jianshu.com/p/1d45e31343d8
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞