game_sync/mq/consumer.go

141 lines
2.8 KiB
Go

package mq
import (
"fmt"
"math/rand"
"os"
"sync"
"time"
"mongo.games.com/goserver/core/broker"
"mongo.games.com/goserver/core/broker/rabbitmq"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/common"
)
/*
为了使用方便,这里统一启动订阅
*/
var subscriberLock sync.RWMutex
var subscriber = make(map[string][]*Subscriber)
type Subscriber struct {
broker.Subscriber
h broker.Handler
opts []broker.SubscribeOption
}
// RegisterSubscriber 注册订阅处理方法
// 不同订阅是在各自的协程中执行的
func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.SubscribeOption) {
s := Subscriber{
h: h,
opts: opts,
}
subscriberLock.Lock()
subscriber[topic] = append(subscriber[topic], &s)
subscriberLock.Unlock()
}
func UnregisterSubscriber(topic string) {
subscriberLock.Lock()
delete(subscriber, topic)
subscriberLock.Unlock()
}
func GetSubscriber(topic string) []*Subscriber {
subscriberLock.RLock()
defer subscriberLock.RUnlock()
if s, ok := subscriber[topic]; ok {
return s
}
return nil
}
func GetSubscribers() map[string][]*Subscriber {
ret := make(map[string][]*Subscriber)
subscriberLock.RLock()
defer subscriberLock.RUnlock()
for topic, s := range subscriber {
temp := make([]*Subscriber, len(s))
copy(temp, s)
ret[topic] = temp
}
return ret
}
type RabbitMQConsumer struct {
broker.Broker
url string
exchange rabbitmq.Exchange
}
func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsumer {
mq := &RabbitMQConsumer{
url: url,
exchange: exchange,
}
rabbitmq.DefaultRabbitURL = mq.url
rabbitmq.DefaultExchange = mq.exchange
mq.Broker = rabbitmq.NewBroker()
mq.Broker.Init()
return mq
}
func (c *RabbitMQConsumer) Start() error {
if err := c.Connect(); err != nil {
return err
}
sss := GetSubscribers()
for topic, ss := range sss {
for _, s := range ss {
sub, err := c.Subscribe(topic, s.h, s.opts...)
if err != nil {
return err
}
s.Subscriber = sub
}
}
return nil
}
func (c *RabbitMQConsumer) Stop() error {
sss := GetSubscribers()
for _, ss := range sss {
for _, s := range ss {
s.Unsubscribe()
}
}
return c.Disconnect()
}
func BackUp(e broker.Event, err error) {
tNow := time.Now()
filePath := fmt.Sprintf("%s/%s_%s_%09d_%04d.dat", BACKUP_PATH, e.Topic(), tNow.Format(TIME_FORMAT), tNow.Nanosecond(), rand.Int31n(10000))
f, err := os.Create(filePath)
if err != nil {
logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", e.Topic(), e.Message(), err)
return
}
defer f.Close()
var reason string
if err != nil {
reason = err.Error()
}
f.WriteString("reason:" + reason + "\n")
f.WriteString("data:" + string(e.Message().Body) + "\n")
}
func init() {
if ok, _ := common.PathExists(BACKUP_PATH); !ok {
os.MkdirAll(BACKUP_PATH, os.ModePerm)
}
}