代码编织梦想

Context包与并发控制

学习目标

知识点掌握程度应用场景
context原理深入理解实现机制并发控制和请求链路追踪
超时控制掌握超时设置和处理API请求超时、任务限时控制
取消信号传播理解取消机制和传播链优雅退出、资源释放
context最佳实践掌握使用规范和技巧工程实践中的常见场景

1. Context原理

1.1 Context基本结构和实现

让我们先看一个完整的Context使用示例:

package main

import (
    "context"
    "fmt"
    "log"
    "time"
)

// 请求追踪信息
type RequestInfo struct {
    TraceID    string
    SessionID  string
    StartTime  time.Time
}

// 服务接口
type Service interface {
    HandleRequest(ctx context.Context, req string) (string, error)
}

// 业务服务实现
type BusinessService struct {
    name string
}

func NewBusinessService(name string) *BusinessService {
    return &BusinessService{name: name}
}

// 处理请求
func (s *BusinessService) HandleRequest(ctx context.Context, req string) (string, error) {
    // 获取请求追踪信息
    info, ok := ctx.Value("request-info").(*RequestInfo)
    if !ok {
        return "", fmt.Errorf("request info not found in context")
    }

    log.Printf("[%s] Processing request: %s, TraceID: %s, Session: %s\n",
        s.name, req, info.TraceID, info.SessionID)

    // 模拟处理过程
    select {
    case <-time.After(2 * time.Second):
        return fmt.Sprintf("Result for %s", req), nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

// 请求中间件
func requestMiddleware(next Service) Service {
    return &middlewareService{next: next}
}

type middlewareService struct {
    next Service
}

func (m *middlewareService) HandleRequest(ctx context.Context, req string) (string, error) {
    // 开始时间
    startTime := time.Now()

    // 添加请求信息到context
    info := &RequestInfo{
        TraceID:   fmt.Sprintf("trace-%d", time.Now().UnixNano()),
        SessionID: fmt.Sprintf("session-%d", time.Now().Unix()),
        StartTime: startTime,
    }
    ctx = context.WithValue(ctx, "request-info", info)

    // 调用下一个处理器
    result, err := m.next.HandleRequest(ctx, req)

    // 记录处理时间
    duration := time.Since(startTime)
    log.Printf("Request completed in %v, TraceID: %s\n", duration, info.TraceID)

    return result, err
}

func main() {
    // 创建服务
    service := requestMiddleware(NewBusinessService("UserService"))

    // 创建基础context
    ctx := context.Background()

    // 添加超时控制
    ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
    defer cancel()

    // 处理请求
    result, err := service.HandleRequest(ctx, "get user profile")
    if err != nil {
        log.Printf("Request failed: %v\n", err)
        return
    }

    log.Printf("Request succeeded: %s\n", result)

    // 模拟超时场景
    ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    result, err = service.HandleRequest(ctx, "get user settings")
    if err != nil {
        log.Printf("Request failed: %v\n", err)
        return
    }
}

2. 超时控制

让我们实现一个带有超时控制的HTTP服务:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
)

// 响应结构
type Response struct {
    Data  interface{} `json:"data,omitempty"`
    Error string      `json:"error,omitempty"`
}

// 服务配置
type ServiceConfig struct {
    Timeout        time.Duration
    MaxConcurrent  int
    RetryAttempts  int
    RetryDelay     time.Duration
}

// HTTP客户端包装器
type HTTPClient struct {
    client  *http.Client
    config  ServiceConfig
    limiter chan struct{} // 并发限制器
}

// 创建新的HTTP客户端
func NewHTTPClient(config ServiceConfig) *HTTPClient {
    return &HTTPClient{
        client: &http.Client{
            Timeout: config.Timeout,
        },
        config: config,
        limiter: make(chan struct{}, config.MaxConcurrent),
    }
}

// 发送HTTP请求
func (c *HTTPClient) DoRequest(ctx context.Context, method, url string) (*Response, error) {
    var lastErr error
    
    for attempt := 0; attempt <= c.config.RetryAttempts; attempt++ {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case c.limiter <- struct{}{}: // 获取并发许可
        }
        
        // 确保释放并发许可
        defer func() {
            <-c.limiter
        }()
        
        // 创建请求
        req, err := http.NewRequestWithContext(ctx, method, url, nil)
        if err != nil {
            return nil, fmt.Errorf("create request failed: %w", err)
        }
        
        // 设置请求超时
        reqCtx, cancel := context.WithTimeout(ctx, c.config.Timeout)
        defer cancel()
        
        // 执行请求
        resp, err := c.client.Do(req.WithContext(reqCtx))
        if err != nil {
            lastErr = err
            log.Printf("Request failed (attempt %d): %v\n", attempt+1, err)
            
            // 如果不是最后一次尝试,等待后重试
            if attempt < c.config.RetryAttempts {
                select {
                case <-ctx.Done():
                    return nil, ctx.Err()
                case <-time.After(c.config.RetryDelay):
                    continue
                }
            }
            continue
        }
        defer resp.Body.Close()
        
        // 解析响应
        var result Response
        if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
            return nil, fmt.Errorf("decode response failed: %w", err)
        }
        
        return &result, nil
    }
    
    return nil, fmt.Errorf("all retry attempts failed, last error: %v", lastErr)
}

