深入 Go 语言 sync.Map 深度学习笔记#

1. 设计哲学:为何需要 sync.Map#

Go 的内置 map 为了极致的单线程性能而放弃了并发安全。传统方案 map + sync.RWMutex 虽然通用,但在核心服务器场景——读多写少 (Read-Mostly)——表现并不理想。

a) 传统并发 map 方案的局限性#

传统方案的痛点:

  • 在极高并发下,读写锁的原子计数与锁竞争会成为瓶颈
  • 每次读仍需原子增减,累计开销显著
  • 读者之间也会产生缓存行竞争
  • 写锁会阻塞所有读操作,影响整体吞吐量

现实场景需求:

  • 服务配置缓存:启动后几乎只读
  • 全局注册表:大量查询,偶尔注册
  • 连接池管理:频繁获取连接,偶尔创建新连接

b) sync.Map 的核心理念#

设计目标:

  1. 为特定场景而生:服务缓存、配置、注册表等读多写少场景
  2. 读写分离:将读与写分到不同结构上,减少竞争
  3. 无锁读取:以空间换时间,让大多数读操作完全无锁
  4. 工程权衡:牺牲写路径复杂度换取读路径的极致性能

c) 空间换时间的哲学#

sync.Map 体现了经典的工程权衡:

  • 空间成本:维护两套数据结构(read + dirty)
  • 时间收益:读操作在最佳情况下完全无锁
  • 适用性限制:仅在读多写少场景下收益明显

2. 底层结构与核心机制#

sync.Map 的"魔法"源于内部精巧的"两级缓存"式结构,通过读写分离实现高性能并发访问。

a) 结构简图#

+------------------------------------------------+
|                    sync.Map                    |
+------------------------------------------------+
|                                                |
|  read (atomic.Pointer) --+                     | // 原子指针,指向只读数据
|                          |                     |
|  dirty (map[any]any)     |                     | // 读写数据,由互斥锁保护
|                          |                     |
|  mu (sync.Mutex)         |                     | // 保护 dirty map 的锁
|                          |                     |
|  misses (int)            |                     | // read map 未命中的计数器
|                          |                     |
+--------------------------|---------------------+
                           |
                           v
          +--------------------------------------+
          |            readOnly struct           |
          +--------------------------------------+
          | m (map[any]*entry)                   | // 不可变 map
          | amended (bool)                       | // 标记 m 是否比 dirty 旧
          +--------------------------------------+

b) 核心数据结构详解#

sync.Map 主结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type Map struct {
    mu     Mutex        // 保护 dirty 的互斥锁
    read   atomic.Value // 存储 readOnly 结构的原子值
    dirty  map[any]*entry // 可修改的 map,包含所有键值对
    misses int          // read 未命中且 dirty 命中的次数
}

type readOnly struct {
    m       map[any]*entry // 只读 map
    amended bool           // dirty 是否包含 read 中没有的键
}

type entry struct {
    p unsafe.Pointer // 指向实际值,可以是 nil, expunged, 或指向值的指针
}

entry 的三种状态:

  • 有效值:p 指向实际的值
  • nil:键已被删除,但仍存在于 read 中
  • expunged:键已被删除且从 read 中清除

c) 读写分离机制#

read map(快路径):

  • atomic.Value 保护,支持无锁读取
  • 包含稳定的键值对,读操作优先访问
  • 是某个时刻 dirty 的快照,具有不可变性

dirty map(慢路径):

  • 由互斥锁保护的普通 map
  • 包含所有最新的键值对
  • 新增键首先出现在这里

3. 核心机制:读写操作与数据同步#

a) Load(读取)操作流程#

第一阶段:无锁快速路径

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (m *Map) Load(key any) (value any, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        // read 未命中且 dirty 可能有新数据
        m.mu.Lock()
        // 双重检查,防止在获取锁期间 read 被更新
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            m.missLocked()  // 增加 misses 计数
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

性能特点:

  • 最佳情况:read 命中,完全无锁,性能极高
  • 次优情况:read 未命中,需要加锁访问 dirty
  • 双重检查:防止在等待锁期间数据发生变化

b) Store(写入)操作流程#

