package mq import ( "fmt" "math/rand" "os" "sync" "time" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" "mongo.games.com/game/common" ) /* 为了使用方便,这里统一启动订阅 */ var subscriberLock sync.RWMutex var subscriber = make(map[string][]*Subscriber) type Subscriber struct { broker.Subscriber h broker.Handler opts []broker.SubscribeOption } // RegisterSubscriber 注册订阅处理方法 // 不同订阅是在各自的协程中执行的 func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.SubscribeOption) { s := Subscriber{ h: h, opts: opts, } subscriberLock.Lock() subscriber[topic] = append(subscriber[topic], &s) subscriberLock.Unlock() } func UnregisterSubscriber(topic string) { subscriberLock.Lock() delete(subscriber, topic) subscriberLock.Unlock() } func GetSubscriber(topic string) []*Subscriber { subscriberLock.RLock() defer subscriberLock.RUnlock() if s, ok := subscriber[topic]; ok { return s } return nil } func GetSubscribers() map[string][]*Subscriber { ret := make(map[string][]*Subscriber) subscriberLock.RLock() defer subscriberLock.RUnlock() for topic, s := range subscriber { temp := make([]*Subscriber, len(s)) copy(temp, s) ret[topic] = temp } return ret } type RabbitMQConsumer struct { broker.Broker url string exchange rabbitmq.Exchange } func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsumer { mq := &RabbitMQConsumer{ url: url, exchange: exchange, } rabbitmq.DefaultRabbitURL = mq.url rabbitmq.DefaultExchange = mq.exchange mq.Broker = rabbitmq.NewBroker() mq.Broker.Init() return mq } func (c *RabbitMQConsumer) Start() error { if err := c.Connect(); err != nil { return err } sss := GetSubscribers() for topic, ss := range sss { for _, s := range ss { sub, err := c.Subscribe(topic, s.h, s.opts...) if err != nil { return err } s.Subscriber = sub } } return nil } func (c *RabbitMQConsumer) Stop() error { sss := GetSubscribers() for _, ss := range sss { for _, s := range ss { s.Unsubscribe() } } return c.Disconnect() } func BackUp(e broker.Event, err error) { tNow := time.Now() filePath := fmt.Sprintf("%s/%s_%s_%09d_%04d.dat", BACKUP_PATH, e.Topic(), tNow.Format(TIME_FORMAT), tNow.Nanosecond(), rand.Int31n(10000)) f, err := os.Create(filePath) if err != nil { logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", e.Topic(), e.Message(), err) return } defer f.Close() var reason string if err != nil { reason = err.Error() } f.WriteString("reason:" + reason + "\n") f.WriteString("data:" + string(e.Message().Body) + "\n") } func init() { if ok, _ := common.PathExists(BACKUP_PATH); !ok { os.MkdirAll(BACKUP_PATH, os.ModePerm) } }