FXJ Wiki

Back

  1. 并发支持:goroutine 和 channel 让并发编程变得简单
  2. 性能:编译型语言,接近 C 的性能
  3. 简洁性:语法简单,容易学习
  4. 工业应用:Docker、Kubernetes 等都用 Go 编写

1.2 Goroutine - 轻量级线程#

什么是 Goroutine?

Goroutine 是 Go 的轻量级线程,比操作系统线程更轻量(只需要几 KB 内存)。

基本用法

package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello from goroutine!")
}

func main() {
    // 启动一个 goroutine
    go sayHello()

    // 主线程继续执行
    fmt.Println("Hello from main!")

    // 等待 goroutine 执行完成
    time.Sleep(1 * time.Second)
}
go

输出(顺序可能不同):

Hello from main!
Hello from goroutine!
plaintext

关键点

  • go 关键字启动一个新的 goroutine
  • Goroutine 和主线程并发执行
  • 主线程退出时,所有 goroutine 也会退出

1.3 Channel - Goroutine 之间的通信#

什么是 Channel?

Channel 是 goroutine 之间传递数据的管道。

基本用法

func main() {
    // 创建一个 channel
    ch := make(chan string)

    // 启动一个 goroutine
    go func() {
        // 向 channel 发送数据
        ch <- "Hello from goroutine!"
    }()

    // 从 channel 接收数据(会阻塞,直到有数据)
    msg := <-ch
    fmt.Println(msg)
}
go

Channel 的阻塞特性

ch := make(chan int)

// 发送操作会阻塞,直到有人接收
ch <- 42  // 如果没有接收者,这里会永远阻塞

// 接收操作会阻塞,直到有数据
value := <-ch  // 如果没有发送者,这里会永远阻塞
go

带缓冲的 Channel

// 创建一个容量为 3 的 channel
ch := make(chan int, 3)

// 可以发送 3 个数据而不阻塞
ch <- 1
ch <- 2
ch <- 3

// 第 4 个会阻塞(如果没有接收者)
// ch <- 4  // 会阻塞

// 接收数据
fmt.Println(<-ch)  // 1
fmt.Println(<-ch)  // 2
fmt.Println(<-ch)  // 3
go

1.4 Mutex - 保护共享数据#

为什么需要 Mutex?

多个 goroutine 同时访问共享变量会导致竞态条件(race condition)。

错误示例(有竞态条件):

package main

import (
    "fmt"
    "time"
)

var counter = 0

func increment() {
    for i := 0; i < 1000; i++ {
        counter++  // 竞态条件!
    }
}

func main() {
    go increment()
    go increment()

    time.Sleep(1 * time.Second)
    fmt.Println(counter)  // 期望 2000,实际可能是 1500
}
go

正确示例(使用 Mutex):

package main

import (
    "fmt"
    "sync"
    "time"
)

var counter = 0
var mu sync.Mutex

func increment() {
    for i := 0; i < 1000; i++ {
        mu.Lock()
        counter++
        mu.Unlock()
    }
}

func main() {
    go increment()
    go increment()

    time.Sleep(1 * time.Second)
    fmt.Println(counter)  // 总是 2000
}
go

使用 defer 确保解锁

func safeIncrement() {
    mu.Lock()
    defer mu.Unlock()  // 函数返回时自动解锁

    counter++
    // 即使这里发生 panic,也会解锁
}
go

1.5 WaitGroup - 等待多个 Goroutine 完成#

问题:如何等待多个 goroutine 完成?

错误方法

func main() {
    for i := 0; i < 10; i++ {
        go doWork(i)
    }
    // 主线程立即退出,goroutine 可能还没执行完
}
go

正确方法(使用 WaitGroup):

package main

import (
    "fmt"
    "sync"
)

func doWork(id int, wg *sync.WaitGroup) {
    defer wg.Done()  // 完成时通知 WaitGroup

    fmt.Printf("Worker %d starting\\n", id)
    // 做一些工作...
    fmt.Printf("Worker %d done\\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)  // 增加计数
        go doWork(i, &wg)
    }

    wg.Wait()  // 等待所有 goroutine 完成
    fmt.Println("All workers done!")
}
go

第二部分:RPC(远程过程调用)#

2.1 什么是 RPC?#

RPC(Remote Procedure Call)让你可以像调用本地函数一样调用远程函数。

不使用 RPC(复杂):

// 客户端
conn, _ := net.Dial("tcp", "server:1234")
request := "GET /data"
conn.Write([]byte(request))
response := make([]byte, 1024)
n, _ := conn.Read(response)
// 手动解析响应...
go

使用 RPC(简单):

// 客户端
var reply string
err := client.Call("Server.GetData", args, &reply)
// reply 中已经包含了结果
go

2.2 Go RPC 基本用法#

服务端

package main

import (
    "log"
    "net"
    "net/rpc"
)

// 1. 定义服务
type MathService struct{}

