package rabbitmq import ( "fmt" "github.com/streadway/amqp" ) var Rabbitmq = &rabbitmq{} type rabbitmq struct { Conn *amqp.Connection } // InitRabbitmq @Title 初始化rabbitmq func InitRabbitmq(host string, port int, user, password string) (err error) { Rabbitmq.Conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", user, password, host, port)) return } // Publish @Title 发送消息 // @Param exchangeName string true "指定发送的交换机" // @Param msg string true "发送消息内容" // @Param delay int64 false "延时发送秒数" func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error { ch, err := r.Conn.Channel() if err != nil { return err } defer ch.Close() header := amqp.Table{} if len(delay) > 0 && delay[0] > 0 { header["x-delay"] = delay[0] * 1000 } return ch.Publish( exchangeName, //交换 "", //队列名称 false, //强制为 false, //立即 amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), Headers: header, }) } // Consume @Title 消费队列 // @Param queueName string true "队列名称" // @Param autoAck bool true "是否自动确认" // @Param prefetchCount int 0 "预消费数量" // @Param callback func(amqp.Delivery)) true "回调处理" func (r *rabbitmq) Consume(queueName string, autoAck bool, prefetchCount int, callback func(amqp.Delivery)) error { ch, err := r.Conn.Channel() if err != nil { return err } defer ch.Close() if prefetchCount > 0 { if err := ch.Qos(prefetchCount, 0, false); err != nil { return err } } //消费队列 msgs, err := ch.Consume( queueName, //队列名称 "", //消费者 autoAck, //自动确认 false, //排他 false, false, nil, ) if err != nil { return err } forever := make(chan bool) go func() { for d := range msgs { callback(d) } }() <-forever return nil } // Close @Title 关闭资源 func (r *rabbitmq) Close() { defer Rabbitmq.Conn.Close() }