// 处理HTTP请求的处理器
func handleRequest(w http.ResponseWriter, r *http.Request) {
    // 创建context
    ctx := r.Context()
    
    // 模拟长时间处理
    select {
    case <-time.After(2 * time.Second):
        response := Response{
            Data: "Request processed successfully",
        }
        json.NewEncoder(w).Encode(response)
    case <-ctx.Done():
        response := Response{
            Error: "Request timeout",
        }
        w.WriteHeader(http.StatusGatewayTimeout)
        json.NewEncoder(w).Encode(response)
    }
}

func main() {
    // 配置HTTP客户端
    config := ServiceConfig{
        Timeout:       5 * time.Second,
        MaxConcurrent: 10,
        RetryAttempts: 3,
        RetryDelay:    time.Second,
    }
    
    client := NewHTTPClient(config)
    
    // 创建HTTP服务器
    http.HandleFunc("/api", handleRequest)
    
    // 启动服务器
    go func() {
        log.Println("Server starting on :8080")
        if err := http.ListenAndServe(":8080", nil); err != nil {
            log.Fatal(err)
        }
    }()
    
    // 等待服务器启动
    time.Sleep(time.Second)
    
    // 测试请求
    ctx := context.Background()
    
    // 测试正常请求
    resp, err := client.DoRequest(ctx, "GET", "http://localhost:8080/api")
    if err != nil {
        log.Printf("Request failed: %v\n", err)
    } else {
        log.Printf("Response: %+v\n", resp)
    }
    
    // 测试超时请求
    ctx, cancel := context.WithTimeout(ctx, time.Second)
    defer cancel()
    
    resp, err = client.DoRequest(ctx, "GET", "http://localhost:8080/api")
    if err != nil {
        log.Printf("Request failed (expected): %v\n", err)
    } else {
        log.Printf("Response: %+v\n", resp)
    }
    
    // 保持主程序运行
    select {}
}
package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 请求处理器
type RequestHandler struct {
    requests  chan Request
    responses chan Response
    done      chan struct{}
    wg        sync.WaitGroup
}

// 请求结构
type Request struct {
    ID      int
    Timeout time.Duration
    Data    string
}

// 响应结构
type Response struct {
    RequestID int
    Result    string
    Error     error
}

// 创建新的请求处理器
func NewRequestHandler() *RequestHandler {
    return &RequestHandler{
        requests:  make(chan Request, 100),
        responses: make(chan Response, 100),
        done:      make(chan struct{}),
    }
}