// 2. 定义 RPC 方法
// 必须是导出的(首字母大写)
// 必须有两个参数:args 和 reply
// 必须返回 error
func (m *MathService) Add(args *Args, reply *int) error {
    *reply = args.A + args.B
    return nil
}

type Args struct {
    A, B int
}

func main() {
    // 3. 注册服务
    mathService := new(MathService)
    rpc.Register(mathService)

    // 4. 监听端口
    listener, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal(err)
    }

    // 5. 接受连接
    for {
        conn, err := listener.Accept()
        if err != nil {
            continue
        }
        go rpc.ServeConn(conn)
    }
}
go

客户端

package main

import (
    "fmt"
    "log"
    "net/rpc"
)

type Args struct {
    A, B int
}

func main() {
    // 1. 连接服务器
    client, err := rpc.Dial("tcp", "localhost:1234")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // 2. 调用远程方法
    args := Args{A: 7, B: 8}
    var reply int
    err = client.Call("MathService.Add", args, &reply)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("7 + 8 = %d\\n", reply)  // 输出:7 + 8 = 15
}
go

2.3 RPC 的关键概念#

1. 序列化(Serialization)

RPC 需要把数据结构转换成字节流,通过网络传输。

// Go 的 RPC 使用 gob 编码
// 自动处理,不需要手动编码/解码
go

2. 同步 vs 异步

// 同步调用(阻塞)
err := client.Call("Service.Method", args, &reply)
// 等待响应返回

// 异步调用(非阻塞)
call := client.Go("Service.Method", args, &reply, nil)
// 继续执行其他操作
// ...
// 等待结果
<-call.Done
go

3. 错误处理

err := client.Call("Service.Method", args, &reply)
if err != nil {
    // 可能的错误:
    // - 网络错误(连接断开)
    // - 服务器错误(方法不存在)
    // - 超时
    log.Fatal(err)
}
go

2.4 Lab 中使用的 labrpc#

MIT 6.824 的 Lab 使用自己实现的 labrpc 包,而不是标准库的 net/rpc

为什么?

  • 可以模拟网络故障(丢包、延迟、分区)
  • 可以控制消息顺序
  • 方便测试

基本用法(与 net/rpc 类似):

// 服务端
type KVServer struct {
    data map[string]string
}

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) error {
    reply.Value = kv.data[args.Key]
    return nil
}

// 客户端
args := GetArgs{Key: "foo"}
reply := GetReply{}
ok := client.Call("KVServer.Get", &args, &reply)
go

第三部分:网络基础#

3.1 TCP vs UDP#

TCP(传输控制协议)

  • 可靠:保证数据按顺序到达,不丢失
  • 面向连接:需要先建立连接
  • 慢:需要确认、重传

UDP(用户数据报协议)

  • 不可靠:可能丢包、乱序
  • 无连接:直接发送
  • 快:没有确认、重传

Lab 中使用 TCP(通过 Unix domain socket):

// 创建 TCP 监听器
listener, _ := net.Listen("tcp", ":1234")

// 接受连接
conn, _ := listener.Accept()

// 读写数据
conn.Write([]byte("hello"))
buf := make([]byte, 1024)
n, _ := conn.Read(buf)
go

3.2 网络分区(Network Partition)#

什么是网络分区?

网络分区是指网络被分成多个部分,部分之间无法通信。

正常情况:
[Server A] <---> [Server B] <---> [Server C]

网络分区:
[Server A] <---> [Server B]     [Server C]
                  (断开)
plaintext

为什么重要?

分布式系统必须处理网络分区,否则会出现数据不一致。

Lab 中的模拟

// labrpc 可以模拟网络分区
network.Enable(serverName, false)  // 断开连接
// ...
network.Enable(serverName, true)   // 恢复连接
go

3.3 超时(Timeout)#

为什么需要超时?

网络请求可能永远不返回(服务器崩溃、网络断开)。

Go 中的超时

// 方法 1:使用 context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 方法 2:使用 select
select {
case result := <-ch:
    // 收到结果
case <-time.After(5 * time.Second):
    // 超时
}
go

Lab 中的超时

// Master 检测 Worker 超时
if time.Since(task.startTime) > 10*time.Second {
    // Worker 可能崩溃了,重新分配任务
    task.state = Idle
}
go

第四部分:操作系统基础#

4.1 进程(Process)#

什么是进程?

进程是操作系统中运行的程序实例。

进程的特点

  • 独立的内存空间
  • 独立的文件描述符
  • 独立的进程 ID(PID)

Go 中获取进程信息

import "os"

// 获取当前进程 ID
pid := os.Getpid()

// 获取父进程 ID
ppid := os.Getppid()

// 获取用户 ID
uid := os.Getuid()
go

Lab 中的应用

// 使用 PID 作为 Worker ID
workerID := os.Getpid()
go

4.2 文件系统#

文件操作

import (
    "io/ioutil"
    "os"
)

// 读取文件
content, err := ioutil.ReadFile("input.txt")

// 写入文件
err = ioutil.WriteFile("output.txt", []byte("hello"), 0644)

