Compare commits

..

11 Commits

@ -2,4 +2,4 @@ module git.oa00.com/go/rabbitmq
go 1.16
require github.com/streadway/amqp v1.0.0
require github.com/rabbitmq/amqp091-go v1.5.0

@ -1,2 +1,40 @@
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg=
github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -1,33 +1,101 @@
package rabbitmq
import (
"context"
"errors"
"fmt"
"github.com/streadway/amqp"
"log"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
var Rabbitmq = &rabbitmq{}
type rabbitmq struct {
conn *amqp.Connection
config Config
Conn *amqp.Connection
mu sync.RWMutex
ch *amqp.Channel
}
type Config struct {
Host string
Port int
User string
Password string
ReConnSecond int // 重连间隔秒数
ReConn bool // 是否断线重连
}
// InitRabbitmq @Title 初始化rabbitmq
func InitRabbitmq(host string, port int, user, password string) {
var err error
Rabbitmq.conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", user, password, host, port))
func InitRabbitmq(config Config) error {
Rabbitmq.config = config
err := getConn()
if err != nil && Rabbitmq.config.ReConn {
go reConn()
}
return err
}
// @Title 获取连接
func getConn() (err error) {
Rabbitmq.Conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", Rabbitmq.config.User, Rabbitmq.config.Password, Rabbitmq.config.Host, Rabbitmq.config.Port))
if err != nil {
log.Fatal("rabbitmq链接失败err:", err)
return err
}
// 断线重连
if Rabbitmq.config.ReConn {
cc := make(chan *amqp.Error)
closeChan := Rabbitmq.Conn.NotifyClose(cc)
go func() {
select {
case <-closeChan:
if Rabbitmq.config.ReConn {
reConn()
}
}
}()
}
return
}
// @Title 自动重连
func reConn() {
if err := getConn(); err != nil {
// 延时重连
time.Sleep(time.Duration(Rabbitmq.config.ReConnSecond) * time.Second)
reConn()
}
}
// GetChannel @Title 获取连接
func (r *rabbitmq) GetChannel() (*amqp.Channel, error) {
if r.ch != nil && !r.ch.IsClosed() {
return r.ch, nil
}
r.mu.Lock()
defer r.mu.Unlock()
if r.ch != nil && !r.ch.IsClosed() {
return r.ch, nil
}
if r.Conn == nil || r.Conn.IsClosed() {
return nil, errors.New("rabbitmq not connect")
}
var err error
r.ch, err = r.Conn.Channel()
if err != nil {
return nil, err
}
return r.ch, nil
}
// Publish @Title 发送消息
// @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
ch, err := r.conn.Channel()
ch, err := r.GetChannel()
if err != nil {
return err
}
@ -35,27 +103,66 @@ func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000
}
return ch.Publish(
return ch.PublishWithContext(
context.Background(),
exchangeName, //交换
"", //队列名称
false, //强制为
false, //立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
Headers: header,
ContentType: "text/plain",
Body: []byte(msg),
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Headers: header,
})
}
// PublishWithContext @Title 发送消息
// @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) PublishWithContext(ctx context.Context, exchangeName, key, msg string, delay ...int64) error {
ch, err := r.GetChannel()
if err != nil {
return err
}
header := amqp.Table{}
if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000
}
return ch.PublishWithContext(
ctx,
exchangeName, //交换
key, //队列名称
false, //强制为
false, //立即
amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Body: []byte(msg),
Headers: header,
})
}
// Consume @Title 消费队列
// @Param queueName string true "队列名称"
// @Param autoAck bool true "是否自动确认"
// @Param prefetchCount int 0 "预消费数量"
// @Param callback func(amqp.Delivery)) true "回调处理"
func (r *rabbitmq) Consume(queueName string, autoAck bool, callback func(amqp.Delivery)) error {
ch, err := r.conn.Channel()
func (r *rabbitmq) Consume(queueName string, autoAck bool, prefetchCount int, callback func(amqp.Delivery)) error {
ch, err := r.GetChannel()
if err != nil {
return err
}
defer ch.Close()
if prefetchCount > 0 {
if err := ch.Qos(prefetchCount, 0, false); err != nil {
return err
}
}
//消费队列
msgs, err := ch.Consume(
queueName, //队列名称
@ -70,17 +177,25 @@ func (r *rabbitmq) Consume(queueName string, autoAck bool, callback func(amqp.De
return err
}
forever := make(chan bool)
go func() {
for d := range msgs {
cc := make(chan *amqp.Error)
closeChan := ch.NotifyClose(cc)
for {
select {
case d := <-msgs:
if d.Acknowledger == nil {
return errors.New("delivery not initialized")
}
callback(d)
case <-closeChan:
// 连接断开
return errors.New("rabbitmq channel close")
}
}()
<-forever
return nil
}
}
// Close @Title 关闭资源
func (r *rabbitmq) Close() {
defer Rabbitmq.conn.Close()
Rabbitmq.config.ReConn = false
defer Rabbitmq.Conn.Close()
}

Loading…
Cancel
Save