diff --git a/core/broker/options.go b/core/broker/options.go index c54ace5..f8fd9ac 100644 --- a/core/broker/options.go +++ b/core/broker/options.go @@ -5,11 +5,15 @@ import ( "crypto/tls" ) +//================================ +// Options +//================================ + type Options struct { Addrs []string Secure bool - // Handler executed when error happens in broker mesage + // Handler executed when error happens in broker message // processing ErrorHandler Handler @@ -20,12 +24,60 @@ type Options struct { Context context.Context } +type Option func(*Options) + +// Addrs sets the host addresses to be used by the broker +func Addrs(addrs ...string) Option { + return func(o *Options) { + o.Addrs = addrs + } +} + +// ErrorHandler will catch all broker errors that cant be handled +// in normal way, for example Codec errors +func ErrorHandler(h Handler) Option { + return func(o *Options) { + o.ErrorHandler = h + } +} + +// Secure communication with the broker +func Secure(b bool) Option { + return func(o *Options) { + o.Secure = b + } +} + +// TLSConfig Specify TLS Config +func TLSConfig(t *tls.Config) Option { + return func(o *Options) { + o.TLSConfig = t + } +} + +//================================ +// PublishOptions +//================================ + type PublishOptions struct { // Other options for implementations of the interface // can be stored in a context Context context.Context } +type PublishOption func(*PublishOptions) + +// PublishContext set context +func PublishContext(ctx context.Context) PublishOption { + return func(o *PublishOptions) { + o.Context = ctx + } +} + +//================================ +// SubscribeOptions +//================================ + type SubscribeOptions struct { // AutoAck defaults to true. When a handler returns // with a nil error the message is acked. @@ -40,19 +92,6 @@ type SubscribeOptions struct { Context context.Context } -type Option func(*Options) - -type PublishOption func(*PublishOptions) - -// PublishContext set context -func PublishContext(ctx context.Context) PublishOption { - return func(o *PublishOptions) { - o.Context = ctx - } -} - -type SubscribeOption func(*SubscribeOptions) - func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { opt := SubscribeOptions{ AutoAck: true, @@ -65,12 +104,7 @@ func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { return opt } -// Addrs sets the host addresses to be used by the broker -func Addrs(addrs ...string) Option { - return func(o *Options) { - o.Addrs = addrs - } -} +type SubscribeOption func(*SubscribeOptions) // DisableAutoAck will disable auto acking of messages // after they have been handled. @@ -80,14 +114,6 @@ func DisableAutoAck() SubscribeOption { } } -// ErrorHandler will catch all broker errors that cant be handled -// in normal way, for example Codec errors -func ErrorHandler(h Handler) Option { - return func(o *Options) { - o.ErrorHandler = h - } -} - // Queue sets the name of the queue to share messages on func Queue(name string) SubscribeOption { return func(o *SubscribeOptions) { @@ -95,20 +121,6 @@ func Queue(name string) SubscribeOption { } } -// Secure communication with the broker -func Secure(b bool) Option { - return func(o *Options) { - o.Secure = b - } -} - -// Specify TLS Config -func TLSConfig(t *tls.Config) Option { - return func(o *Options) { - o.TLSConfig = t - } -} - // SubscribeContext set context func SubscribeContext(ctx context.Context) SubscribeOption { return func(o *SubscribeOptions) { diff --git a/core/broker/rabbitmq/channel.go b/core/broker/rabbitmq/channel.go index c02f5e3..99f7b6b 100644 --- a/core/broker/rabbitmq/channel.go +++ b/core/broker/rabbitmq/channel.go @@ -1,9 +1,5 @@ package rabbitmq -// -// All credit to Mondo -// - import ( "errors" @@ -11,12 +7,17 @@ import ( "github.com/streadway/amqp" ) +/* + RabbitMQ 通道封装 +*/ + type rabbitMQChannel struct { uuid string connection *amqp.Connection channel *amqp.Channel } +// newRabbitChannel 新建通道 func newRabbitChannel(conn *amqp.Connection, prefetchCount int, prefetchGlobal bool) (*rabbitMQChannel, error) { id, err := uuid.NewRandom() if err != nil { diff --git a/core/broker/rabbitmq/connection.go b/core/broker/rabbitmq/connection.go index 7d06e63..dc24cad 100644 --- a/core/broker/rabbitmq/connection.go +++ b/core/broker/rabbitmq/connection.go @@ -1,9 +1,5 @@ package rabbitmq -// -// All credit to Mondo -// - import ( "crypto/tls" "regexp" @@ -14,6 +10,11 @@ import ( "github.com/streadway/amqp" ) +/* + RabbitMQ 连接封装 + 带自动重连 +*/ + var ( DefaultExchange = Exchange{ Name: "idealeak", @@ -39,22 +40,6 @@ var ( dialConfig = amqp.DialConfig ) -type rabbitMQConn struct { - Connection *amqp.Connection - Channel *rabbitMQChannel - ExchangeChannel *rabbitMQChannel - exchange Exchange - url string - prefetchCount int - prefetchGlobal bool - - sync.Mutex - connected bool - - close chan bool // 关闭信号 - waitConnection chan struct{} // 建立连接中 -} - // Exchange is the rabbitmq exchange type Exchange struct { // Name of the exchange @@ -63,6 +48,22 @@ type Exchange struct { Durable bool } +type rabbitMQConn struct { + Connection *amqp.Connection + Channel *rabbitMQChannel + ExchangeChannel *rabbitMQChannel + exchange Exchange + url string + prefetchCount int // 每次分发给消费者的最大消息数量 + prefetchGlobal bool // 是否对整个 channel 生效 + + sync.Mutex + connected bool + + close chan bool // 关闭信号 + WaitConnection chan struct{} // 建立连接中 +} + func newRabbitMQConn(ex Exchange, urls []string, prefetchCount int, prefetchGlobal bool) *rabbitMQConn { var url string @@ -78,107 +79,11 @@ func newRabbitMQConn(ex Exchange, urls []string, prefetchCount int, prefetchGlob prefetchCount: prefetchCount, prefetchGlobal: prefetchGlobal, close: make(chan bool), - waitConnection: make(chan struct{}), + WaitConnection: make(chan struct{}), } - // its bad case of nil == waitConnection, so close it at start - close(ret.waitConnection) return ret } -func (r *rabbitMQConn) connect(secure bool, config *amqp.Config) error { - // try connect - if err := r.tryConnect(secure, config); err != nil { - return err - } - - // connected - r.Lock() - r.connected = true - r.Unlock() - - // create reconnect loop - go r.reconnect(secure, config) - return nil -} - -func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) { - // skip first connect - var connect bool - - for { - if connect { - // try reconnect - if err := r.tryConnect(secure, config); err != nil { - time.Sleep(1 * time.Second) - continue - } - - // connected - r.Lock() - r.connected = true - r.Unlock() - //unblock resubscribe cycle - close channel - //at this point channel is created and unclosed - close it without any additional checks - close(r.waitConnection) - } - - connect = true - notifyClose := make(chan *amqp.Error) - r.Connection.NotifyClose(notifyClose) - - // block until closed - select { - case <-notifyClose: - // block all resubscribe attempt - they are useless because there is no connection to rabbitmq - // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) - r.Lock() - r.connected = false - r.waitConnection = make(chan struct{}) - r.Unlock() - case <-r.close: - return - } - } -} - -func (r *rabbitMQConn) Connect(secure bool, config *amqp.Config) error { - r.Lock() - - // already connected - if r.connected { - r.Unlock() - return nil - } - - // check it was closed - select { - case <-r.close: - r.close = make(chan bool) - default: - // no op - // new conn - } - - r.Unlock() - - return r.connect(secure, config) -} - -func (r *rabbitMQConn) Close() error { - r.Lock() - defer r.Unlock() - - select { - case <-r.close: - return nil - default: - close(r.close) - r.connected = false - } - - return r.Connection.Close() -} - func (r *rabbitMQConn) tryConnect(secure bool, config *amqp.Config) error { var err error @@ -218,6 +123,98 @@ func (r *rabbitMQConn) tryConnect(secure bool, config *amqp.Config) error { return err } +func (r *rabbitMQConn) connect(secure bool, config *amqp.Config) error { + // try connect + if err := r.tryConnect(secure, config); err != nil { + return err + } + + // connected + r.Lock() + r.connected = true + r.Unlock() + + close(r.WaitConnection) + return nil +} + +func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) { + // skip first connect + var connect bool + + for { + if connect { + // try reconnect + select { + case <-r.close: + return + default: + if err := r.connect(secure, config); err != nil { + time.Sleep(time.Second) + continue + } + } + } + + connect = true + notifyClose := make(chan *amqp.Error) + r.Connection.NotifyClose(notifyClose) + + // block until closed + select { + case <-notifyClose: + // block all resubscribe attempt - they are useless because there is no connection to rabbitmq + // create channel 'WaitConnection' (at this point channel is nil or closed, create it without unnecessary checks) + r.Lock() + r.connected = false + r.WaitConnection = make(chan struct{}) + r.Unlock() + case <-r.close: + return + } + } +} + +func (r *rabbitMQConn) Connect(secure bool, config *amqp.Config) error { + r.Lock() + if r.connected { + r.Unlock() + return nil + } + r.Unlock() + + // check it was closed + select { + case <-r.close: + r.close = make(chan bool) + default: + // no op + // new conn + } + + err := r.connect(secure, config) + if err == nil { + go r.reconnect(secure, config) + } + + return err +} + +func (r *rabbitMQConn) Close() error { + r.Lock() + defer r.Unlock() + + select { + case <-r.close: + return nil + default: + close(r.close) + r.connected = false + } + + return r.Connection.Close() +} + func (r *rabbitMQConn) Consume(queue, key string, headers amqp.Table, qArgs amqp.Table, autoAck, durableQueue bool) (*rabbitMQChannel, <-chan amqp.Delivery, error) { consumerChannel, err := newRabbitChannel(r.Connection, r.prefetchCount, r.prefetchGlobal) if err != nil { diff --git a/core/broker/rabbitmq/options.go b/core/broker/rabbitmq/options.go index 380294c..31aa247 100644 --- a/core/broker/rabbitmq/options.go +++ b/core/broker/rabbitmq/options.go @@ -6,28 +6,33 @@ import ( "mongo.games.com/goserver/core/broker" ) +/* + context参数 +*/ + type durableQueueKey struct{} type headersKey struct{} type queueArgumentsKey struct{} type prefetchCountKey struct{} type prefetchGlobalKey struct{} type exchangeKey struct{} +type durableExchange struct{} type requeueOnErrorKey struct{} type deliveryMode struct{} type priorityKey struct{} type externalAuth struct{} -type durableExchange struct{} +type ackSuccessKey struct{} +type subscribeContextKey struct{} + +//============================ +// broker.SubscribeOption +//============================ // DurableQueue creates a durable queue when subscribing. func DurableQueue() broker.SubscribeOption { return setSubscribeOption(durableQueueKey{}, true) } -// DurableExchange is an option to set the Exchange to be durable -func DurableExchange() broker.Option { - return setBrokerOption(durableExchange{}, true) -} - // Headers adds headers used by the headers exchange func Headers(h map[string]interface{}) broker.SubscribeOption { return setSubscribeOption(headersKey{}, h) @@ -43,6 +48,25 @@ func RequeueOnError() broker.SubscribeOption { return setSubscribeOption(requeueOnErrorKey{}, true) } +// SubscribeContext set the context for broker.SubscribeOption +func SubscribeContext(ctx context.Context) broker.SubscribeOption { + return setSubscribeOption(subscribeContextKey{}, ctx) +} + +// AckOnSuccess will automatically acknowledge messages when no error is returned +func AckOnSuccess() broker.SubscribeOption { + return setSubscribeOption(ackSuccessKey{}, true) +} + +//============================ +// broker.Option +//============================ + +// DurableExchange is an option to set the Exchange to be durable +func DurableExchange() broker.Option { + return setBrokerOption(durableExchange{}, true) +} + // ExchangeName is an option to set the ExchangeName func ExchangeName(e string) broker.Option { return setBrokerOption(exchangeKey{}, e) @@ -53,11 +77,19 @@ func PrefetchCount(c int) broker.Option { return setBrokerOption(prefetchCountKey{}, c) } +func ExternalAuth() broker.Option { + return setBrokerOption(externalAuth{}, ExternalAuthentication{}) +} + // PrefetchGlobal creates a durable queue when subscribing. func PrefetchGlobal() broker.Option { return setBrokerOption(prefetchGlobalKey{}, true) } +//============================ +// broker.PublishOption +//============================ + // DeliveryMode sets a delivery mode for publishing func DeliveryMode(value uint8) broker.PublishOption { return setPublishOption(deliveryMode{}, value) @@ -67,21 +99,3 @@ func DeliveryMode(value uint8) broker.PublishOption { func Priority(value uint8) broker.PublishOption { return setPublishOption(priorityKey{}, value) } - -func ExternalAuth() broker.Option { - return setBrokerOption(externalAuth{}, ExternalAuthentication{}) -} - -type subscribeContextKey struct{} - -// SubscribeContext set the context for broker.SubscribeOption -func SubscribeContext(ctx context.Context) broker.SubscribeOption { - return setSubscribeOption(subscribeContextKey{}, ctx) -} - -type ackSuccessKey struct{} - -// AckOnSuccess will automatically acknowledge messages when no error is returned -func AckOnSuccess() broker.SubscribeOption { - return setSubscribeOption(ackSuccessKey{}, true) -} diff --git a/core/broker/rabbitmq/rabbitmq.go b/core/broker/rabbitmq/rabbitmq.go index b79b7cf..141038d 100644 --- a/core/broker/rabbitmq/rabbitmq.go +++ b/core/broker/rabbitmq/rabbitmq.go @@ -11,33 +11,11 @@ import ( "mongo.games.com/goserver/core/broker" ) -type rbroker struct { - conn *rabbitMQConn - addrs []string - opts broker.Options - prefetchCount int - prefetchGlobal bool - mtx sync.Mutex - wg sync.WaitGroup -} - -type subscriber struct { - mtx sync.Mutex - mayRun bool - opts broker.SubscribeOptions - topic string - ch *rabbitMQChannel - durableQueue bool - queueArgs map[string]interface{} - r *rbroker - fn func(msg amqp.Delivery) - headers map[string]interface{} -} - +// publication Event type publication struct { d amqp.Delivery m *broker.Message - t string + t string // RouteKey err error } @@ -57,6 +35,22 @@ func (p *publication) Message() *broker.Message { return p.m } +type subscriber struct { + mtx sync.Mutex + mayRun bool + + // config + opts broker.SubscribeOptions + topic string + durableQueue bool + queueArgs map[string]interface{} + headers map[string]interface{} + + ch *rabbitMQChannel + r *myBroker + fn func(msg amqp.Delivery) +} + func (s *subscriber) Options() broker.SubscribeOptions { return s.opts } @@ -96,7 +90,7 @@ func (s *subscriber) resubscribe() { //yep, its shutdown case return //wait until we reconect to rabbit - case <-s.r.conn.waitConnection: + case <-s.r.conn.WaitConnection: } // it may crash (panic) in case of Consume without connection, so recheck it @@ -138,7 +132,34 @@ func (s *subscriber) resubscribe() { } } -func (r *rbroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { +type myBroker struct { + conn *rabbitMQConn + addrs []string + opts broker.Options + prefetchCount int + prefetchGlobal bool + mtx sync.Mutex + wg sync.WaitGroup +} + +func NewBroker(opts ...broker.Option) broker.Broker { + options := broker.Options{ + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + return &myBroker{ + addrs: options.Addrs, + opts: options, + } +} + +// Publish 发送消息 +// 可设置消息的持久化、优先级、过期时间等属性 +func (r *myBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { m := amqp.Publishing{ Body: msg.Body, Headers: amqp.Table{}, @@ -170,7 +191,10 @@ func (r *rbroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ return r.conn.Publish(r.conn.exchange.Name, topic, m) } -func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +// Subscribe 创建消费者 +// 默认开启自动确认消息, 使用broker.DisableAutoAck()关闭 +// 关闭自动确认消息后使用 AckOnSuccess() 可以根据handler的错误信息自动确认或者拒绝消息 +func (r *myBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { var ackSuccess bool if r.conn == nil { @@ -211,7 +235,7 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker headers = h } - if bval, ok := ctx.Value(ackSuccessKey{}).(bool); ok && bval { + if su, ok := ctx.Value(ackSuccessKey{}).(bool); ok && su { opt.AutoAck = false ackSuccess = true } @@ -227,6 +251,7 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker } p := &publication{d: msg, m: m, t: msg.RoutingKey} p.err = handler(p) + // AutoAck为false时根据有没有错误自动确认或者重新入队 if p.err == nil && ackSuccess && !opt.AutoAck { msg.Ack(false) } else if p.err != nil && !opt.AutoAck { @@ -234,30 +259,30 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker } } - sret := &subscriber{topic: topic, opts: opt, mayRun: true, r: r, + s := &subscriber{topic: topic, opts: opt, mayRun: true, r: r, durableQueue: durableQueue, fn: fn, headers: headers, queueArgs: qArgs} - go sret.resubscribe() + go s.resubscribe() - return sret, nil + return s, nil } -func (r *rbroker) Options() broker.Options { +func (r *myBroker) Options() broker.Options { return r.opts } -func (r *rbroker) String() string { +func (r *myBroker) String() string { return "rabbitmq" } -func (r *rbroker) Address() string { +func (r *myBroker) Address() string { if len(r.addrs) > 0 { return r.addrs[0] } return "" } -func (r *rbroker) Init(opts ...broker.Option) error { +func (r *myBroker) Init(opts ...broker.Option) error { for _, o := range opts { o(&r.opts) } @@ -265,7 +290,8 @@ func (r *rbroker) Init(opts ...broker.Option) error { return nil } -func (r *rbroker) Connect() error { +// Connect 建立连接 +func (r *myBroker) Connect() error { if r.conn == nil { r.conn = newRabbitMQConn(r.getExchange(), r.opts.Addrs, r.getPrefetchCount(), r.getPrefetchGlobal()) } @@ -281,7 +307,9 @@ func (r *rbroker) Connect() error { return r.conn.Connect(r.opts.Secure, &conf) } -func (r *rbroker) Disconnect() error { +// Disconnect 停止消费者后断开连接 +// 等待消费者停止消费 +func (r *myBroker) Disconnect() error { if r.conn == nil { return errors.New("connection is nil") } @@ -290,22 +318,7 @@ func (r *rbroker) Disconnect() error { return ret } -func NewBroker(opts ...broker.Option) broker.Broker { - options := broker.Options{ - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - return &rbroker{ - addrs: options.Addrs, - opts: options, - } -} - -func (r *rbroker) getExchange() Exchange { +func (r *myBroker) getExchange() Exchange { ex := DefaultExchange @@ -320,14 +333,14 @@ func (r *rbroker) getExchange() Exchange { return ex } -func (r *rbroker) getPrefetchCount() int { +func (r *myBroker) getPrefetchCount() int { if e, ok := r.opts.Context.Value(prefetchCountKey{}).(int); ok { return e } return DefaultPrefetchCount } -func (r *rbroker) getPrefetchGlobal() bool { +func (r *myBroker) getPrefetchGlobal() bool { if e, ok := r.opts.Context.Value(prefetchGlobalKey{}).(bool); ok { return e } diff --git a/core/netlib/packethandler.go b/core/netlib/packethandler.go index 4ca1c9f..ec5ba58 100644 --- a/core/netlib/packethandler.go +++ b/core/netlib/packethandler.go @@ -48,7 +48,11 @@ func GetHandler(packetId int) Handler { func Register(mainId int, msgType interface{}, h func(session *Session, packetId int, data interface{}) error) { f := func() interface{} { - return reflect.New(reflect.TypeOf(msgType)).Interface() + tp := reflect.TypeOf(msgType) + if tp.Kind() == reflect.Ptr { + tp = tp.Elem() + } + return reflect.New(tp).Interface() } RegisterFactory(mainId, PacketFactoryWrapper(f)) RegisterHandler(mainId, HandlerWrapper(h))