// 启动处理器
func (h *RequestHandler) Start(workers int) {
    for i := 0; i < workers; i++ {
        h.wg.Add(1)
        go h.worker(i)
    }
}

// 工作协程
func (h *RequestHandler) worker(id int) {
    defer h.wg.Done()
    
    for {
        select {
        case req, ok := <-h.requests:
            if !ok {
                fmt.Printf("Worker %d: request channel closed\n", id)
                return
            }
            
            // 创建context用于超时控制
            ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)
            
            // 处理请求
            response := h.processRequest(ctx, req)
            
            // 发送响应
            select {
            case h.responses <- response:
                fmt.Printf("Worker %d: sent response for request %d\n", 
                    id, req.ID)
            case <-h.done:
                cancel()
                return
            }
            
            cancel() // 清理context
            
        case <-h.done:
            fmt.Printf("Worker %d: received stop signal\n", id)
            return
        }
    }
}

// 处理单个请求
func (h *RequestHandler) processRequest(ctx context.Context, req Request) Response {
    // 模拟处理时间
    processTime := time.Duration(rand.Intn(int(req.Timeout))) + req.Timeout/2
    
    select {
    case <-time.After(processTime):
        return Response{
            RequestID: req.ID,
            Result:   fmt.Sprintf("Processed: %s", req.Data),
        }
    case <-ctx.Done():
        return Response{
            RequestID: req.ID,
            Error:    ctx.Err(),
        }
    }
}

// 提交请求
func (h *RequestHandler) SubmitRequest(req Request) error {
    select {
    case h.requests <- req:
        return nil
    case <-h.done:
        return fmt.Errorf("handler is stopped")
    }
}

// 获取响应
func (h *RequestHandler) GetResponse() (Response, error) {
    select {
    case resp := <-h.responses:
        return resp, nil
    case <-h.done:
        return Response{}, fmt.Errorf("handler is stopped")
    }
}

// 停止处理器
func (h *RequestHandler) Stop() {
    close(h.done)
    h.wg.Wait()
    close(h.requests)
    close(h.responses)
}

func main() {
    // 创建请求处理器
    handler := NewRequestHandler()
    handler.Start(3)
    
    // 发送一些测试请求
    requests := []Request{
        {ID: 1, Timeout: time.Second, Data: "Fast request"},
        {ID: 2, Timeout: time.Second * 2, Data: "Normal request"},
        {ID: 3, Timeout: time.Millisecond * 500, Data: "Quick request"},
        {ID: 4, Timeout: time.Second * 3, Data: "Slow request"},
    }
    
    // 提交请求
    for _, req := range requests {
        if err := handler.SubmitRequest(req); err != nil {
            fmt.Printf("Failed to submit request %d: %v\n", req.ID, err)
            continue
        }
        fmt.Printf("Submitted request %d\n", req.ID)
    }
    
    // 收集响应
    var wg sync.WaitGroup
    wg.Add(1)
    
    go func() {
        defer wg.Done()
        for i := 0; i < len(requests); i++ {
            resp, err := handler.GetResponse()
            if err != nil {
                fmt.Printf("Failed to get response: %v\n", err)
                continue
            }
            
            if resp.Error != nil {
                fmt.Printf("Request %d failed: %v\n", resp.RequestID, resp.Error)
            } else {
                fmt.Printf("Request %d succeeded: %s\n", resp.RequestID, resp.Result)
            }
        }
    }()
    
    // 等待所有响应处理完成
    wg.Wait()
    
    // 停止处理器
    handler.Stop()
    fmt.Println("Main: processing completed")
}

3. 取消信号传播

让我们继续完成取消信号传播的示例代码:

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

// 任务定义
type Task struct {
    ID      string
    Process func(ctx context.Context) (interface{}, error)
}

// 工作池
type WorkerPool struct {
    workers    int
    tasks      chan Task
    results    chan interface{}
    errors     chan error
    done       chan struct{}
    wg         sync.WaitGroup
}

