You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rabbitmq/rabbitmq.go

202 lines
4.4 KiB

package rabbitmq
import (
"context"
"errors"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
var Rabbitmq = &rabbitmq{}
type rabbitmq struct {
config Config
Conn *amqp.Connection
mu sync.RWMutex
ch *amqp.Channel
}
type Config struct {
Host string
Port int
User string
Password string
ReConnSecond int // 重连间隔秒数
ReConn bool // 是否断线重连
}
// InitRabbitmq @Title 初始化rabbitmq
func InitRabbitmq(config Config) error {
Rabbitmq.config = config
err := getConn()
if err != nil && Rabbitmq.config.ReConn {
go reConn()
}
return err
}
// @Title 获取连接
func getConn() (err error) {
Rabbitmq.Conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", Rabbitmq.config.User, Rabbitmq.config.Password, Rabbitmq.config.Host, Rabbitmq.config.Port))
if err != nil {
return err
}
// 断线重连
if Rabbitmq.config.ReConn {
cc := make(chan *amqp.Error)
closeChan := Rabbitmq.Conn.NotifyClose(cc)
go func() {
select {
case <-closeChan:
if Rabbitmq.config.ReConn {
reConn()
}
}
}()
}
return
}
// @Title 自动重连
func reConn() {
if err := getConn(); err != nil {
// 延时重连
time.Sleep(time.Duration(Rabbitmq.config.ReConnSecond) * time.Second)
reConn()
}
}
// GetChannel @Title 获取连接
func (r *rabbitmq) GetChannel() (*amqp.Channel, error) {
if r.ch != nil && !r.ch.IsClosed() {
return r.ch, nil
}
r.mu.Lock()
defer r.mu.Unlock()
if r.ch != nil && !r.ch.IsClosed() {
return r.ch, nil
}
if r.Conn == nil || r.Conn.IsClosed() {
return nil, errors.New("rabbitmq not connect")
}
var err error
r.ch, err = r.Conn.Channel()
if err != nil {
return nil, err
}
return r.ch, nil
}
// Publish @Title 发送消息
// @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error {
ch, err := r.GetChannel()
if err != nil {
return err
}
header := amqp.Table{}
if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000
}
return ch.PublishWithContext(
context.Background(),
exchangeName, //交换
"", //队列名称
false, //强制为
false, //立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Headers: header,
})
}
// PublishWithContext @Title 发送消息
// @Param exchangeName string true "指定发送的交换机"
// @Param key string true "路由key"
// @Param msg string true "发送消息内容"
// @Param delay int64 false "延时发送秒数"
func (r *rabbitmq) PublishWithContext(ctx context.Context, exchangeName, key, msg string, delay ...int64) error {
ch, err := r.GetChannel()
if err != nil {
return err
}
header := amqp.Table{}
if len(delay) > 0 && delay[0] > 0 {
header["x-delay"] = delay[0] * 1000
}
return ch.PublishWithContext(
ctx,
exchangeName, //交换
key, //队列名称
false, //强制为
false, //立即
amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Body: []byte(msg),
Headers: header,
})
}
// Consume @Title 消费队列
// @Param queueName string true "队列名称"
// @Param autoAck bool true "是否自动确认"
// @Param prefetchCount int 0 "预消费数量"
// @Param callback func(amqp.Delivery)) true "回调处理"
func (r *rabbitmq) Consume(queueName string, autoAck bool, prefetchCount int, callback func(amqp.Delivery)) error {
ch, err := r.GetChannel()
if err != nil {
return err
}
defer ch.Close()
if prefetchCount > 0 {
if err := ch.Qos(prefetchCount, 0, false); err != nil {
return err
}
}
//消费队列
msgs, err := ch.Consume(
queueName, //队列名称
"", //消费者
autoAck, //自动确认
false, //排他
false,
false,
nil,
)
if err != nil {
return err
}
cc := make(chan *amqp.Error)
closeChan := ch.NotifyClose(cc)
for {
select {
case d := <-msgs:
if d.Acknowledger == nil {
return errors.New("delivery not initialized")
}
callback(d)
case <-closeChan:
// 连接断开
return errors.New("rabbitmq channel close")
}
}
}
// Close @Title 关闭资源
func (r *rabbitmq) Close() {
Rabbitmq.config.ReConn = false
defer Rabbitmq.Conn.Close()
}