更新版本

master
杨赟 2 years ago
parent b1fd72ef4b
commit a50222d8ef

@ -1,6 +1,7 @@
package rabbitmq
import (
"context"
"errors"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
@ -86,6 +87,7 @@ func (r *rabbitmq) GetChannel() (*amqp.Channel, error) {
// Publish @Title 发送消息
// @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
@ -97,7 +99,8 @@ func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000
}
return ch.Publish(
return ch.PublishWithContext(
context.Background(),
exchangeName, //交换
"", //队列名称
false, //强制为
@ -109,6 +112,33 @@ func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
})
}
// PublishWithContext @Title 发送消息
// @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) PublishWithContext(ctx context.Context, exchangeName, key, msg string, delay ...int64) error {
ch, err := r.GetChannel()
if err != nil {
return err
}
header := amqp.Table{}
if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000
}
return ch.PublishWithContext(
ctx,
exchangeName, //交换
key, //队列名称
false, //强制为
false, //立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
Headers: header,
})
}
// Consume @Title 消费队列
// @Param queueName string true "队列名称"
// @Param autoAck bool true "是否自动确认"

Loading…
Cancel
Save