package internal import ( "encoding/json" "errors" "fmt" "math/rand" "os" "sync" "time" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" ) /* 在发布功能上加上异步发送消息和发送失败备份到本地文件的功能 */ var ErrClosed = errors.New("publisher is closed") type item struct { topic string msg interface{} opts []broker.PublishOption } type RabbitMQPublisher struct { broker.Broker exchange rabbitmq.Exchange url string que chan *item closed chan struct{} wg sync.WaitGroup } func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int, opts ...broker.Option) *RabbitMQPublisher { if queueSize <= 0 { queueSize = 1 } mq := &RabbitMQPublisher{ url: url, exchange: exchange, que: make(chan *item, queueSize), closed: make(chan struct{}), } options := []broker.Option{ broker.Addrs(url), rabbitmq.ExchangeName(exchange.Name), } if exchange.Durable { options = append(options, rabbitmq.DurableExchange()) } if len(opts) > 0 { options = append(options, opts...) } mq.Broker = rabbitmq.NewBroker(options...) mq.Broker.Init() return mq } // 发布消息,同步 func (p *RabbitMQPublisher) publish(topic string, msg interface{}, opts ...broker.PublishOption) (err error) { defer func() { e := recover() if e != nil { logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v recover:%v", topic, msg, e) } if err != nil || e != nil { p.backup(topic, msg, err) } }() var buf []byte switch d := msg.(type) { case []byte: buf = d case string: buf = []byte(d) default: buf, err = json.Marshal(msg) if err != nil { return err } } err = p.Publish(topic, &broker.Message{Body: buf}, opts...) if err != nil { logger.Logger.Errorf("RabbitMQPublisher.publish() topic:%v message:%v err:%v", topic, msg, err) return } return nil } func (p *RabbitMQPublisher) publishRoutine() { p.wg.Add(1) defer p.wg.Done() for { select { case item, ok := <-p.que: if ok { p.publish(item.topic, item.msg, item.opts...) } else { return } case <-p.closed: return } } } func (p *RabbitMQPublisher) Start() (err error) { if ok, _ := PathExists(BackupPath); !ok { err = os.MkdirAll(BackupPath, os.ModePerm) if err != nil { return } } if err = p.Connect(); err != nil { return err } logger.Logger.Infof("RabbitMQPublisher.Start() url:%s exchange:%s", p.url, p.exchange.Name) go p.publishRoutine() return nil } func (p *RabbitMQPublisher) Stop() error { logger.Logger.Infof("RabbitMQPublisher.Stop() url:%s exchange:%s", p.url, p.exchange.Name) select { case <-p.closed: return ErrClosed default: } close(p.closed) close(p.que) for item := range p.que { p.publish(item.topic, item.msg, item.opts...) } //等待所有投递出去的任务全部完成 p.wg.Wait() return p.Disconnect() } // Send 发布消息,异步 func (p *RabbitMQPublisher) Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) { select { case <-p.closed: return ErrClosed default: } i := &item{topic: topic, msg: msg, opts: opts} select { case p.que <- i: default: //会不会情况更糟糕 go func() { p.wg.Add(1) defer p.wg.Done() p.publish(topic, msg) }() } return nil } func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { buf, er := json.Marshal(msg) if er != nil { return } tNow := time.Now() filePath := fmt.Sprintf(FilePathFormat, BackupPath, topic, tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) f, er := os.Create(filePath) if er != nil { logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v err:%v", topic, msg, er) return } defer f.Close() var reason string if err != nil { reason = err.Error() } f.WriteString("reason:" + reason + "\n") f.WriteString("data:" + string(buf) + "\n") }