From 84df6f12f35271ca39e678f30aeed87b10439462 Mon Sep 17 00:00:00 2001 From: kanade <3136520963@qq.com> Date: Tue, 27 Apr 2021 14:22:57 +0800 Subject: [PATCH] init --- README.md | 1 + go.mod | 5 ++++ go.sum | 2 ++ rabbitmq.go | 86 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 94 insertions(+) create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 rabbitmq.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..5676a1f --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# 消息队列rabbitmq工具 \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..eadf004 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module git.oa00.com/go/rabbitmq + +go 1.16 + +require github.com/streadway/amqp v1.0.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..75f2157 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/rabbitmq.go b/rabbitmq.go new file mode 100644 index 0000000..442f6da --- /dev/null +++ b/rabbitmq.go @@ -0,0 +1,86 @@ +package rabbitmq + +import ( + "fmt" + "github.com/streadway/amqp" + "log" +) + +var Rabbitmq = &rabbitmq{} + +type rabbitmq struct { + conn *amqp.Connection +} + +// InitRabbitmq @Title 初始化rabbitmq +func InitRabbitmq(host string, port int, user, password string) { + var err error + Rabbitmq.conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", user, password, host, port)) + if err != nil { + log.Fatal("rabbitmq链接失败,err:", err) + } + return +} + +// Publish @Title 发送消息 +// @Param exchangeName string true "指定发送的交换机" +// @Param msg string true "发送消息内容" +// @Param delay int64 false "延时发送秒数" +func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error { + ch, err := r.conn.Channel() + if err != nil { + return err + } + header := amqp.Table{} + if len(delay) > 0 && delay[0] > 0 { + header["x-delay"] = delay[0] * 1000 + } + return ch.Publish( + exchangeName, //交换 + "", //队列名称 + false, //强制为 + false, //立即 + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(msg), + Headers: header, + }) +} + +// Consume @Title 消费队列 +// @Param queueName string true "队列名称" +// @Param autoAck bool true "是否自动确认" +// @Param callback func(amqp.Delivery)) true "回调处理" +func (r *rabbitmq) Consume(queueName string, autoAck bool, callback func(amqp.Delivery)) error { + ch, err := r.conn.Channel() + if err != nil { + return err + } + //消费队列 + msgs, err := ch.Consume( + queueName, //队列名称 + "", //消费者 + autoAck, //自动确认 + false, //排他 + false, + false, + nil, + ) + if err != nil { + return err + } + + forever := make(chan bool) + go func() { + for d := range msgs { + callback(d) + } + }() + <-forever + return nil +} + +// Close @Title 关闭资源 +func (r *rabbitmq) Close() { + defer Rabbitmq.conn.Close() +}