package mq import ( "encoding/json" "fmt" "reflect" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" "mongo.games.com/game/mq/internal" ) type HandlerFunc func(data interface{}) (err error) type HandlerData struct { T reflect.Type F HandlerFunc } type MessageMgr struct { name map[reflect.Type]string handler map[string]HandlerData } type RegisterMessageParam struct { Name string Data interface{} } // RegisterMessage 注册消息 func (c *MessageMgr) RegisterMessage(param *RegisterMessageParam) { if param == nil { return } t := c.getType(param.Data) c.name[t] = param.Name } func (c *MessageMgr) getType(data interface{}) reflect.Type { return reflect.Indirect(reflect.ValueOf(data)).Type() } func (c *MessageMgr) getName(data interface{}) string { t := c.getType(data) if name, exist := c.name[t]; exist { return name } return "" } type RabbitMQData struct { MQName string // 是队列名称也是Routing key Data interface{} } func (c *MessageMgr) Send(data interface{}, name ...string) error { if msg, ok := data.(*RabbitMQData); ok { return Send(msg.MQName, msg.Data) } if len(name) > 0 && name[0] != "" { return Send(name[0], data) } key := c.getName(data) if key == "" { key = "_null_" } return Send(key, data) } type RegisterHandlerParam struct { Name string Data interface{} Handler HandlerFunc } // RegisterHandler 注册消息处理函数 func (c *MessageMgr) RegisterHandler(param *RegisterHandlerParam) { if param == nil { return } if _, ok := c.handler[param.Name]; ok { panic(fmt.Sprintf("RabbitMQ RegisterHandler repeatet name:%v", param.Name)) return } if param.Data == nil { return } if param.Handler == nil { return } c.handler[param.Name] = HandlerData{ T: c.getType(param.Data), F: param.Handler, } internal.RegisterSubscriber(param.Name, func(e broker.Event) (err error) { msg := e.Message() if msg != nil { defer func() { e.Ack() }() log := reflect.New(c.handler[param.Name].T).Interface() err = json.Unmarshal(msg.Body, log) if err != nil { logger.Logger.Errorf("RabbitMQ Unmarshal error: %v", err) return } logger.Logger.Tracef("MQ Receive[%v]: %#v", param.Name, log) return c.handler[param.Name].F(log) } return nil }, broker.Queue(param.Name), broker.DisableAutoAck(), rabbitmq.DurableQueue()) } // MessageMgrSingle 消息发送器 var MessageMgrSingle = &MessageMgr{ name: make(map[reflect.Type]string), handler: make(map[string]HandlerData), } // RegisterMessage 注册消息 // name: 消息名称 // data: 消息结构体指针 func RegisterMessage(param *RegisterMessageParam) { MessageMgrSingle.RegisterMessage(param) } // Write 发送消息 // 默认队列名称规则:队列前缀_消息结构体名称 func Write(data interface{}, name ...string) error { return MessageMgrSingle.Send(data, name...) } // RegisterHandler 注册消息处理函数 func RegisterHandler(param *RegisterHandlerParam) { MessageMgrSingle.RegisterHandler(param) } var globalConsumer *internal.RabbitMQConsumer // StartConsumer 启动消费者 func StartConsumer(url string, exchange string, durableExchange bool, opts ...broker.Option) { StopConsumer() globalConsumer = internal.NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, opts...) if err := globalConsumer.Start(); err != nil { panic(fmt.Sprintf("RabbitMQConsumer.Start() url:%v exchange:%v err:%v", url, exchange, err)) } } // StopConsumer 停止消费者 func StopConsumer() { if globalConsumer != nil { globalConsumer.Stop() } } var globalPublisher *internal.RabbitMQPublisher // StartPublisher 启动发布者 func StartPublisher(url string, exchange string, durableExchange bool, queueSize int, opts ...broker.Option) { StopPublisher() globalPublisher = internal.NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize, opts...) if err := globalPublisher.Start(); err != nil { panic(fmt.Sprintf("RabbitMQPublisher.Start() url:%v exchange:%v err:%v", url, exchange, err)) } } // StopPublisher 停止发布者 func StopPublisher() { if globalPublisher != nil { globalPublisher.Stop() } } func Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) { if globalPublisher != nil { logger.Logger.Tracef("MQ Send[%v]: %#v", topic, msg) return globalPublisher.Send(topic, msg, opts...) } logger.Logger.Errorf("RabbitMQPublisher not start!") return internal.ErrClosed }