// 创建临时文件
tempFile, err := ioutil.TempFile("", "prefix-*")

// 重命名文件(原子操作)
err = os.Rename("old.txt", "new.txt")
go

原子性

os.Rename 是原子操作,要么成功,要么失败,不会出现中间状态。

// 原子写入文件的正确方法
tempFile, _ := ioutil.TempFile("", "mr-tmp-*")
tempFile.Write(data)
tempFile.Close()
os.Rename(tempFile.Name(), "final.txt")  // 原子操作
go

4.3 信号(Signal)#

什么是信号?

信号是操作系统发送给进程的通知。

常见信号

  • SIGINT:Ctrl+C(中断)
  • SIGTERM:终止进程
  • SIGKILL:强制杀死进程(无法捕获)

Go 中处理信号

import (
    "os"
    "os/signal"
    "syscall"
)

func main() {
    // 创建信号 channel
    sigCh := make(chan os.Signal, 1)

    // 注册要接收的信号
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    // 等待信号
    sig := <-sigCh
    fmt.Printf("Received signal: %v\\n", sig)

    // 清理资源...
    os.Exit(0)
}
go

第五部分:并发模式#

5.1 生产者-消费者模式#

场景:一个或多个生产者生成数据,一个或多个消费者处理数据。

func main() {
    jobs := make(chan int, 100)

    // 生产者
    go func() {
        for i := 0; i < 10; i++ {
            jobs <- i
        }
        close(jobs)  // 关闭 channel
    }()

    // 消费者
    for job := range jobs {  // 遍历 channel,直到关闭
        fmt.Printf("Processing job %d\\n", job)
    }
}
go

5.2 Worker Pool 模式#

场景:固定数量的 worker 处理任务队列。

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\\n", id, job)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // 启动 3 个 worker
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 发送任务
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)

    // 收集结果
    for a := 1; a <= 9; a++ {
        <-results
    }
}
go

Lab 中的应用

  • Master 是任务队列
  • Worker 是 worker pool

5.3 请求-响应模式#

场景:客户端发送请求,等待服务器响应。

type Request struct {
    args  Args
    reply chan Reply  // 用 channel 接收响应
}

func server(requests chan Request) {
    for req := range requests {
        // 处理请求
        result := process(req.args)

        // 发送响应
        req.reply <- Reply{Value: result}
    }
}

func client(requests chan Request) {
    replyCh := make(chan Reply)

    // 发送请求
    requests <- Request{
        args:  Args{A: 1, B: 2},
        reply: replyCh,
    }

    // 等待响应
    reply := <-replyCh
    fmt.Println(reply.Value)
}
go

Lab 中的应用

  • Worker 向 Master 发送 RPC 请求
  • Master 处理请求,返回响应

第六部分:常见陷阱#

6.1 Goroutine 泄漏#

问题:启动的 goroutine 永远不退出,导致内存泄漏。

错误示例

func leak() {
    ch := make(chan int)

    go func() {
        value := <-ch  // 永远阻塞,goroutine 泄漏
        fmt.Println(value)
    }()

    // 函数返回,但 goroutine 还在运行
}
go

正确示例

func noLeak() {
    ch := make(chan int)
    done := make(chan bool)

    go func() {
        select {
        case value := <-ch:
            fmt.Println(value)
        case <-done:
            return  // 退出 goroutine
        }
    }()

    // 通知 goroutine 退出
    close(done)
}
go

6.2 死锁#

问题:多个 goroutine 互相等待,导致程序卡死。

错误示例

func deadlock() {
    var mu1, mu2 sync.Mutex

    // Goroutine 1
    go func() {
        mu1.Lock()
        time.Sleep(1 * time.Second)
        mu2.Lock()  // 等待 mu2
        mu2.Unlock()
        mu1.Unlock()
    }()

    // Goroutine 2
    go func() {
        mu2.Lock()
        time.Sleep(1 * time.Second)
        mu1.Lock()  // 等待 mu1
        mu1.Unlock()
        mu2.Unlock()
    }()

    time.Sleep(5 * time.Second)
}
go

解决方法

  1. 总是按相同顺序获取锁
  2. 使用超时
  3. 避免在持有锁时调用外部函数

6.3 Channel 关闭后发送#

问题:向已关闭的 channel 发送数据会 panic。

ch := make(chan int)
close(ch)
ch <- 1  // panic: send on closed channel
go

正确做法

  • 只有发送者关闭 channel
  • 接收者不关闭 channel
  • 关闭前确保没有其他 goroutine 会发送

总结#

这些基础知识是完成 MIT 6.824 Lab 的必备技能:

  1. Go 并发:goroutine、channel、mutex、waitgroup
  2. RPC:远程过程调用,客户端-服务器通信
  3. 网络:TCP、网络分区、超时
  4. 操作系统:进程、文件系统、原子操作
  5. 并发模式:生产者-消费者、worker pool、请求-响应

掌握这些概念后,你就可以开始实现 MapReduce、Raft 和 KVRaft 了。