// 创建新的工作池
func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        workers: workers,
        tasks:   make(chan Task, workers*2),
        results: make(chan interface{}, workers*2),
        errors:  make(chan error, workers*2),
        done:    make(chan struct{}),
    }
}

// 启动工作池
func (p *WorkerPool) Start(ctx context.Context) {
    // 启动workers
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go p.worker(ctx, i)
    }

    // 等待所有worker完成
    go func() {
        p.wg.Wait()
        close(p.done)
        close(p.results)
        close(p.errors)
    }()
}

// worker处理任务
func (p *WorkerPool) worker(ctx context.Context, id int) {
    defer p.wg.Done()
    
    log.Printf("Worker %d started\n", id)
    
    for {
        select {
        case <-ctx.Done():
            log.Printf("Worker %d stopped: %v\n", id, ctx.Err())
            return
        case task, ok := <-p.tasks:
            if !ok {
                log.Printf("Worker %d: task channel closed\n", id)
                return
            }

            log.Printf("Worker %d processing task %s\n", id, task.ID)
            
            // 创建任务专用的context
            taskCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
            
            // 执行任务
            result, err := task.Process(taskCtx)
            cancel() // 释放任务context资源
            
            if err != nil {
                select {
                case p.errors <- fmt.Errorf("task %s failed: %w", task.ID, err):
                case <-ctx.Done():
                    return
                }
            } else {
                select {
                case p.results <- result:
                case <-ctx.Done():
                    return
                }
            }
        }
    }
}

// 提交任务
func (p *WorkerPool) Submit(task Task) error {
    select {
    case p.tasks <- task:
        return nil
    case <-p.done:
        return fmt.Errorf("worker pool is closed")
    }
}

// 关闭工作池
func (p *WorkerPool) Close() {
    close(p.tasks)
}

// 获取结果通道
func (p *WorkerPool) Results() <-chan interface{} {
    return p.results
}

// 获取错误通道
func (p *WorkerPool) Errors() <-chan error {
    return p.errors
}

func main() {
    // 创建根context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 创建工作池
    pool := NewWorkerPool(3)
    pool.Start(ctx)

    // 创建模拟任务
    tasks := []Task{
        {
            ID: "task-1",
            Process: func(ctx context.Context) (interface{}, error) {
                select {
                case <-time.After(2 * time.Second):
                    return "Task 1 completed", nil
                case <-ctx.Done():
                    return nil, ctx.Err()
                }
            },
        },
        {
            ID: "task-2",
            Process: func(ctx context.Context) (interface{}, error) {
                select {
                case <-time.After(3 * time.Second):
                    return "Task 2 completed", nil
                case <-ctx.Done():
                    return nil, ctx.Err()
                }
            },
        },
        {
            ID: "task-3",
            Process: func(ctx context.Context) (interface{}, error) {
                select {
                case <-time.After(1 * time.Second):
                    return nil, fmt.Errorf("task 3 failed")
                case <-ctx.Done():
                    return nil, ctx.Err()
                }
            },
        },
    }

    // 提交任务
    for _, task := range tasks {
        if err := pool.Submit(task); err != nil {
            log.Printf("Failed to submit task %s: %v\n", task.ID, err)
        }
    }

    // 等待3秒后取消所有任务
    go func() {
        time.Sleep(3 * time.Second)
        log.Println("Cancelling all tasks...")
        cancel()
    }()

    // 收集结果和错误
    completed := 0
    expected := len(tasks)

    for completed < expected {
        select {
        case result, ok := <-pool.Results():
            if !ok {
                continue
            }
            log.Printf("Got result: %v\n", result)
            completed++
        case err, ok := <-pool.Errors():
            if !ok {
                continue
            }
            log.Printf("Got error: %v\n", err)
            completed++
        case <-ctx.Done():
            log.Printf("Main: context cancelled: %v\n", ctx.Err())
            completed = expected // 强制退出循环
        }
    }

    // 关闭工作池
    pool.Close()
    
    // 等待工作池完全关闭
    <-pool.done
    log.Println("All workers stopped")
}

