This commit is contained in:
sk 2024-05-24 14:42:25 +08:00
parent 6f0906c7c0
commit 7de9e8f939
3 changed files with 38 additions and 24 deletions

View File

@ -12,6 +12,7 @@ import (
_ "mongo.games.com/game" _ "mongo.games.com/game"
"mongo.games.com/game/common" "mongo.games.com/game/common"
_ "mongo.games.com/game/dbproxy/mq"
"mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/dbproxy/svc"
"mongo.games.com/game/model" "mongo.games.com/game/model"
"mongo.games.com/game/mq" "mongo.games.com/game/mq"

View File

@ -36,7 +36,7 @@ type RabbitMQConsumer struct {
exchange rabbitmq.Exchange exchange rabbitmq.Exchange
} }
func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsumer { func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange, opts ...broker.Option) *RabbitMQConsumer {
mq := &RabbitMQConsumer{ mq := &RabbitMQConsumer{
url: url, url: url,
exchange: exchange, exchange: exchange,
@ -47,6 +47,9 @@ func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsum
if exchange.Durable { if exchange.Durable {
options = append(options, rabbitmq.DurableExchange()) options = append(options, rabbitmq.DurableExchange())
} }
if len(opts) > 0 {
options = append(options, opts...)
}
mq.Broker = rabbitmq.NewBroker(options...) mq.Broker = rabbitmq.NewBroker(options...)
mq.Broker.Init() mq.Broker.Init()
return mq return mq
@ -61,6 +64,8 @@ func (c *RabbitMQConsumer) Start() error {
return err return err
} }
logger.Logger.Infof("RabbitMQConsumer.Start() url:%s exchange:%s", c.url, c.exchange.Name)
for topic, ss := range GetSubscribers() { for topic, ss := range GetSubscribers() {
for _, s := range ss { for _, s := range ss {
sub, err := c.Subscribe(topic, func(event broker.Event) error { sub, err := c.Subscribe(topic, func(event broker.Event) error {
@ -68,7 +73,7 @@ func (c *RabbitMQConsumer) Start() error {
defer func() { defer func() {
e := recover() e := recover()
if e != nil { if e != nil {
logger.Logger.Errorf("RabbitMQConsumer.Subscriber(%s,%v) recover:%v", event.Topic(), event.Message(), e) logger.Logger.Errorf("RabbitMQConsumer.Subscriber() topic:%v message:%v recover:%v", event.Topic(), event.Message(), e)
} }
if err != nil { if err != nil {
c.backUp(event, err) c.backUp(event, err)
@ -89,6 +94,7 @@ func (c *RabbitMQConsumer) Start() error {
} }
func (c *RabbitMQConsumer) Stop() error { func (c *RabbitMQConsumer) Stop() error {
logger.Logger.Infof("RabbitMQConsumer.Stop() url:%s exchange:%s", c.url, c.exchange.Name)
for _, ss := range GetSubscribers() { for _, ss := range GetSubscribers() {
for _, s := range ss { for _, s := range ss {
s.Unsubscribe() s.Unsubscribe()
@ -102,7 +108,7 @@ func (c *RabbitMQConsumer) backUp(e broker.Event, err error) {
filePath := fmt.Sprintf(FilePathFormat, BackupPath, e.Topic(), tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) filePath := fmt.Sprintf(FilePathFormat, BackupPath, e.Topic(), tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000))
f, er := os.Create(filePath) f, er := os.Create(filePath)
if er != nil { if er != nil {
logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", e.Topic(), e.Message(), er) logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v err:%v", e.Topic(), e.Message(), er)
return return
} }
defer f.Close() defer f.Close()
@ -124,6 +130,7 @@ func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.Subscribe
subscriberLock.Lock() subscriberLock.Lock()
subscriber[topic] = append(subscriber[topic], &s) subscriber[topic] = append(subscriber[topic], &s)
subscriberLock.Unlock() subscriberLock.Unlock()
logger.Logger.Infof("RegisterSubscriber topic:%v", topic)
} }
func GetSubscribers() map[string][]*Subscriber { func GetSubscribers() map[string][]*Subscriber {
@ -139,11 +146,11 @@ func GetSubscribers() map[string][]*Subscriber {
} }
// StartConsumer 启动消费者 // StartConsumer 启动消费者
func StartConsumer(url string, exchange string, durableExchange bool) { func StartConsumer(url string, exchange string, durableExchange bool, opts ...broker.Option) {
StopConsumer() StopConsumer()
globalConsumer = NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}) globalConsumer = NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, opts...)
if err := globalConsumer.Start(); err != nil { if err := globalConsumer.Start(); err != nil {
panic(fmt.Sprintf("RabbitMQConsumer.Start() err:%v", err)) panic(fmt.Sprintf("RabbitMQConsumer.Start() url:%v exchange:%v err:%v", url, exchange, err))
} }
} }

View File

@ -33,6 +33,7 @@ var globalPublisher *RabbitMQPublisher
type item struct { type item struct {
topic string topic string
msg interface{} msg interface{}
opts []broker.PublishOption
} }
type RabbitMQPublisher struct { type RabbitMQPublisher struct {
@ -44,7 +45,7 @@ type RabbitMQPublisher struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int) *RabbitMQPublisher { func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int, opts ...broker.Option) *RabbitMQPublisher {
if queueSize <= 0 { if queueSize <= 0 {
queueSize = 1 queueSize = 1
} }
@ -60,17 +61,20 @@ func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int)
if exchange.Durable { if exchange.Durable {
options = append(options, rabbitmq.DurableExchange()) options = append(options, rabbitmq.DurableExchange())
} }
if len(opts) > 0 {
options = append(options, opts...)
}
mq.Broker = rabbitmq.NewBroker(options...) mq.Broker = rabbitmq.NewBroker(options...)
mq.Broker.Init() mq.Broker.Init()
return mq return mq
} }
// 发布消息,同步 // 发布消息,同步
func (p *RabbitMQPublisher) publish(topic string, msg interface{}) (err error) { func (p *RabbitMQPublisher) publish(topic string, msg interface{}, opts ...broker.PublishOption) (err error) {
defer func() { defer func() {
e := recover() e := recover()
if e != nil { if e != nil {
logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) recover:%v", topic, msg, e) logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v recover:%v", topic, msg, e)
} }
if err != nil || e != nil { if err != nil || e != nil {
p.backup(topic, msg, err) p.backup(topic, msg, err)
@ -90,9 +94,9 @@ func (p *RabbitMQPublisher) publish(topic string, msg interface{}) (err error) {
} }
} }
err = p.Publish(topic, &broker.Message{Body: buf}) err = p.Publish(topic, &broker.Message{Body: buf}, opts...)
if err != nil { if err != nil {
logger.Logger.Error("RabbitMQPublisher.publish(%s,%v) err:%v", topic, msg, err) logger.Logger.Error("RabbitMQPublisher.publish() topic:%v message:%v err:%v", topic, msg, err)
return return
} }
return nil return nil
@ -106,7 +110,7 @@ func (p *RabbitMQPublisher) publishRoutine() {
select { select {
case item, ok := <-p.que: case item, ok := <-p.que:
if ok { if ok {
p.publish(item.topic, item.msg) p.publish(item.topic, item.msg, item.opts...)
} else { } else {
return return
} }
@ -124,17 +128,19 @@ func (p *RabbitMQPublisher) Start() (err error) {
} }
} }
err = p.Connect() if err = p.Connect(); err != nil {
if err != nil { return err
return
} }
logger.Logger.Infof("RabbitMQPublisher.Start() url:%s exchange:%s", p.url, p.exchange.Name)
go p.publishRoutine() go p.publishRoutine()
return nil return nil
} }
func (p *RabbitMQPublisher) Stop() error { func (p *RabbitMQPublisher) Stop() error {
logger.Logger.Infof("RabbitMQPublisher.Stop() url:%s exchange:%s", p.url, p.exchange.Name)
select { select {
case <-p.closed: case <-p.closed:
return ErrClosed return ErrClosed
@ -144,7 +150,7 @@ func (p *RabbitMQPublisher) Stop() error {
close(p.closed) close(p.closed)
close(p.que) close(p.que)
for item := range p.que { for item := range p.que {
p.publish(item.topic, item.msg) p.publish(item.topic, item.msg, item.opts...)
} }
//等待所有投递出去的任务全部完成 //等待所有投递出去的任务全部完成
@ -154,14 +160,14 @@ func (p *RabbitMQPublisher) Stop() error {
} }
// Send 发布消息,异步 // Send 发布消息,异步
func (p *RabbitMQPublisher) Send(topic string, msg interface{}) (err error) { func (p *RabbitMQPublisher) Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) {
select { select {
case <-p.closed: case <-p.closed:
return ErrClosed return ErrClosed
default: default:
} }
i := &item{topic: topic, msg: msg} i := &item{topic: topic, msg: msg, opts: opts}
select { select {
case p.que <- i: case p.que <- i:
default: default:
@ -184,7 +190,7 @@ func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) {
filePath := fmt.Sprintf(FilePathFormat, BackupPath, topic, tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) filePath := fmt.Sprintf(FilePathFormat, BackupPath, topic, tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000))
f, er := os.Create(filePath) f, er := os.Create(filePath)
if er != nil { if er != nil {
logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", topic, msg, er) logger.Logger.Errorf("RabbitMQPublisher.public() topic:%v message:%v err:%v", topic, msg, er)
return return
} }
defer f.Close() defer f.Close()
@ -197,11 +203,11 @@ func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) {
} }
// StartPublisher 启动发布者 // StartPublisher 启动发布者
func StartPublisher(url string, exchange string, durableExchange bool, queueSize int) { func StartPublisher(url string, exchange string, durableExchange bool, queueSize int, opts ...broker.Option) {
StopPublisher() StopPublisher()
globalPublisher = NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize) globalPublisher = NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize, opts...)
if err := globalPublisher.Start(); err != nil { if err := globalPublisher.Start(); err != nil {
panic(fmt.Sprintf("RabbitMQPublisher.Start() err:%v", err)) panic(fmt.Sprintf("RabbitMQPublisher.Start() url:%v exchange:%v err:%v", url, exchange, err))
} }
} }
@ -212,9 +218,9 @@ func StopPublisher() {
} }
} }
func Send(topic string, msg interface{}) (err error) { func Send(topic string, msg interface{}, opts ...broker.PublishOption) (err error) {
if globalPublisher != nil { if globalPublisher != nil {
return globalPublisher.Send(topic, msg) return globalPublisher.Send(topic, msg, opts...)
} }
return ErrClosed return ErrClosed
} }