娃娃机使用消息队列处理消息

This commit is contained in:
by 2024-08-15 14:37:08 +08:00
parent 154a037e08
commit afe8d3f0b1
2 changed files with 201 additions and 143 deletions

View File

@ -3,47 +3,69 @@ package action
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"net"
"time"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/netlib"
"mongo.games.com/goserver/core/task"
"mongo.games.com/goserver/core/timer"
"mongo.games.com/game/machine/machinedoll" "mongo.games.com/game/machine/machinedoll"
"mongo.games.com/game/protocol/machine" "mongo.games.com/game/protocol/machine"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/netlib"
"mongo.games.com/goserver/core/timer"
"sync"
"time"
) )
type DoneFunc func(c net.Conn) type ConnMessageQueue struct {
queue chan []interface{}
conn *machinedoll.Conn
waitGroup *sync.WaitGroup
}
func Process(conn *machinedoll.Conn, sec time.Duration, f1, f2 []DoneFunc, isSync bool) { var connMessageQueues = make(map[*machinedoll.Conn]*ConnMessageQueue)
var ch chan struct{}
if isSync { func Process(conn *machinedoll.Conn, f1 []func(), f2 []func()) {
ch = make(chan struct{}, 1) // 获取或创建该 conn 对应的消息队列
queue, ok := connMessageQueues[conn]
if !ok {
queue = &ConnMessageQueue{
queue: make(chan []interface{}, 100),
conn: conn,
waitGroup: new(sync.WaitGroup),
} }
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { connMessageQueues[conn] = queue
for _, v := range f1 { go processConnMessageQueue(queue)
v(conn)
} }
// 将消息添加到队列中
queue.queue <- []interface{}{f1, f2}
}
func processConnMessageQueue(queue *ConnMessageQueue) {
for msg := range queue.queue {
f1 := msg[0].([]func())
f2 := msg[1].([]func())
queue.waitGroup.Add(len(f1))
for _, f := range f1 {
go func(f func()) {
defer queue.waitGroup.Done()
f()
}(f)
}
queue.waitGroup.Wait()
if len(f2) > 0 { if len(f2) > 0 {
queue.waitGroup.Add(1)
go func() {
defer queue.waitGroup.Done()
timer.AfterTimer(func(h timer.TimerHandle, ud interface{}) bool { timer.AfterTimer(func(h timer.TimerHandle, ud interface{}) bool {
Process(conn, 0, f2, nil, isSync) for _, f := range f2 {
if isSync { f()
ch <- struct{}{}
} }
return true return true
}, nil, sec) }, nil, 200*time.Millisecond)
} else { }()
if isSync {
ch <- struct{}{}
} }
}
return nil queue.waitGroup.Wait()
}), nil).StartByFixExecutor(fmt.Sprintf("Machine%v", conn.Addr))
if isSync {
<-ch
} }
} }
@ -63,21 +85,48 @@ func SMDollMachinePerateHandler(session *netlib.Session, packetId int, data inte
switch msg.Perate { switch msg.Perate {
case 1: case 1:
//向前移动 //向前移动
Process(conn, 200*time.Millisecond, []DoneFunc{machinedoll.Backward}, []DoneFunc{machinedoll.BackwardStop}, false) f1 := []func(){
func() { machinedoll.Backward(conn) },
}
f2 := []func(){
func() { machinedoll.BackwardStop(conn) },
}
Process(conn, f1, f2)
case 2: case 2:
//向后移动 //向后移动
Process(conn, 200*time.Millisecond, []DoneFunc{machinedoll.Forward}, []DoneFunc{machinedoll.ForwardStop}, false) f1 := []func(){
func() { machinedoll.Forward(conn) },
}
f2 := []func(){
func() { machinedoll.ForwardStop(conn) },
}
Process(conn, f1, f2)
//Process(conn, 200*time.Millisecond, []DoneFunc{machinedoll.Forward}, []DoneFunc{machinedoll.ForwardStop}, false)
case 3: case 3:
//向左移动 //向左移动
Process(conn, 200*time.Millisecond, []DoneFunc{machinedoll.Left}, []DoneFunc{machinedoll.LeftStop}, false) f1 := []func(){
func() { machinedoll.Left(conn) },
}
f2 := []func(){
func() { machinedoll.LeftStop(conn) },
}
Process(conn, f1, f2)
//Process(conn, 200*time.Millisecond, []DoneFunc{machinedoll.Left}, []DoneFunc{machinedoll.LeftStop}, false)
case 4: case 4:
//向右移动 //向右移动
Process(conn, 200*time.Millisecond, []DoneFunc{machinedoll.Right}, []DoneFunc{machinedoll.RightStop}, false) f1 := []func(){
func() { machinedoll.Right(conn) },
}
f2 := []func(){
func() { machinedoll.RightStop(conn) },
}
Process(conn, f1, f2)
//Process(conn, 200*time.Millisecond, []DoneFunc{machinedoll.Right}, []DoneFunc{machinedoll.RightStop}, false)
case 5: case 5:
//投币 //投币
//Process(conn, 0*time.Millisecond, []DoneFunc{machinedoll.Coin}, []DoneFunc{}, false) //Process(conn, 0*time.Millisecond, []DoneFunc{machinedoll.Coin}, []DoneFunc{}, false)
machinedoll.Coin(conn) machinedoll.Coin(conn)
go CoinResult(session, conn, msg.Snid, msg.GetId()) go DollMachineGrabResult(session, conn, msg.Snid, msg.GetId())
} }
return nil return nil
} }
@ -95,24 +144,30 @@ func SMDollMachineGrabHandler(session *netlib.Session, packetId int, data interf
return nil return nil
} }
send := func(net.Conn) {
session.Send(int(machine.DollMachinePacketID_PACKET_MSDollMachineoPerateResult), &machine.MSDollMachineoPerateResult{
Snid: msg.Snid,
Id: msg.GetId(),
Result: 1,
TypeId: 2,
})
}
switch msg.GetTypeId() { switch msg.GetTypeId() {
case 1: case 1:
//弱抓 //弱抓
Process(conn, 0, []DoneFunc{machinedoll.WeakGrab}, []DoneFunc{send}, false) f1 := []func(){
func() { machinedoll.WeakGrab(conn) },
}
f2 := []func(){}
Process(conn, f1, f2)
//Process(conn, 0, []DoneFunc{machinedoll.WeakGrab}, []DoneFunc{send}, false)
case 2: case 2:
//强力抓 //强力抓
Process(conn, 0, []DoneFunc{machinedoll.Grab}, []DoneFunc{send}, false) f1 := []func(){
func() { machinedoll.Grab(conn) },
}
f2 := []func(){}
Process(conn, f1, f2)
//Process(conn, 0, []DoneFunc{machinedoll.Grab}, []DoneFunc{send}, false)
}
//go DollMachineGrabResult(session, conn, msg.Snid, msg.GetId())
err := conn.SetDeadline(time.Now().Add(15 * time.Second))
if err != nil {
fmt.Println("Error setting deadline:", err)
return err
} }
go DollMachineGrabResult(session, conn, msg.Snid, msg.GetId())
return nil return nil
} }
@ -122,7 +177,6 @@ func DollMachineGrabResult(session *netlib.Session, conn *machinedoll.Conn, snid
// 读取数据 // 读取数据
fmt.Println("监听抓取结果返回!") fmt.Println("监听抓取结果返回!")
buf := make([]byte, 1024) buf := make([]byte, 1024)
conn.SetDeadline(time.Now().Add(10 * time.Second))
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
fmt.Println("Failed to read response from client:", err) fmt.Println("Failed to read response from client:", err)
@ -153,7 +207,8 @@ func DollMachineGrabResult(session *netlib.Session, conn *machinedoll.Conn, snid
Result: 0, Result: 0,
TypeId: 2, TypeId: 2,
}) })
fmt.Println("没有抓到礼品!!!!!!!!") fmt.Println("没有抓到礼品snid = ", snid)
conn.SetDeadline(time.Time{})
return return
} }
if bytes.Contains(part, instruction1) { if bytes.Contains(part, instruction1) {
@ -170,35 +225,38 @@ func DollMachineGrabResult(session *netlib.Session, conn *machinedoll.Conn, snid
Result: 1, Result: 1,
TypeId: 2, TypeId: 2,
}) })
fmt.Println("抓到礼品了!!!!!!!!") fmt.Println("抓到礼品了snid = ", snid)
conn.SetDeadline(time.Time{})
return return
} }
} //上分成功
} coinData := []byte{0xAA, 0x04, 0x02, 0x03, 0x01}
} if bytes.Contains(part, coinData) {
}
func CoinResult(session *netlib.Session, conn *machinedoll.Conn, snid, id int32) {
// 读取服务端的响应
buf := make([]byte, 1024)
n, err := conn.Read(buf)
if err != nil {
fmt.Println("Failed to read response from server:", err)
return
}
if buf[4] == 1 {
fmt.Println("上分成功n = ", n)
}
if buf[4] == 0 {
fmt.Println("上分失败!!!")
}
//返回消息 //返回消息
fmt.Println("上分成功!")
session.Send(int(machine.DollMachinePacketID_PACKET_MSDollMachineoPerateResult), &machine.MSDollMachineoPerateResult{ session.Send(int(machine.DollMachinePacketID_PACKET_MSDollMachineoPerateResult), &machine.MSDollMachineoPerateResult{
Snid: snid, Snid: snid,
Id: id, Id: id,
Result: int32(buf[4]), Result: 1,
TypeId: 1, TypeId: 1,
}) })
} }
//上分失败
coinData = []byte{0xAA, 0x04, 0x02, 0x03, 0x00}
if bytes.Contains(part, coinData) {
//返回消息
fmt.Println("上分失败!")
session.Send(int(machine.DollMachinePacketID_PACKET_MSDollMachineoPerateResult), &machine.MSDollMachineoPerateResult{
Snid: snid,
Id: id,
Result: 0,
TypeId: 1,
})
}
}
}
}
}
// 与游戏服务器连接成功,向游戏服务器推送所有娃娃机连接 // 与游戏服务器连接成功,向游戏服务器推送所有娃娃机连接
func SMGameLinkSucceedHandler(session *netlib.Session, packetId int, data interface{}) error { func SMGameLinkSucceedHandler(session *netlib.Session, packetId int, data interface{}) error {

View File

@ -160,7 +160,7 @@ func Coin(conn net.Conn) {
fmt.Println("Failed to send command to server:", err) fmt.Println("Failed to send command to server:", err)
return return
} }
// 读取服务端的响应 /* // 读取服务端的响应
buf := make([]byte, 1024) buf := make([]byte, 1024)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
@ -172,7 +172,7 @@ func Coin(conn net.Conn) {
} }
if buf[4] == 0 { if buf[4] == 0 {
fmt.Println("上分失败!!!") fmt.Println("上分失败!!!")
} }*/
} }
// 剩余局数清零 // 剩余局数清零
@ -184,14 +184,14 @@ func ClearRemainingGames(conn net.Conn) {
fmt.Println("Failed to send command to server:", err) fmt.Println("Failed to send command to server:", err)
return return
} }
// 读取服务端的响应 /* // 读取服务端的响应
buf := make([]byte, 1024) buf := make([]byte, 1024)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
fmt.Println("Failed to read response from server:", err) fmt.Println("Failed to read response from server:", err)
return return
} }
fmt.Println("n", n) fmt.Println("n", n)*/
} }
// 计算校验码 // 计算校验码
@ -202,7 +202,7 @@ func CalculateChecksum(data []byte) []byte {
value ^= datum value ^= datum
} }
} }
fmt.Println("校验码 value = ", value) //fmt.Println("校验码 value = ", value)
data = append(data, value, 0xdd) data = append(data, value, 0xdd)
return data return data
@ -221,13 +221,13 @@ func OpenMusic(conn net.Conn) {
return return
} }
// 读取服务端的响应 // 读取服务端的响应
buf := make([]byte, 1024) /* buf := make([]byte, 1024)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
fmt.Println("Failed to read response from server:", err) fmt.Println("Failed to read response from server:", err)
return return
} }
fmt.Println("n", n) fmt.Println("n", n)*/
} }
// 关闭音乐 // 关闭音乐
@ -241,14 +241,14 @@ func CloseMusic(conn net.Conn) {
fmt.Println("Failed to send command to server:", err) fmt.Println("Failed to send command to server:", err)
return return
} }
// 读取服务端的响应 /* // 读取服务端的响应
buf := make([]byte, 1024) buf := make([]byte, 1024)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
fmt.Println("Failed to read response from server:", err) fmt.Println("Failed to read response from server:", err)
return return
} }
fmt.Println("n", n) fmt.Println("n", n)*/
} }
// 恢复出厂设置 // 恢复出厂设置
@ -260,14 +260,14 @@ func RestoreFactorySettings(conn net.Conn) {
fmt.Println("Failed to send command to server:", err) fmt.Println("Failed to send command to server:", err)
return return
} }
// 读取服务端的响应 /* // 读取服务端的响应
buf := make([]byte, 1024) buf := make([]byte, 1024)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
fmt.Println("Failed to read response from server:", err) fmt.Println("Failed to read response from server:", err)
return return
} }
fmt.Println("n", n) fmt.Println("n", n)*/
} }
// 重启主板 // 重启主板
@ -279,14 +279,14 @@ func Reboot(conn net.Conn) {
fmt.Println("Failed to send command to server:", err) fmt.Println("Failed to send command to server:", err)
return return
} }
// 读取服务端的响应 /* // 读取服务端的响应
buf := make([]byte, 1024) buf := make([]byte, 1024)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
fmt.Println("Failed to read response from server:", err) fmt.Println("Failed to read response from server:", err)
return return
} }
fmt.Println("n", n) fmt.Println("n", n)*/
} }
// 暂停服务 // 暂停服务
@ -320,7 +320,7 @@ func queryBaseParam(conn net.Conn) {
fmt.Println("Failed to send command to server:", err) fmt.Println("Failed to send command to server:", err)
return return
} }
// 读取服务端的响应 /* // 读取服务端的响应
buf := make([]byte, 1024) buf := make([]byte, 1024)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
@ -328,7 +328,7 @@ func queryBaseParam(conn net.Conn) {
return return
} }
fmt.Println("n", n) fmt.Println("n", n)*/
} }
// 设置出奖模式 // 设置出奖模式
@ -343,14 +343,14 @@ func SetPower(conn net.Conn) {
fmt.Println("Failed to send command to server:", err) fmt.Println("Failed to send command to server:", err)
return return
} }
// 读取服务端的响应 /* // 读取服务端的响应
buf := make([]byte, 1024) buf := make([]byte, 1024)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
fmt.Println("Failed to read response from server:", err) fmt.Println("Failed to read response from server:", err)
return return
} }
fmt.Println("n", n) fmt.Println("n", n)*/
} }
var data = []byte{ var data = []byte{