Compare commits

..

No commits in common. 'master' and 'v1.0.5' have entirely different histories.

@ -2,4 +2,4 @@ module git.oa00.com/go/rabbitmq
go 1.16 go 1.16
require github.com/rabbitmq/amqp091-go v1.5.0 require github.com/rabbitmq/amqp091-go v1.4.0

@ -3,8 +3,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= github.com/rabbitmq/amqp091-go v1.4.0 h1:T2G+J9W9OY4p64Di23J6yH7tOkMocgnESvYeBjuG9cY=
github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rabbitmq/amqp091-go v1.4.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=

@ -1,7 +1,6 @@
package rabbitmq package rabbitmq
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
@ -28,13 +27,9 @@ type Config struct {
} }
// InitRabbitmq @Title 初始化rabbitmq // InitRabbitmq @Title 初始化rabbitmq
func InitRabbitmq(config Config) error { func InitRabbitmq(config Config) (err error) {
Rabbitmq.config = config Rabbitmq.config = config
err := getConn() return getConn()
if err != nil && Rabbitmq.config.ReConn {
go reConn()
}
return err
} }
// @Title 获取连接 // @Title 获取连接
@ -91,7 +86,6 @@ func (r *rabbitmq) GetChannel() (*amqp.Channel, error) {
// Publish @Title 发送消息 // Publish @Title 发送消息
// @Param exchangeName string true "指定发送的交换机" // @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容" // @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数" // @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error { func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
@ -103,47 +97,15 @@ func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
if len(delay) > 0 && delay[0] > 0 { if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000 header["x-delay"] = delay[0] * 1000
} }
return ch.PublishWithContext( return ch.Publish(
context.Background(),
exchangeName, //交换 exchangeName, //交换
"", //队列名称 "", //队列名称
false, //强制为 false, //强制为
false, //立即 false, //立即
amqp.Publishing{ amqp.Publishing{
ContentType: "text/plain", ContentType: "text/plain",
Body: []byte(msg), Body: []byte(msg),
DeliveryMode: amqp.Persistent, Headers: header,
Timestamp: time.Now(),
Headers: header,
})
}
// PublishWithContext @Title 发送消息
// @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) PublishWithContext(ctx context.Context, exchangeName, key, msg string, delay ...int64) error {
ch, err := r.GetChannel()
if err != nil {
return err
}
header := amqp.Table{}
if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000
}
return ch.PublishWithContext(
ctx,
exchangeName, //交换
key, //队列名称
false, //强制为
false, //立即
amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Body: []byte(msg),
Headers: header,
}) })
} }
@ -183,9 +145,6 @@ func (r *rabbitmq) Consume(queueName string, autoAck bool, prefetchCount int, ca
for { for {
select { select {
case d := <-msgs: case d := <-msgs:
if d.Acknowledger == nil {
return errors.New("delivery not initialized")
}
callback(d) callback(d)
case <-closeChan: case <-closeChan:
// 连接断开 // 连接断开

Loading…
Cancel
Save