第一阶段:尝试快速更新

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Map) Store(key, value any) {
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return  // 成功在 read 中更新现有键
    }
    
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        // 键存在于 read 中
        if e.unexpungeLocked() {
            m.dirty[key] = e  // 将删除的键重新加入 dirty
        }
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        // 键仅存在于 dirty 中
        e.storeLocked(&value)
    } else {
        // 新键
        if !read.amended {
            m.dirtyLocked()  // 首次写入新键时复制 read 到 dirty
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

c) 数据同步机制(Promotion)#

触发条件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    // 将 dirty 提升为新的 read
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

提升过程:

  1. misses 计数达到阈值(等于 dirty 的长度)
  2. 原子替换:将 dirty 设为新的 read
  3. 清理状态:清空 dirty,重置 misses
  4. 性能重置:后续读操作重新享受无锁优势

d) Delete(删除)操作#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (m *Map) Delete(key any) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            delete(m.dirty, key)  // 从 dirty 中删除
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()  // 标记为已删除
    }
}

4. 开发者必知实践#

a) 使用场景判断#

适合使用 sync.Map:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 配置缓存:启动后几乎只读
type ConfigCache struct {
    cache sync.Map
}

func (c *ConfigCache) Get(key string) (string, bool) {
    value, ok := c.cache.Load(key)
    if !ok {
        return "", false
    }
    return value.(string), true
}

// 服务注册表:大量查询,偶尔注册
type ServiceRegistry struct {
    services sync.Map
}

func (r *ServiceRegistry) LookupService(name string) *Service {
    if svc, ok := r.services.Load(name); ok {
        return svc.(*Service)
    }
    return nil
}

避免使用 sync.Map:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 错误:频繁写入的计数器
type BadCounter struct {
    counters sync.Map
}

func (c *BadCounter) Increment(key string) {
    // 每次都要写入,sync.Map 优势丧失
    if v, ok := c.counters.Load(key); ok {
        c.counters.Store(key, v.(int)+1)
    } else {
        c.counters.Store(key, 1)
    }
}

// 正确:使用传统方案
type GoodCounter struct {
    mu       sync.RWMutex
    counters map[string]int
}

b) 类型安全的封装模式#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 类型安全的 sync.Map 封装
type StringMap struct {
    m sync.Map
}

func (sm *StringMap) Load(key string) (string, bool) {
    value, ok := sm.m.Load(key)
    if !ok {
        return "", false
    }
    return value.(string), true
}

func (sm *StringMap) Store(key, value string) {
    sm.m.Store(key, value)
}

func (sm *StringMap) Delete(key string) {
    sm.m.Delete(key)
}

func (sm *StringMap) Range(f func(key, value string) bool) {
    sm.m.Range(func(key, value any) bool {
        return f(key.(string), value.(string))
    })
}

c) 遍历与一致性#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 正确的遍历模式
func (m *StringMap) Snapshot() map[string]string {
    snapshot := make(map[string]string)
    m.Range(func(key, value string) bool {
        snapshot[key] = value
        return true  // 继续遍历
    })
    return snapshot
}

// 注意:Range 不保证强一致性
func (m *StringMap) UnsafeCount() int {
    count := 0
    m.Range(func(_, _ string) bool {
        count++  // 在遍历过程中其他 goroutine 可能修改数据
        return true
    })
    return count  // 返回的计数可能不准确
}

5. 性能分析与复杂度#

a) 时间复杂度#

Load 操作:

  • 最佳情况:O(1) - read 命中,无锁访问
  • 一般情况:O(1) - dirty 命中,需要加锁
  • worst case:O(1) - 两次查找都未命中

Store 操作:

  • 现有键更新:O(1) - 可能在 read 中直接更新
  • 新键插入:平摊 O(1),但首次可能触发 O(N) 的复制

Delete 操作:

  • 时间复杂度:O(1) - 类似 Store 操作

b) 空间复杂度#

内存开销:

  • 基础开销:O(N) - N 为键值对数量
  • 额外开销:在数据迁移期间可能达到 O(2N)
  • 接口开销:每个值都需要装箱为 interface{}

c) 性能对比#

场景sync.Mapmap + RWMutex性能倍数
读密集(95%读)极优良好3-5x
读写均衡(50%读)一般良好0.5-0.8x
写密集(10%读)较差优秀0.3-0.5x

