Home>

I have been using qiniu's storage service before,Generate thumbnails of pictures,Blur image, webp of video, now need to move storage to s3, then these images,Video processing is about to write by yourself,This article sorts out the general idea.

Analyze requirements

First look at how qiniu's interface handles pictures,For example, first take a picture of the first second of the video,Thumbnail the picture again,Finally stored in a new key, the command can be written like thisvframe/jpg/offset/1 | imagemogr2/thumbnail/400x | saveas/xxx , you can see that the three operations are separated by the | symbol,Similar to unix pipe operation.

The above operation is counted as one cmd. One api request can process multiple cmds at the same time. The cmds are separated by semicolons. After processing,Return the processing result in the callback,E.g

{
 "id":"xxxxx", "pipeline":"xxx", "code":0, "desc":"the fop was completed successfully", "reqid":"xtsaafnxubr5j10u", "inputbucket":"xxx", "inputkey":"xxxxx", "items":[
 {
 "cmd":"vframe/jpg/offset/1 | imagemogr2/thumbnail/400x | saveas/zmftzs1wcml2yxrlom1vbwvudc9jb3zlci9zbmfwl3zpzgvvl2m5yzdjzjq5ltu3ngqtngzzlzz11zdzz1
 "code":0, "desc":"the fop was completed successfully", "hash":"fhdn6v8ei4vw4xjgalsfxutvmeiv", "key":"xx", "returnold":0
 }, {
 "cmd":"vframe/jpg/offset/1 | imagemogr2/thumbnail/400x | imagemogr2/blur/45x8 | saveas/zmftzs1wcml2yxrlom1vbwvudc9jb3zlci9zbmfwl3=zpzgvvl2m5yzdjzzzzz1zqz1zkwz1zkwz1zkwz1zkjzzqzzrzqz1
 "code":0, "desc":"the fop was completed successfully", "hash":"fgnirzrcsa7tzx1xvsb_4d5tiak3", "key":"xxx", "returnold":0
 }
 ]
}

Breaking down requirements

This program needs roughly these parts:

An http interface to accept tasks,Throw the task into the queue after accepting it,Returns a job id. worker processes tasks asynchronously,The number of workers and the number of parallel processes per worker can be configured.Workers have a retry mechanism.

Parsing the tasks to be done from the job payload,Parse out each cmd, preferably execute each cmd in parallel, and record the results of each cmd

There are multiple operations in each cmd, and they are connected with a pipe. The output of the previous operation is the input of the next operation.

You can think of 1 and 2, 3 separately.1 is more independent,I wrote a model of a worker before, referring to this article handling 1 million requests per minute with go, which is more detailed,It uses a go channel as the queue. I added a beanstalk as a queue providor. Another improvement is thatThe article only provides the setting of the number of workers.I added another parameter,Set the number of coroutines that each worker can execute in parallel.So the following mainly talks about 3 and 2 solutions

pipe

You can refer to this library pipe, the usage is as follows:

p:= pipe.line (
 pipe.readfile ("test.png"), resize (300, 300), blur (0.5),)
</p>
<p>
output, err:= pipe.combinedoutput (p)
if err!=nil {
 fmt.printf ("%v \ n", err)
}
</p>
<p>
buf:= bytes.newbuffer (output)
img, _:= imaging.decode (buf)
</p>
<p>
imaging.save (img, "test_a.png")

It's more convenient.Create a cmd struct, use regular matching for the parameters of each operation, put a [] op slice, and finally execute it.The struct and method are as follows:

type cmd struct {
 cmd string
 saveas string
 ops [] op
 err error
}
</p>
<p>
type op interface {
 getpipe () pipe.pipe
}
</p>
<p>
type resizeop struct {
 width, height int
}
</p>
<p>
func (c resizeop) getpipe () pipe.pipe {
 return resize (c.width, c.height)
}
</p>
<p>
//Instructions
cmdstr:= `file/test.png | thumbnail/x300 | blur/20x8`
cmd:= cmd {cmdstr, "test_b.png", nil, nil}
</p>
<p>
cmd.parse ()
cmd.doops ()
sync.waitgroup

After the single cmd processing is resolved,Is the parallel problem of multiple cmd,Nothing to think about,Just use sync.waitgroup to solve it perfectly.step by step,Let's first look at the use of this struct:

func main () {
 cmds:= [] string {}
 for i:= 0;i<10000;i ++ {
 cmds=append (cmds, fmt.sprintf ("cmd-%d", i))
 }
</p>
<p>
results:= handlecmds (cmds)
</p>
<p>
fmt.println (len (results)) //10000
}
</p>
<p>
func docmd (cmd string) string {
 return fmt.sprintf ("cmd =%s", cmd)
}
</p>
<p>
func handlecmds (cmds [] string) (results [] string) {
 fmt.println (len (cmds)) //10000
 var count uint64
</p>
<p>
group:= sync.waitgroup ()
 lock:= sync.mutex {}
 for _, item:= range cmds {
 //count up by one
 group.add (1)
 go func (cmd string) {
 result:= docmd (cmd)
 atomic.adduint64 (&count, 1)
</p>
<p>
lock.lock ()
 results=append (results, result)
 lock.unlock ()
 //Count down by one
 group.done ()
 } (item)
 }
</p>
<p>
//blocked
 group.wait ()
</p>
<p>
fmt.printf ("count =%d \ n", count) //10000
 return
}

group is essentially a counter,When counting>0, group.wait () will block until counting == 0. One more thing to note here,The result=append (results, result) operation is thread-unsafe.Clear here the results are shared,Need to lock to ensure synchronization,Otherwise the final len (results) is not 10000.

We build a benchcmd to store cmds. As follows:

type benchcmd struct {
 cmds [] cmd
 waitgroup sync.waitgroup
 errs [] error
 lock sync.mutex
}
</p>
<p>
func (b * benchcmd) docmds () {
 for _, item:= range b.cmds {
 b.waitgroup.add (1)
</p>
<p>
go func (cmd cmd) {
 cmd.parse ()
 err:= cmd.doops ()
</p>
<p>
b.lock.lock ()
 b.errs=append (b.errs, err)
 b.lock.unlock ()
</p>
<p>
b.waitgroup.done ()
 } (item)
 }
</p>
<p>
b.waitgroup.wait ()
}

The final call looks like this:

var cmds [] cmd
cmd_a:= cmd {`file/test.png | thumbnail/x300 | blur/20x8`," test_a.png ", nil, nil}
cmd_b:= cmd {`file/test.png | thumbnail/500x1000 | blur/20x108`," test_b.png ", nil, nil}
cmd_c:= cmd {`file/test.png | thumbnail/300x300`," test_c.png ", nil, nil}
</p>
<p>
cmds=append (cmds, cmd_a)
cmds=append (cmds, cmd_b)
cmds=append (cmds, cmd_c)
</p>
<p>
bench:= benchcmd {
 cmds:cmds, waitgroup:sync.waitgroup {}, lock:sync.mutex {},}
</p>
<p>
bench.docmds ()
</p>
<p>
fmt.println (bench.errs)

This is just a preliminary experiment,Thinking is not comprehensive enough,And just imitating the api, qiniu should not do this,Lower coupling,Maybe each cmd has its own cluster,Then this pipe cannot be solved temporarily,The current limitation is that each cmd must be in a process.

  • Previous A complete example of the placeholder effect implemented by jQuery
  • Next AngularJS basic ng-keydown directive simple example