kefu/server/event/event.php_bak

365 lines
13 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace server\event;
use server\components\ApiException;
use server\components\Db;
use server\components\Redis;
use server\components\Base;
use server\model\UserModel;
use server\components\Utility;
use server\lib\Gateway;
use server\components\Command;
use server\modules\api\service\Service;
use server\components\Task;
class Websocket {
const PID_FILE = '/usr/local/swoole.pid';
const LOG_FILE = '/home/wwwlogs/swoole.log';
public $server;
private static $_defaultAvatar = 'https://chdnda.com/static/common/images/avatar.png';
/**
* 服务开关维护时设为0.
*/
private static $_serverSwitch = 1;
/**
* 参数配置
*/
private static $setting = [
'reactor_num' => 16,
'worker_num' => 48,
'max_request' => 20000,
'daemonize' => 0,
// 'package_max_length' => 6 * 1024 *1024,
'dispatch_mode' => 2, // 固定模式保证同一个连接发来的数据只会被同一个worker处理
'pid_file' => self::PID_FILE,
'log_file' => self::LOG_FILE,
'task_worker_num' => 4,
'heartbeat_check_interval' => 60, //心跳检测每60秒遍历所有连接
'heartbeat_idle_time' => 120, //强制关闭2分钟内没有向服务器发送任何数据的连接
// 'task_tmpdir' => '/dev/shm',
//'ssl_cert_file' => '/usr/local/nginx/conf/vhost/cs.chdnda.com.pem',
//'ssl_key_file' => '/usr/local/nginx/conf/vhost/cs.chdnda.com.key',
];
public function __construct($daemon) {
self::$setting['daemonize'] = $daemon;
$this->_run();
}
private function _run(){
//$this->server = new \swoole_websocket_server('0.0.0.0',12800);
$this->server = new \swoole_websocket_server('0.0.0.0',12800, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
$this->server->set(self::$setting);
$this->server->on('start', [$this,'onStart']);
// $this->server->on('ManagerStart', [$this,'onManagerStart']);
$this->server->on('open', [$this,'onOpen']);
$this->server->on('message',[$this,'onMessage']);
$this->server->on('task',[$this,'onTask']);
$this->server->on('finish',[$this,'onFinish']);
$this->server->on('close', [$this,'onClose']);
$this->server->on('WorkerStart', [$this,'onWorkerStart']);
$this->server->on('WorkerStop', [$this,'onWorkerStop']);
$this->server->start();
}
public function onStart($ws){
var_dump("服务启动");
swoole_set_process_name('swoole_server_customerService');
}
// public function onManagerStart($ws){
// swoole_set_process_name('swoole_manager_customerService');
// }
//
/**
* 进程启动后初始化操作,监听worker启动
*/
public function onWorkerStart($ws, $worker_id){
echo "进程启动---$worker_id".PHP_EOL;
swoole_set_process_name('swoole_process_customerService');
Gateway::setGateway($ws);
global $configFile;
$configFile = include (__DIR__ . '/../config/main.php');
// 定义应用目录
define('SERVER_PATH', __DIR__ . '/../');
Db::instance();
Redis::instance();
// 只有当worker_id为0时才添加定时器,避免重复添加
if ($worker_id == 0) {
if(is_file(self::PID_FILE) && (filemtime(self::PID_FILE) == time() || filemtime(self::PID_FILE) == time()-1) ){
echo "重启整个服务了,开始进行清理工作...".PHP_EOL;
Redis::instance()->del('kfList');
Redis::instance()->del('fd');
$keys = Redis::instance()->keys('user_set_*');
Redis::instance()->del($keys);
Db::instance()->update('ws_service_log')->cols(['end_time' => time()])->where("end_time=0")->query();
}
$userModel = new UserModel(Db::instance());
swoole_timer_tick(1000*30, function () use($userModel) {
$userModel->saveNowData(1);
});
swoole_timer_tick(1000*60*60, function () use($userModel) {
$userModel->saveNowData(2);
});
swoole_timer_tick(1000*24, function () use($ws) {
$allfd = Redis::instance()->hkeys('fd');
foreach ($allfd as $fd) {
if(!$ws->isEstablished($fd)){
Redis::instance()->hdel('fd',$fd);
}
}
});
}
}
/**
* 当客户端连接时触发
*
* @param int $request->fd 客户端连接id
*/
public function onOpen($ws,$request){
if(self::$_serverSwitch == 0){
echo '服务器维护中,请稍后再试..'.PHP_EOL;
}
echo date('Y-m-d H:i:s')." 新用户[$request->fd]:".$request->server['remote_addr']."加入".PHP_EOL;
}
public function onFinish($ws, $task_id, $data){
echo "Task [{$task_id}] is finished\n";
echo "Result: {$data}\n";
}
/**
* 当客户端发来消息时触发
* @param int client_id 连接id
* @param mixed $message 具体消息 {"path":"api\/chat","param":{"type":"kf","to_id":"KF8","to_name":"test","content":"这是交流内容"},"access_token":"abc"}
*/
public function onMessage($ws,$request){
//将用户发送上来的数据格式化成数组客户端上传的json数据
$param = json_decode($request->data,true);
$param['client_id'] = $request->fd;
$param['client_ip'] = $ws->getClientInfo($request->fd)['remote_ip'];
// 客服验证token
// echo "收到消息[][$request->fd] ".date('Y-m-d H:i:s').' => '.$request->data."\n\n";
if($param['path'] !== 'ping'){
$logData = $param;
$logData['time'] = date('Y-m-d H:i:s');
Base::writeLog($logData);
}
if (!empty($param['path']) && $param['path'] == 'ping') {
Gateway::sendToClient($request->fd, json_encode([
'type' => 'ping'
]));
return false;
}
try {
self::_checkAccessTokenIsValid($param);
$response = Utility::dealRequest($param);
} catch (ApiException $e) {
echo "onMessage:" . $e->getMessage() . "\n";
list($errorMsg, $msgType) = explode('|', $e->getMessage());
$errorCode = $e->getCode();
}
$sendData = [
'code' => empty($errorCode) ? 200 : $errorCode,
'message' => empty($errorMsg) ? 'ok' : $errorMsg,
'data' => empty($response) ? '' : $response
];
if (!empty($msgType)) {
$sendData['type'] = $msgType;
}
// 向客户端发送服务器反馈数据.
if ($sendData['code'] == 200) {
Base::sendMessage($sendData);
} else {
Gateway::sendToClient($param['client_id'], json_encode($sendData, JSON_UNESCAPED_UNICODE));
// access-token过期关闭客户端.
if (isset($msgType) && $msgType == 'invalid_token') {
Gateway::closeClient($request->fd);
return false;
}
}
}
/**
* data = [
* 'type' => 'xxx',
* 'params' => [],
* 'data' => []
* ]
*/
public function onTask($ws,$task_id,$from_id,$data){
$task = new Task();
$method = $data['method'];
$task->$method($data['data'],$data['params']);
return "Finished";
}
/**
* 当用户断开连接时触发
* @param int $clientId 连接id
*/
public function onClose($ws,$clientId){
(new Service($clientId))->actionClose();
echo date('Y-m-d H:i:s').": 连接关闭[$clientId]".PHP_EOL;
}
/**
* 进程退出时触发
*/
public function onWorkerStop($ws, $worker_id){
echo "进程退出---$worker_id\n";
}
/**
* 验证令牌.
*/
private static function _checkAccessTokenIsValid(& $param)
{
try {
if (empty($param['param']['type']) || !in_array($param['param']['type'],['kf','user'])) {
throw new ApiException(450, 'type错误');
}
list($module, $method) = explode('/',$param['path']);
if ($param['param']['type'] == 'kf') {
if ($method == 'init') {
if (empty($param['access_token'])) {
throw new ApiException(450, 'access_token不能为空.');
}
$result = Db::instance()->select('id,user_name,sex,user_avatar,access_token,expire_time,group_id,kf_type')
->from('ws_users')
->where('access_token=:access_token')
->bindValues(['access_token' => $param['access_token']])
->row();
if (empty($result)) {
throw new ApiException(450, 'access_token不存在.');
}
if ($result['access_token'] != $param['access_token']) {
throw new ApiException(450, 'access_token错误.');
}
if ($result['expire_time'] < time()) {
throw new ApiException(450, "access_token已过期.");
}
$userInfo = [
'uid' => "KF".$result['id'],
'name' => $result['user_name'],
'sex' => $result['sex'],
'avatar' => $result['user_avatar'],
'kf_type' => $result['kf_type'],
'group_id' => $result['group_id'],
];
unset($param['param']['from_id'],$param['param']['from_name'],$param['param']['from_avatar']);
}else{
$kf = json_decode(Redis::instance()->hget('fd',$param['client_id']),true);
if(empty($kf)){
$kf = Db::instance()->select('CONCAT("KF",id) as uid,user_name as name,sex,user_avatar as avatar,access_token,expire_time,group_id,kf_type')
->from('ws_users')
->where('access_token=:access_token')
->bindValues(['access_token' => $param['access_token']])
->row();
}
$userInfo = [
'uid' => $kf['uid'],
'name' => $kf['name'],
'sex' => $kf['sex'],
'avatar' => $kf['avatar'],
'kf_type' => $kf['kf_type'],
'group_id' => $kf['group_id'],
];
}
$param['user'] = $userInfo;
} else {
if ($method == 'userInit') {
if (empty($param['access_token'])) {
throw new ApiException(450, 'access_token不能为空.');
}
$avatar = empty($param['param']['avatar_url']) ? self::$_defaultAvatar : $param['param']['avatar_url'];
$userInfo = [
'uid' => $param['param']['user_id'],
'name' => $param['param']['user_name'],
'group_id' => $param['param']['group_id'],
'access_token' => $param['access_token'],
'avatar' => $avatar
];
unset($param['param']['user_id'],$param['param']['user_name'],$param['param']['avatar_url']);
// 验证用户,并获取用户信息
// $api = '/api/Customer/GetPlayerByToken';
// $res = Utility::callRemoteApi($api,$param['access_token']);
// if($res['State'] !== 1 || empty($res['Data'])){
// throw new ApiException(450, '获取用户信息失败');
// }
// $avatar = $res['Data']['Head'];
// if(empty($avatar) || is_numeric($avatar)) $avatar = self::$_defaultAvatar;
// $userInfo = [
// 'uid' => $res['Data']['SnId'],
// 'name' => $res['Data']['Name'],
// 'group_id' => $param['param']['group_id'],
// 'access_token' => $param['access_token'],
// 'avatar' => $avatar
// ];
}else{
$users = json_decode(Redis::instance()->hget('fd',$param['client_id']),true);
if (empty($users)) {
throw new ApiException(401, '请先初始化连接');
}
$userInfo = [
'uid' => $users['uid'],
'name' => $users['name'],
'avatar' => $users['avatar'],
'access_token' => $users['access_token'],
'group_id' => $users['group_id'],
'data' => $users['data'],
];
unset($param['param']['from_id'],$param['param']['from_name'],$param['param']['from_avatar']);
}
$param['user'] = $userInfo;
}
unset($param['access_token']);
} catch (ApiException $e) {
throw new ApiException($e->getCode(), $e->getMessage() . '|' . 'invalid_token...');
}
}
}
(new Command())->execute($argv);