From a50222d8ef7d47b8d701debdb62428c7e527c904 Mon Sep 17 00:00:00 2001 From: kanade Date: Mon, 17 Oct 2022 10:37:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rabbitmq.go | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/rabbitmq.go b/rabbitmq.go index 5bf4239..c90b9e0 100644 --- a/rabbitmq.go +++ b/rabbitmq.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "context" "errors" "fmt" amqp "github.com/rabbitmq/amqp091-go" @@ -86,6 +87,7 @@ func (r *rabbitmq) GetChannel() (*amqp.Channel, error) { // Publish @Title 发送消息 // @Param exchangeName string true "指定发送的交换机" +// @Param key string true "路由key" // @Param msg string true "发送消息内容" // @Param delay int64 false "延时发送秒数" func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error { @@ -97,7 +99,8 @@ func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error { if len(delay) > 0 && delay[0] > 0 { header["x-delay"] = delay[0] * 1000 } - return ch.Publish( + return ch.PublishWithContext( + context.Background(), exchangeName, //交换 "", //队列名称 false, //强制为 @@ -109,6 +112,33 @@ func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error { }) } +// PublishWithContext @Title 发送消息 +// @Param exchangeName string true "指定发送的交换机" +// @Param key string true "路由key" +// @Param msg string true "发送消息内容" +// @Param delay int64 false "延时发送秒数" +func (r *rabbitmq) PublishWithContext(ctx context.Context, exchangeName, key, msg string, delay ...int64) error { + ch, err := r.GetChannel() + if err != nil { + return err + } + header := amqp.Table{} + if len(delay) > 0 && delay[0] > 0 { + header["x-delay"] = delay[0] * 1000 + } + return ch.PublishWithContext( + ctx, + exchangeName, //交换 + key, //队列名称 + false, //强制为 + false, //立即 + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(msg), + Headers: header, + }) +} + // Consume @Title 消费队列 // @Param queueName string true "队列名称" // @Param autoAck bool true "是否自动确认"