代码编织梦想

超时控制与取消机制

一、知识要点概述

知识模块重要程度掌握要求
超时策略⭐⭐⭐⭐⭐深入理解context包的超时机制,掌握不同场景下的超时控制方法
取消传播⭐⭐⭐⭐⭐熟练运用context的取消传播机制,实现优雅的任务取消
资源清理⭐⭐⭐⭐掌握goroutine退出时的资源清理方法,避免资源泄露
级联取消⭐⭐⭐⭐理解并实现多层级的取消控制,确保子任务能正确响应取消信号

二、详细内容

1. 超时策略

超时控制是高并发系统中非常重要的一环,它能够防止系统资源被长时间占用,提高系统的可用性。

package main

import (
    "context"
    "fmt"
    "time"
)

// 模拟一个耗时操作
func longRunningOperation(ctx context.Context) error {
    // 创建一个channel用于接收操作结果
    done := make(chan bool)
    
    go func() {
        // 模拟耗时操作
        time.Sleep(2 * time.Second)
        done <- true
    }()
    
    // 使用select实现超时控制
    select {
    case <-done:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

// 使用context.WithTimeout实现超时控制
func withTimeoutExample() {
    // 创建一个1秒超时的context
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    if err := longRunningOperation(ctx); err != nil {
        fmt.Printf("操作超时: %v\n", err)
        return
    }
    fmt.Println("操作成功完成")
}

// 使用time.After实现超时控制
func withTimeAfterExample() {
    done := make(chan bool)
    
    go func() {
        time.Sleep(2 * time.Second)
        done <- true
    }()
    
    select {
    case <-done:
        fmt.Println("操作成功完成")
    case <-time.After(1 * time.Second):
        fmt.Println("操作超时")
    }
}

// 带有重试机制的超时控制
func withRetryTimeout(maxRetries int, timeout time.Duration) error {
    for i := 0; i < maxRetries; i++ {
        ctx, cancel := context.WithTimeout(context.Background(), timeout)
        err := longRunningOperation(ctx)
        cancel()
        
        if err == nil {
            return nil
        }
        
        fmt.Printf("第%d次重试失败: %v\n", i+1, err)
        time.Sleep(time.Second) // 重试间隔
    }
    return fmt.Errorf("达到最大重试次数")
}

func main() {
    fmt.Println("=== 使用context.WithTimeout示例 ===")
    withTimeoutExample()
    
    fmt.Println("\n=== 使用time.After示例 ===")
    withTimeAfterExample()
    
    fmt.Println("\n=== 带重试机制的超时控制示例 ===")
    err := withRetryTimeout(3, time.Second)
    if err != nil {
        fmt.Printf("最终结果: %v\n", err)
    }
}
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 模拟数据处理任务
type Task struct {
    ID int
    ProcessTime time.Duration
}

// 工作协程
func worker(ctx context.Context, id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for {
        select {
        case task, ok := <-tasks:
            if !ok {
                fmt.Printf("Worker %d: 通道已关闭,退出\n", id)
                return
            }
            
            // 检查是否已取消
            select {
            case <-ctx.Done():
                fmt.Printf("Worker %d: 收到取消信号,停止处理任务\n", id)
                return
            default:
                // 继续处理任务
                fmt.Printf("Worker %d: 开始处理任务 %d\n", id, task.ID)
                time.Sleep(task.ProcessTime)
                fmt.Printf("Worker %d: 完成任务 %d\n", id, task.ID)
            }
            
        case <-ctx.Done():
            fmt.Printf("Worker %d: 收到取消信号,退出\n", id)
            return
        }
    }
}

// 任务分发器
func dispatcher(ctx context.Context, numWorkers int) {
    var wg sync.WaitGroup
    tasks := make(chan Task, numWorkers)
    
    // 启动工作协程
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(ctx, i, tasks, &wg)
    }
    
    // 模拟任务生成
    taskID := 0
    for {
        select {
        case <-ctx.Done():
            close(tasks)
            wg.Wait()
            fmt.Println("调度器: 所有工作协程已退出")
            return
        default:
            // 生成新任务
            taskID++
            task := Task{
                ID: taskID,
                ProcessTime: time.Duration(taskID*100) * time.Millisecond,
            }
            tasks <- task
            time.Sleep(200 * time.Millisecond) // 控制任务生成速率
        }
    }
}

func main() {
    // 创建可取消的context
    ctx, cancel := context.WithCancel(context.Background())
    
    // 启动调度器
    go dispatcher(ctx, 3)
    
    // 运行一段时间后取消
    time.Sleep(2 * time.Second)
    fmt.Println("主程序: 发送取消信号")
    cancel()
    
    // 等待一段时间确保清理完成
    time.Sleep(time.Second)
    fmt.Println("主程序: 退出")
}
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 模拟数据库连接
type DBConnection struct {
    id     int
    closed bool
    mu     sync.Mutex
}

func (db *DBConnection) Query(ctx context.Context) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-time.After(500 * time.Millisecond):
        return nil
    }
}

func (db *DBConnection) Close() error {
    db.mu.Lock()
    defer db.mu.Unlock()
    
    if !db.closed {
        fmt.Printf("关闭数据库连接 %d\n", db.id)
        db.closed = true
    }
    return nil
}

// 资源管理器
type ResourceManager struct {
    connections []*DBConnection
    mu          sync.Mutex
}

func NewResourceManager() *ResourceManager {
    return &ResourceManager{
        connections: make([]*DBConnection, 0),
    }
}

func (rm *ResourceManager) AcquireConnection() *DBConnection {
    rm.mu.Lock()
    defer rm.mu.Unlock()
    
    conn := &DBConnection{
        id: len(rm.connections),
    }
    rm.connections = append(rm.connections, conn)
    fmt.Printf("创建新的数据库连接 %d\n", conn.id)
    return conn
}

func (rm *ResourceManager) ReleaseAll() {
    rm.mu.Lock()
    defer rm.mu.Unlock()
    
    for _, conn := range rm.connections {
        conn.Close()
    }
    rm.connections = nil
}

// 工作任务
func doWork(ctx context.Context, rm *ResourceManager) error {
    // 获取数据库连接
    conn := rm.AcquireConnection()
    
    // 确保在函数退出时释放资源
    defer func() {
        if err := conn.Close(); err != nil {
            fmt.Printf("关闭连接失败: %v\n", err)
        }
    }()
    
    // 执行查询
    return conn.Query(ctx)
}

func main() {
    rm := NewResourceManager()
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    var wg sync.WaitGroup
    
    // 启动多个工作协程
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            for j := 0; j < 2; j++ {
                select {
                case <-ctx.Done():
                    fmt.Printf("工作协程 %d: 收到取消信号\n", id)
                    return
                default:
                    if err := doWork(ctx, rm); err != nil {
                        fmt.Printf("工作协程 %d: 执行失败 - %v\n", id, err)
                        return
                    }
                    fmt.Printf("工作协程 %d: 完成一次查询\n", id)
                }
            }
        }(i)
    }
    
    // 等待所有工作完成
    wg.Wait()
    
    // 清理所有资源
    fmt.Println("清理所有资源")
    rm.ReleaseAll()
}
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Pipeline 代表一个处理管道
type Pipeline struct {
    name     string
    process  func(context.Context) error
    children []*Pipeline
    wg       sync.WaitGroup
}

// NewPipeline 创建新的处理管道
func NewPipeline(name string, process func(context.Context) error) *Pipeline {
    return &Pipeline{
        name:     name,
        process:  process,
        children: make([]*Pipeline, 0),
    }
}

// AddChild 添加子管道
func (p *Pipeline) AddChild(child *Pipeline) {
    p.children = append(p.children, child)
}

// Execute 执行当前管道及其所有子管道
func (p *Pipeline) Execute(ctx context.Context) error {
    // 创建一个子context,用于控制子管道
    childCtx, cancel := context.WithCancel(ctx)
    defer cancel() // 确保所有子管道都会被取消

    // 启动所有子管道
    errChan := make(chan error, len(p.children))
    for _, child := range p.children {
        p.wg.Add(1)
        go func(pipeline *Pipeline) {
            defer p.wg.Done()
            if err := pipeline.Execute(childCtx); err != nil {
                errChan <- err
                cancel() // 如果有错误发生,取消所有其他子管道
            }
        }(child)
    }

    // 执行当前管道的处理函数
    processDone := make(chan error, 1)
    go func() {
        processDone <- p.process(childCtx)
    }()

    // 等待处理完成或者context取消
    select {
    case err := <-processDone:
        if err != nil {
            cancel() // 如果处理出错,取消所有子管道
            fmt.Printf("[%s] 处理错误: %v\n", p.name, err)
            return err
        }
    case err := <-errChan:
        fmt.Printf("[%s] 子管道错误: %v\n", p.name, err)
        return err
    case <-ctx.Done():
        fmt.Printf("[%s] 被取消\n", p.name)
        return ctx.Err()
    }

    // 等待所有子管道完成
    waitCh := make(chan struct{})
    go func() {
        p.wg.Wait()
        close(waitCh)
    }()

    select {
    case <-waitCh:
        fmt.Printf("[%s] 及其所有子管道已完成\n", p.name)
        return nil
    case <-ctx.Done():
        fmt.Printf("[%s] 等待子管道完成时被取消\n", p.name)
        return ctx.Err()
    }
}

