187 lines
4.5 KiB
Go
187 lines
4.5 KiB
Go
package mq
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"reflect"
|
||
|
||
"mongo.games.com/goserver/core/broker"
|
||
"mongo.games.com/goserver/core/broker/rabbitmq"
|
||
"mongo.games.com/goserver/core/logger"
|
||
|
||
"mongo.games.com/game/mq/internal"
|
||
)
|
||
|
||
type HandlerFunc func(data interface{}) (err error)
|
||
|
||
type HandlerData struct {
|
||
T reflect.Type
|
||
F HandlerFunc
|
||
}
|
||
|
||
type MessageMgr struct {
|
||
name map[reflect.Type]string
|
||
handler map[string]HandlerData
|
||
}
|
||
|
||
type RegisterMessageParam struct {
|
||
Name string
|
||
Data interface{}
|
||
}
|
||
|
||
// RegisterMessage 注册消息
|
||
func (c *MessageMgr) RegisterMessage(param *RegisterMessageParam) {
|
||
if param == nil {
|
||
return
|
||
}
|
||
t := c.getType(param.Data)
|
||
c.name[t] = param.Name
|
||
}
|
||
|
||
func (c *MessageMgr) getType(data interface{}) reflect.Type {
|
||
return reflect.Indirect(reflect.ValueOf(data)).Type()
|
||
}
|
||
|
||
func (c *MessageMgr) getName(data interface{}) string {
|
||
t := c.getType(data)
|
||
if name, exist := c.name[t]; exist {
|
||
return name
|
||
}
|
||
return ""
|
||
}
|
||
|
||
type RabbitMQData struct {
|
||
MQName string // 是队列名称也是Routing key
|
||
Data interface{}
|
||
}
|
||
|
||
func (c *MessageMgr) Send(data interface{}, name ...string) error {
|
||
if msg, ok := data.(*RabbitMQData); ok {
|
||
return Send(msg.MQName, msg.Data)
|
||
}
|
||
if len(name) > 0 && name[0] != "" {
|
||
return Send(name[0], data)
|
||
}
|
||
key := c.getName(data)
|
||
if key == "" {
|
||
key = "_null_"
|
||
}
|
||
return Send(key, data)
|
||
}
|
||
|
||
type RegisterHandlerParam struct {
|
||
Name string
|
||
Data interface{}
|
||
Handler HandlerFunc
|
||
}
|
||
|
||
// RegisterHandler 注册消息处理函数
|
||
func (c *MessageMgr) RegisterHandler(param *RegisterHandlerParam) {
|
||
if param == nil {
|
||
return
|
||
}
|
||
if _, ok := c.handler[param.Name]; ok {
|
||
panic(fmt.Sprintf("RabbitMQ RegisterHandler repeatet name:%v", param.Name))
|
||
return
|
||
}
|
||
if param.Data == nil {
|
||
return
|
||
}
|
||
if param.Handler == nil {
|
||
return
|
||
}
|
||
c.handler[param.Name] = HandlerData{
|
||
T: c.getType(param.Data),
|
||
F: param.Handler,
|
||
}
|
||
|
||
internal.RegisterSubscriber(param.Name, func(e broker.Event) (err error) {
|
||
msg := e.Message()
|
||
if msg != nil {
|
||
defer func() {
|
||
e.Ack()
|
||
}()
|
||
|
||
log := reflect.New(c.handler[param.Name].T).Interface()
|
||
err = json.Unmarshal(msg.Body, log)
|
||
if err != nil {
|
||
logger.Logger.Errorf("RabbitMQ Unmarshal error: %v", err)
|
||
return
|
||
}
|
||
logger.Logger.Tracef("==> Receive RabbitMQ(%v): %#v", param.Name, log)
|
||
|
||
return c.handler[param.Name].F(log)
|
||
}
|
||
return nil
|
||
}, broker.Queue(param.Name), broker.DisableAutoAck(), rabbitmq.DurableQueue())
|
||
}
|
||
|
||
// MessageMgrSingle 消息发送器
|
||
var MessageMgrSingle = &MessageMgr{
|
||
name: make(map[reflect.Type]string),
|
||
handler: make(map[string]HandlerData),
|
||
}
|
||
|
||
// RegisterMessage 注册消息
|
||
// name: 消息名称
|
||
// data: 消息结构体指针
|
||
func RegisterMessage(param *RegisterMessageParam) {
|
||
MessageMgrSingle.RegisterMessage(param)
|
||
}
|
||
|
||
// Write 发送消息
|
||
// 默认队列名称规则:队列前缀_消息结构体名称
|
||
func Write(data interface{}, name ...string) error {
|
||
return MessageMgrSingle.Send(data, name...)
|
||
}
|
||
|
||
// RegisterHandler 注册消息处理函数
|
||
func RegisterHandler(param *RegisterHandlerParam) {
|
||
MessageMgrSingle.RegisterHandler(param)
|
||
}
|
||
|
||
var globalConsumer *internal.RabbitMQConsumer
|
||
|
||
// StartConsumer 启动消费者
|
||
func StartConsumer(url string, exchange string, durableExchange bool, opts ...broker.Option) {
|
||
StopConsumer()
|
||
globalConsumer = internal.NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, opts...)
|
||
if err := globalConsumer.Start(); err != nil {
|
||
panic(fmt.Sprintf("RabbitMQConsumer.Start() url:%v exchange:%v err:%v", url, exchange, err))
|
||
}
|
||
}
|
||
|
||
// StopConsumer 停止消费者
|
||
func StopConsumer() {
|
||
if globalConsumer != nil {
|
||
globalConsumer.Stop()
|
||
}
|
||
}
|
||
|
||
var globalPublisher *internal.RabbitMQPublisher
|
||
|
||
// StartPublisher 启动发布者
|
||
func StartPublisher(url string, exchange string, durableExchange bool, queueSize int, opts ...broker.Option) {
|
||
StopPublisher()
|
||
globalPublisher = internal.NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize, opts...)
|
||
if err := globalPublisher.Start(); err != nil {
|
||
panic(fmt.Sprintf("RabbitMQPublisher.Start() url:%v exchange:%v err:%v", url, exchange, err))
|
||
}
|
||
}
|
||
|
||
// StopPublisher 停止发布者
|
||
func StopPublisher() {
|
||
if globalPublisher != nil {
|
||
globalPublisher.Stop()
|
||
}
|
||
}
|
||
|
||
func Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) {
|
||
if globalPublisher != nil {
|
||
logger.Logger.Tracef("==> RabbitMQ(%v): %#v", topic, msg)
|
||
return globalPublisher.Send(topic, msg, opts...)
|
||
}
|
||
logger.Logger.Errorf("RabbitMQPublisher not start!")
|
||
return internal.ErrClosed
|
||
}
|