From 7de9e8f939802c101fb63ed2f1c2989ebd0566e9 Mon Sep 17 00:00:00 2001 From: sk <123456@qq.com> Date: Fri, 24 May 2024 14:42:25 +0800 Subject: [PATCH] rabbitmq --- dbproxy/main.go | 1 + mq/consumer.go | 19 +++++++++++++------ mq/publisher.go | 42 ++++++++++++++++++++++++------------------ 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/dbproxy/main.go b/dbproxy/main.go index 48af0ea..eacedcf 100644 --- a/dbproxy/main.go +++ b/dbproxy/main.go @@ -12,6 +12,7 @@ import ( _ "mongo.games.com/game" "mongo.games.com/game/common" + _ "mongo.games.com/game/dbproxy/mq" "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" diff --git a/mq/consumer.go b/mq/consumer.go index 4110f6d..eda1518 100644 --- a/mq/consumer.go +++ b/mq/consumer.go @@ -36,7 +36,7 @@ type RabbitMQConsumer struct { exchange rabbitmq.Exchange } -func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsumer { +func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange, opts ...broker.Option) *RabbitMQConsumer { mq := &RabbitMQConsumer{ url: url, exchange: exchange, @@ -47,6 +47,9 @@ func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsum if exchange.Durable { options = append(options, rabbitmq.DurableExchange()) } + if len(opts) > 0 { + options = append(options, opts...) + } mq.Broker = rabbitmq.NewBroker(options...) mq.Broker.Init() return mq @@ -61,6 +64,8 @@ func (c *RabbitMQConsumer) Start() error { return err } + logger.Logger.Infof("RabbitMQConsumer.Start() url:%s exchange:%s", c.url, c.exchange.Name) + for topic, ss := range GetSubscribers() { for _, s := range ss { sub, err := c.Subscribe(topic, func(event broker.Event) error { @@ -68,7 +73,7 @@ func (c *RabbitMQConsumer) Start() error { defer func() { e := recover() if e != nil { - logger.Logger.Errorf("RabbitMQConsumer.Subscriber(%s,%v) recover:%v", event.Topic(), event.Message(), e) + logger.Logger.Errorf("RabbitMQConsumer.Subscriber() topic:%v message:%v recover:%v", event.Topic(), event.Message(), e) } if err != nil { c.backUp(event, err) @@ -89,6 +94,7 @@ func (c *RabbitMQConsumer) Start() error { } func (c *RabbitMQConsumer) Stop() error { + logger.Logger.Infof("RabbitMQConsumer.Stop() url:%s exchange:%s", c.url, c.exchange.Name) for _, ss := range GetSubscribers() { for _, s := range ss { s.Unsubscribe() @@ -102,7 +108,7 @@ func (c *RabbitMQConsumer) backUp(e broker.Event, err error) { filePath := fmt.Sprintf(FilePathFormat, BackupPath, e.Topic(), tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) f, er := os.Create(filePath) if er != nil { - logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", e.Topic(), e.Message(), er) + logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v err:%v", e.Topic(), e.Message(), er) return } defer f.Close() @@ -124,6 +130,7 @@ func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.Subscribe subscriberLock.Lock() subscriber[topic] = append(subscriber[topic], &s) subscriberLock.Unlock() + logger.Logger.Infof("RegisterSubscriber topic:%v", topic) } func GetSubscribers() map[string][]*Subscriber { @@ -139,11 +146,11 @@ func GetSubscribers() map[string][]*Subscriber { } // StartConsumer 启动消费者 -func StartConsumer(url string, exchange string, durableExchange bool) { +func StartConsumer(url string, exchange string, durableExchange bool, opts ...broker.Option) { StopConsumer() - globalConsumer = NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}) + globalConsumer = NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, opts...) if err := globalConsumer.Start(); err != nil { - panic(fmt.Sprintf("RabbitMQConsumer.Start() err:%v", err)) + panic(fmt.Sprintf("RabbitMQConsumer.Start() url:%v exchange:%v err:%v", url, exchange, err)) } } diff --git a/mq/publisher.go b/mq/publisher.go index 3f27579..36b1e38 100644 --- a/mq/publisher.go +++ b/mq/publisher.go @@ -33,6 +33,7 @@ var globalPublisher *RabbitMQPublisher type item struct { topic string msg interface{} + opts []broker.PublishOption } type RabbitMQPublisher struct { @@ -44,7 +45,7 @@ type RabbitMQPublisher struct { wg sync.WaitGroup } -func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int) *RabbitMQPublisher { +func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int, opts ...broker.Option) *RabbitMQPublisher { if queueSize <= 0 { queueSize = 1 } @@ -60,17 +61,20 @@ func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int) if exchange.Durable { options = append(options, rabbitmq.DurableExchange()) } + if len(opts) > 0 { + options = append(options, opts...) + } mq.Broker = rabbitmq.NewBroker(options...) mq.Broker.Init() return mq } // 发布消息,同步 -func (p *RabbitMQPublisher) publish(topic string, msg interface{}) (err error) { +func (p *RabbitMQPublisher) publish(topic string, msg interface{}, opts ...broker.PublishOption) (err error) { defer func() { e := recover() if e != nil { - logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) recover:%v", topic, msg, e) + logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v recover:%v", topic, msg, e) } if err != nil || e != nil { p.backup(topic, msg, err) @@ -90,9 +94,9 @@ func (p *RabbitMQPublisher) publish(topic string, msg interface{}) (err error) { } } - err = p.Publish(topic, &broker.Message{Body: buf}) + err = p.Publish(topic, &broker.Message{Body: buf}, opts...) if err != nil { - logger.Logger.Error("RabbitMQPublisher.publish(%s,%v) err:%v", topic, msg, err) + logger.Logger.Error("RabbitMQPublisher.publish() topic:%v message:%v err:%v", topic, msg, err) return } return nil @@ -106,7 +110,7 @@ func (p *RabbitMQPublisher) publishRoutine() { select { case item, ok := <-p.que: if ok { - p.publish(item.topic, item.msg) + p.publish(item.topic, item.msg, item.opts...) } else { return } @@ -124,17 +128,19 @@ func (p *RabbitMQPublisher) Start() (err error) { } } - err = p.Connect() - if err != nil { - return + if err = p.Connect(); err != nil { + return err } + logger.Logger.Infof("RabbitMQPublisher.Start() url:%s exchange:%s", p.url, p.exchange.Name) + go p.publishRoutine() return nil } func (p *RabbitMQPublisher) Stop() error { + logger.Logger.Infof("RabbitMQPublisher.Stop() url:%s exchange:%s", p.url, p.exchange.Name) select { case <-p.closed: return ErrClosed @@ -144,7 +150,7 @@ func (p *RabbitMQPublisher) Stop() error { close(p.closed) close(p.que) for item := range p.que { - p.publish(item.topic, item.msg) + p.publish(item.topic, item.msg, item.opts...) } //等待所有投递出去的任务全部完成 @@ -154,14 +160,14 @@ func (p *RabbitMQPublisher) Stop() error { } // Send 发布消息,异步 -func (p *RabbitMQPublisher) Send(topic string, msg interface{}) (err error) { +func (p *RabbitMQPublisher) Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) { select { case <-p.closed: return ErrClosed default: } - i := &item{topic: topic, msg: msg} + i := &item{topic: topic, msg: msg, opts: opts} select { case p.que <- i: default: @@ -184,7 +190,7 @@ func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { filePath := fmt.Sprintf(FilePathFormat, BackupPath, topic, tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) f, er := os.Create(filePath) if er != nil { - logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", topic, msg, er) + logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v err:%v", topic, msg, er) return } defer f.Close() @@ -197,11 +203,11 @@ func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { } // StartPublisher 启动发布者 -func StartPublisher(url string, exchange string, durableExchange bool, queueSize int) { +func StartPublisher(url string, exchange string, durableExchange bool, queueSize int, opts ...broker.Option) { StopPublisher() - globalPublisher = NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize) + globalPublisher = NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize, opts...) if err := globalPublisher.Start(); err != nil { - panic(fmt.Sprintf("RabbitMQPublisher.Start() err:%v", err)) + panic(fmt.Sprintf("RabbitMQPublisher.Start() url:%v exchange:%v err:%v", url, exchange, err)) } } @@ -212,9 +218,9 @@ func StopPublisher() { } } -func Send(topic string, msg interface{}) (err error) { +func Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) { if globalPublisher != nil { - return globalPublisher.Send(topic, msg) + return globalPublisher.Send(topic, msg, opts...) } return ErrClosed }