读操作性能对比:

1
2
3
// Benchmark 结果示例(Go 1.19)
BenchmarkSyncMapLoad-8               100000000    10.2 ns/op
BenchmarkRWMutexMapLoad-8            50000000     31.4 ns/op

d) 内存与 GC 影响#

优势:

  • read map 的不可变性减少了 GC 扫描压力
  • 读操作无锁,减少了内存屏障开销

劣势:

  • interface{} 装箱增加了 GC 对象数量
  • 数据迁移期间的内存双倍开销

6. 高级主题:性能优化与实现细节#

a) 内存管理优化#

lazy deletion 策略:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// entry 状态转换
const (
    entryDeleted = unsafe.Pointer(nil)
    entryExpunged = unsafe.Pointer(&expunged)
)

// 延迟删除避免立即重新分配 map
func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == entryDeleted || p == entryExpunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, entryDeleted) {
            return true
        }
    }
}

read map 的不可变性:

  • 一旦创建,read map 永不修改
  • 利用了不可变数据结构的并发优势
  • 避免了读操作之间的竞争

b) 缓存局部性优化#

批量操作模式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type BatchMap struct {
    sync.Map
}

// 批量加载,减少锁竞争
func (bm *BatchMap) LoadBatch(keys []string) map[string]any {
    result := make(map[string]any)
    
    // 先尝试从 read 批量读取
    for _, key := range keys {
        if value, ok := bm.Load(key); ok {
            result[key] = value
        }
    }
    
    return result
}

c) 监控与诊断#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// sync.Map 性能监控
type MonitoredSyncMap struct {
    sync.Map
    
    readHits   int64
    readMisses int64
    writes     int64
}

func (m *MonitoredSyncMap) Load(key any) (any, bool) {
    value, ok := m.Map.Load(key)
    if ok {
        atomic.AddInt64(&m.readHits, 1)
    } else {
        atomic.AddInt64(&m.readMisses, 1)
    }
    return value, ok
}

func (m *MonitoredSyncMap) Store(key, value any) {
    atomic.AddInt64(&m.writes, 1)
    m.Map.Store(key, value)
}

func (m *MonitoredSyncMap) Stats() (readHits, readMisses, writes int64) {
    return atomic.LoadInt64(&m.readHits),
           atomic.LoadInt64(&m.readMisses),
           atomic.LoadInt64(&m.writes)
}

7. 面试题深度解析#

a) 问题 1:sync.Map 的适用场景分析#

题目: 详细解释 sync.Map 在什么场景下优于传统的 map + RWMutex 方案,以及其设计原理。

标准答案:

适用场景分析:

  1. 读多写少场景:读写比例至少 80:20 以上
  2. 键集合相对稳定:新键插入不频繁
  3. 高并发读取:大量 goroutine 同时读取

设计原理:

  • 读写分离:read map 无锁读取,dirty map 处理写入
  • 数据提升:通过 misses 计数触发 dirty → read 的提升
  • 延迟删除:使用 entry 状态标记避免立即 map 重分配

性能分析:

读写比例sync.Mapmap + RWMutex推荐方案
90:10优秀良好sync.Map
70:30良好良好看具体需求
50:50一般优秀map + RWMutex

b) 问题 2:数据一致性与内存模型#

题目: 分析以下代码的并发安全性,解释 sync.Map 如何保证数据一致性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
    var m sync.Map
    
    // Goroutine 1: 写入
    go func() {
        m.Store("key1", "value1")
        m.Store("key2", "value2")
    }()
    
    // Goroutine 2: 读取
    go func() {
        if v1, ok1 := m.Load("key1"); ok1 {
            v2, ok2 := m.Load("key2")
            fmt.Printf("v1=%v, v2=%v, ok2=%v\n", v1, v2, ok2)
        }
    }()
}

标准答案:

并发安全性保证:

  1. 原子操作:read map 通过 atomic.Value 保证读取的原子性
  2. 互斥锁:dirty map 通过 mutex 保证写入的原子性
  3. 内存屏障:原子操作提供了必要的内存屏障