3.1 取消信号传播流程图

在这里插入图片描述

4. Context最佳实践

4.1 Context使用规范

  1. 函数调用链传递
// 推荐
func HandleRequest(ctx context.Context, req *Request) error

// 不推荐
func HandleRequest(timeout time.Duration, req *Request) error
  1. Context应作为第一个参数
// 推荐
func ProcessTask(ctx context.Context, task *Task) error

// 不推荐
func ProcessTask(task *Task, ctx context.Context) error
  1. 不要储存Context在结构体中
// 不推荐
type Service struct {
    ctx context.Context
}

// 推荐
type Service struct {
    // 其他字段
}

func (s *Service) DoWork(ctx context.Context) error

4.2 Context使用注意事项

  1. 不要将nil传递给context参数
// 推荐
ctx := context.Background()
ProcessTask(ctx, task)

// 不推荐
ProcessTask(nil, task)
  1. context.Value应该只用于请求作用域数据
// 推荐
ctx = context.WithValue(ctx, "request-id", requestID)

// 不推荐 - 配置信息应该通过其他方式传递
ctx = context.WithValue(ctx, "db-config", dbConfig)
  1. 正确处理取消信号
select {
case <-ctx.Done():
    return ctx.Err()
default:
    // 继续处理
}

4.3 实践建议

  1. 超时控制
  • 设置合理的超时时间
  • 在不同层级使用不同的超时时间
  • 确保资源正确释放
  1. 错误处理
  • 区分超时和取消错误
  • 传递有意义的错误信息
  • 实现优雅降级
  1. 性能优化
  • 避免创建过多的context
  • 合理使用context.Value
  • 及时取消不需要的操作
  1. 日志追踪
  • 记录关键操作的耗时
  • 追踪请求的完整链路
  • 记录取消原因

总结

关键点回顾

  1. Context原理
  • 继承关系
  • 值传递机制
  • 生命周期管理
  1. 超时控制
  • 设置超时时间
  • 处理超时信号
  • 资源清理
  1. 取消信号传播
  • 信号传递机制
  • 取消处理流程
  • 资源释放
  1. 最佳实践
  • 使用规范
  • 注意事项
  • 优化建议

实践建议

  1. 代码规范
  • 遵循命名约定
  • 合理组织代码结构
  • 添加必要的注释
  1. 错误处理
  • 使用有意义的错误信息
  • 实现错误恢复机制
  • 记录错误日志
  1. 性能优化
  • 减少不必要的context创建
  • 避免context.Value滥用
  • 及时释放资源

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_40780178/article/details/144000001

【go语言成长之路】如何编写go代码-爱代码爱编程

文章目录 如何编写Go代码一、介绍二、代码组织三、第一个程序四、从模块导入包五、从远程模块导入包六、测试 如何编写Go代码 一、介绍 ​ 本文档演示了模块内简单 Go 包的开发,并介绍了 g

go-爱代码爱编程

go-zero 拦截器 有时我们需要在处理请求的过程中添加一些额外的逻辑,比如身份验证、日志记录、请求限流、性能监控等,这些都可以通过拦截器实现。go zero可以设置多个拦截器 一、 服务端拦截器 服务端拦截器用于处

go语言链接redis数据库-爱代码爱编程

1.使用go get命令安装go-redis/v8库: 我这里使用的vscode工具安装: go get github.com/go-redis/redis/v8 2.创建Redis客户端实例 使用以下Go代码

go 结构体方法-爱代码爱编程

在 Go 语言中,结构体方法是指附加到结构体类型上的函数。这些方法可以通过结构体的实例来调用。方法的接收者(receiver)指定了该方法属于哪个结构体类型。接收者可以是一个值类型或指针类型。 定义结构体方法 下面是如何为一个结构体定义方法的示例: package main import (     "fmt" ) type Recta

