game_sync/mq/export.go

187 lines
4.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package mq
import (
"encoding/json"
"fmt"
"reflect"
"mongo.games.com/goserver/core/broker"
"mongo.games.com/goserver/core/broker/rabbitmq"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/mq/internal"
)
type HandlerFunc func(data interface{}) (err error)
type HandlerData struct {
T reflect.Type
F HandlerFunc
}
type MessageMgr struct {
name map[reflect.Type]string
handler map[string]HandlerData
}
type RegisterMessageParam struct {
Name string
Data interface{}
}
// RegisterMessage 注册消息
func (c *MessageMgr) RegisterMessage(param *RegisterMessageParam) {
if param == nil {
return
}
t := c.getType(param.Data)
c.name[t] = param.Name
}
func (c *MessageMgr) getType(data interface{}) reflect.Type {
return reflect.Indirect(reflect.ValueOf(data)).Type()
}
func (c *MessageMgr) getName(data interface{}) string {
t := c.getType(data)
if name, exist := c.name[t]; exist {
return name
}
return ""
}
type RabbitMQData struct {
MQName string // 是队列名称也是Routing key
Data interface{}
}
func (c *MessageMgr) Send(data interface{}, name ...string) error {
if msg, ok := data.(*RabbitMQData); ok {
return Send(msg.MQName, msg.Data)
}
if len(name) > 0 && name[0] != "" {
return Send(name[0], data)
}
key := c.getName(data)
if key == "" {
key = "_null_"
}
return Send(key, data)
}
type RegisterHandlerParam struct {
Name string
Data interface{}
Handler HandlerFunc
}
// RegisterHandler 注册消息处理函数
func (c *MessageMgr) RegisterHandler(param *RegisterHandlerParam) {
if param == nil {
return
}
if _, ok := c.handler[param.Name]; ok {
panic(fmt.Sprintf("RabbitMQ RegisterHandler repeatet name:%v", param.Name))
return
}
if param.Data == nil {
return
}
if param.Handler == nil {
return
}
c.handler[param.Name] = HandlerData{
T: c.getType(param.Data),
F: param.Handler,
}
internal.RegisterSubscriber(param.Name, func(e broker.Event) (err error) {
msg := e.Message()
if msg != nil {
defer func() {
e.Ack()
}()
log := reflect.New(c.handler[param.Name].T).Interface()
err = json.Unmarshal(msg.Body, log)
if err != nil {
logger.Logger.Errorf("RabbitMQ Unmarshal error: %v", err)
return
}
logger.Logger.Tracef("MQ Receive[%v]: %#v", param.Name, log)
return c.handler[param.Name].F(log)
}
return nil
}, broker.Queue(param.Name), broker.DisableAutoAck(), rabbitmq.DurableQueue())
}
// MessageMgrSingle 消息发送器
var MessageMgrSingle = &MessageMgr{
name: make(map[reflect.Type]string),
handler: make(map[string]HandlerData),
}
// RegisterMessage 注册消息
// name: 消息名称
// data: 消息结构体指针
func RegisterMessage(param *RegisterMessageParam) {
MessageMgrSingle.RegisterMessage(param)
}
// Write 发送消息
// 默认队列名称规则队列前缀_消息结构体名称
func Write(data interface{}, name ...string) error {
return MessageMgrSingle.Send(data, name...)
}
// RegisterHandler 注册消息处理函数
func RegisterHandler(param *RegisterHandlerParam) {
MessageMgrSingle.RegisterHandler(param)
}
var globalConsumer *internal.RabbitMQConsumer
// StartConsumer 启动消费者
func StartConsumer(url string, exchange string, durableExchange bool, opts ...broker.Option) {
StopConsumer()
globalConsumer = internal.NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, opts...)
if err := globalConsumer.Start(); err != nil {
panic(fmt.Sprintf("RabbitMQConsumer.Start() url:%v exchange:%v err:%v", url, exchange, err))
}
}
// StopConsumer 停止消费者
func StopConsumer() {
if globalConsumer != nil {
globalConsumer.Stop()
}
}
var globalPublisher *internal.RabbitMQPublisher
// StartPublisher 启动发布者
func StartPublisher(url string, exchange string, durableExchange bool, queueSize int, opts ...broker.Option) {
StopPublisher()
globalPublisher = internal.NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize, opts...)
if err := globalPublisher.Start(); err != nil {
panic(fmt.Sprintf("RabbitMQPublisher.Start() url:%v exchange:%v err:%v", url, exchange, err))
}
}
// StopPublisher 停止发布者
func StopPublisher() {
if globalPublisher != nil {
globalPublisher.Stop()
}
}
func Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) {
if globalPublisher != nil {
logger.Logger.Tracef("MQ Send[%v]: %#v", topic, msg)
return globalPublisher.Send(topic, msg, opts...)
}
logger.Logger.Errorf("RabbitMQPublisher not start!")
return internal.ErrClosed
}