1567 字
8 分钟
Go语言并发编程实战指南

Go语言并发编程实战指南
Go语言天生支持并发,其独特的goroutine和channel机制使得并发编程变得简单而高效。本文将通过实际例子深入讲解Go并发编程的核心概念和最佳实践。
1. Goroutine基础
什么是Goroutine
Goroutine是Go运行时管理的轻量级线程,相比传统线程具有以下优势:
- 轻量级:初始栈大小只有2KB
- 低成本:创建和销毁成本极低
- 高效调度:由Go运行时统一调度
package main
import ( "fmt" "time")
func main() { // 启动goroutine go sayHello("World") go sayHello("Go")
// 主线程等待 time.Sleep(time.Second) fmt.Println("Main function ends")}
func sayHello(name string) { for i := 0; i < 3; i++ { fmt.Printf("Hello, %s! (%d)\n", name, i) time.Sleep(100 * time.Millisecond) }}
Goroutine池模式
type WorkerPool struct { workerCount int jobQueue chan Job workers []*Worker quit chan bool}
type Job struct { ID int Task func() error}
type Worker struct { id int jobQueue chan Job quit chan bool}
func NewWorkerPool(workerCount, queueSize int) *WorkerPool { return &WorkerPool{ workerCount: workerCount, jobQueue: make(chan Job, queueSize), workers: make([]*Worker, workerCount), quit: make(chan bool), }}
func (wp *WorkerPool) Start() { for i := 0; i < wp.workerCount; i++ { worker := &Worker{ id: i, jobQueue: wp.jobQueue, quit: make(chan bool), } wp.workers[i] = worker go worker.start() }}
func (w *Worker) start() { for { select { case job := <-w.jobQueue: if err := job.Task(); err != nil { fmt.Printf("Worker %d: Job %d failed: %v\n", w.id, job.ID, err) } else { fmt.Printf("Worker %d: Job %d completed\n", w.id, job.ID) } case <-w.quit: fmt.Printf("Worker %d stopping\n", w.id) return } }}
2. Channel通信机制
基本用法
// 创建无缓冲channelch1 := make(chan int)
// 创建有缓冲channelch2 := make(chan string, 10)
// 只读和只写channelfunc producer(ch chan<- int) { for i := 0; i < 10; i++ { ch <- i } close(ch)}
func consumer(ch <-chan int) { for value := range ch { fmt.Printf("Received: %d\n", value) }}
Select多路复用
func selectExample() { ch1 := make(chan string) ch2 := make(chan string)
go func() { time.Sleep(1 * time.Second) ch1 <- "Channel 1" }()
go func() { time.Sleep(2 * time.Second) ch2 <- "Channel 2" }()
for i := 0; i < 2; i++ { select { case msg1 := <-ch1: fmt.Println("Received from ch1:", msg1) case msg2 := <-ch2: fmt.Println("Received from ch2:", msg2) case <-time.After(3 * time.Second): fmt.Println("Timeout") return } }}
Channel模式
Fan-in模式:
func fanIn(input1, input2 <-chan string) <-chan string { output := make(chan string) go func() { defer close(output) for { select { case s, ok := <-input1: if !ok { input1 = nil } else { output <- s } case s, ok := <-input2: if !ok { input2 = nil } else { output <- s } } if input1 == nil && input2 == nil { break } } }() return output}
Fan-out模式:
func fanOut(input <-chan int, workers int) []<-chan int { outputs := make([]<-chan int, workers) for i := 0; i < workers; i++ { output := make(chan int) outputs[i] = output go func(ch chan<- int) { defer close(ch) for value := range input { ch <- value * 2 // 处理数据 } }(output) } return outputs}
3. 同步原语
Mutex互斥锁
type SafeCounter struct { mu sync.Mutex value int}
func (c *SafeCounter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.value++}
func (c *SafeCounter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value}
RWMutex读写锁
type SafeMap struct { mu sync.RWMutex data map[string]int}
func NewSafeMap() *SafeMap { return &SafeMap{ data: make(map[string]int), }}
func (sm *SafeMap) Get(key string) (int, bool) { sm.mu.RLock() defer sm.mu.RUnlock() value, ok := sm.data[key] return value, ok}
func (sm *SafeMap) Set(key string, value int) { sm.mu.Lock() defer sm.mu.Unlock() sm.data[key] = value}
Once单次执行
type Singleton struct { data string}
var ( instance *Singleton once sync.Once)
func GetInstance() *Singleton { once.Do(func() { instance = &Singleton{ data: "Singleton Instance", } }) return instance}
WaitGroup等待组
func processData(data []int) { var wg sync.WaitGroup results := make(chan int, len(data))
for _, value := range data { wg.Add(1) go func(v int) { defer wg.Done() // 模拟耗时处理 time.Sleep(100 * time.Millisecond) results <- v * v }(value) }
// 在另一个goroutine中等待并关闭结果通道 go func() { wg.Wait() close(results) }()
// 收集结果 for result := range results { fmt.Printf("Result: %d\n", result) }}
4. Context上下文管理
基本用法
func doWork(ctx context.Context, duration time.Duration) error { select { case <-time.After(duration): fmt.Println("Work completed") return nil case <-ctx.Done(): fmt.Println("Work cancelled:", ctx.Err()) return ctx.Err() }}
func main() { // 超时控制 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel()
if err := doWork(ctx, 3*time.Second); err != nil { fmt.Println("Error:", err) }}
传递值
type key string
const userIDKey key = "userID"
func withUserID(ctx context.Context, userID string) context.Context { return context.WithValue(ctx, userIDKey, userID)}
func getUserID(ctx context.Context) (string, bool) { userID, ok := ctx.Value(userIDKey).(string) return userID, ok}
func processRequest(ctx context.Context) { if userID, ok := getUserID(ctx); ok { fmt.Printf("Processing request for user: %s\n", userID) }}
5. 并发模式实战
生产者-消费者模式
type Message struct { ID int Content string Time time.Time}
type Producer struct { output chan<- Message quit chan bool}
func NewProducer(output chan<- Message) *Producer { return &Producer{ output: output, quit: make(chan bool), }}
func (p *Producer) Start() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop()
id := 1 for { select { case <-ticker.C: msg := Message{ ID: id, Content: fmt.Sprintf("Message %d", id), Time: time.Now(), } p.output <- msg id++ case <-p.quit: return } }}
type Consumer struct { input chan Message id int}
func NewConsumer(id int, input chan Message) *Consumer { return &Consumer{ input: input, id: id, }}
func (c *Consumer) Start() { for msg := range c.input { fmt.Printf("Consumer %d processing: %s\n", c.id, msg.Content) // 模拟处理时间 time.Sleep(50 * time.Millisecond) }}
管道模式
func pipeline() { // 第一阶段:生成数据 numbers := make(chan int) go func() { defer close(numbers) for i := 1; i <= 10; i++ { numbers <- i } }()
// 第二阶段:平方计算 squares := make(chan int) go func() { defer close(squares) for n := range numbers { squares <- n * n } }()
// 第三阶段:过滤偶数 evens := make(chan int) go func() { defer close(evens) for s := range squares { if s%2 == 0 { evens <- s } } }()
// 最终输出 for e := range evens { fmt.Printf("Even square: %d\n", e) }}
6. 性能优化技巧
Goroutine数量控制
func processWithLimit(tasks []Task, maxGoroutines int) { semaphore := make(chan struct{}, maxGoroutines) var wg sync.WaitGroup
for _, task := range tasks { wg.Add(1) go func(t Task) { defer wg.Done() semaphore <- struct{}{} // 获取信号量 defer func() { <-semaphore }() // 释放信号量
processTask(t) }(task) }
wg.Wait()}
对象池优化
var bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, 1024) },}
func processData(data []byte) []byte { buffer := bufferPool.Get().([]byte) defer bufferPool.Put(buffer)
// 使用buffer处理数据 copy(buffer, data) return buffer[:len(data)]}
内存对齐优化
type CacheLinePadded struct { value uint64 _ [7]uint64 // 填充到缓存行大小}
type Counter struct { counters []CacheLinePadded}
func NewCounter(numCounters int) *Counter { return &Counter{ counters: make([]CacheLinePadded, numCounters), }}
func (c *Counter) Increment(index int) { atomic.AddUint64(&c.counters[index].value, 1)}
7. 错误处理和调试
Panic恢复
func safeGoroutine(fn func()) { go func() { defer func() { if r := recover(); r != nil { fmt.Printf("Recovered from panic: %v\n", r) fmt.Printf("Stack trace: %s\n", debug.Stack()) } }() fn() }()}
竞态检测
# 编译时启用竞态检测go build -race main.go
# 运行时检测go run -race main.go
总结
Go并发编程的最佳实践:
- 优先使用Channel:通过通信共享内存,而不是通过共享内存通信
- 控制Goroutine数量:避免创建过多的Goroutine
- 正确使用Context:用于取消和超时控制
- 避免竞态条件:使用适当的同步原语
- 合理处理错误:在Goroutine中正确处理panic
掌握这些技巧,能够帮助您构建高性能、可维护的并发程序。
Go语言并发编程实战指南
https://fuwari.vercel.app/posts/go语言并发编程实战指南/