可能的输出分析:

  • v1=value1, v2=, ok2=false - key1 已写入,key2 尚未写入
  • v1=value1, v2=value2, ok2=true - 两个键都已写入
  • 不可能出现 key1 未找到但 key2 找到的情况(违反程序顺序)

一致性保证机制:

  • sync.Map 保证单个操作的原子性
  • 但不保证跨操作的一致性(如两次 Load 之间数据可能变化)

c) 问题 3:性能陷阱识别#

题目: 识别并修复以下代码的性能问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type Counter struct {
    counts sync.Map
}

func (c *Counter) Increment(key string) {
    if v, ok := c.counts.Load(key); ok {
        c.counts.Store(key, v.(int)+1)
    } else {
        c.counts.Store(key, 1)
    }
}

func (c *Counter) Get(key string) int {
    if v, ok := c.counts.Load(key); ok {
        return v.(int)
    }
    return 0
}

标准答案:

问题识别:

  1. 频繁写入:计数器场景是写密集的,不适合 sync.Map
  2. read-modify-write:每次递增都涉及读取和写入
  3. 装箱开销:int 值需要装箱为 interface{}

修复方案:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 方案1:使用 map + RWMutex
type Counter struct {
    mu     sync.RWMutex
    counts map[string]int
}

func (c *Counter) Increment(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.counts == nil {
        c.counts = make(map[string]int)
    }
    c.counts[key]++
}

func (c *Counter) Get(key string) int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.counts[key]
}

// 方案2:使用 atomic 操作
type AtomicCounter struct {
    counters sync.Map
}

func (c *AtomicCounter) Increment(key string) {
    actual, _ := c.counters.LoadOrStore(key, new(int64))
    counter := actual.(*int64)
    atomic.AddInt64(counter, 1)
}

func (c *AtomicCounter) Get(key string) int64 {
    if actual, ok := c.counters.Load(key); ok {
        return atomic.LoadInt64(actual.(*int64))
    }
    return 0
}

d) 问题 4:实现原理深度分析#

题目: 解释 sync.Map 中 misses 计数器的作用机制,以及为什么选择 len(dirty) 作为提升阈值。

标准答案:

misses 计数器机制:

  1. 计数规则:只有当 read 未命中且 dirty 命中时才递增
  2. 阈值触发:当 misses >= len(dirty) 时触发数据提升
  3. 性能平衡:平衡了数据新鲜度与读取性能

阈值选择的原理:

1
2
3
4
5
6
7
8
9
// 当 misses 达到 dirty 长度时
if m.misses < len(m.dirty) {
    return  // 继续累积 misses
}

// 数据提升:dirty → read
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0

设计考虑:

  • 过早提升:频繁的数据复制,影响写性能
  • 过晚提升:read 数据过旧,大量请求走慢路径
  • len(dirty) 阈值:意味着平均每个 dirty 中的键被 miss 一次后触发提升

数学模型:

  • 假设 dirty 有 N 个键,read 有 M 个键
  • 当 N-M 个新键被访问 N 次后(平均每个新键被访问一次)
  • 触发提升,使得后续访问这些键时走快路径

8. 最佳实践总结#

a) 使用决策流程#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 决策树:何时使用 sync.Map
func ShouldUseSyncMap(readPercent float64, keyStability bool, concurrency int) bool {
    // 读比例必须足够高
    if readPercent < 0.8 {
        return false
    }
    
    // 键集合应该相对稳定
    if !keyStability {
        return false
    }
    
    // 并发度必须足够高才能体现优势
    if concurrency < 10 {
        return false
    }
    
    return true
}

b) 类型安全封装模式#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 推荐:创建类型安全的包装器
type SafeStringMap struct {
    m sync.Map
}

func NewSafeStringMap() *SafeStringMap {
    return &SafeStringMap{}
}

func (sm *SafeStringMap) Store(key, value string) {
    sm.m.Store(key, value)
}

func (sm *SafeStringMap) Load(key string) (string, bool) {
    if value, ok := sm.m.Load(key); ok {
        return value.(string), true
    }
    return "", false
}

func (sm *SafeStringMap) LoadOrStore(key, value string) (string, bool) {
    actual, loaded := sm.m.LoadOrStore(key, value)
    return actual.(string), loaded
}

