FXJ Wiki

Back

练习 1:并发安全的计数器#

目标:理解锁的基本用法。

任务:实现一个并发安全的计数器,支持 Increment()Get() 操作。

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.Mutex
    count int
}

func (c *Counter) Increment() {
    // TODO: 加锁,count++,解锁
}

func (c *Counter) Get() int {
    // TODO: 加锁,返回 count,解锁
    return 0
}

func main() {
    c := &Counter{}
    var wg sync.WaitGroup

    // 启动 100 个 goroutine,每个都调用 Increment()
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            c.Increment()
        }()
    }

    wg.Wait()
    fmt.Println("最终计数:", c.Get())  // 应该是 100
}
go

参考答案

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *Counter) Get() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}
go

验证:用 go run -race main.go 运行,如果有数据竞争会报错。


练习 2:带超时的任务队列#

目标:理解 Lab 1 里 Master 的任务超时回收逻辑。

任务:实现一个任务队列,任务被取出后如果 10 秒内没有完成,自动放回队列。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID        int
    Status    string    // "idle", "running", "done"
    StartTime time.Time
}

type TaskQueue struct {
    mu    sync.Mutex
    tasks []Task
}

// GetTask 取出一个空闲任务
func (q *TaskQueue) GetTask() (Task, bool) {
    q.mu.Lock()
    defer q.mu.Unlock()
    for i := range q.tasks {
        if q.tasks[i].Status == "idle" {
            q.tasks[i].Status = "running"
            q.tasks[i].StartTime = time.Now()
            return q.tasks[i], true
        }
    }
    return Task{}, false
}

// CompleteTask 标记任务完成
func (q *TaskQueue) CompleteTask(id int) {
    q.mu.Lock()
    defer q.mu.Unlock()
    for i := range q.tasks {
        if q.tasks[i].ID == id {
            q.tasks[i].Status = "done"
            return
        }
    }
}

// RecoverTimeouts 把超时的任务放回队列
func (q *TaskQueue) RecoverTimeouts() {
    q.mu.Lock()
    defer q.mu.Unlock()
    for i := range q.tasks {
        if q.tasks[i].Status == "running" &&
            time.Since(q.tasks[i].StartTime) > 10*time.Second {
            fmt.Printf("任务 %d 超时,重新放回队列\n", q.tasks[i].ID)
            q.tasks[i].Status = "idle"
        }
    }
}

func main() {
    q := &TaskQueue{
        tasks: []Task{
            {ID: 1, Status: "idle"},
            {ID: 2, Status: "idle"},
        },
    }

    // 启动超时检测 goroutine
    go func() {
        for {
            time.Sleep(time.Second)
            q.RecoverTimeouts()
        }
    }()

    // 取出任务但不完成(模拟 worker 崩溃)
    task, _ := q.GetTask()
    fmt.Printf("取出任务 %d\n", task.ID)

    // 等待超时
    time.Sleep(12 * time.Second)
    // 任务应该被放回队列了
}
go

练习 3:原子文件写入#

目标:理解 Lab 1 里 Worker 写输出文件的正确方式。

任务:实现一个函数,把数据原子地写入文件(先写临时文件,再 rename)。

package main

import (
    "fmt"
    "os"
)

func atomicWrite(filename string, data []byte) error {
    // 1. 创建临时文件
    tmpFile, err := os.CreateTemp("", "tmp-*")
    if err != nil {
        return err
    }
    tmpName := tmpFile.Name()

    // 2. 写入数据
    _, err = tmpFile.Write(data)
    tmpFile.Close()
    if err != nil {
        os.Remove(tmpName)
        return err
    }

    // 3. 原子重命名
    return os.Rename(tmpName, filename)
}

func main() {
    err := atomicWrite("output.txt", []byte("hello world\n"))
    if err != nil {
        fmt.Println("写入失败:", err)
        return
    }
    fmt.Println("写入成功")

    // 验证
    data, _ := os.ReadFile("output.txt")
    fmt.Println("文件内容:", string(data))
}
go

练习 4:带去重的请求处理器#

目标:理解 Lab 3 里 KV 服务器的去重逻辑。

任务:实现一个请求处理器,对于相同 ClientID + SeqNum 的请求,只执行一次。

package main

import (
    "fmt"
    "sync"
)

type Request struct {
    ClientID int64
    SeqNum   int
    Key      string
    Value    string
}

type Server struct {
    mu      sync.Mutex
    data    map[string]string
    lastSeq map[int64]int  // clientID -> 最后处理的 seqNum
}

func (s *Server) Put(req Request) string {
    s.mu.Lock()
    defer s.mu.Unlock()

    // 检查是否是重复请求
    if lastSeq, ok := s.lastSeq[req.ClientID]; ok && req.SeqNum <= lastSeq {
        fmt.Printf("重复请求 client=%d seq=%d,跳过\n", req.ClientID, req.SeqNum)
        return "OK"
    }

    // 执行操作
    s.data[req.Key] = req.Value
    s.lastSeq[req.ClientID] = req.SeqNum
    fmt.Printf("执行 Put(%s, %s) client=%d seq=%d\n", req.Key, req.Value, req.ClientID, req.SeqNum)
    return "OK"
}

func main() {
    s := &Server{
        data:    make(map[string]string),
        lastSeq: make(map[int64]int),
    }

    // 正常请求
    s.Put(Request{ClientID: 1, SeqNum: 1, Key: "x", Value: "hello"})

    // 重复请求(模拟网络重传)
    s.Put(Request{ClientID: 1, SeqNum: 1, Key: "x", Value: "hello"})

    // 新请求
    s.Put(Request{ClientID: 1, SeqNum: 2, Key: "x", Value: "world"})

    fmt.Println("最终值:", s.data["x"])  // 应该是 "world"
}
go

做完这四个练习之后#

你已经掌握了 Lab 里最核心的四个模式:

  1. 用锁保护共享状态
  2. 超时检测和任务回收
  3. 原子文件写入
  4. 请求去重

现在可以开始看 Lab 代码了。