Compare commits

..

6 Commits

@ -2,4 +2,4 @@ module git.oa00.com/go/rabbitmq
go 1.16
require github.com/rabbitmq/amqp091-go v1.4.0
require github.com/rabbitmq/amqp091-go v1.5.0

@ -3,8 +3,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.4.0 h1:T2G+J9W9OY4p64Di23J6yH7tOkMocgnESvYeBjuG9cY=
github.com/rabbitmq/amqp091-go v1.4.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg=
github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=

@ -1,6 +1,7 @@
package rabbitmq
import (
"context"
"errors"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
@ -27,9 +28,13 @@ type Config struct {
}
// InitRabbitmq @Title 初始化rabbitmq
func InitRabbitmq(config Config) (err error) {
func InitRabbitmq(config Config) error {
Rabbitmq.config = config
return getConn()
err := getConn()
if err != nil && Rabbitmq.config.ReConn {
go reConn()
}
return err
}
// @Title 获取连接
@ -86,6 +91,7 @@ func (r *rabbitmq) GetChannel() (*amqp.Channel, error) {
// Publish @Title 发送消息
// @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
@ -97,7 +103,8 @@ func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000
}
return ch.Publish(
return ch.PublishWithContext(
context.Background(),
exchangeName, //交换
"", //队列名称
false, //强制为
@ -105,6 +112,37 @@ func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Headers: header,
})
}
// PublishWithContext @Title 发送消息
// @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) PublishWithContext(ctx context.Context, exchangeName, key, msg string, delay ...int64) error {
ch, err := r.GetChannel()
if err != nil {
return err
}
header := amqp.Table{}
if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000
}
return ch.PublishWithContext(
ctx,
exchangeName, //交换
key, //队列名称
false, //强制为
false, //立即
amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Body: []byte(msg),
Headers: header,
})
}
@ -145,6 +183,9 @@ func (r *rabbitmq) Consume(queueName string, autoAck bool, prefetchCount int, ca
for {
select {
case d := <-msgs:
if d.Acknowledger == nil {
return errors.New("delivery not initialized")
}
callback(d)
case <-closeChan:
// 连接断开

Loading…
Cancel
Save