156 lines
3.4 KiB
Go
156 lines
3.4 KiB
Go
package mq
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"mongo.games.com/goserver/core/broker"
|
|
"mongo.games.com/goserver/core/broker/rabbitmq"
|
|
"mongo.games.com/goserver/core/logger"
|
|
|
|
"mongo.games.com/game/common"
|
|
)
|
|
|
|
/*
|
|
为了使用方便,这里统一启动订阅
|
|
*/
|
|
|
|
var (
|
|
globalConsumer *RabbitMQConsumer
|
|
subscriberLock sync.RWMutex
|
|
subscriber = make(map[string][]*Subscriber)
|
|
)
|
|
|
|
type Subscriber struct {
|
|
broker.Subscriber
|
|
h broker.Handler
|
|
opts []broker.SubscribeOption
|
|
}
|
|
|
|
type RabbitMQConsumer struct {
|
|
broker.Broker
|
|
url string
|
|
exchange rabbitmq.Exchange
|
|
}
|
|
|
|
func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsumer {
|
|
mq := &RabbitMQConsumer{
|
|
url: url,
|
|
exchange: exchange,
|
|
}
|
|
options := []broker.Option{
|
|
broker.Addrs(url), rabbitmq.ExchangeName(exchange.Name),
|
|
}
|
|
if exchange.Durable {
|
|
options = append(options, rabbitmq.DurableExchange())
|
|
}
|
|
mq.Broker = rabbitmq.NewBroker(options...)
|
|
mq.Broker.Init()
|
|
return mq
|
|
}
|
|
|
|
func (c *RabbitMQConsumer) Start() error {
|
|
if ok, _ := common.PathExists(BackupPath); !ok {
|
|
os.MkdirAll(BackupPath, os.ModePerm)
|
|
}
|
|
|
|
if err := c.Connect(); err != nil {
|
|
return err
|
|
}
|
|
|
|
for topic, ss := range GetSubscribers() {
|
|
for _, s := range ss {
|
|
sub, err := c.Subscribe(topic, func(event broker.Event) error {
|
|
var err error
|
|
defer func() {
|
|
e := recover()
|
|
if e != nil {
|
|
logger.Logger.Errorf("RabbitMQConsumer.Subscriber(%s,%v) recover:%v", event.Topic(), event.Message(), e)
|
|
}
|
|
if err != nil {
|
|
c.backUp(event, err)
|
|
}
|
|
}()
|
|
err = s.h(event)
|
|
return err
|
|
}, s.opts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.Subscriber = sub
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *RabbitMQConsumer) Stop() error {
|
|
for _, ss := range GetSubscribers() {
|
|
for _, s := range ss {
|
|
s.Unsubscribe()
|
|
}
|
|
}
|
|
return c.Disconnect()
|
|
}
|
|
|
|
func (c *RabbitMQConsumer) backUp(e broker.Event, err error) {
|
|
tNow := time.Now()
|
|
filePath := fmt.Sprintf(FilePathFormat, BackupPath, e.Topic(), tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000))
|
|
f, er := os.Create(filePath)
|
|
if er != nil {
|
|
logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", e.Topic(), e.Message(), er)
|
|
return
|
|
}
|
|
defer f.Close()
|
|
var reason string
|
|
if err != nil {
|
|
reason = err.Error()
|
|
}
|
|
f.WriteString("reason:" + reason + "\n")
|
|
f.WriteString("data:" + string(e.Message().Body) + "\n")
|
|
}
|
|
|
|
// RegisterSubscriber 注册订阅处理方法
|
|
// 不同订阅是在各自的协程中执行的
|
|
func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.SubscribeOption) {
|
|
s := Subscriber{
|
|
h: h,
|
|
opts: opts,
|
|
}
|
|
subscriberLock.Lock()
|
|
subscriber[topic] = append(subscriber[topic], &s)
|
|
subscriberLock.Unlock()
|
|
}
|
|
|
|
func GetSubscribers() map[string][]*Subscriber {
|
|
ret := make(map[string][]*Subscriber)
|
|
subscriberLock.RLock()
|
|
defer subscriberLock.RUnlock()
|
|
for topic, s := range subscriber {
|
|
temp := make([]*Subscriber, len(s))
|
|
copy(temp, s)
|
|
ret[topic] = temp
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// StartConsumer 启动消费者
|
|
func StartConsumer(url string, exchange string, durableExchange bool) {
|
|
StopConsumer()
|
|
globalConsumer = NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange})
|
|
if err := globalConsumer.Start(); err != nil {
|
|
panic(fmt.Sprintf("RabbitMQConsumer.Start() err:%v", err))
|
|
}
|
|
}
|
|
|
|
// StopConsumer 停止消费者
|
|
func StopConsumer() {
|
|
if globalConsumer != nil {
|
|
globalConsumer.Stop()
|
|
}
|
|
}
|