如何在 Go 中实现文件读取与正则匹配的并发处理

本文讲解如何正确设计 go 并发程序:将文件逐行读取(单 goroutine)与多 worker 并行处理解耦,通过 channel 传递任务与结果,避免死锁,并安全关闭 channel。核心在于分离“生产者”“消费者”和“结果收集者”三类 goroutine。

在 Go 中实现“并发处理文件内容”时,一个常见误区是试图让文件读取业务处理都在同一 goroutine 中同步推进,或错误地等待所有 worker 完成后再尝试从结果 channel 读取——这极易导致死锁(如原代码中 wg.Wait() 阻塞主线程,而 results 通道未被关闭,range results 永远无法退出)。

正确的并发模型应严格遵循 生产者-消费者模式,并明确各组件职责:

  • 生产者(Producer):单独 goroutine 负责读取文件(如 bufio.Scanner),将每行文本发送至 jobs channel,读取完毕后立即关闭 jobs
  • 消费者(Workers):多个 goroutine 从 jobs 接收任务,执行 CPU 密集型操作(如正则匹配),将结果(如 1 或结构体)发往 results channel;
  • 收集者(Collector):单独 goroutine 监听 wg.Wait(),待所有 worker 退出 for range jobs 后,关闭 results channel;主 goroutine 则通过 for v := range results 安全遍历所有结果。

以下是修正后的完整可运行示例(已适配标准库,无需外部依赖):

package main

import (
    "bufio"
    "fmt"
    "regexp"
    "strings"
    "sync"
)

func telephoneNumbersInFile(path string) int {
    file := strings.NewReader(path)
    telephone := regexp.MustCompile(`\(\d+\)\s\d+-\d+`)

    jobs := make(chan string, 10)   // 建议设置缓冲区,避免生产者阻塞
    results := make(chan int, 10)  // 同样建议缓冲,提升吞吐

    var wg sync.WaitGroup

    // 启动 3 个 worker
    for w := 1; w <= 3; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for line := range jobs {
                if telephone.MatchString(line) {
                    results <- 1
                }
            }
        }()
    }

    // 【生产者】:在 goroutine 中读取文件并关闭 jobs
    go func() {
        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
            jobs <- scanner.Text()
        }
        close(jobs) // 关键!通知 workers 无新任务
    }()

    // 【收集者】:等待所有 worker 结束后关闭 results
    go func() {
        wg.Wait()
        close(results) // 关键!使 range results 可正常退出
    }()

    // 【主流程】:安全收集结果
    counts := 0
    for v := range results {
        counts += v
    }
    return counts
}

func main() {
    const input = "Foo\n(555) 123-3456\nBar\nBaz\n(800) 999-0000"
    n := telephoneNumbersInFile(input)
    fmt.Println("Found", n, "telephone numbers") // 输出: Found 2 telephone numbers
}

关键要点说明:

  • 不要在主线程中 wg.Wait() 后再读 results:因为 results 仍为 open 状态,range 会永久阻塞。必须用额外 goroutine 在 wg.Wait() 后 close(results)。
  • jobs 和 results 均建议设缓冲(如 make(chan T, 10)):防止生产者因消费者暂时繁忙而阻塞,尤其在 I/O 与 CPU 处理速度不匹配时。
  • Worker 内部无需 wg.Done() 之外的同步逻辑:for range jobs 会在 jobs 关闭后自动退出,自然触发 defer wg.Done()。
  • 进阶优化方向
    • 将 jobs 改为 chan []string,实现批量读取(如每次读 100 行),减少 channel 通信频次;
    • 用 context.Context 控制超时或取消;
    • 若需返回具体匹配内容而非仅计数,可定义 type MatchResult struct { Line string; Number string } 并发送该结构体。

此模式清晰分离关注点,规避共享状态与显式锁,是 Go 并发编程的经典实践。