game_sync/mq/internal/consumer.go

144 lines
3.2 KiB
Go

package internal
import (
"fmt"
"math/rand"
"os"
"sync"
"time"
"mongo.games.com/goserver/core/broker"
"mongo.games.com/goserver/core/broker/rabbitmq"
"mongo.games.com/goserver/core/logger"
)
/*
为了使用方便,这里统一启动订阅
*/
var (
subscriberLock sync.RWMutex
subscriber = make(map[string][]*Subscriber)
)
type Subscriber struct {
broker.Subscriber
h broker.Handler
opts []broker.SubscribeOption
}
type RabbitMQConsumer struct {
broker.Broker
url string
exchange rabbitmq.Exchange
}
func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange, opts ...broker.Option) *RabbitMQConsumer {
mq := &RabbitMQConsumer{
url: url,
exchange: exchange,
}
options := []broker.Option{
broker.Addrs(url), rabbitmq.ExchangeName(exchange.Name),
}
if exchange.Durable {
options = append(options, rabbitmq.DurableExchange())
}
if len(opts) > 0 {
options = append(options, opts...)
}
mq.Broker = rabbitmq.NewBroker(options...)
mq.Broker.Init()
return mq
}
func (c *RabbitMQConsumer) Start() error {
if ok, _ := PathExists(BackupPath); !ok {
os.MkdirAll(BackupPath, os.ModePerm)
}
if err := c.Connect(); err != nil {
return err
}
logger.Logger.Infof("RabbitMQConsumer.Start() url:%s exchange:%s", c.url, c.exchange.Name)
for topic, ss := range GetSubscribers() {
for _, s := range ss {
sub, err := c.Subscribe(topic, func(event broker.Event) error {
var err error
defer func() {
e := recover()
if e != nil {
logger.Logger.Errorf("RabbitMQConsumer.Subscriber() topic:%v message:%v recover:%v", event.Topic(), event.Message(), e)
}
if err != nil {
c.backUp(event, err)
}
}()
err = s.h(event)
return err
}, s.opts...)
if err != nil {
return err
}
s.Subscriber = sub
}
}
return nil
}
func (c *RabbitMQConsumer) Stop() error {
logger.Logger.Infof("RabbitMQConsumer.Stop() url:%s exchange:%s", c.url, c.exchange.Name)
for _, ss := range GetSubscribers() {
for _, s := range ss {
s.Unsubscribe()
}
}
return c.Disconnect()
}
func (c *RabbitMQConsumer) backUp(e broker.Event, err error) {
tNow := time.Now()
filePath := fmt.Sprintf(FilePathFormat, BackupPath, e.Topic(), tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000))
f, er := os.Create(filePath)
if er != nil {
logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v err:%v", e.Topic(), e.Message(), er)
return
}
defer f.Close()
var reason string
if err != nil {
reason = err.Error()
}
f.WriteString("reason:" + reason + "\n")
f.WriteString("data:" + string(e.Message().Body) + "\n")
}
// RegisterSubscriber 注册订阅处理方法
// 不同订阅是在各自的协程中执行的
func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.SubscribeOption) {
s := Subscriber{
h: h,
opts: opts,
}
subscriberLock.Lock()
subscriber[topic] = append(subscriber[topic], &s)
subscriberLock.Unlock()
logger.Logger.Infof("RegisterSubscriber topic:%v", topic)
}
func GetSubscribers() map[string][]*Subscriber {
ret := make(map[string][]*Subscriber)
subscriberLock.RLock()
defer subscriberLock.RUnlock()
for topic, s := range subscriber {
temp := make([]*Subscriber, len(s))
copy(temp, s)
ret[topic] = temp
}
return ret
}