1567 字
8 分钟
Go语言并发编程实战指南

Go语言并发编程实战指南#

Go语言天生支持并发,其独特的goroutine和channel机制使得并发编程变得简单而高效。本文将通过实际例子深入讲解Go并发编程的核心概念和最佳实践。

1. Goroutine基础#

什么是Goroutine#

Goroutine是Go运行时管理的轻量级线程,相比传统线程具有以下优势:

  • 轻量级:初始栈大小只有2KB
  • 低成本:创建和销毁成本极低
  • 高效调度:由Go运行时统一调度
package main
import (
"fmt"
"time"
)
func main() {
// 启动goroutine
go sayHello("World")
go sayHello("Go")
// 主线程等待
time.Sleep(time.Second)
fmt.Println("Main function ends")
}
func sayHello(name string) {
for i := 0; i < 3; i++ {
fmt.Printf("Hello, %s! (%d)\n", name, i)
time.Sleep(100 * time.Millisecond)
}
}

Goroutine池模式#

type WorkerPool struct {
workerCount int
jobQueue chan Job
workers []*Worker
quit chan bool
}
type Job struct {
ID int
Task func() error
}
type Worker struct {
id int
jobQueue chan Job
quit chan bool
}
func NewWorkerPool(workerCount, queueSize int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobQueue: make(chan Job, queueSize),
workers: make([]*Worker, workerCount),
quit: make(chan bool),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
worker := &Worker{
id: i,
jobQueue: wp.jobQueue,
quit: make(chan bool),
}
wp.workers[i] = worker
go worker.start()
}
}
func (w *Worker) start() {
for {
select {
case job := <-w.jobQueue:
if err := job.Task(); err != nil {
fmt.Printf("Worker %d: Job %d failed: %v\n", w.id, job.ID, err)
} else {
fmt.Printf("Worker %d: Job %d completed\n", w.id, job.ID)
}
case <-w.quit:
fmt.Printf("Worker %d stopping\n", w.id)
return
}
}
}

2. Channel通信机制#

基本用法#

// 创建无缓冲channel
ch1 := make(chan int)
// 创建有缓冲channel
ch2 := make(chan string, 10)
// 只读和只写channel
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func consumer(ch <-chan int) {
for value := range ch {
fmt.Printf("Received: %d\n", value)
}
}

Select多路复用#

func selectExample() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "Channel 1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Channel 2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("Received from ch2:", msg2)
case <-time.After(3 * time.Second):
fmt.Println("Timeout")
return
}
}
}

Channel模式#

Fan-in模式:

func fanIn(input1, input2 <-chan string) <-chan string {
output := make(chan string)
go func() {
defer close(output)
for {
select {
case s, ok := <-input1:
if !ok {
input1 = nil
} else {
output <- s
}
case s, ok := <-input2:
if !ok {
input2 = nil
} else {
output <- s
}
}
if input1 == nil && input2 == nil {
break
}
}
}()
return output
}

Fan-out模式:

func fanOut(input <-chan int, workers int) []<-chan int {
outputs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
output := make(chan int)
outputs[i] = output
go func(ch chan<- int) {
defer close(ch)
for value := range input {
ch <- value * 2 // 处理数据
}
}(output)
}
return outputs
}

3. 同步原语#

Mutex互斥锁#

type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}

RWMutex读写锁#

type SafeMap struct {
mu sync.RWMutex
data map[string]int
}
func NewSafeMap() *SafeMap {
return &SafeMap{
data: make(map[string]int),
}
}
func (sm *SafeMap) Get(key string) (int, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
value, ok := sm.data[key]
return value, ok
}
func (sm *SafeMap) Set(key string, value int) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.data[key] = value
}

Once单次执行#

type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{
data: "Singleton Instance",
}
})
return instance
}

WaitGroup等待组#

func processData(data []int) {
var wg sync.WaitGroup
results := make(chan int, len(data))
for _, value := range data {
wg.Add(1)
go func(v int) {
defer wg.Done()
// 模拟耗时处理
time.Sleep(100 * time.Millisecond)
results <- v * v
}(value)
}
// 在另一个goroutine中等待并关闭结果通道
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("Result: %d\n", result)
}
}

4. Context上下文管理#

基本用法#