第二章:编写第一个 go 程序 1.hello world 程序 -爱代码爱编程

1. 编写代码 1.设置 Go 环境变量 使用 go env -w 命令可以永久设置 Go 环境变量。GO111MODULE=on 是一个常用的设置,用于确保在所有项目中启用模块化支持。 $ go env -w GO11

【设计模式】创建型模式之单例模式(饿汉式 懒汉式 golang实现)-爱代码爱编程

定义 一个类只允许创建一个对象或实例,而且自行实例化并向整个系统提供该实例,这个类就是一个单例类,它提供全局访问的方法。这种设计模式叫单例设计模式,简称单例模式。 单例模式的要点: 某个类只能有一个实例必须自行创建该实

75、go语言并发利器:context包深度解析与实战技巧-爱代码爱编程

Go语言开发:context包:学习context包,实现跨Goroutine的上下文传递 本文将带你了解Go语言中的context包,学习如何使用它来实现跨Goroutine的上下文传递。我们将从基础概念入手,通过实际案

40分钟学 go 语言高并发:超时控制与取消机制-爱代码爱编程

超时控制与取消机制 一、知识要点概述 知识模块重要程度掌握要求超时策略⭐⭐⭐⭐⭐深入理解context包的超时机制,掌握不同场景下的超时控制方法取消传播⭐⭐⭐⭐⭐熟练运用context的取消传播机制,实现优雅的任务取消资

40分钟学 go 语言高并发:sync包详解(上)_go异步实现高并发请求 sync-爱代码爱编程

sync包详解(上) 学习目标 知识点掌握程度应用场景Mutex实现原理深入理解底层实现机制并发访问临界资源时的互斥控制RWMutex使用场景掌握读写锁的特性和应用读多写少的并发场景优化锁竞争优化了解常见的锁优化策略高并

40分钟学 go 语言高并发:goroutine基础与原理-爱代码爱编程

Day 03 - goroutine基础与原理 1. goroutine创建和调度 1.1 goroutine基本特性 特性说明轻量级初始栈大小仅2KB,可动态增长调度方式协作式调度,由Go运行时管理创建成本创建成本很

40分钟学 go 语言高并发:错误处理最佳实践-爱代码爱编程

错误处理最佳实践 一、课程概述 学习要点重要程度掌握目标error设计★★★★★掌握合理的错误类型设计和错误码管理错误包装★★★★☆理解和运用errors包提供的错误包装功能panic处理★★★★★掌握panic/rec

40分钟学 go 语言高并发:并发下载器开发实战教程-爱代码爱编程

并发下载器开发实战教程 一、系统设计概述 1.1 功能需求表 功能模块描述技术要点分片下载将大文件分成多个小块并发下载goroutine池、分片算法断点续传支持下载中断后继续下载文件指针定位、临时文件管理进度显示实时显

40分钟学 go 语言高并发:开发环境搭建与工程化实践_go开发教程-爱代码爱编程

Day 01 - Go开发环境搭建与工程化实践 1. Go环境变量配置 1.1 重要的环境变量表格 环境变量说明示例值GOROOTGo语言安装根目录Windows: C:\goLinux/Mac: /usr/local

40分钟学 go 语言高并发:pipeline模式(二)-爱代码爱编程

Pipeline模式(二) 一、实战应用示例 1.1 日志处理Pipeline 让我们实现一个处理日志文件的Pipeline示例: package main import ( "bufio" "co

40分钟学 go 语言高并发:select多路复用-爱代码爱编程

Select多路复用 学习目标 知识点掌握程度应用场景select实现原理深入理解底层机制channel通信和多路选择超时处理掌握超时控制方法避免阻塞和资源浪费优先级控制理解优先级实现处理多个channel的顺序性能考虑