// 模拟处理函数,带有随机延时
func createProcessor(name string, duration time.Duration) func(context.Context) error {
    return func(ctx context.Context) error {
        fmt.Printf("[%s] 开始处理\n", name)
        select {
        case <-time.After(duration):
            fmt.Printf("[%s] 处理完成\n", name)
            return nil
        case <-ctx.Done():
            fmt.Printf("[%s] 处理被取消\n", name)
            return ctx.Err()
        }
    }
}

func main() {
    // 创建根管道和子管道
    root := NewPipeline("ROOT", createProcessor("ROOT", 1*time.Second))
    
    // 第一层子管道
    stage1A := NewPipeline("1A", createProcessor("1A", 2*time.Second))
    stage1B := NewPipeline("1B", createProcessor("1B", 1500*time.Millisecond))
    root.AddChild(stage1A)
    root.AddChild(stage1B)
    
    // 第二层子管道
    stage2A := NewPipeline("2A", createProcessor("2A", 1*time.Second))
    stage2B := NewPipeline("2B", createProcessor("2B", 800*time.Millisecond))
    stage1A.AddChild(stage2A)
    stage1B.AddChild(stage2B)
    
    // 第三层子管道
    stage3A := NewPipeline("3A", createProcessor("3A", 500*time.Millisecond))
    stage2A.AddChild(stage3A)

    // 创建根context,设置3秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    // 启动处理管道
    fmt.Println("开始执行管道...")
    startTime := time.Now()
    
    if err := root.Execute(ctx); err != nil {
        fmt.Printf("管道执行失败: %v\n", err)
    }
    
    fmt.Printf("总执行时间: %v\n", time.Since(startTime))
}

继续补充级联取消示例代码和详细的知识讲解。

让我们详细解析一下上面的代码示例和相关知识点:

2. 超时控制详解

超时控制主要有以下几种实现方式:

  1. 使用context.WithTimeout

    • 优点:可以传播取消信号,适合多层级的调用链
    • 用法:创建带超时的context,传递给下层函数
  2. 使用time.After

    • 优点:实现简单,适合单个操作的超时控制
    • 缺点:不能传播取消信号
  3. 带重试机制的超时控制

    • 结合重试策略,提高操作的成功率
    • 可以设置最大重试次数和重试间隔

3. 取消传播机制

取消传播的核心概念:

  1. Context链

    • 父context取消时,所有子context都会被取消
    • 子context取消不会影响父context
  2. 取消信号的传递

    • 通过Done()通道接收取消信号
    • 使用select监听多个通道
  3. 错误处理

    • context.Canceled:手动调用cancel
    • context.DeadlineExceeded:超时导致的取消

4. 资源清理

资源清理的关键点:

  1. defer的使用

    • 确保资源在函数退出时被释放
    • 处理panic情况下的资源清理
  2. 并发安全

    • 使用互斥锁保护共享资源
    • 避免重复释放
  3. 清理顺序

    • 按照依赖关系进行清理
    • 避免死锁和资源泄露

5. 级联取消

级联取消的实现要点:

  1. 树形结构

    • 构建父子关系的处理链
    • 自上而下传播取消信号
  2. 并发控制

    • 使用WaitGroup等待子任务完成
    • 处理取消时的优雅退出

让我们用Mermaid图来展示这些概念:
在这里插入图片描述

6. 最佳实践建议

  1. 超时控制

    • 为所有网络操作设置合理的超时时间
    • 在必要时实现重试机制
    • 避免使用过长的超时时间
  2. 取消传播

    • 合理使用context传递取消信号
    • 及时检查取消状态
    • 正确处理取消后的清理工作
  3. 资源管理

    • 使用defer确保资源释放
    • 实现优雅的关闭流程
    • 处理好并发安全问题
  4. 错误处理

    • 区分不同类型的错误
    • 提供有意义的错误信息
    • 实现合适的重试策略

7. 性能优化技巧

  1. 避免资源泄露

    • 使用工具检测泄露
    • 实现完整的清理流程
    • 定期进行资源回收
  2. 控制并发数量

    • 使用协程池
    • 实现背压机制
    • 监控系统资源使用
  3. 超时优化

    • 设置合适的超时阈值
    • 实现自适应超时
    • 监控超时情况

