基于 Go 协程实现图片马赛克应用(下):并发重构


声明:本教程代码整理自《Go Web Programming》一书并发编程部分。

接下来,我们引入协程来提升图片马赛克应用的性能。

在之前同步版本的应用中,对于示例图片的马赛克处理,需要几秒时间才能完成,我们可以通过并发编程来提升这个性能。

重构方案

并发重构的整体方案如下:

  1. 将原始上传图片等分为 4 份;
  2. 同时对这 4 个等份进行马赛克处理;
  3. 将上一步处理结果合并为最终的马赛克图片。

-w1260

代码重构

接下来我们按照上述方案对代码进行重构。

嵌入图片数据库代码迁移

由于并发版本也会调用嵌入图片数据库函数,所以我们在 mosaic 项目根目录下新建一个 common 子目录存放公共代码,然后将 sync 子目录下的 tilesdb.go 文件移动到 common 目录下,将包名调整为 common,并且将可能被外部调用的函数名首字母调整成大写,比如将 cloneTilesDB 重命名为 CloneTilesDB,以及 resizenearest 重命名为 ResizeNearest,此外,记得将 sync/handler.go 中调用嵌入图片数据库函数的地方进行重构。

重构图片马赛克处理器代码

接下来就是本次代码并发重构的主要战场了 —— 针对 sync.Mosaic 处理器的调整,之前是通过一个函数实现的,在并发版本中,需要将其拆分为两个函数 —— cutcombine,分别用于切分原始图片和合并并发处理后的马赛克图片。

mosaic 项目根目录下创建一个 concurrent 子目录存放并发重构代码,然后在该子目录下新建一个 handler.go 文件,重构 Masaic 函数实现代码如下:

package concurrent

import (
    "bytes"
    "encoding/base64"
    "fmt"
    "html/template"
    "image"
    "image/draw"
    "image/jpeg"
    "mosaic/common"
    "net/http"
    "os"
    "strconv"
    "sync"
    "time"
)

// 并发版本 Mosaic 处理器实现
func Mosaic(w http.ResponseWriter, r *http.Request) {
    t0 := time.Now()
    r.ParseMultipartForm(10485760)
    file, _, _ := r.FormFile("image")
    defer file.Close()
    tileSize, _ := strconv.Atoi(r.FormValue("tile_size"))
    original, _, _ := image.Decode(file)
    bounds := original.Bounds()
    db := common.CloneTilesDB()
    // 调用 cut 方法切分原始图片为 4 等份(切分后的子图片还会各自进行马赛克处理,这些都会通过协程异步执行)
    c1 := cut(original, &db, tileSize, bounds.Min.X, bounds.Min.Y, bounds.Max.X/2, bounds.Max.Y/2)
    c2 := cut(original, &db, tileSize, bounds.Max.X/2, bounds.Min.Y, bounds.Max.X, bounds.Max.Y/2)
    c3 := cut(original, &db, tileSize, bounds.Min.X, bounds.Max.Y/2, bounds.Max.X/2, bounds.Max.Y)
    c4 := cut(original, &db, tileSize, bounds.Max.X/2, bounds.Max.Y/2, bounds.Max.X, bounds.Max.Y)
    // 将上述各自进行马赛克处理的子图片合并为最终的马赛克图片(通过协程异步执行)
    c := combine(bounds, c1, c2, c3, c4)
    buf1 := new(bytes.Buffer)
    jpeg.Encode(buf1, original, nil)
    originalStr := base64.StdEncoding.EncodeToString(buf1.Bytes())

    t1 := time.Now()
    // 由于切分后图片的马赛克处理后合并都是基于协程异步调用的,
    // 所以这里从 c 通道读取最终马赛克图片会阻塞,知道该通道写入值
    images := map[string]string{
        "original": originalStr,
        "mosaic":   <-c,
        "duration": fmt.Sprintf("%v ", t1.Sub(t0)),
    }
    t, _ := template.ParseFiles("views/results.html")
    t.Execute(w, images)
}

