This commit is contained in:
sk 2024-05-22 15:05:34 +08:00
parent 7c2d2fb4f6
commit 12eb52dbbb
6 changed files with 284 additions and 243 deletions

View File

@ -5,11 +5,15 @@ import (
"crypto/tls"
)
//================================
// Options
//================================
type Options struct {
Addrs []string
Secure bool
// Handler executed when error happens in broker mesage
// Handler executed when error happens in broker message
// processing
ErrorHandler Handler
@ -20,12 +24,60 @@ type Options struct {
Context context.Context
}
type Option func(*Options)
// Addrs sets the host addresses to be used by the broker
func Addrs(addrs ...string) Option {
return func(o *Options) {
o.Addrs = addrs
}
}
// ErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func ErrorHandler(h Handler) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}
// Secure communication with the broker
func Secure(b bool) Option {
return func(o *Options) {
o.Secure = b
}
}
// TLSConfig Specify TLS Config
func TLSConfig(t *tls.Config) Option {
return func(o *Options) {
o.TLSConfig = t
}
}
//================================
// PublishOptions
//================================
type PublishOptions struct {
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
type PublishOption func(*PublishOptions)
// PublishContext set context
func PublishContext(ctx context.Context) PublishOption {
return func(o *PublishOptions) {
o.Context = ctx
}
}
//================================
// SubscribeOptions
//================================
type SubscribeOptions struct {
// AutoAck defaults to true. When a handler returns
// with a nil error the message is acked.
@ -40,19 +92,6 @@ type SubscribeOptions struct {
Context context.Context
}
type Option func(*Options)
type PublishOption func(*PublishOptions)
// PublishContext set context
func PublishContext(ctx context.Context) PublishOption {
return func(o *PublishOptions) {
o.Context = ctx
}
}
type SubscribeOption func(*SubscribeOptions)
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
opt := SubscribeOptions{
AutoAck: true,
@ -65,12 +104,7 @@ func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
return opt
}
// Addrs sets the host addresses to be used by the broker
func Addrs(addrs ...string) Option {
return func(o *Options) {
o.Addrs = addrs
}
}
type SubscribeOption func(*SubscribeOptions)
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
@ -80,14 +114,6 @@ func DisableAutoAck() SubscribeOption {
}
}
// ErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func ErrorHandler(h Handler) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}
// Queue sets the name of the queue to share messages on
func Queue(name string) SubscribeOption {
return func(o *SubscribeOptions) {
@ -95,20 +121,6 @@ func Queue(name string) SubscribeOption {
}
}
// Secure communication with the broker
func Secure(b bool) Option {
return func(o *Options) {
o.Secure = b
}
}
// Specify TLS Config
func TLSConfig(t *tls.Config) Option {
return func(o *Options) {
o.TLSConfig = t
}
}
// SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) {

View File

@ -1,9 +1,5 @@
package rabbitmq
//
// All credit to Mondo
//
import (
"errors"
@ -11,12 +7,17 @@ import (
"github.com/streadway/amqp"
)
/*
RabbitMQ 通道封装
*/
type rabbitMQChannel struct {
uuid string
connection *amqp.Connection
channel *amqp.Channel
}
// newRabbitChannel 新建通道
func newRabbitChannel(conn *amqp.Connection, prefetchCount int, prefetchGlobal bool) (*rabbitMQChannel, error) {
id, err := uuid.NewRandom()
if err != nil {

View File

@ -1,9 +1,5 @@
package rabbitmq
//
// All credit to Mondo
//
import (
"crypto/tls"
"regexp"
@ -14,6 +10,11 @@ import (
"github.com/streadway/amqp"
)
/*
RabbitMQ 连接封装
带自动重连
*/
var (
DefaultExchange = Exchange{
Name: "idealeak",
@ -39,22 +40,6 @@ var (
dialConfig = amqp.DialConfig
)
type rabbitMQConn struct {
Connection *amqp.Connection
Channel *rabbitMQChannel
ExchangeChannel *rabbitMQChannel
exchange Exchange
url string
prefetchCount int
prefetchGlobal bool
sync.Mutex
connected bool
close chan bool // 关闭信号
waitConnection chan struct{} // 建立连接中
}
// Exchange is the rabbitmq exchange
type Exchange struct {
// Name of the exchange
@ -63,6 +48,22 @@ type Exchange struct {
Durable bool
}
type rabbitMQConn struct {
Connection *amqp.Connection
Channel *rabbitMQChannel
ExchangeChannel *rabbitMQChannel
exchange Exchange
url string
prefetchCount int // 每次分发给消费者的最大消息数量
prefetchGlobal bool // 是否对整个 channel 生效
sync.Mutex
connected bool
close chan bool // 关闭信号
WaitConnection chan struct{} // 建立连接中
}
func newRabbitMQConn(ex Exchange, urls []string, prefetchCount int, prefetchGlobal bool) *rabbitMQConn {
var url string
@ -78,107 +79,11 @@ func newRabbitMQConn(ex Exchange, urls []string, prefetchCount int, prefetchGlob
prefetchCount: prefetchCount,
prefetchGlobal: prefetchGlobal,
close: make(chan bool),
waitConnection: make(chan struct{}),
WaitConnection: make(chan struct{}),
}
// its bad case of nil == waitConnection, so close it at start
close(ret.waitConnection)
return ret
}
func (r *rabbitMQConn) connect(secure bool, config *amqp.Config) error {
// try connect
if err := r.tryConnect(secure, config); err != nil {
return err
}
// connected
r.Lock()
r.connected = true
r.Unlock()
// create reconnect loop
go r.reconnect(secure, config)
return nil
}
func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) {
// skip first connect
var connect bool
for {
if connect {
// try reconnect
if err := r.tryConnect(secure, config); err != nil {
time.Sleep(1 * time.Second)
continue
}
// connected
r.Lock()
r.connected = true
r.Unlock()
//unblock resubscribe cycle - close channel
//at this point channel is created and unclosed - close it without any additional checks
close(r.waitConnection)
}
connect = true
notifyClose := make(chan *amqp.Error)
r.Connection.NotifyClose(notifyClose)
// block until closed
select {
case <-notifyClose:
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.waitConnection = make(chan struct{})
r.Unlock()
case <-r.close:
return
}
}
}
func (r *rabbitMQConn) Connect(secure bool, config *amqp.Config) error {
r.Lock()
// already connected
if r.connected {
r.Unlock()
return nil
}
// check it was closed
select {
case <-r.close:
r.close = make(chan bool)
default:
// no op
// new conn
}
r.Unlock()
return r.connect(secure, config)
}
func (r *rabbitMQConn) Close() error {
r.Lock()
defer r.Unlock()
select {
case <-r.close:
return nil
default:
close(r.close)
r.connected = false
}
return r.Connection.Close()
}
func (r *rabbitMQConn) tryConnect(secure bool, config *amqp.Config) error {
var err error
@ -218,6 +123,98 @@ func (r *rabbitMQConn) tryConnect(secure bool, config *amqp.Config) error {
return err
}
func (r *rabbitMQConn) connect(secure bool, config *amqp.Config) error {
// try connect
if err := r.tryConnect(secure, config); err != nil {
return err
}
// connected
r.Lock()
r.connected = true
r.Unlock()
close(r.WaitConnection)
return nil
}
func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) {
// skip first connect
var connect bool
for {
if connect {
// try reconnect
select {
case <-r.close:
return
default:
if err := r.connect(secure, config); err != nil {
time.Sleep(time.Second)
continue
}
}
}
connect = true
notifyClose := make(chan *amqp.Error)
r.Connection.NotifyClose(notifyClose)
// block until closed
select {
case <-notifyClose:
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'WaitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.WaitConnection = make(chan struct{})
r.Unlock()
case <-r.close:
return
}
}
}
func (r *rabbitMQConn) Connect(secure bool, config *amqp.Config) error {
r.Lock()
if r.connected {
r.Unlock()
return nil
}
r.Unlock()
// check it was closed
select {
case <-r.close:
r.close = make(chan bool)
default:
// no op
// new conn
}
err := r.connect(secure, config)
if err == nil {
go r.reconnect(secure, config)
}
return err
}
func (r *rabbitMQConn) Close() error {
r.Lock()
defer r.Unlock()
select {
case <-r.close:
return nil
default:
close(r.close)
r.connected = false
}
return r.Connection.Close()
}
func (r *rabbitMQConn) Consume(queue, key string, headers amqp.Table, qArgs amqp.Table, autoAck, durableQueue bool) (*rabbitMQChannel, <-chan amqp.Delivery, error) {
consumerChannel, err := newRabbitChannel(r.Connection, r.prefetchCount, r.prefetchGlobal)
if err != nil {

View File

@ -6,28 +6,33 @@ import (
"mongo.games.com/goserver/core/broker"
)
/*
context参数
*/
type durableQueueKey struct{}
type headersKey struct{}
type queueArgumentsKey struct{}
type prefetchCountKey struct{}
type prefetchGlobalKey struct{}
type exchangeKey struct{}
type durableExchange struct{}
type requeueOnErrorKey struct{}
type deliveryMode struct{}
type priorityKey struct{}
type externalAuth struct{}
type durableExchange struct{}
type ackSuccessKey struct{}
type subscribeContextKey struct{}
//============================
// broker.SubscribeOption
//============================
// DurableQueue creates a durable queue when subscribing.
func DurableQueue() broker.SubscribeOption {
return setSubscribeOption(durableQueueKey{}, true)
}
// DurableExchange is an option to set the Exchange to be durable
func DurableExchange() broker.Option {
return setBrokerOption(durableExchange{}, true)
}
// Headers adds headers used by the headers exchange
func Headers(h map[string]interface{}) broker.SubscribeOption {
return setSubscribeOption(headersKey{}, h)
@ -43,6 +48,25 @@ func RequeueOnError() broker.SubscribeOption {
return setSubscribeOption(requeueOnErrorKey{}, true)
}
// SubscribeContext set the context for broker.SubscribeOption
func SubscribeContext(ctx context.Context) broker.SubscribeOption {
return setSubscribeOption(subscribeContextKey{}, ctx)
}
// AckOnSuccess will automatically acknowledge messages when no error is returned
func AckOnSuccess() broker.SubscribeOption {
return setSubscribeOption(ackSuccessKey{}, true)
}
//============================
// broker.Option
//============================
// DurableExchange is an option to set the Exchange to be durable
func DurableExchange() broker.Option {
return setBrokerOption(durableExchange{}, true)
}
// ExchangeName is an option to set the ExchangeName
func ExchangeName(e string) broker.Option {
return setBrokerOption(exchangeKey{}, e)
@ -53,11 +77,19 @@ func PrefetchCount(c int) broker.Option {
return setBrokerOption(prefetchCountKey{}, c)
}
func ExternalAuth() broker.Option {
return setBrokerOption(externalAuth{}, ExternalAuthentication{})
}
// PrefetchGlobal creates a durable queue when subscribing.
func PrefetchGlobal() broker.Option {
return setBrokerOption(prefetchGlobalKey{}, true)
}
//============================
// broker.PublishOption
//============================
// DeliveryMode sets a delivery mode for publishing
func DeliveryMode(value uint8) broker.PublishOption {
return setPublishOption(deliveryMode{}, value)
@ -67,21 +99,3 @@ func DeliveryMode(value uint8) broker.PublishOption {
func Priority(value uint8) broker.PublishOption {
return setPublishOption(priorityKey{}, value)
}
func ExternalAuth() broker.Option {
return setBrokerOption(externalAuth{}, ExternalAuthentication{})
}
type subscribeContextKey struct{}
// SubscribeContext set the context for broker.SubscribeOption
func SubscribeContext(ctx context.Context) broker.SubscribeOption {
return setSubscribeOption(subscribeContextKey{}, ctx)
}
type ackSuccessKey struct{}
// AckOnSuccess will automatically acknowledge messages when no error is returned
func AckOnSuccess() broker.SubscribeOption {
return setSubscribeOption(ackSuccessKey{}, true)
}

View File

@ -11,33 +11,11 @@ import (
"mongo.games.com/goserver/core/broker"
)
type rbroker struct {
conn *rabbitMQConn
addrs []string
opts broker.Options
prefetchCount int
prefetchGlobal bool
mtx sync.Mutex
wg sync.WaitGroup
}
type subscriber struct {
mtx sync.Mutex
mayRun bool
opts broker.SubscribeOptions
topic string
ch *rabbitMQChannel
durableQueue bool
queueArgs map[string]interface{}
r *rbroker
fn func(msg amqp.Delivery)
headers map[string]interface{}
}
// publication Event
type publication struct {
d amqp.Delivery
m *broker.Message
t string
t string // RouteKey
err error
}
@ -57,6 +35,22 @@ func (p *publication) Message() *broker.Message {
return p.m
}
type subscriber struct {
mtx sync.Mutex
mayRun bool
// config
opts broker.SubscribeOptions
topic string
durableQueue bool
queueArgs map[string]interface{}
headers map[string]interface{}
ch *rabbitMQChannel
r *myBroker
fn func(msg amqp.Delivery)
}
func (s *subscriber) Options() broker.SubscribeOptions {
return s.opts
}
@ -96,7 +90,7 @@ func (s *subscriber) resubscribe() {
//yep, its shutdown case
return
//wait until we reconect to rabbit
case <-s.r.conn.waitConnection:
case <-s.r.conn.WaitConnection:
}
// it may crash (panic) in case of Consume without connection, so recheck it
@ -138,7 +132,34 @@ func (s *subscriber) resubscribe() {
}
}
func (r *rbroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
type myBroker struct {
conn *rabbitMQConn
addrs []string
opts broker.Options
prefetchCount int
prefetchGlobal bool
mtx sync.Mutex
wg sync.WaitGroup
}
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return &myBroker{
addrs: options.Addrs,
opts: options,
}
}
// Publish 发送消息
// 可设置消息的持久化、优先级、过期时间等属性
func (r *myBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
m := amqp.Publishing{
Body: msg.Body,
Headers: amqp.Table{},
@ -170,7 +191,10 @@ func (r *rbroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ
return r.conn.Publish(r.conn.exchange.Name, topic, m)
}
func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
// Subscribe 创建消费者
// 默认开启自动确认消息, 使用broker.DisableAutoAck()关闭
// 关闭自动确认消息后使用 AckOnSuccess() 可以根据handler的错误信息自动确认或者拒绝消息
func (r *myBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
var ackSuccess bool
if r.conn == nil {
@ -211,7 +235,7 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker
headers = h
}
if bval, ok := ctx.Value(ackSuccessKey{}).(bool); ok && bval {
if su, ok := ctx.Value(ackSuccessKey{}).(bool); ok && su {
opt.AutoAck = false
ackSuccess = true
}
@ -227,6 +251,7 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker
}
p := &publication{d: msg, m: m, t: msg.RoutingKey}
p.err = handler(p)
// AutoAck为false时根据有没有错误自动确认或者重新入队
if p.err == nil && ackSuccess && !opt.AutoAck {
msg.Ack(false)
} else if p.err != nil && !opt.AutoAck {
@ -234,30 +259,30 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker
}
}
sret := &subscriber{topic: topic, opts: opt, mayRun: true, r: r,
s := &subscriber{topic: topic, opts: opt, mayRun: true, r: r,
durableQueue: durableQueue, fn: fn, headers: headers, queueArgs: qArgs}
go sret.resubscribe()
go s.resubscribe()
return sret, nil
return s, nil
}
func (r *rbroker) Options() broker.Options {
func (r *myBroker) Options() broker.Options {
return r.opts
}
func (r *rbroker) String() string {
func (r *myBroker) String() string {
return "rabbitmq"
}
func (r *rbroker) Address() string {
func (r *myBroker) Address() string {
if len(r.addrs) > 0 {
return r.addrs[0]
}
return ""
}
func (r *rbroker) Init(opts ...broker.Option) error {
func (r *myBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&r.opts)
}
@ -265,7 +290,8 @@ func (r *rbroker) Init(opts ...broker.Option) error {
return nil
}
func (r *rbroker) Connect() error {
// Connect 建立连接
func (r *myBroker) Connect() error {
if r.conn == nil {
r.conn = newRabbitMQConn(r.getExchange(), r.opts.Addrs, r.getPrefetchCount(), r.getPrefetchGlobal())
}
@ -281,7 +307,9 @@ func (r *rbroker) Connect() error {
return r.conn.Connect(r.opts.Secure, &conf)
}
func (r *rbroker) Disconnect() error {
// Disconnect 停止消费者后断开连接
// 等待消费者停止消费
func (r *myBroker) Disconnect() error {
if r.conn == nil {
return errors.New("connection is nil")
}
@ -290,22 +318,7 @@ func (r *rbroker) Disconnect() error {
return ret
}
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return &rbroker{
addrs: options.Addrs,
opts: options,
}
}
func (r *rbroker) getExchange() Exchange {
func (r *myBroker) getExchange() Exchange {
ex := DefaultExchange
@ -320,14 +333,14 @@ func (r *rbroker) getExchange() Exchange {
return ex
}
func (r *rbroker) getPrefetchCount() int {
func (r *myBroker) getPrefetchCount() int {
if e, ok := r.opts.Context.Value(prefetchCountKey{}).(int); ok {
return e
}
return DefaultPrefetchCount
}
func (r *rbroker) getPrefetchGlobal() bool {
func (r *myBroker) getPrefetchGlobal() bool {
if e, ok := r.opts.Context.Value(prefetchGlobalKey{}).(bool); ok {
return e
}

View File

@ -48,7 +48,11 @@ func GetHandler(packetId int) Handler {
func Register(mainId int, msgType interface{}, h func(session *Session, packetId int, data interface{}) error) {
f := func() interface{} {
return reflect.New(reflect.TypeOf(msgType)).Interface()
tp := reflect.TypeOf(msgType)
if tp.Kind() == reflect.Ptr {
tp = tp.Elem()
}
return reflect.New(tp).Interface()
}
RegisterFactory(mainId, PacketFactoryWrapper(f))
RegisterHandler(mainId, HandlerWrapper(h))