update mongox

添加函数多次调用抑制器
This commit is contained in:
sk 2025-02-06 13:36:14 +08:00
parent 84ef19185d
commit 7e899c06fa
6 changed files with 111 additions and 37 deletions

View File

@ -3,6 +3,8 @@ package mongox
import ( import (
"fmt" "fmt"
"github.com/fsnotify/fsnotify"
"mongo.games.com/goserver/core" "mongo.games.com/goserver/core"
"mongo.games.com/goserver/core/viperx" "mongo.games.com/goserver/core/viperx"
) )
@ -24,12 +26,21 @@ func (c *Configuration) Init() error {
vp := viperx.GetViper(c.Path) vp := viperx.GetViper(c.Path)
cfg := &Config{} f := func() {
if err := vp.Unmarshal(cfg); err != nil { cfg := &Config{}
panic(fmt.Sprintf("mongox init error: %v", err)) if err := vp.Unmarshal(cfg); err != nil {
panic(fmt.Sprintf("mongox init error: %v", err))
}
Restart(cfg)
} }
f()
Init(cfg) vp.OnConfigChange(func(in fsnotify.Event) {
if in.Name != c.Path {
return
}
f()
})
return nil return nil
} }

View File

@ -2,8 +2,7 @@ package mongox
import ( import (
"errors" "errors"
"sync"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/mongox/internal" "mongo.games.com/goserver/core/mongox/internal"
) )
@ -16,30 +15,42 @@ type Collection = internal.Collection
type Database = internal.Database type Database = internal.Database
var _manager *internal.Manager var _manager *internal.Manager
var rwm = sync.RWMutex{} // _manager 读写锁,配置更新时加锁
// GetConfig 获取配置 // GetConfig 获取配置
func GetConfig() *Config { func GetConfig() *Config {
rwm.RLock()
defer rwm.RUnlock()
if _manager == nil { if _manager == nil {
return nil return nil
} }
return _manager.GetConfig() return _manager.GetConfig()
} }
// Init 初始化
func Init(conf *Config) {
_manager = internal.NewManager(conf)
}
// Restart 重启 // Restart 重启
func Restart() { func Restart(conf ...*Config) {
rwm.Lock()
defer rwm.Unlock()
if _manager == nil { if _manager == nil {
logger.Logger.Error(NotInitError) if len(conf) == 0 || conf[0] == nil {
return
}
_manager = internal.NewManager(conf[0])
return return
} }
_manager.Restart(_manager.GetConfig()) if len(conf) == 0 || conf[0] == nil {
_manager.Restart(_manager.GetConfig())
return
}
_manager.Restart(conf[0])
} }
// Close 关闭 // Close 关闭
func Close() { func Close() {
rwm.Lock()
defer rwm.Unlock()
internal.Close(_manager) internal.Close(_manager)
} }

View File