// 按照指定坐标切分原始图片
func cut(original image.Image, db *common.DB, tileSize, x1, y1, x2, y2 int) <-chan image.Image {
    c := make(chan image.Image)
    sp := image.Point{0, 0}
    // 由于对于每块切分后的图片会单独进行马赛克处理,所以为了提升处理性能,这里引入协程异步执行
    go func() {
        newimage := image.NewNRGBA(image.Rect(x1, y1, x2, y2))
        for y := y1; y < y2; y = y + tileSize {
            for x := x1; x < x2; x = x + tileSize {
                r, g, b, _ := original.At(x, y).RGBA()
                color := [3]float64{float64(r), float64(g), float64(b)}
                nearest := db.Nearest(color)
                file, err := os.Open(nearest)
                if err == nil {
                    img, _, err := image.Decode(file)
                    if err == nil {
                        t := common.Resize(img, tileSize)
                        tile := t.SubImage(t.Bounds())
                        tileBounds := image.Rect(x, y, x+tileSize, y+tileSize)
                        draw.Draw(newimage, tileBounds, tile, sp, draw.Src)
                    } else {
                        fmt.Println("error in decoding nearest", err, nearest)
                    }
                } else {
                    fmt.Println("error opening file when creating mosaic:", nearest)
                }
                file.Close()
            }
        }
        c <- newimage.SubImage(newimage.Rect)
    }()
    
    return c
}

// 合并经过马赛克处理的子图片并对最终马赛克图片进行 base64 编码
func combine(r image.Rectangle, c1, c2, c3, c4 <-chan image.Image) <-chan string {
    c := make(chan string)
    // 由于传入的每个子图片都是异步进行马赛克处理的,调用的时候可能尚未处理完成,所以这里也通过协程异步处理合并操作
    go func() {
        var wg sync.WaitGroup
        newimage := image.NewNRGBA(r)
        // 定义子图片嵌入最终目标图片的匿名函数,将其赋值给 copy
        copy := func(dst draw.Image, r image.Rectangle, src image.Image, sp image.Point) {
            draw.Draw(dst, r, src, sp, draw.Src)
            wg.Done()
        }
        wg.Add(4)
        var s1, s2, s3, s4 image.Image
        var ok1, ok2, ok3, ok4 bool
        for {
            // 通过选择通道读取每个子图片的异步马赛克处理结果,然后再通过协程异步执行子图片嵌入最终图片的 copy 函数
            select {
            case s1, ok1 = <-c1:
                go copy(newimage, s1.Bounds(), s1, image.Point{r.Min.X, r.Min.Y})
            case s2, ok2 = <-c2:
                go copy(newimage, s2.Bounds(), s2, image.Point{r.Max.X / 2, r.Min.Y})
            case s3, ok3 = <-c3:
                go copy(newimage, s3.Bounds(), s3, image.Point{r.Min.X, r.Max.Y / 2})
            case s4, ok4 = <-c4:
                go copy(newimage, s4.Bounds(), s4, image.Point{r.Max.X / 2, r.Max.Y / 2})
            }
            if ok1 && ok2 && ok3 && ok4 {
                break
            }
        }
        // 等待所有子图片都嵌入到目标图片(将此节点视为合并操作完成)
        wg.Wait()
        buf2 := new(bytes.Buffer)
        // 将合并后的最终马赛克图片进行 base64 编码并写入 c 通道返回
        jpeg.Encode(buf2, newimage, nil)
        c <- base64.StdEncoding.EncodeToString(buf2.Bytes())
    }()
    return c
}

cutcombine 函数中具体的实现代码都是通过协程启动的,以便尽可能利用 CPU 并发处理多个子任务,即使是在 combine 中,也可以做到先处理完成的图片区域优先绘制到马赛克图片对应位置(通过 select 选择语句),不过最终对马赛克图片进行 JPEG 编码并转化为 base64 字符串还是需要等到所有子协程处理完毕,这一逻辑通过 WaitGroup 机制实现。

这里实际上使用的是 Fade-out 和 Fade-in 设计模式。

针对共享资源消除竞态条件

当然,我们的并发重构之旅到此并没有结束,并发编程并不仅仅是给需要异步执行的代码加一个 go 声明那么简单,在进行并发编程时,还需要关注一些容易触发竞态条件的共享资源,因为这里很有可能会导致一些潜在的 bug,而且不易发觉,这也是从 PHP 之类不支持并发编程语言转型过来的程序员在开始进行并发编程时容易踩的坑。