func (sm *SafeStringMap) Delete(key string) {
    sm.m.Delete(key)
}

func (sm *SafeStringMap) Range(f func(key, value string) bool) {
    sm.m.Range(func(key, value any) bool {
        return f(key.(string), value.(string))
    })
}

c) 性能监控与调优#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 避免:不恰当的使用模式
func BadPattern() {
    var m sync.Map
    
    // 错误1:频繁写入
    for i := 0; i < 1000000; i++ {
        m.Store(fmt.Sprintf("key%d", i), i)
    }
    
    // 错误2:单 goroutine 使用
    for i := 0; i < 1000000; i++ {
        m.Load(fmt.Sprintf("key%d", rand.Intn(1000000)))
    }
}

// 推荐:合适的使用模式
func GoodPattern() {
    var m sync.Map
    
    // 一次性初始化数据
    for i := 0; i < 1000; i++ {
        m.Store(fmt.Sprintf("config%d", i), fmt.Sprintf("value%d", i))
    }
    
    // 高并发读取
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 10000; j++ {
                key := fmt.Sprintf("config%d", rand.Intn(1000))
                if value, ok := m.Load(key); ok {
                    _ = value.(string)
                }
            }
        }()
    }
    wg.Wait()
}

d) 错误处理与资源管理#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 完整的 sync.Map 使用示例
type ServiceRegistry struct {
    services sync.Map
    logger   *log.Logger
}

func NewServiceRegistry(logger *log.Logger) *ServiceRegistry {
    return &ServiceRegistry{
        logger: logger,
    }
}

func (r *ServiceRegistry) RegisterService(name string, svc *Service) error {
    if name == "" {
        return fmt.Errorf("service name cannot be empty")
    }
    if svc == nil {
        return fmt.Errorf("service cannot be nil")
    }
    
    // 使用 LoadOrStore 避免覆盖现有服务
    if actual, loaded := r.services.LoadOrStore(name, svc); loaded {
        r.logger.Printf("Service %s already registered: %v", name, actual)
        return fmt.Errorf("service %s already exists", name)
    }
    
    r.logger.Printf("Service %s registered successfully", name)
    return nil
}

func (r *ServiceRegistry) GetService(name string) (*Service, error) {
    if value, ok := r.services.Load(name); ok {
        return value.(*Service), nil
    }
    return nil, fmt.Errorf("service %s not found", name)
}

func (r *ServiceRegistry) GetAllServices() map[string]*Service {
    result := make(map[string]*Service)
    r.services.Range(func(key, value any) bool {
        result[key.(string)] = value.(*Service)
        return true
    })
    return result
}

e) 性能基准测试#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 对比测试模板
func BenchmarkSyncMapVsRWMutexMap(b *testing.B) {
    // 测试数据准备
    keys := make([]string, 1000)
    for i := range keys {
        keys[i] = fmt.Sprintf("key%d", i)
    }
    
    b.Run("SyncMap-Read90", func(b *testing.B) {
        var m sync.Map
        // 预填充数据
        for _, key := range keys {
            m.Store(key, "value")
        }
        
        b.ResetTimer()
        b.RunParallel(func(pb *testing.PB) {
            for pb.Next() {
                // 90% 读操作
                if rand.Intn(10) < 9 {
                    key := keys[rand.Intn(len(keys))]
                    m.Load(key)
                } else {
                    // 10% 写操作
                    key := keys[rand.Intn(len(keys))]
                    m.Store(key, "new_value")
                }
            }
        })
    })
    
    b.Run("RWMutexMap-Read90", func(b *testing.B) {
        var mu sync.RWMutex
        m := make(map[string]string)
        
        // 预填充数据
        for _, key := range keys {
            m[key] = "value"
        }
        
        b.ResetTimer()
        b.RunParallel(func(pb *testing.PB) {
            for pb.Next() {
                if rand.Intn(10) < 9 {
                    // 90% 读操作
                    key := keys[rand.Intn(len(keys))]
                    mu.RLock()
                    _ = m[key]
                    mu.RUnlock()
                } else {
                    // 10% 写操作
                    key := keys[rand.Intn(len(keys))]
                    mu.Lock()
                    m[key] = "new_value"
                    mu.Unlock()
                }
            }
        })
    })
}