Go语言并发目录遍历

在本节中,我们构建一个程序,根据命令行指定的输入,报告一个或多个目录的磁盘使用情况,类似于 UNIX du 命令。大多数的工作由下面的 walkDir 函数完成,它使用 dirents 辅助函数来枚举目录中的条目。
// wakjDir 递归地遍历以 dir 为根目录的整个文件树
// 并在 filesizes 上发送每个已找到的文件的大小
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

// dirents 返回 dir 目录中的条目
func dirents(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du1: %v\n", err)
        return nil
    }
    return entries
}
ioutil.ReadDir 函数返回一个 os.FileInfo 类型的 slice,针对单个文件同样的信息可以通过调用 os.Stat 函数来返回。对每一个子目录,walkDir 递归调用它自己,对于每一个文件,walkDir 发送一条消息到 fileSizes 通道。消息是文件所占用的字节数。

如下所示,main 函数使用两个 goroutine。后台 goroutine 调用 walkDir 遍历命令行上指定的每一个目录,最后关闭 fileSizes 通道。主 goroutine 计算从通道中接收的文件的大小的和,最后输出总数。
// du1 计算目录中文件占用的磁盘空间大小
package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
)

func main() {
    // 确定初始目录
    flag.Parse()
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }

    // 遍历文件树
    fileSizes := make(chan int64)
    go func() {
        for _, root := range roots {
            walkDir(root, fileSizes)
        }
        close(fileSizes)
    }()

    // 输出结果
    var nfiles, nbytes int64
    for size := range fileSizes {
        nfiles++
        nbytes += size
    }
    printDiskUsage(nfiles, nbytes)
}

func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
在输出结果前,程序等待较长时间:

$ go build gopl>io/ch8/du1
$ ./du1 $HOME /usr/bin/etc
213201 files 62.7 GB

如果程序可以通知它的进度,将会更友好。但是仅把 printDiskUsage 调用移动到循环内部会使它输出数千行结果。

下面这个 du 的变种周期性地输出总数,只有在 -v 标识指定的时候才输出,因为不是所有的用户都想看进度消息。后台 goroutine 依然从根部开始迭代。

主 goroutine 现在使用一个计时器每 500ms 定期产生事件,使用一个 select 语句或者等待一个关于文件大小的消息,这时它更新总数,或者等待一个计时事件,这时它输出当前的总数。如果 -v 标识没有指定,tick 通道依然是 nil,它对应的情况在 select 中实际上被禁用。
var verbose = flag.Bool("v", false, "show verbose progress messages")

func main() {
    // ...启动后台 goroutine...
    // 确定初始目录
    flag.Parse()
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }

    // 遍历文件树
    fileSizes := make(chan int64)
    go func() {
        for _, root := range roots {
            walkDir(root, fileSizes)
        }
        close(fileSizes)
    }()

    // 定期打印结果
    var tick <-chan time.Time
    if *verbose {
        tick = time.Tick(500 * time.Millisecond)
    }
    var nfiles, nbytes int64
loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop // fileSizes 关闭
            }
            nfiles++
            nbytes += size
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }
    printDiskUsage(nfiles, nbytes) // 最终总数
}
因为这个程序没有使用 range 循环,所以第一个 select 情况必须显式判断 fileSizes 通道是否已经关闭,使用两个返回值的形式进行接收操作。如果通道已经关闭,程序退出循环。标签化的 break 语句将跳出 select 和 for 循环的逻辑;没有标签的 break 只能跳出 select 的逻辑,导致循环的下一次迭代。

程序提供给我们一个从容不迫的更新流:

$ go build gop1.io/ch8/du2
$ ./du2 -v $HOME/usr/bin/etc
28608 files 8.3 GB
54147 files 10.3 GB
93591 files 15.1 GB
127169 files 52.9 GB
175931 files 62.2 GB
213201 files 62.7 GB

但是它依然耗费太长的时间。这里没有理由不能并发调用 walkDir 从而充分利用磁盘系统的并行机制。第三个版本的 du 为每一个 walkDir 的调用创建一个新的 goroutine。它使用 sync.WaitGroup 来为当前存活的 walkDir 调用计数,一个关闭者 goroutine 在计数器减为 0 的时候关闭 fileSizes 通道。
func main() {
    // ...确定根目录...
    flag.Parse()

    // 确定初始目录
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }

    // 并行遍历每一个文件树
    fileSizes := make(chan int64)
    var n sync.WaitGroup
    for _, root := range roots {
        n.Add(1)
        go walkDir(root, &n, fileSizes)
    }
    go func() {
        n.Wait()
        close(fileSizes)
    }()

    // 定期打印结果
    var tick <-chan time.Time
    if *verbose {
        tick = time.Tick(500 * time.Millisecond)
    }
    var nfiles, nbytes int64
loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop // fileSizes 关闭
            }
            nfiles++
            nbytes += size
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }

    printDiskUsage(nfiles, nbytes) // 最终总数
}

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, n, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}
因为程序在高峰时创建数千个 goroutine,所以我们不得不修改 dirents 函数来使用计数信号量,以防止它同时打开太多的文件:
// sema是一个用于限制目录并发数的计数信号量
var sema = make(chan struct{}, 20)

// dirents返回directory目录中的条目
func dirents(dir string) []os.FileInfo {
    sema <- struct{}{}        // 获取令牌
    defer func() { <-sema }() // 释放令牌

    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}
尽管系统与系统之间有很多的不同,但是这个版本的速度比前一个版本快几倍。