package rabbitmq import ( "context" "errors" "fmt" amqp "github.com/rabbitmq/amqp091-go" "sync" "time" ) var Rabbitmq = &rabbitmq{} type rabbitmq struct { config Config Conn *amqp.Connection mu sync.RWMutex ch *amqp.Channel } type Config struct { Host string Port int User string Password string ReConnSecond int // 重连间隔秒数 ReConn bool // 是否断线重连 } // InitRabbitmq @Title 初始化rabbitmq func InitRabbitmq(config Config) error { Rabbitmq.config = config err := getConn() if err != nil && Rabbitmq.config.ReConn { go reConn() } return err } // @Title 获取连接 func getConn() (err error) { Rabbitmq.Conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", Rabbitmq.config.User, Rabbitmq.config.Password, Rabbitmq.config.Host, Rabbitmq.config.Port)) if err != nil { return err } // 断线重连 if Rabbitmq.config.ReConn { cc := make(chan *amqp.Error) closeChan := Rabbitmq.Conn.NotifyClose(cc) go func() { select { case <-closeChan: if Rabbitmq.config.ReConn { reConn() } } }() } return } // @Title 自动重连 func reConn() { if err := getConn(); err != nil { // 延时重连 time.Sleep(time.Duration(Rabbitmq.config.ReConnSecond) * time.Second) reConn() } } // GetChannel @Title 获取连接 func (r *rabbitmq) GetChannel() (*amqp.Channel, error) { if r.ch != nil && !r.ch.IsClosed() { return r.ch, nil } r.mu.Lock() defer r.mu.Unlock() if r.ch != nil && !r.ch.IsClosed() { return r.ch, nil } if r.Conn == nil || r.Conn.IsClosed() { return nil, errors.New("rabbitmq not connect") } var err error r.ch, err = r.Conn.Channel() if err != nil { return nil, err } return r.ch, nil } // Publish @Title 发送消息 // @Param exchangeName string true "指定发送的交换机" // @Param key string true "路由key" // @Param msg string true "发送消息内容" // @Param delay int64 false "延时发送秒数" func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error { ch, err := r.GetChannel() if err != nil { return err } header := amqp.Table{} if len(delay) > 0 && delay[0] > 0 { header["x-delay"] = delay[0] * 1000 } return ch.PublishWithContext( context.Background(), exchangeName, //交换 "", //队列名称 false, //强制为 false, //立即 amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), DeliveryMode: amqp.Persistent, Timestamp: time.Now(), Headers: header, }) } // PublishWithContext @Title 发送消息 // @Param exchangeName string true "指定发送的交换机" // @Param key string true "路由key" // @Param msg string true "发送消息内容" // @Param delay int64 false "延时发送秒数" func (r *rabbitmq) PublishWithContext(ctx context.Context, exchangeName, key, msg string, delay ...int64) error { ch, err := r.GetChannel() if err != nil { return err } header := amqp.Table{} if len(delay) > 0 && delay[0] > 0 { header["x-delay"] = delay[0] * 1000 } return ch.PublishWithContext( ctx, exchangeName, //交换 key, //队列名称 false, //强制为 false, //立即 amqp.Publishing{ ContentType: "text/plain", DeliveryMode: amqp.Persistent, Timestamp: time.Now(), Body: []byte(msg), Headers: header, }) } // Consume @Title 消费队列 // @Param queueName string true "队列名称" // @Param autoAck bool true "是否自动确认" // @Param prefetchCount int 0 "预消费数量" // @Param callback func(amqp.Delivery)) true "回调处理" func (r *rabbitmq) Consume(queueName string, autoAck bool, prefetchCount int, callback func(amqp.Delivery)) error { ch, err := r.GetChannel() if err != nil { return err } defer ch.Close() if prefetchCount > 0 { if err := ch.Qos(prefetchCount, 0, false); err != nil { return err } } //消费队列 msgs, err := ch.Consume( queueName, //队列名称 "", //消费者 autoAck, //自动确认 false, //排他 false, false, nil, ) if err != nil { return err } cc := make(chan *amqp.Error) closeChan := ch.NotifyClose(cc) for { select { case d := <-msgs: if d.Acknowledger == nil { return errors.New("delivery not initialized") } callback(d) case <-closeChan: // 连接断开 return errors.New("rabbitmq channel close") } } } // Close @Title 关闭资源 func (r *rabbitmq) Close() { Rabbitmq.config.ReConn = false defer Rabbitmq.Conn.Close() }