@ -23,6 +23,9 @@ const (
// GetClient 获取数据库连接 // GetClient 获取数据库连接
// 默认获取的是 Global, log 的数据库连接 // 默认获取的是 Global, log 的数据库连接
func GetClient() (*mongo.Client, error) { func GetClient() (*mongo.Client, error) {
rwm.RLock()
defer rwm.RUnlock()
if _manager == nil { if _manager == nil {
return nil, NotInitError return nil, NotInitError
} }
@ -39,6 +42,9 @@ func GetClient() (*mongo.Client, error) {
// platform: 平台id // platform: 平台id
// database: 数据库名称 // database: 数据库名称
func GetDatabase(platform string, database DatabaseType) (*Database, error) { func GetDatabase(platform string, database DatabaseType) (*Database, error) {
rwm.RLock()
defer rwm.RUnlock()
if _manager == nil { if _manager == nil {
return nil, NotInitError return nil, NotInitError
} }
@ -57,6 +63,9 @@ func GetLogDatabase(platform string) (*Database, error) {
// GetGlobalDatabase 获取全局库 // GetGlobalDatabase 获取全局库
// database: 数据库名称 // database: 数据库名称
func GetGlobalDatabase(database DatabaseType) (*Database, error) { func GetGlobalDatabase(database DatabaseType) (*Database, error) {
rwm.RLock()
defer rwm.RUnlock()
if _manager == nil { if _manager == nil {
return nil, NotInitError return nil, NotInitError
} }
@ -80,6 +89,9 @@ func GetGlobalMonitorDatabase() (*Database, error) {
// database: 数据库名称 // database: 数据库名称
// collection: 集合名称 // collection: 集合名称
func GetGlobalCollection(database DatabaseType, collection string) (*Collection, error) { func GetGlobalCollection(database DatabaseType, collection string) (*Collection, error) {
rwm.RLock()
defer rwm.RUnlock()
if _manager == nil { if _manager == nil {
return nil, NotInitError return nil, NotInitError
} }
@ -104,6 +116,9 @@ func GetGlobalMonitorCollection(collection string) (*Collection, error) {
// database: 数据库名称 // database: 数据库名称
// collection: 集合名称 // collection: 集合名称
func GetCollection(platform string, database DatabaseType, collection string) (*Collection, error) { func GetCollection(platform string, database DatabaseType, collection string) (*Collection, error) {
rwm.RLock()
defer rwm.RUnlock()
if _manager == nil { if _manager == nil {
return nil, NotInitError return nil, NotInitError
} }

View File

@ -8,6 +8,8 @@ import (
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/sync/singleflight"
"mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/logger"
) )
@ -31,6 +33,7 @@ type Collection struct {
} }
type Database struct { type Database struct {
singleflight.Group
*DatabaseConfig *DatabaseConfig
Client *mongo.Client Client *mongo.Client
Database *mongo.Database Database *mongo.Database
@ -87,22 +90,28 @@ func (d *Database) GetCollection(name string) (*Collection, error) {
} }
d.Collection.Store(name, v) d.Collection.Store(name, v)
} }
c, _ := v.(*Collection) return v.(*Collection), nil
return c, nil
} }
type Manager struct { type Manager struct {
*singleflight.Group
conf *Config conf *Config
global *sync.Map // 内部库名称:Database global *sync.Map // 内部库名称:Database
platforms *sync.Map // 平台id:内部库名称:Database platforms *sync.Map // 平台id:内部库名称:Database
} }
func (m *Manager) GetCollection(key, database, collection string) (*Collection, error) { func (m *Manager) GetCollection(key, database, collection string) (*Collection, error) {
d, err := m.GetDatabase(key, database) d, err, _ := m.Do(key+database, func() (interface{}, error) {
return m.GetDatabase(key, database)
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
return d.GetCollection(collection)
c, err, _ := d.(*Database).Do(collection, func() (interface{}, error) {
return d.(*Database).GetCollection(collection)
})
return c.(*Collection), err
} }
func (m *Manager) GetDatabase(key, database string) (*Database, error) { func (m *Manager) GetDatabase(key, database string) (*Database, error) {
@ -120,8 +129,7 @@ func (m *Manager) GetDatabase(key, database string) (*Database, error) {
v = db v = db
m.global.Store(database, v) m.global.Store(database, v)
} }
d, _ := v.(*Database) return v.(*Database), nil
return d, nil
default: default:
var mp *sync.Map var mp *sync.Map
@ -144,28 +152,32 @@ func (m *Manager) GetDatabase(key, database string) (*Database, error) {
v = db v = db
mp.Store(database, v) mp.Store(database, v)
} }
d, _ := v.(*Database) return v.(*Database), nil
return d, nil
} }
} }
func (m *Manager) Restart(conf *Config) { func (m *Manager) Restart(conf *Config) {
logger.Logger.Infof("mongo manager restart...") logger.Logger.Infof("mongo manager restart...")
old := *m
time.AfterFunc(time.Minute, func() {
Close(&old)
})
old := *m
m.Group = &singleflight.Group{}
m.conf = conf m.conf = conf
m.global = &sync.Map{} m.global = &sync.Map{}
m.platforms = &sync.Map{} m.platforms = &sync.Map{}
time.AfterFunc(time.Minute, func() {
Close(&old)
})
} }
func Close(m *Manager) { func Close(m *Manager) {
logger.Logger.Infof("mongo manager close") logger.Logger.Infof("mongo manager close")
m.global.Range(func(key, value any) bool { m.global.Range(func(key, value any) bool {
if v, ok := value.(*Database); ok { if v, ok := value.(*Database); ok {
v.Client.Disconnect(nil) if err := v.Client.Disconnect(nil); err != nil {
logger.Logger.Warnf("mongo manager close error: %v", err)
}
} }
return true return true
}) })
@ -174,7 +186,9 @@ func Close(m *Manager) {
if v, ok := value.(*sync.Map); ok { if v, ok := value.(*sync.Map); ok {
v.Range(func(key, value any) bool { v.Range(func(key, value any) bool {
if v, ok := value.(*Database); ok { if v, ok := value.(*Database); ok {
v.Client.Disconnect(nil) if err := v.Client.Disconnect(nil); err != nil {
logger.Logger.Warnf("mongo manager close error: %v", err)
}
} }
return true return true
}) })
@ -189,6 +203,7 @@ func (m *Manager) GetConfig() *Config {
func NewManager(conf *Config) *Manager { func NewManager(conf *Config) *Manager {
return &Manager{ return &Manager{
Group: &singleflight.Group{},
conf: conf, conf: conf,
global: &sync.Map{}, global: &sync.Map{},
platforms: &sync.Map{}, platforms: &sync.Map{},

View File

@ -5,8 +5,10 @@ import (
"sync" "sync"
"time" "time"
"golang.org/x/sync/singleflight"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
"mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/logger"
) )
@ -107,6 +109,7 @@ func (d *Database) Connect() error {
} }
type Manager struct { type Manager struct {
singleflight.Group
conf *Config conf *Config
platforms sync.Map // 平台id:Database platforms sync.Map // 平台id:Database
tables []interface{} tables []interface{}
@ -123,12 +126,18 @@ func (m *Manager) SetAutoMigrateTables(tables []interface{}) {
func (m *Manager) GetDatabase(key string) (*Database, error) { func (m *Manager) GetDatabase(key string) (*Database, error) {
v, ok := m.platforms.Load(key) // 平台id v, ok := m.platforms.Load(key) // 平台id
if !ok { if !ok {
db := &Database{ db, err, _ := m.Do(key, func() (interface{}, error) {
Manager: m, db := &Database{
Config: m.conf, Manager: m,
DatabaseConfig: m.conf.Platforms[key], Config: m.conf,
} DatabaseConfig: m.conf.Platforms[key],
if err := db.Connect(); err != nil { }
if err := db.Connect(); err != nil {
return nil, err
}
return db, nil
})
if err != nil {
return nil, err return nil, err
} }
v = db v = db
@ -139,7 +148,20 @@ func (m *Manager) GetDatabase(key string) (*Database, error) {
} }
func (m *Manager) Close() { func (m *Manager) Close() {
m.platforms.Range(func(key, value interface{}) bool {
d, _ := value.(*Database)
if d != nil && d.DB != nil {
sqlDB, err := d.DB.DB()
if err != nil {
logger.Logger.Errorf("mysql get DB error: %v", err)
return true
}
if err = sqlDB.Close(); err != nil {
logger.Logger.Errorf("mysql close error: %v", err)
}
}
return true
})
} }
func NewManager(conf *Config) *Manager { func NewManager(conf *Config) *Manager {

4
go.mod
View File

@ -4,6 +4,7 @@ go 1.22.5
require ( require (
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575
github.com/fsnotify/fsnotify v1.7.0
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3
@ -17,6 +18,7 @@ require (
github.com/xtaci/kcp-go v5.4.20+incompatible github.com/xtaci/kcp-go v5.4.20+incompatible
go.etcd.io/etcd/client/v3 v3.5.16 go.etcd.io/etcd/client/v3 v3.5.16
go.mongodb.org/mongo-driver v1.17.1 go.mongodb.org/mongo-driver v1.17.1
golang.org/x/sync v0.8.0
golang.org/x/time v0.7.0 golang.org/x/time v0.7.0
google.golang.org/protobuf v1.35.1 google.golang.org/protobuf v1.35.1
gorm.io/driver/mysql v1.5.7 gorm.io/driver/mysql v1.5.7
@ -26,7 +28,6 @@ require (
require ( require (
github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect github.com/golang/protobuf v1.5.4 // indirect
@ -67,7 +68,6 @@ require (
golang.org/x/crypto v0.26.0 // indirect golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.23.0 // indirect golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect