package rabbitmq import ( "fmt" "github.com/streadway/amqp" "log" ) var Rabbitmq = &rabbitmq{} type rabbitmq struct { conn *amqp.Connection } // InitRabbitmq @Title 初始化rabbitmq func InitRabbitmq(host string, port int, user, password string) { var err error Rabbitmq.conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", user, password, host, port)) if err != nil { log.Fatal("rabbitmq链接失败,err:", err) } return } // Publish @Title 发送消息 // @Param exchangeName string true "指定发送的交换机" // @Param msg string true "发送消息内容" // @Param delay int64 false "延时发送秒数" func (r *rabbitmq) Publish(exchangeName, msg string, delay ...int64) error { ch, err := r.conn.Channel() if err != nil { return err } header := amqp.Table{} if len(delay) > 0 && delay[0] > 0 { header["x-delay"] = delay[0] * 1000 } return ch.Publish( exchangeName, //交换 "", //队列名称 false, //强制为 false, //立即 amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), Headers: header, }) } // Consume @Title 消费队列 // @Param queueName string true "队列名称" // @Param autoAck bool true "是否自动确认" // @Param callback func(amqp.Delivery)) true "回调处理" func (r *rabbitmq) Consume(queueName string, autoAck bool, callback func(amqp.Delivery)) error { ch, err := r.conn.Channel() if err != nil { return err } //消费队列 msgs, err := ch.Consume( queueName, //队列名称 "", //消费者 autoAck, //自动确认 false, //排他 false, false, nil, ) if err != nil { return err } forever := make(chan bool) go func() { for d := range msgs { callback(d) } }() <-forever return nil } // Close @Title 关闭资源 func (r *rabbitmq) Close() { defer Rabbitmq.conn.Close() }