goserver_sync/core/netlib/session.go

312 lines
6.4 KiB
Go

// session
package netlib
import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"mongo.games.com/goserver/core/container"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/utils"
)
type SessionCloseListener interface {
onClose(s ISession)
}
type SessionCutPacketListener interface {
onCutPacket(w io.Writer) error
}
type ISession interface {
SetAttribute(key, value interface{}) bool
RemoveAttribute(key interface{})
GetAttribute(key interface{}) interface{}
GetSessionConfig() *SessionConfig
LocalAddr() string
RemoteAddr() string
IsIdle() bool
Close()
Send(packetid int, data interface{}, sync ...bool) bool
SendEx(packetid int, logicNo uint32, data interface{}, sync bool) bool
FireConnectEvent() bool
FireDisconnectEvent() bool
FirePacketReceived(packetid int, logicNo uint32, packet interface{}) bool
FirePacketSent(packetid int, logicNo uint32, data []byte) bool
FireSessionIdle() bool
}
type packet struct {
packetid int
logicno uint32
data interface{}
next *packet
}
type Session struct {
Id int
GroupId int
Sid int64
Auth bool
impl ISession
sendBuffer chan *packet
recvBuffer chan *action
sc *SessionConfig
attributes *container.SynchronizedMap
scl SessionCloseListener
scpl SessionCutPacketListener
createTime time.Time
lastSndTime time.Time
lastRcvTime time.Time
waitor *utils.Waitor
rcvbuf *RWBuffer
sndbuf *RWBuffer
closed int32
sendedBytes int64
recvedBytes int64
sendedPack int64
recvedPack int64
quit bool
shutSend bool
shutRecv bool
isConned bool
PendingRcv bool
PendingSnd bool
//rpc
mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call
//rpc
}
func (s *Session) init() {
s.sendBuffer = make(chan *packet, s.sc.MaxPend)
s.recvBuffer = make(chan *action, s.sc.MaxDone)
s.attributes = container.NewSynchronizedMap()
if s.sc.IsInnerLink {
s.pending = make(map[uint64]*Call)
}
}
func (s *Session) SetAttribute(key, value interface{}) bool {
return s.attributes.Set(key, value)
}
func (s *Session) RemoveAttribute(key interface{}) {
s.attributes.Delete(key)
}
func (s *Session) GetAttribute(key interface{}) interface{} {
return s.attributes.Get(key)
}
func (s *Session) GetSessionConfig() *SessionConfig {
return s.sc
}
func (s *Session) LocalAddr() string {
if s.impl != nil {
return s.impl.LocalAddr()
}
return ""
}
func (s *Session) RemoteAddr() string {
if s.impl != nil {
return s.impl.RemoteAddr()
}
return ""
}
func (s *Session) IsConned() bool {
return s.isConned
}
func (s *Session) IsIdle() bool {
return s.lastRcvTime.Add(s.sc.IdleTimeout).Before(time.Now())
}
func (s *Session) Close() {
if !atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
return
}
if s.quit {
return
}
s.quit = true
if s.sc.IsInnerLink {
// Terminate pending calls.
s.mutex.Lock()
for _, call := range s.pending {
call.Error = ErrShutdown
call.done()
}
s.mutex.Unlock()
}
go s.reapRoutine()
}
func (s *Session) Send(packetid int, data interface{}, sync ...bool) bool {
if s.quit || s.shutSend {
return false
}
p := AllocPacket()
p.packetid = packetid
p.logicno = 0
p.data = data
if len(sync) > 0 && sync[0] {
select {
case s.sendBuffer <- p:
case <-time.After(s.sc.WriteTimeout):
logger.Logger.Warn(s.Id, " send buffer full(", len(s.sendBuffer), "),data be droped(asyn), IsInnerLink ",
s.sc.IsInnerLink)
logger.Logger.Warn("Send session(sync) config desc:", *s.sc)
if s.sc.IsInnerLink == false {
s.Close()
}
return false
}
} else {
select {
case s.sendBuffer <- p:
default:
logger.Logger.Warn(s.Id, " send buffer full(", len(s.sendBuffer), "),data be droped(sync), IsInnerLink ",
s.sc.IsInnerLink)
logger.Logger.Warn("Send session(async) config desc:", *s.sc)
if s.sc.IsInnerLink == false {
s.Close()
}
return false
}
}
return true
}
func (s *Session) SendEx(packetid int, logicNo uint32, data interface{}, sync bool) bool {
if s.quit || s.shutSend {
return false
}
p := AllocPacket()
p.packetid = packetid
p.logicno = logicNo
p.data = data
if sync {
select {
case s.sendBuffer <- p:
case <-time.After(time.Duration(s.sc.WriteTimeout)):
logger.Logger.Warn(s.Id, " send buffer full(", len(s.sendBuffer), "),data be droped(asyn), IsInnerLink ",
s.sc.IsInnerLink)
logger.Logger.Warn("Send session(sync) config desc:", *s.sc)
if s.sc.IsInnerLink == false {
s.Close()
}
return false
}
} else {
select {
case s.sendBuffer <- p:
default:
logger.Logger.Warn(s.Id, " send buffer full(", len(s.sendBuffer), "),data be droped(sync), IsInnerLink ",
s.sc.IsInnerLink)
logger.Logger.Warn("Send session(async) config desc:", *s.sc)
if s.sc.IsInnerLink == false {
s.Close()
}
return false
}
}
return true
}
func (s *Session) FireConnectEvent() bool {
s.isConned = true
if s.sc.sfc != nil {
if !s.sc.sfc.OnSessionOpened(s) {
return false
}
}
if s.sc.shc != nil {
s.sc.shc.OnSessionOpened(s)
}
return true
}
func (s *Session) FireDisconnectEvent() bool {
s.isConned = false
if s.sc.sfc != nil {
if !s.sc.sfc.OnSessionClosed(s) {
return false
}
}
if s.sc.shc != nil {
s.sc.shc.OnSessionClosed(s)
}
return true
}
func (s *Session) FirePacketReceived(packetid int, logicNo uint32, packet interface{}) bool {
if s.sc.sfc != nil {
if !s.sc.sfc.OnPacketReceived(s, packetid, logicNo, packet) {
return false
}
}
if s.sc.shc != nil {
s.sc.shc.OnPacketReceived(s, packetid, logicNo, packet)
}
return true
}
func (s *Session) FirePacketSent(packetid int, logicNo uint32, data []byte) bool {
if s.sc.sfc != nil {
if !s.sc.sfc.OnPacketSent(s, packetid, logicNo, data) {
return false
}
}
if s.sc.shc != nil {
s.sc.shc.OnPacketSent(s, packetid, logicNo, data)
}
return true
}
func (s *Session) FireSessionIdle() bool {
if s.sc.sfc != nil {
if !s.sc.sfc.OnSessionIdle(s) {
return false
}
}
if s.sc.shc != nil {
s.sc.shc.OnSessionIdle(s)
}
return true
}
func (s *Session) reapRoutine() {
defer func() {
if err := recover(); err != nil {
logger.Logger.Warn(s.Id, " reapRoutine panic : ", err)
}
}()
if !s.shutSend {
//close send goroutiue(throw a poison)
s.sendBuffer <- SendRoutinePoison
}
/*
if !s.shutRecv {
//close recv goroutiue
s.shutRead()
}
*/
s.waitor.Wait(fmt.Sprintf("Session.reapRoutine(%v_%v)", s.sc.Name, s.Id))
s.scl.onClose(s)
}
func (s *Session) destroy() {
s.FireDisconnectEvent()
}