package internal import ( "fmt" "math/rand" "os" "sync" "time" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" ) /* 为了使用方便,这里统一启动订阅 */ var ( subscriberLock sync.RWMutex subscriber = make(map[string][]*Subscriber) ) type Subscriber struct { broker.Subscriber h broker.Handler opts []broker.SubscribeOption } type RabbitMQConsumer struct { broker.Broker url string exchange rabbitmq.Exchange } func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange, opts ...broker.Option) *RabbitMQConsumer { mq := &RabbitMQConsumer{ url: url, exchange: exchange, } options := []broker.Option{ broker.Addrs(url), rabbitmq.ExchangeName(exchange.Name), } if exchange.Durable { options = append(options, rabbitmq.DurableExchange()) } if len(opts) > 0 { options = append(options, opts...) } mq.Broker = rabbitmq.NewBroker(options...) mq.Broker.Init() return mq } func (c *RabbitMQConsumer) Start() error { if ok, _ := PathExists(BackupPath); !ok { os.MkdirAll(BackupPath, os.ModePerm) } if err := c.Connect(); err != nil { return err } logger.Logger.Infof("RabbitMQConsumer.Start() url:%s exchange:%s", c.url, c.exchange.Name) for topic, ss := range GetSubscribers() { for _, s := range ss { sub, err := c.Subscribe(topic, func(event broker.Event) error { var err error defer func() { e := recover() if e != nil { logger.Logger.Errorf("RabbitMQConsumer.Subscriber() topic:%v message:%v recover:%v", event.Topic(), event.Message(), e) } if err != nil { c.backUp(event, err) } }() err = s.h(event) return err }, s.opts...) if err != nil { return err } s.Subscriber = sub } } return nil } func (c *RabbitMQConsumer) Stop() error { logger.Logger.Infof("RabbitMQConsumer.Stop() url:%s exchange:%s", c.url, c.exchange.Name) for _, ss := range GetSubscribers() { for _, s := range ss { s.Unsubscribe() } } return c.Disconnect() } func (c *RabbitMQConsumer) backUp(e broker.Event, err error) { tNow := time.Now() filePath := fmt.Sprintf(FilePathFormat, BackupPath, e.Topic(), tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) f, er := os.Create(filePath) if er != nil { logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v err:%v", e.Topic(), e.Message(), er) return } defer f.Close() var reason string if err != nil { reason = err.Error() } f.WriteString("reason:" + reason + "\n") f.WriteString("data:" + string(e.Message().Body) + "\n") } // RegisterSubscriber 注册订阅处理方法 // 不同订阅是在各自的协程中执行的 func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.SubscribeOption) { s := Subscriber{ h: h, opts: opts, } subscriberLock.Lock() subscriber[topic] = append(subscriber[topic], &s) subscriberLock.Unlock() logger.Logger.Infof("RegisterSubscriber topic:%v", topic) } func GetSubscribers() map[string][]*Subscriber { ret := make(map[string][]*Subscriber) subscriberLock.RLock() defer subscriberLock.RUnlock() for topic, s := range subscriber { temp := make([]*Subscriber, len(s)) copy(temp, s) ret[topic] = temp } return ret }