8. 调试和监控

  1. 日志记录

    • 记录关键操作的耗时
    • 记录取消和超时事件
    • 记录资源使用情况
  2. 监控指标

    • 超时次数和比率
    • 取消操作的统计
    • 资源使用情况
  3. 调试工具

    • 使用race detector
    • 使用pprof分析
    • 使用trace工具

9. 总结

通过本节课的学习,我们掌握了:

  1. 如何实现可靠的超时控制机制
  2. 如何正确使用context进行取消传播
  3. 如何安全地进行资源清理
  4. 如何实现级联取消功能

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_40780178/article/details/144073518

golang 学习笔记3:go 并发与网络_go socket高并发-爱代码爱编程

目录 13,Go 错误处理1,error 接口2,panic 异常3,defer 函数 14,Go 并发1,goroutine2,channel3,无缓冲 channel 的惯用法4,有缓冲 chan

go语言并发编程入门:goroutine、channel、context、并发安全、gmp调度模型_goroutine context-爱代码爱编程

GO语言并发编程入门:Goroutine、Channel、Context、并发安全、GMP调度模型 1.GO并发介绍 并发:多线程程序在一个核的cpu上运行。 并行:多线程程序在多个核的cpu上运行。 由上可知并发不是并

【go语言成长之路】如何编写go代码-爱代码爱编程

文章目录 如何编写Go代码一、介绍二、代码组织三、第一个程序四、从模块导入包五、从远程模块导入包六、测试 如何编写Go代码 一、介绍 ​ 本文档演示了模块内简单 Go 包的开发,并介绍了 g

go-爱代码爱编程

go-zero 拦截器 有时我们需要在处理请求的过程中添加一些额外的逻辑,比如身份验证、日志记录、请求限流、性能监控等,这些都可以通过拦截器实现。go zero可以设置多个拦截器 一、 服务端拦截器 服务端拦截器用于处

go语言链接redis数据库-爱代码爱编程

1.使用go get命令安装go-redis/v8库: 我这里使用的vscode工具安装: go get github.com/go-redis/redis/v8 2.创建Redis客户端实例 使用以下Go代码

go 结构体方法-爱代码爱编程

在 Go 语言中,结构体方法是指附加到结构体类型上的函数。这些方法可以通过结构体的实例来调用。方法的接收者(receiver)指定了该方法属于哪个结构体类型。接收者可以是一个值类型或指针类型。 定义结构体方法 下面是如何为一个结构体定义方法的示例: package main import (     "fmt" ) type Recta

第二章:编写第一个 go 程序 1.hello world 程序 -爱代码爱编程

1. 编写代码 1.设置 Go 环境变量 使用 go env -w 命令可以永久设置 Go 环境变量。GO111MODULE=on 是一个常用的设置,用于确保在所有项目中启用模块化支持。 $ go env -w GO11

【设计模式】创建型模式之单例模式(饿汉式 懒汉式 golang实现)-爱代码爱编程

定义 一个类只允许创建一个对象或实例,而且自行实例化并向整个系统提供该实例,这个类就是一个单例类,它提供全局访问的方法。这种设计模式叫单例设计模式,简称单例模式。 单例模式的要点: 某个类只能有一个实例必须自行创建该实

40分钟学 go 语言高并发:context包与并发控制-爱代码爱编程

Context包与并发控制 学习目标 知识点掌握程度应用场景context原理深入理解实现机制并发控制和请求链路追踪超时控制掌握超时设置和处理API请求超时、任务限时控制取消信号传播理解取消机制和传播链优雅退出、资源释放

75、go语言并发利器:context包深度解析与实战技巧-爱代码爱编程

Go语言开发:context包:学习context包,实现跨Goroutine的上下文传递 本文将带你了解Go语言中的context包,学习如何使用它来实现跨Goroutine的上下文传递。我们将从基础概念入手,通过实际案

40分钟学 go 语言高并发:pipeline模式(二)-爱代码爱编程

Pipeline模式(二) 一、实战应用示例 1.1 日志处理Pipeline 让我们实现一个处理日志文件的Pipeline示例: package main import ( "bufio" "co

40分钟学 go 语言高并发:【实战】并发安全的配置管理器-爱代码爱编程

【实战】并发安全的配置管理器 一、课程概述 学习要点重要程度掌握目标配置热更新★★★★★理解配置热更新原理,实现动态加载配置并发读写控制★★★★★掌握并发安全的读写控制机制观察者模式★★★★☆理解并实现配置变更通知机制版