分布式任务队列machinery的使用

Tags: golang 

目录

说明

分布式任务队列是大型系统中经常用的技术方案,是一种高效、可靠性高,能够承受海量并发的技术方案。

目前有个名为machinery的开源项目,是用go语言开发了一个分布式任务框架。

学习代码位于machinery study code

快速体验machinery

machinery需要外部的broker和result backend。

依赖的服务

broker目前支持:

amqp :    amqp://[username:password@]@host[:port]
redis :   redis://[password@]host[port][/db_num]
          redis+socket://[password@]/path/to/file.sock[:/db_num]

result backend目前支持:

amqp :    amqp://[username:password@]@host[:port]
redis :   redis://[password@]host[port][/db_num]
          redis+socket://[password@]/path/to/file.sock[:/db_num]
memcache: memcache://host1[:port1][,host2[:port2],...[,hostN[:portN]]]
mongodb:  mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

这里使用rabbitmq和redis,均部署在本地: 在mac上部署redis在mac上部署rabbitmq

运行example

直接用go get获取项目代码:

go get github.com/RichardKnop/machinery/v1

在machinery项目目录下,有一个example目录,是一个使用示例。

启动worker:

go run example/machinery.go worker

启动send:

go run example/machinery.go send

支持的数据类型

machinery将任务编码成json后发送,task执行的结果也以json的方式保存到backend。当前只支持一下类型:

bool
int
int8
int16
int32
int64
uint
uint8
uint16
uint32
uint64
float32
float64
string

特别在写task函数的时候,需要注意task的返回值也必须是这些类型,除了最后一个error。

开发worker

worker是用来执行任务的,worker启动的时候注册它能够承担的任务。

首先要创建一个server:

var cnf = &config.Config{
	Broker:        "amqp://guest:guest@localhost:5672/",
	DefaultQueue:  "machinery_tasks",
	ResultBackend: "redis://127.0.0.1:6379",
	AMQP: &config.AMQPConfig{
		Exchange:     "machinery_exchange",
		ExchangeType: "direct",
		BindingKey:   "machinery_task",
	},
}

//init server
server, err := machinery.NewServer(cnf)
if err != nil {
	log.Fatal(err)
}

在server中注册要承担的task:

//regist task
server.RegisterTask("HelloWorld", HelloWorld)

每个task都是一个函数,task函数返回的最后一个参数必须是error,例如:

func HelloWorld(arg string) (string, error) {
	return "Hi, i'm worker@localhost", nil
}

最后创建worker,并启动:

//create worker
worker := server.NewWorker("worker@localhost", 10)
err = worker.Launch()
if err != nil {
	log.Fatal(err)
}

运行worker:

$go run worker.go
INFO: 2017/11/06 13:59:45 worker.go:31 Launching a worker with the following settings:
INFO: 2017/11/06 13:59:45 worker.go:32 - Broker: amqp://guest:guest@localhost:5672/
INFO: 2017/11/06 13:59:45 worker.go:33 - DefaultQueue: machinery_tasks
INFO: 2017/11/06 13:59:45 worker.go:34 - ResultBackend: redis://127.0.0.1:6379
INFO: 2017/11/06 13:59:45 worker.go:36 - AMQP: machinery_exchange
INFO: 2017/11/06 13:59:45 worker.go:37   - Exchange: machinery_exchange
INFO: 2017/11/06 13:59:45 worker.go:38   - ExchangeType: direct
INFO: 2017/11/06 13:59:45 worker.go:39   - BindingKey: machinery_task
INFO: 2017/11/06 13:59:45 worker.go:40   - PrefetchCount: 0
INFO: 2017/11/06 13:59:45 amqp.go:72 [*] Waiting for messages. To exit press CTRL+C

开发Sender

发送端用来向worker发送任务。

发送端也需要像worker一样创建server,在server注册task,需要与worker保持一致。

var cnf = &config.Config{
	Broker:        "amqp://guest:guest@localhost:5672/",
	DefaultQueue:  "machinery_tasks",
	ResultBackend: "redis://127.0.0.1:6379",
	AMQP: &config.AMQPConfig{
		Exchange:     "machinery_exchange",
		ExchangeType: "direct",
		BindingKey:   "machinery_task",
	},
}

//init server
server, err := machinery.NewServer(cnf)
if err != nil {
	log.Fatal(err)
}

//regist task
server.RegisterTask("HelloWorld", HelloWorld)

sender要发送的每个task用Signature描述:

type Signature struct {
  UUID           string
  Name           string
  RoutingKey     string
  ETA            *time.Time
  GroupUUID      string
  GroupTaskCount int
  Args           []Arg
  Headers        Headers
  Immutable      bool
  RetryCount     int
  RetryTimeout   int
  OnSuccess      []*Signature
  OnError        []*Signature
  ChordCallback  *Signature
}

发送task——HelloWorld:

//task signature
signature := &tasks.Signature{
	Name: "HelloWorld",
	Args: []tasks.Arg{
		{
			Type:  "string",
			Value: "task1",
		},
	},
}

asyncResult, err := server.SendTask(signature)
if err != nil {
	log.Fatal(err)
}

等待任务执行的结果:

res, err := asyncResult.Get(1)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("%s\n", res)

编排Task

machinery支持task编排,即workflow,支持以下几种workflow:

Group:     Group中的多个task并行的执行
Chord:     Group中的任务都执行完成后,回调Chord中指定的task
Chain:     所有的task串行执行,一个执行完成后,执行下一个

具体情况,参考github: machinery

参考

  1. github: machinery
  2. 在mac上部署redis
  3. 在mac上部署rabbitmq
  4. machinery study code

推荐阅读

Copyright @2011-2019 All rights reserved. 转载请添加原文连接,合作请加微信lijiaocn或者发送邮件: [email protected],备注网站合作

友情链接:  系统软件  程序语言  运营经验  水库文集  网络课程  微信网文  发现知识星球