package mq import ( "encoding/json" "fmt" "reflect" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" ) type HandlerFunc func(data interface{}) (err error) type HandlerData struct { T reflect.Type F HandlerFunc } type MessageMgr struct { name map[reflect.Type]string handler map[string]HandlerData } type RegisterMessageParam struct { Name string Data interface{} } // RegisterMessage 注册消息 func (c *MessageMgr) RegisterMessage(param *RegisterMessageParam) { if param == nil { return } t := c.getType(param.Data) c.name[t] = param.Name } func (c *MessageMgr) getType(data interface{}) reflect.Type { return reflect.Indirect(reflect.ValueOf(data)).Type() } func (c *MessageMgr) getName(data interface{}) string { t := c.getType(data) if name, exist := c.name[t]; exist { return name } return "" } // Write 记录消息,需要提前注册 func (c *MessageMgr) Write(data interface{}) { name := c.getName(data) if name == "" { name = "_null_" } logger.Logger.Tracef("==> RabbitMQ(%v): %#v", name, data) Send(name, data) } // WriteMQData rabbitMQ消息 func (c *MessageMgr) WriteMQData(name string, data interface{}) { logger.Logger.Tracef("==> RabbitMQ(%v): %#v", name, data) Send(name, data) } type RegisterHandlerParam struct { Name string Data interface{} Handler HandlerFunc } // RegisterHandler 注册消息处理函数 func (c *MessageMgr) RegisterHandler(param *RegisterHandlerParam) { if param == nil { return } if _, ok := c.handler[param.Name]; ok { panic(fmt.Sprintf("RabbitMQ RegisterHandler repeatet name:%v", param.Name)) return } if param.Data == nil { return } if param.Handler == nil { return } c.handler[param.Name] = HandlerData{ T: c.getType(param.Data), F: param.Handler, } RegisterSubscriber(param.Name, func(e broker.Event) (err error) { msg := e.Message() if msg != nil { defer func() { e.Ack() }() log := reflect.New(c.handler[param.Name].T).Interface() err = json.Unmarshal(msg.Body, log) if err != nil { logger.Logger.Errorf("RabbitMQ Unmarshal error: %v", err) return } logger.Logger.Tracef("==> Receive RabbitMQ(%v): %#v", param.Name, log) return c.handler[param.Name].F(log) } return nil }, broker.Queue(param.Name), broker.DisableAutoAck(), rabbitmq.DurableQueue()) } // MessageMgrSingle 消息发送器 var MessageMgrSingle = &MessageMgr{ name: make(map[reflect.Type]string), handler: make(map[string]HandlerData), } // RegisterMessage 注册消息 // name: 消息名称 // data: 消息结构体指针 func RegisterMessage(param *RegisterMessageParam) { MessageMgrSingle.RegisterMessage(param) } // Write 发送消息 // 默认队列名称规则:队列前缀_消息结构体名称 func Write(data interface{}) { MessageMgrSingle.Write(data) } // WriteMQData 发送消息 func WriteMQData(name string, data interface{}) { MessageMgrSingle.WriteMQData(name, data) } // RegisterHandler 注册消息处理函数 func RegisterHandler(param *RegisterHandlerParam) { MessageMgrSingle.RegisterHandler(param) }