友情提示:如果你不知道什么是竞态条件,请移步 Go 入门教程并发编程锁机制篇进行了解。

在这个示例中,共享资源就是用户上传的原始图片。

当我们使用 Nearest 方法查询与指定区格平均颜色最接近的嵌入图片时,查询成功则将对应嵌入图片从数据库删除,以避免在最终的马赛克图片中出现相同的嵌入图片,在之前版本的同步代码中,这个逻辑没有任何问题,但是在重构后通过并发处理时,有可能出现不同协程同时调用 Nearest 方法的情况,如果这个时候找到的是同一个嵌入图片,就出现了竞态条件。

要消除竞态条件,可以使用互斥锁这种通用的技术手段,简称 mutex,互斥锁要求同时只能由一个协程访问指定资源,为此,我们需要修改 Nearest 方法使用互斥锁技术:

type DB struct {
    mutex *sync.Mutex
    store map[string][3]float64
}
    
func (db *DB) Nearest(target [3]float64) string {
    var filename string
    db.mutex.Lock()
    smallest := 1000000.0
    for k, v := range db.store {
        dist := distance(target, v)
        if dist < smallest {
            filename, smallest = k, dist
        }
    }
    delete(db.store, filename)
    db.mutex.Unlock()
    return filename
}
    
...
    
func CloneTilesDB() DB {
    store := make(map[string][3]float64)
    for k, v := range TILESDB {
        store[k] = v
    }
    db := DB{
        store: store,
        mutex: &sync.Mutex{},
    }
    return db
} 

我们让 DB 对象持有互斥锁,每次处理图片马赛克克隆数据库时对其进行初始化,然后将 Nearest 方法改为归属于 DB 类,在调用 Nearest 方法时,执行查询之前调用 db.mutex.Lock() 锁定资源,只能由当前协程执行后续代码,其他协程运行到这里会阻塞,直到当前协程找到嵌入图片并将其从数据库移除,再调用 db.mutex.Unlock() 释放互斥锁,其他协程才能抢占互斥锁资源进行后续处理。

这也是为什么我们将 cut 方法中之前的 Nearest 方法调用改为调用 db.Nearest 的原因。

演示新版图片马赛克功能

至此,我们就完成了所有代码重构工作,在 Web 应用入口 main 函数中,将 /mosaic 路由对应的处理器由之前的 sync.Mosaic 调整为 concurrent.MosaicTILESDB 初始化代码这一行相应包也要做调整):

package main

import (
    "fmt"
    "mosaic/common"
    "mosaic/concurrent"
    "mosaic/sync"
    "net/http"
)

func main() {
    mux := http.NewServeMux()
    files := http.FileServer(http.Dir("public"))
    mux.Handle("/static/", http.StripPrefix("/static/", files))
    mux.HandleFunc("/", sync.Upload)
    // 同步进行马赛克处理
    // mux.HandleFunc("/mosaic", sync.Mosaic)
    // 基于协程异步进行马赛克处理
    mux.HandleFunc("/mosaic", concurrent.Mosaic)
    server := &http.Server{
        Addr:    "127.0.0.1:8080",
        Handler: mux,
    }
    // 初始化嵌入图片数据库(以便在处理图片马赛克时克隆)
    common.TILESDB = common.TilesDB()
    fmt.Println("图片马赛克应用服务器已启动")
    server.ListenAndServe()
}

其他地方都不需要再做任何调整,启动图片马赛克 Web 服务:

-w696

启动成功后,在浏览器访问 http://127.0.0.1:8080,在图片上传页面选择和上篇教程同一张图片进行马赛克处理:

-w966

可以看到处理时间大幅缩短(比之前快了 75 倍左右),显然,并发重构后的代码性能更好,尤其是针对这种 CPU 密集型运算。

图片马赛克应用完整代码已提交到 Github 代码仓库:https://github.com/nonfu/mosaic


Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 基于 Go 协程实现图片马赛克应用(上):同步版本

>> 下一篇: 在 Go 语言中基于中间件避免 CSRF 攻击