goserver_sync/core/task/task.go

273 lines
5.9 KiB
Go

package task
import (
"fmt"
"runtime"
"sync/atomic"
"time"
"mongo.games.com/goserver/core"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/container"
"mongo.games.com/goserver/core/container/recycler"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/profile"
)
type Callable interface {
Call(*basic.Object) interface{}
}
type CompleteNotify interface {
Done(interface{}, Task)
}
type Task interface {
AddRefCnt(cnt int32) int32
GetRefCnt() int32
Get() interface{}
GetWithTimeout(timeout time.Duration) interface{}
GetEnv(k interface{}) interface{}
PutEnv(k, v interface{}) bool
SetAlertTime(alertt time.Duration)
GetCostTime() time.Duration
GetRunTime() time.Duration
Start()
StartByExecutor(name string) bool
StartByFixExecutor(name string) bool
BroadcastToAllExecutor() bool
StartByGroupExecutor(gname string, name string) bool
StartByGroupFixExecutor(name, gname string) bool
//inner
clone(name string) Task
run(o *basic.Object) (e error)
done(n CompleteNotify)
sendRsp()
setAfterQueCnt(n int)
setBeforeQueCnt(n int)
getS() *basic.Object
getC() Callable
getN() CompleteNotify
}
type CallableWrapper func(o *basic.Object) interface{}
func (cw CallableWrapper) Call(o *basic.Object) interface{} {
return cw(o)
}
type CompleteNotifyWrapper func(interface{}, Task)
func (cnw CompleteNotifyWrapper) Done(i interface{}, t Task) {
cnw(i, t)
}
type baseTask struct {
imp Task
s *basic.Object
c Callable
n CompleteNotify
r chan interface{}
v interface{}
env *container.SynchronizedMap
tCreate time.Time
tStart time.Time
alertTime time.Duration
name string
refTaskCnt int32
beforeQueCnt int //入队列前,等待中的任务数量
afterQueCnt int //出队列后,等待中的任务数量
}
func New(s *basic.Object, c Callable, n CompleteNotify, name ...string) Task {
return newBaseTask(s, c, n, name...)
}
func newBaseTask(s *basic.Object, c Callable, n CompleteNotify, name ...string) *baseTask {
t := &baseTask{
s: s,
c: c,
n: n,
r: make(chan interface{}, 1),
tCreate: time.Now(),
}
if len(name) != 0 {
t.name = name[0]
}
if s == nil {
t.s = core.CoreObject()
}
t.imp = t
return t
}
func (t *baseTask) clone(name string) Task {
fullname := t.name
if name != "" {
fullname += "-" + name
}
return New(t.s, t.c, t.n, fullname)
}
func (t *baseTask) setAfterQueCnt(n int) {
t.afterQueCnt = n
}
func (t *baseTask) setBeforeQueCnt(n int) {
t.beforeQueCnt = n
}
func (t *baseTask) getS() *basic.Object {
return t.s
}
func (t *baseTask) getC() Callable {
return t.c
}
func (t *baseTask) getN() CompleteNotify {
return t.n
}
func (t *baseTask) AddRefCnt(cnt int32) int32 {
return atomic.AddInt32(&t.refTaskCnt, cnt)
}
func (t *baseTask) GetRefCnt() int32 {
return atomic.LoadInt32(&t.refTaskCnt)
}
func (t *baseTask) Get() interface{} {
if t.n != nil {
panic("Task result by CompleteNotify return")
}
return <-t.r
}
func (t *baseTask) GetWithTimeout(timeout time.Duration) interface{} {
if timeout == 0 {
return t.Get()
} else {
timer := recycler.GetTimer(timeout)
defer recycler.GiveTimer(timer)
select {
case r, ok := <-t.r:
if ok {
return r
} else {
return nil
}
case <-timer.C:
return nil
}
}
return nil
}
func (t *baseTask) GetEnv(k interface{}) interface{} {
if t.env == nil {
return nil
}
return t.env.Get(k)
}
func (t *baseTask) PutEnv(k, v interface{}) bool {
if t.env == nil {
t.env = container.NewSynchronizedMap()
}
if t.env != nil {
t.env.Set(k, v)
}
return true
}
func (t *baseTask) run(o *basic.Object) (e error) {
watch := profile.TimeStatisticMgr.WatchStart(fmt.Sprintf("/task/%v/run", t.name), profile.TIME_ELEMENT_TASK)
defer func() {
if watch != nil {
watch.Stop()
}
if err := recover(); err != nil {
var buf [4096]byte
n := runtime.Stack(buf[:], false)
logger.Logger.Error("Task::run stack--->", string(buf[:n]))
}
}()
t.tStart = time.Now()
wait := t.tStart.Sub(t.tCreate)
t.v = t.c.Call(o)
dura := t.GetRunTime()
if t.r != nil {
t.r <- t.v
}
t.imp.sendRsp()
if t.alertTime != 0 && t.name != "" {
cost := t.GetCostTime()
if cost > t.alertTime {
logger.Logger.Warn("task [", t.name, "] since createTime(",
cost, ") since startTime(", dura, "), in quene wait(", wait, ")", " beforeQueCnt(", t.beforeQueCnt, ") afterQueCnt(", t.afterQueCnt, ")")
}
}
return nil
}
func (t *baseTask) done(n CompleteNotify) {
if n != nil {
n.Done(t.v, t)
}
}
func (t *baseTask) sendRsp() {
if t.n != nil {
SendTaskRes(t.s, t, t.n)
}
}
// Start 启动独立的一个协程,相当于 go 关键字
func (t *baseTask) Start() {
go t.imp.run(nil)
}
func (t *baseTask) SetAlertTime(alertt time.Duration) {
t.alertTime = alertt
}
func (t *baseTask) GetCostTime() time.Duration {
return time.Now().Sub(t.tCreate)
}
func (t *baseTask) GetRunTime() time.Duration {
return time.Now().Sub(t.tStart)
}
// StartByExecutor 根据名称的哈希值选择一个协程,在协程中执行(框架启动时默认会创建几个协程)
func (t *baseTask) StartByExecutor(name string) bool {
return sendTaskReqToExecutor(t, name, "")
}
// StartByFixExecutor 根据名称创建一个协程,如果协程已经存在,相同名称的任务会在同一个协程中执行
func (t *baseTask) StartByFixExecutor(name string) bool {
return sendTaskReqToFixExecutor(t, name, "")
}
func (t *baseTask) BroadcastToAllExecutor() bool {
return sendTaskReqToAllExecutor(t)
}
// StartByGroupExecutor 在 StartByExecutor 前根据gname分组
func (t *baseTask) StartByGroupExecutor(gname string, name string) bool {
return sendTaskReqToExecutor(t, name, gname)
}
// StartByGroupFixExecutor 在 StartByFixExecutor 前根据gname分组
func (t *baseTask) StartByGroupFixExecutor(name, gname string) bool {
return sendTaskReqToFixExecutor(t, name, gname)
}