From 6e3b9b69e0f82a46ab743235cd489ed27dae4e98 Mon Sep 17 00:00:00 2001 From: sk <123456@qq.com> Date: Thu, 23 May 2024 10:14:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0etcd=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/etcd/client.go | 283 +++++++++++++++++++++++++++++++++++++++ core/etcd/client_test.go | 26 ++++ core/etcd/config.go | 40 ++++++ core/etcd/export.go | 17 +++ core/loader.go | 2 + core/logger/log.go | 9 +- 6 files changed, 375 insertions(+), 2 deletions(-) create mode 100644 core/etcd/client.go create mode 100644 core/etcd/client_test.go create mode 100644 core/etcd/config.go create mode 100644 core/etcd/export.go diff --git a/core/etcd/client.go b/core/etcd/client.go new file mode 100644 index 0000000..e83bf65 --- /dev/null +++ b/core/etcd/client.go @@ -0,0 +1,283 @@ +package etcd + +import ( + "context" + "fmt" + "sync" + "time" + + "go.etcd.io/etcd/client/v3" + + "mongo.games.com/goserver/core/logger" +) + +/* + etcd常用操作和数据监听 +*/ + +var globalClient = new(Client) + +type ( + InitFunc func(ctx context.Context, res *clientv3.GetResponse) + WatchFunc func(ctx context.Context, res *clientv3.WatchResponse) + funcPair struct { + initFunc InitFunc + watchFunc WatchFunc + } +) + +type Client struct { + cli *clientv3.Client + functions map[string][]funcPair + closed chan struct{} + wg sync.WaitGroup +} + +func (c *Client) Ctx() context.Context { + if c.cli != nil { + return c.cli.Ctx() + } + return context.TODO() +} + +func (c *Client) Open(etcdUrl []string, userName, passWord string, dialTimeout time.Duration) error { + var err error + + if len(etcdUrl) == 0 { + etcdUrl = []string{"localhost:2379"} + } + if dialTimeout == 0 { + dialTimeout = time.Minute + } + + c.cli, err = clientv3.New(clientv3.Config{ + Endpoints: etcdUrl, + Username: userName, + Password: passWord, + DialTimeout: dialTimeout, + DialKeepAliveTime: 5 * time.Second, + DialKeepAliveTimeout: 30 * time.Second, + }) + + if err != nil { + logger.Logger.Errorf("EtcdClient.Open(%v) err:%v", etcdUrl, err) + return err + } + + logger.Logger.Infof("EtcdClient.Open(%v) success", etcdUrl) + + if c.functions == nil { + c.functions = make(map[string][]funcPair) + } + + c.closed = make(chan struct{}) + return err +} + +func (c *Client) Close() error { + logger.Logger.Infof("EtcdClient.Close()") + select { + case <-c.closed: + return nil + default: + close(c.closed) + } + + if c.cli != nil { + c.cli.Close() + } + + c.wg.Wait() + + return nil +} + +// PutValue 添加键值对 +func (c *Client) PutValue(key, value string) (*clientv3.PutResponse, error) { + resp, err := c.cli.Put(context.TODO(), key, value) + if err != nil { + logger.Logger.Warnf("EtcdClient.PutValue(%v,%v) error:%v", key, value, err) + } + return resp, err +} + +// GetValue 查询 +func (c *Client) GetValue(key string) (*clientv3.GetResponse, error) { + resp, err := c.cli.Get(context.TODO(), key) + if err != nil { + logger.Logger.Warnf("EtcdClient.GetValue(%v) error:%v", key, err) + } + return resp, err +} + +// DelValue 返回删除了几条数据 +func (c *Client) DelValue(key string) (*clientv3.DeleteResponse, error) { + res, err := c.cli.Delete(context.TODO(), key) + if err != nil { + logger.Logger.Warnf("EtcdClient.DelValue(%v) error:%v", key, err) + } + return res, err +} + +// DelValueWithPrefix 按照前缀删除 +func (c *Client) DelValueWithPrefix(prefix string) (*clientv3.DeleteResponse, error) { + res, err := c.cli.Delete(context.TODO(), prefix, clientv3.WithPrefix()) + if err != nil { + logger.Logger.Warnf("EtcdClient.DelValueWithPrefix(%v) error:%v", prefix, err) + } + return res, err +} + +// GetValueWithPrefix 获取前缀 +func (c *Client) GetValueWithPrefix(prefix string) (*clientv3.GetResponse, error) { + resp, err := c.cli.Get(context.TODO(), prefix, clientv3.WithPrefix()) + if err != nil { + logger.Logger.Warnf("EtcdClient.GetValueWIthPrefix(%v) error:%v", prefix, err) + } + return resp, err +} + +// WatchWithPrefix 监测前缀创建事件 +func (c *Client) WatchWithPrefix(prefix string, revision int64) clientv3.WatchChan { + if c.cli != nil { + opts := []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithCreatedNotify()} + if revision > 0 { + opts = append(opts, clientv3.WithRev(revision)) + } + return c.cli.Watch(clientv3.WithRequireLeader(context.Background()), prefix, opts...) + } + return nil +} + +// AddFunc 添加监听函数 +func (c *Client) AddFunc(key string, initFunc InitFunc, watchFunc WatchFunc) { + logger.Logger.Infof("EtcdClient.AddFunc(%v)", key) + fs := funcPair{ + initFunc: initFunc, + watchFunc: watchFunc, + } + + if c.functions == nil { + c.functions = make(map[string][]funcPair) + } + + a, ok := c.functions[key] + if !ok { + a = []funcPair{} + } + a = append(a, fs) + c.functions[key] = a +} + +// Start 重新监听 +func (c *Client) Start() { + select { + case <-c.closed: + return + default: + } + + logger.Logger.Infof("EtcdClient.Start") + for k, v := range c.functions { + for _, vv := range v { + c.initAndWatch(k, vv.initFunc, vv.watchFunc) + } + } +} + +func (c *Client) Restart() { + go func() { + c.Close() + cli := new(Client) + err := cli.Open(Config.Url, Config.UserName, Config.Password, time.Duration(Config.DialTimeout)*time.Second) + if err != nil { + logger.Logger.Errorf("EtcdClient.Restart error:%v", err) + return + } + c.cli = cli.cli + c.wg = sync.WaitGroup{} + c.closed = make(chan struct{}) + c.Start() + }() +} + +// initAndWatch 开始监听 +func (c *Client) initAndWatch(key string, initFunc InitFunc, watchFunc WatchFunc) { + res, err := c.GetValueWithPrefix(key) + if err != nil { + panic(fmt.Sprintf("initAndWatch WithPrefix(%v) err:%v", key, err)) + } + + logger.Logger.Infof("etcd initFunc WithPrefix(%v) start", key) + initFunc(c.Ctx(), res) + + ctx, _ := context.WithCancel(c.cli.Ctx()) + vision := int64(-1) + if res.Header != nil { + vision = res.Header.Revision + } + c.goWatch(ctx, vision+1, key, watchFunc) +} + +// goWatch 异步监听 +func (c *Client) goWatch(ctx context.Context, revision int64, prefix string, f WatchFunc) { + c.wg.Add(1) + go func() { + defer func() { + c.wg.Done() + }() + defer func() { + if err := recover(); err != nil { + logger.Logger.Errorf("etcd watch WithPrefix(%v) panic:%v", prefix, err) + } + logger.Logger.Warnf("etcd watch WithPrefix(%v) quit!!!", prefix) + }() + x, _ := context.WithCancel(ctx) + var times int64 + for { + times++ + logger.Logger.Warnf("etcd watch WithPrefix(%v) base revision %v start[%v]!!!", prefix, revision, times) + rch := c.WatchWithPrefix(prefix, revision) + for { + skip := false + select { + case <-c.closed: + return + case _, ok := <-ctx.Done(): + if !ok { + return + } + case resp, ok := <-rch: + if !ok { + logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed", prefix) + skip = true + break + } + if resp.Header.Revision > revision { + revision = resp.Header.Revision + } + if resp.Canceled { + logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed, reason:%v", prefix, resp.Err()) + skip = true + break + } + if err := resp.Err(); err != nil { + logger.Logger.Warnf("etcd watch WithPrefix(%v) err:%v", prefix, resp.Err()) + continue + } + if len(resp.Events) == 0 { + continue + } + + logger.Logger.Tracef("@goWatch %v changed, header:%#v", prefix, resp.Header) + f(x, &resp) + } + + if skip { + break + } + } + time.Sleep(time.Second) + } + }() +} diff --git a/core/etcd/client_test.go b/core/etcd/client_test.go new file mode 100644 index 0000000..7e0f53d --- /dev/null +++ b/core/etcd/client_test.go @@ -0,0 +1,26 @@ +package etcd + +import ( + "context" + "go.etcd.io/etcd/client/v3" + "testing" +) + +func TestClient_AddFunc(t *testing.T) { + cli := new(Client) + + cli.AddFunc("abc", func(ctx context.Context, res *clientv3.GetResponse) { + t.Log("initFunc") + }, func(ctx context.Context, res *clientv3.WatchResponse) { + t.Log("watchFunc") + }) + + err := cli.Open([]string{"localhost:2379"}, "", "", 0) + if err != nil { + t.Log(err) + return + } + cli.Start() + cli.Close() + t.Log("close...") +} diff --git a/core/etcd/config.go b/core/etcd/config.go new file mode 100644 index 0000000..55ba462 --- /dev/null +++ b/core/etcd/config.go @@ -0,0 +1,40 @@ +package etcd + +import ( + "fmt" + "time" + + "mongo.games.com/goserver/core" +) + +var Config = Configuration{} + +type Configuration struct { + Url []string + UserName string + Password string + DialTimeout int // second +} + +func (c *Configuration) Name() string { + return "etcd" +} + +func (c *Configuration) Init() error { + err := globalClient.Open(c.Url, c.UserName, c.Password, time.Duration(c.DialTimeout)*time.Second) + if err != nil { + panic(fmt.Sprintf("etcd init error:%v", err)) + } + return nil +} + +func (c *Configuration) Close() error { + if globalClient != nil { + globalClient.Close() + } + return nil +} + +func init() { + core.RegistePackage(&Config) +} diff --git a/core/etcd/export.go b/core/etcd/export.go new file mode 100644 index 0000000..f23c0b9 --- /dev/null +++ b/core/etcd/export.go @@ -0,0 +1,17 @@ +package etcd + +func GlobalClient() *Client { + return globalClient +} + +func AddFunc(key string, initFunc InitFunc, watchFunc WatchFunc) { + globalClient.AddFunc(key, initFunc, watchFunc) +} + +func Start() { + globalClient.Start() +} + +func Restart() { + globalClient.Restart() +} diff --git a/core/loader.go b/core/loader.go index 1c233fe..0bab5b7 100644 --- a/core/loader.go +++ b/core/loader.go @@ -9,6 +9,8 @@ import ( "mongo.games.com/goserver/core/logger" ) +// Package 功能包 +// 只做初始化,不要依赖其它功能包 type Package interface { Name() string Init() error diff --git a/core/logger/log.go b/core/logger/log.go index b7b3a20..2f2d954 100644 --- a/core/logger/log.go +++ b/core/logger/log.go @@ -8,11 +8,16 @@ import ( var ( Logger seelog.LoggerInterface + err error ) func init() { - Logger, _ = seelog.LoggerFromConfigAsFile("logger.xml") - seelog.ReplaceLogger(Logger) + Logger, err = seelog.LoggerFromConfigAsFile("logger.xml") + if err == nil { + seelog.ReplaceLogger(Logger) + } else { + Logger = seelog.Default + } } func Reload(fileName string) error {