You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
202 lines
4.4 KiB
202 lines
4.4 KiB
package rabbitmq
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var Rabbitmq = &rabbitmq{}
|
|
|
|
type rabbitmq struct {
|
|
config Config
|
|
Conn *amqp.Connection
|
|
mu sync.RWMutex
|
|
ch *amqp.Channel
|
|
}
|
|
|
|
type Config struct {
|
|
Host string
|
|
Port int
|
|
User string
|
|
Password string
|
|
ReConnSecond int // 重连间隔秒数
|
|
ReConn bool // 是否断线重连
|
|
}
|
|
|
|
// InitRabbitmq @Title 初始化rabbitmq
|
|
func InitRabbitmq(config Config) error {
|
|
Rabbitmq.config = config
|
|
err := getConn()
|
|
if err != nil && Rabbitmq.config.ReConn {
|
|
go reConn()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// @Title 获取连接
|
|
func getConn() (err error) {
|
|
Rabbitmq.Conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", Rabbitmq.config.User, Rabbitmq.config.Password, Rabbitmq.config.Host, Rabbitmq.config.Port))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// 断线重连
|
|
if Rabbitmq.config.ReConn {
|
|
cc := make(chan *amqp.Error)
|
|
closeChan := Rabbitmq.Conn.NotifyClose(cc)
|
|
go func() {
|
|
select {
|
|
case <-closeChan:
|
|
if Rabbitmq.config.ReConn {
|
|
reConn()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
return
|
|
}
|
|
|
|
// @Title 自动重连
|
|
func reConn() {
|
|
if err := getConn(); err != nil {
|
|
// 延时重连
|
|
time.Sleep(time.Duration(Rabbitmq.config.ReConnSecond) * time.Second)
|
|
reConn()
|
|
}
|
|
}
|
|
|
|
// GetChannel @Title 获取连接
|
|
func (r *rabbitmq) GetChannel() (*amqp.Channel, error) {
|
|
if r.ch != nil && !r.ch.IsClosed() {
|
|
return r.ch, nil
|
|
}
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if r.ch != nil && !r.ch.IsClosed() {
|
|
return r.ch, nil
|
|
}
|
|
if r.Conn == nil || r.Conn.IsClosed() {
|
|
return nil, errors.New("rabbitmq not connect")
|
|
}
|
|
var err error
|
|
r.ch, err = r.Conn.Channel()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return r.ch, nil
|
|
}
|
|
|
|
// Publish @Title 发送消息
|
|
// @Param exchangeName string true "指定发送的交换机"
|
|
// @Param key string true "路由key"
|
|
// @Param msg string true "发送消息内容"
|
|
// @Param delay int64 false "延时发送秒数"
|
|
func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
|
|
ch, err := r.GetChannel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
header := amqp.Table{}
|
|
if len(delay) > 0 && delay[0] > 0 {
|
|
header["x-delay"] = delay[0] * 1000
|
|
}
|
|
return ch.PublishWithContext(
|
|
context.Background(),
|
|
exchangeName, //交换
|
|
"", //队列名称
|
|
false, //强制为
|
|
false, //立即
|
|
amqp.Publishing{
|
|
ContentType: "text/plain",
|
|
Body: []byte(msg),
|
|
DeliveryMode: amqp.Persistent,
|
|
Timestamp: time.Now(),
|
|
Headers: header,
|
|
})
|
|
}
|
|
|
|
// PublishWithContext @Title 发送消息
|
|
// @Param exchangeName string true "指定发送的交换机"
|
|
// @Param key string true "路由key"
|
|
// @Param msg string true "发送消息内容"
|
|
// @Param delay int64 false "延时发送秒数"
|
|
func (r *rabbitmq) PublishWithContext(ctx context.Context, exchangeName, key, msg string, delay ...int64) error {
|
|
ch, err := r.GetChannel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
header := amqp.Table{}
|
|
if len(delay) > 0 && delay[0] > 0 {
|
|
header["x-delay"] = delay[0] * 1000
|
|
}
|
|
return ch.PublishWithContext(
|
|
ctx,
|
|
exchangeName, //交换
|
|
key, //队列名称
|
|
false, //强制为
|
|
false, //立即
|
|
amqp.Publishing{
|
|
ContentType: "text/plain",
|
|
DeliveryMode: amqp.Persistent,
|
|
Timestamp: time.Now(),
|
|
Body: []byte(msg),
|
|
Headers: header,
|
|
})
|
|
}
|
|
|
|
// Consume @Title 消费队列
|
|
// @Param queueName string true "队列名称"
|
|
// @Param autoAck bool true "是否自动确认"
|
|
// @Param prefetchCount int 0 "预消费数量"
|
|
// @Param callback func(amqp.Delivery)) true "回调处理"
|
|
func (r *rabbitmq) Consume(queueName string, autoAck bool, prefetchCount int, callback func(amqp.Delivery)) error {
|
|
ch, err := r.GetChannel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer ch.Close()
|
|
if prefetchCount > 0 {
|
|
if err := ch.Qos(prefetchCount, 0, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
//消费队列
|
|
msgs, err := ch.Consume(
|
|
queueName, //队列名称
|
|
"", //消费者
|
|
autoAck, //自动确认
|
|
false, //排他
|
|
false,
|
|
false,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cc := make(chan *amqp.Error)
|
|
closeChan := ch.NotifyClose(cc)
|
|
|
|
for {
|
|
select {
|
|
case d := <-msgs:
|
|
if d.Acknowledger == nil {
|
|
return errors.New("delivery not initialized")
|
|
}
|
|
callback(d)
|
|
case <-closeChan:
|
|
// 连接断开
|
|
return errors.New("rabbitmq channel close")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close @Title 关闭资源
|
|
func (r *rabbitmq) Close() {
|
|
Rabbitmq.config.ReConn = false
|
|
defer Rabbitmq.Conn.Close()
|
|
}
|