|
|
|
@ -1,6 +1,7 @@
|
|
|
|
|
package rabbitmq
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
|
|
@ -27,9 +28,13 @@ type Config struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// InitRabbitmq @Title 初始化rabbitmq
|
|
|
|
|
func InitRabbitmq(config Config) (err error) {
|
|
|
|
|
func InitRabbitmq(config Config) error {
|
|
|
|
|
Rabbitmq.config = config
|
|
|
|
|
return getConn()
|
|
|
|
|
err := getConn()
|
|
|
|
|
if err != nil && Rabbitmq.config.ReConn {
|
|
|
|
|
go reConn()
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// @Title 获取连接
|
|
|
|
@ -86,6 +91,7 @@ func (r *rabbitmq) GetChannel() (*amqp.Channel, error) {
|
|
|
|
|
|
|
|
|
|
// Publish @Title 发送消息
|
|
|
|
|
// @Param exchangeName string true "指定发送的交换机"
|
|
|
|
|
// @Param key string true "路由key"
|
|
|
|
|
// @Param msg string true "发送消息内容"
|
|
|
|
|
// @Param delay int64 false "延时发送秒数"
|
|
|
|
|
func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
|
|
|
|
@ -97,15 +103,47 @@ func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
|
|
|
|
|
if len(delay) > 0 && delay[0] > 0 {
|
|
|
|
|
header["x-delay"] = delay[0] * 1000
|
|
|
|
|
}
|
|
|
|
|
return ch.Publish(
|
|
|
|
|
return ch.PublishWithContext(
|
|
|
|
|
context.Background(),
|
|
|
|
|
exchangeName, //交换
|
|
|
|
|
"", //队列名称
|
|
|
|
|
false, //强制为
|
|
|
|
|
false, //立即
|
|
|
|
|
amqp.Publishing{
|
|
|
|
|
ContentType: "text/plain",
|
|
|
|
|
Body: []byte(msg),
|
|
|
|
|
Headers: header,
|
|
|
|
|
ContentType: "text/plain",
|
|
|
|
|
Body: []byte(msg),
|
|
|
|
|
DeliveryMode: amqp.Persistent,
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
Headers: header,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// PublishWithContext @Title 发送消息
|
|
|
|
|
// @Param exchangeName string true "指定发送的交换机"
|
|
|
|
|
// @Param key string true "路由key"
|
|
|
|
|
// @Param msg string true "发送消息内容"
|
|
|
|
|
// @Param delay int64 false "延时发送秒数"
|
|
|
|
|
func (r *rabbitmq) PublishWithContext(ctx context.Context, exchangeName, key, msg string, delay ...int64) error {
|
|
|
|
|
ch, err := r.GetChannel()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
header := amqp.Table{}
|
|
|
|
|
if len(delay) > 0 && delay[0] > 0 {
|
|
|
|
|
header["x-delay"] = delay[0] * 1000
|
|
|
|
|
}
|
|
|
|
|
return ch.PublishWithContext(
|
|
|
|
|
ctx,
|
|
|
|
|
exchangeName, //交换
|
|
|
|
|
key, //队列名称
|
|
|
|
|
false, //强制为
|
|
|
|
|
false, //立即
|
|
|
|
|
amqp.Publishing{
|
|
|
|
|
ContentType: "text/plain",
|
|
|
|
|
DeliveryMode: amqp.Persistent,
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
Body: []byte(msg),
|
|
|
|
|
Headers: header,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -145,6 +183,9 @@ func (r *rabbitmq) Consume(queueName string, autoAck bool, prefetchCount int, ca
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case d := <-msgs:
|
|
|
|
|
if d.Acknowledger == nil {
|
|
|
|
|
return errors.New("delivery not initialized")
|
|
|
|
|
}
|
|
|
|
|
callback(d)
|
|
|
|
|
case <-closeChan:
|
|
|
|
|
// 连接断开
|
|
|
|
|