func doWork(ctx context.Context, duration time.Duration) error {
select {
case <-time.After(duration):
fmt.Println("Work completed")
return nil
case <-ctx.Done():
fmt.Println("Work cancelled:", ctx.Err())
return ctx.Err()
}
}
func main() {
// 超时控制
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := doWork(ctx, 3*time.Second); err != nil {
fmt.Println("Error:", err)
}
}

传递值#

type key string
const userIDKey key = "userID"
func withUserID(ctx context.Context, userID string) context.Context {
return context.WithValue(ctx, userIDKey, userID)
}
func getUserID(ctx context.Context) (string, bool) {
userID, ok := ctx.Value(userIDKey).(string)
return userID, ok
}
func processRequest(ctx context.Context) {
if userID, ok := getUserID(ctx); ok {
fmt.Printf("Processing request for user: %s\n", userID)
}
}

5. 并发模式实战#

生产者-消费者模式#

type Message struct {
ID int
Content string
Time time.Time
}
type Producer struct {
output chan<- Message
quit chan bool
}
func NewProducer(output chan<- Message) *Producer {
return &Producer{
output: output,
quit: make(chan bool),
}
}
func (p *Producer) Start() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
id := 1
for {
select {
case <-ticker.C:
msg := Message{
ID: id,
Content: fmt.Sprintf("Message %d", id),
Time: time.Now(),
}
p.output <- msg
id++
case <-p.quit:
return
}
}
}
type Consumer struct {
input chan Message
id int
}
func NewConsumer(id int, input chan Message) *Consumer {
return &Consumer{
input: input,
id: id,
}
}
func (c *Consumer) Start() {
for msg := range c.input {
fmt.Printf("Consumer %d processing: %s\n", c.id, msg.Content)
// 模拟处理时间
time.Sleep(50 * time.Millisecond)
}
}

管道模式#

func pipeline() {
// 第一阶段:生成数据
numbers := make(chan int)
go func() {
defer close(numbers)
for i := 1; i <= 10; i++ {
numbers <- i
}
}()
// 第二阶段:平方计算
squares := make(chan int)
go func() {
defer close(squares)
for n := range numbers {
squares <- n * n
}
}()
// 第三阶段:过滤偶数
evens := make(chan int)
go func() {
defer close(evens)
for s := range squares {
if s%2 == 0 {
evens <- s
}
}
}()
// 最终输出
for e := range evens {
fmt.Printf("Even square: %d\n", e)
}
}

6. 性能优化技巧#

Goroutine数量控制#

func processWithLimit(tasks []Task, maxGoroutines int) {
semaphore := make(chan struct{}, maxGoroutines)
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(t Task) {
defer wg.Done()
semaphore <- struct{}{} // 获取信号量
defer func() { <-semaphore }() // 释放信号量
processTask(t)
}(task)
}
wg.Wait()
}

对象池优化#

var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
func processData(data []byte) []byte {
buffer := bufferPool.Get().([]byte)
defer bufferPool.Put(buffer)
// 使用buffer处理数据
copy(buffer, data)
return buffer[:len(data)]
}

内存对齐优化#

type CacheLinePadded struct {
value uint64
_ [7]uint64 // 填充到缓存行大小
}
type Counter struct {
counters []CacheLinePadded
}
func NewCounter(numCounters int) *Counter {
return &Counter{
counters: make([]CacheLinePadded, numCounters),
}
}
func (c *Counter) Increment(index int) {
atomic.AddUint64(&c.counters[index].value, 1)
}

7. 错误处理和调试#

Panic恢复#

func safeGoroutine(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Recovered from panic: %v\n", r)
fmt.Printf("Stack trace: %s\n", debug.Stack())
}
}()
fn()
}()
}

竞态检测#

Terminal window
# 编译时启用竞态检测
go build -race main.go
# 运行时检测
go run -race main.go

总结#

Go并发编程的最佳实践:

  1. 优先使用Channel:通过通信共享内存,而不是通过共享内存通信
  2. 控制Goroutine数量:避免创建过多的Goroutine
  3. 正确使用Context:用于取消和超时控制
  4. 避免竞态条件:使用适当的同步原语
  5. 合理处理错误:在Goroutine中正确处理panic

掌握这些技巧,能够帮助您构建高性能、可维护的并发程序。

Go语言并发编程实战指南
https://fuwari.vercel.app/posts/go语言并发编程实战指南/
作者
MuHan
发布于
2025-09-17
许可协议
CC BY-NC-SA 4.0