package mq import ( "encoding/json" "errors" "fmt" "math/rand" "os" "sync" "time" "mongo.games.com/game/common" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" ) /* 在发布功能上加上异步发送消息和发送失败备份到本地文件的功能 */ const ( BACKUP_PATH = "backup" TIME_FORMAT = "20060102150405" ) var ERR_CLOSED = errors.New("publisher is closed") type item struct { topic string msg interface{} } type RabbitMQPublisher struct { b broker.Broker exchange rabbitmq.Exchange url string que chan *item closed bool waitor sync.WaitGroup } func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, backlog int) *RabbitMQPublisher { if backlog <= 0 { backlog = 1 } mq := &RabbitMQPublisher{ url: url, exchange: exchange, que: make(chan *item, backlog), } rabbitmq.DefaultRabbitURL = mq.url rabbitmq.DefaultExchange = mq.exchange mq.b = rabbitmq.NewBroker() mq.b.Init() return mq } func (p *RabbitMQPublisher) Start() (err error) { if ok, _ := common.PathExists(BACKUP_PATH); !ok { err = os.MkdirAll(BACKUP_PATH, os.ModePerm) if err != nil { return } } err = p.b.Connect() if err != nil { return } go p.workerRoutine() return nil } func (p *RabbitMQPublisher) Stop() error { if p.closed { return ERR_CLOSED } p.closed = true close(p.que) for item := range p.que { p.publish(item.topic, item.msg) } //等待所有投递出去的任务全部完成 p.waitor.Wait() return p.b.Disconnect() } // Send 发布消息,异步 func (p *RabbitMQPublisher) Send(topic string, msg interface{}) (err error) { if p.closed { return ERR_CLOSED } i := &item{topic: topic, msg: msg} select { case p.que <- i: default: //会不会情况更糟糕 go p.concurrentPublish(topic, msg) } return nil } func (p *RabbitMQPublisher) concurrentPublish(topic string, msg interface{}) (err error) { p.waitor.Add(1) defer p.waitor.Done() return p.publish(topic, msg) } // 发布消息,同步 func (p *RabbitMQPublisher) publish(topic string, msg interface{}) (err error) { defer func() { if err != nil { p.backup(topic, msg, err) } recover() }() buf, err := json.Marshal(msg) if err != nil { return err } err = p.b.Publish(topic, &broker.Message{Body: buf}) if err != nil { logger.Logger.Error("RabbitMQPublisher.publish err:", err) return } return nil } func (p *RabbitMQPublisher) workerRoutine() { p.waitor.Add(1) defer p.waitor.Done() for { select { case item, ok := <-p.que: if ok { p.publish(item.topic, item.msg) } else { return } } } } func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { buf, err := json.Marshal(msg) if err != nil { return } tNow := time.Now() filePath := fmt.Sprintf("%s/%s_%s_%09d_%04d.dat", BACKUP_PATH, topic, tNow.Format(TIME_FORMAT), tNow.Nanosecond(), rand.Int31n(10000)) f, err := os.Create(filePath) if err != nil { logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", topic, msg, err) return } defer f.Close() var reason string if err != nil { reason = err.Error() } f.WriteString("reason:" + reason + "\n") f.WriteString("data:" + string(buf) + "\n") }