game_sync/mq/publisher.go

227 lines
4.8 KiB
Go

package mq
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"os"
"sync"
"time"
"mongo.games.com/goserver/core/broker"
"mongo.games.com/goserver/core/broker/rabbitmq"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/common"
)
/*
在发布功能上加上异步发送消息和发送失败备份到本地文件的功能
*/
const (
BackupPath = "backup"
TimeFormat = "20060102150405"
FilePathFormat = "%s/%s_%s_%09d_%04d.dat"
)
var ErrClosed = errors.New("publisher is closed")
var globalPublisher *RabbitMQPublisher
type item struct {
topic string
msg interface{}
opts []broker.PublishOption
}
type RabbitMQPublisher struct {
broker.Broker
exchange rabbitmq.Exchange
url string
que chan *item
closed chan struct{}
wg sync.WaitGroup
}
func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int, opts ...broker.Option) *RabbitMQPublisher {
if queueSize <= 0 {
queueSize = 1
}
mq := &RabbitMQPublisher{
url: url,
exchange: exchange,
que: make(chan *item, queueSize),
closed: make(chan struct{}),
}
options := []broker.Option{
broker.Addrs(url), rabbitmq.ExchangeName(exchange.Name),
}
if exchange.Durable {
options = append(options, rabbitmq.DurableExchange())
}
if len(opts) > 0 {
options = append(options, opts...)
}
mq.Broker = rabbitmq.NewBroker(options...)
mq.Broker.Init()
return mq
}
// 发布消息,同步
func (p *RabbitMQPublisher) publish(topic string, msg interface{}, opts ...broker.PublishOption) (err error) {
defer func() {
e := recover()
if e != nil {
logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v recover:%v", topic, msg, e)
}
if err != nil || e != nil {
p.backup(topic, msg, err)
}
}()
var buf []byte
switch d := msg.(type) {
case []byte:
buf = d
case string:
buf = []byte(d)
default:
buf, err = json.Marshal(msg)
if err != nil {
return err
}
}
err = p.Publish(topic, &broker.Message{Body: buf}, opts...)
if err != nil {
logger.Logger.Errorf("RabbitMQPublisher.publish() topic:%v message:%v err:%v", topic, msg, err)
return
}
return nil
}
func (p *RabbitMQPublisher) publishRoutine() {
p.wg.Add(1)
defer p.wg.Done()
for {
select {
case item, ok := <-p.que:
if ok {
p.publish(item.topic, item.msg, item.opts...)
} else {
return
}
case <-p.closed:
return
}
}
}
func (p *RabbitMQPublisher) Start() (err error) {
if ok, _ := common.PathExists(BackupPath); !ok {
err = os.MkdirAll(BackupPath, os.ModePerm)
if err != nil {
return
}
}
if err = p.Connect(); err != nil {
return err
}
logger.Logger.Infof("RabbitMQPublisher.Start() url:%s exchange:%s", p.url, p.exchange.Name)
go p.publishRoutine()
return nil
}
func (p *RabbitMQPublisher) Stop() error {
logger.Logger.Infof("RabbitMQPublisher.Stop() url:%s exchange:%s", p.url, p.exchange.Name)
select {
case <-p.closed:
return ErrClosed
default:
}
close(p.closed)
close(p.que)
for item := range p.que {
p.publish(item.topic, item.msg, item.opts...)
}
//等待所有投递出去的任务全部完成
p.wg.Wait()
return p.Disconnect()
}
// Send 发布消息,异步
func (p *RabbitMQPublisher) Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) {
select {
case <-p.closed:
return ErrClosed
default:
}
i := &item{topic: topic, msg: msg, opts: opts}
select {
case p.que <- i:
default:
//会不会情况更糟糕
go func() {
p.wg.Add(1)
defer p.wg.Done()
p.publish(topic, msg)
}()
}
return nil
}
func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) {
buf, er := json.Marshal(msg)
if er != nil {
return
}
tNow := time.Now()
filePath := fmt.Sprintf(FilePathFormat, BackupPath, topic, tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000))
f, er := os.Create(filePath)
if er != nil {
logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v err:%v", topic, msg, er)
return
}
defer f.Close()
var reason string
if err != nil {
reason = err.Error()
}
f.WriteString("reason:" + reason + "\n")
f.WriteString("data:" + string(buf) + "\n")
}
// StartPublisher 启动发布者
func StartPublisher(url string, exchange string, durableExchange bool, queueSize int, opts ...broker.Option) {
StopPublisher()
globalPublisher = NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize, opts...)
if err := globalPublisher.Start(); err != nil {
panic(fmt.Sprintf("RabbitMQPublisher.Start() url:%v exchange:%v err:%v", url, exchange, err))
}
}
// StopPublisher 停止发布者
func StopPublisher() {
if globalPublisher != nil {
globalPublisher.Stop()
}
}
func Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) {
if globalPublisher != nil {
return globalPublisher.Send(topic, msg, opts...)
}
return ErrClosed
}