1, 'worker_num' => 2, 'max_request' => 1000, 'daemonize' => 0, // 'package_max_length' => 6 * 1024 *1024, 'dispatch_mode' => 2, // 固定模式,保证同一个连接发来的数据只会被同一个worker处理 'pid_file' => self::PID_FILE, 'log_file' => self::LOG_FILE, 'task_worker_num' => 100, 'heartbeat_check_interval' => 60, //心跳检测:每60秒遍历所有连接 // 'heartbeat_check_interval' => 600000000000000, //心跳检测:每60秒遍历所有连接 'heartbeat_idle_time' => 1800, //强制关闭30分钟内没有向服务器发送任何数据的连接 // 'heartbeat_idle_time' => 1200000000000000, //强制关闭10分钟内没有向服务器发送任何数据的连接 // 'task_tmpdir' => '/dev/shm', // 'ssl_cert_file' => '/usr/local/nginx/conf/cert/2122339_cs.waicf.com.pem', // 'ssl_key_file' => '/usr/local/nginx/conf/cert/2122339_cs.waicf.com.key', ]; public function __construct($daemon) { self::$setting['daemonize'] = $daemon; $this->_run(); } private function _run(){ $this->server = new \swoole_websocket_server('0.0.0.0',12800); // $this->server = new \swoole_websocket_server('0.0.0.0',12800, SWOOLE_PROCESS, SWOOLE_SOCK_TCP | SWOOLE_SSL); $this->server->set(self::$setting); $this->server->on('start', [$this,'onStart']); // $this->server->on('ManagerStart', [$this,'onManagerStart']); $this->server->on('open', [$this,'onOpen']); $this->server->on('message',[$this,'onMessage']); $this->server->on('task',[$this,'onTask']); $this->server->on('finish',[$this,'onFinish']); $this->server->on('close', [$this,'onClose']); $this->server->on('WorkerStart', [$this,'onWorkerStart']); $this->server->on('WorkerStop', [$this,'onWorkerStop']); // $this->server->on('WorkerError', [$this,'onWorkerError']); // $this->server->on('WorkerExit', [$this,'onWorkerExit']); $this->server->start(); } public function onStart($ws){ var_dump("服务启动"); swoole_set_process_name('swoole_server_customerService'); } // public function onManagerStart($ws){ // swoole_set_process_name('swoole_manager_customerService'); // } // // public function onWorkerError($ws, $worker_id, $worker_pid, $exit_code, $signal){ // echo "WorkerError".PHP_EOL; // } // // public function onWorkerExit($ws,$worker_id){ // echo "WorkerExit".PHP_EOL; // } /** * 进程启动后初始化操作,监听worker启动 */ public function onWorkerStart($ws, $worker_id){ echo "进程启动---$worker_id".PHP_EOL; swoole_set_process_name('swoole_process_customerService'); Gateway::setGateway($ws); global $configFile; $configFile = include (__DIR__ . '/../config/main.php'); // 定义应用目录 define('SERVER_PATH', __DIR__ . '/../'); Db::instance(); Redis::instance(); // 只有当worker_id为0时才添加定时器,避免重复添加 if ($worker_id == 0) { if(is_file(self::PID_FILE) && (filemtime(self::PID_FILE) == time() || filemtime(self::PID_FILE) == time()-1) ){ echo "重启整个服务了,开始进行清理工作...".PHP_EOL; Redis::instance()->del('kfList'); Redis::instance()->del('client_real_ip'); Redis::instance()->del('fd'); $keys = Redis::instance()->keys('user_set_*'); Redis::instance()->del($keys); Db::instance()->update('ws_service_log')->cols(['end_time' => time()])->where("end_time=0")->query(); } $userModel = new UserModel(Db::instance()); // swoole_timer_tick(1000, function () use($userModel) { swoole_timer_tick(1000 * 30, function () use($userModel) { $userModel->saveNowData(1); }); // swoole_timer_tick(1000, function () use($userModel) { swoole_timer_tick(1000 * 60 * 60, function () use($userModel) { $userModel->saveNowData(2); }); swoole_timer_tick(1000 * 24, function () use($ws) { $allfd = Redis::instance()->hkeys('fd'); foreach ($allfd as $fd) { if(!$ws->isEstablished($fd)){ Redis::instance()->hdel('fd',$fd); } } }); } } /** * 当客户端连接时触发 * * @param int $request->fd 客户端连接id */ public function onOpen($ws,$request){ if(self::$_serverSwitch == 0){ echo '服务器维护中,请稍后再试..'.PHP_EOL; } Redis::instance()->hset('client_real_ip', $request->fd, $request->header['x-real-ip']); echo date('Y-m-d H:i:s')." 新用户[$request->fd]:".$request->header['x-real-ip']."加入".PHP_EOL; } public function onFinish($ws, $task_id, $data){ echo "Task [{$task_id}] is finished\n"; echo "Result: {$data}\n"; } /** * 当客户端发来消息时触发 * @param int client_id 连接id * @param mixed $message 具体消息 {"path":"api\/chat","param":{"type":"kf","to_id":"KF8","to_name":"test","content":"这是交流内容"},"access_token":"abc"} */ public function onMessage($ws,$request){ //将用户发送上来的数据格式化成数组(客户端上传的json数据) $param = json_decode($request->data, true); $param['client_id'] = $request->fd; $param['client_ip'] = Redis::instance()->hget('client_real_ip', $request->fd); global $configFile; if($configFile['open_chatmsg']){ if($param['path'] !== 'ping'){ $logData = $param; $logData['time'] = date('Y-m-d H:i:s'); Base::writeLog($logData); } }else{ echo "收到消息[][$request->fd] ".date('Y-m-d H:i:s').' => '.$request->data."\n\n"; } if (!empty($param['path']) && $param['path'] == 'ping') { Gateway::sendToClient($request->fd, json_encode([ 'type' => 'ping' ])); return false; } try { self::_checkAccessTokenIsValid($param); $response = Utility::dealRequest($param); } catch (ApiException $e) { echo "onMessage:" . $e->getMessage().PHP_EOL; list($errorMsg, $msgType) = explode('|', $e->getMessage()); $errorCode = $e->getCode(); } $sendData = [ 'code' => empty($errorCode) ? 200 : $errorCode, 'message' => empty($errorMsg) ? 'ok' : $errorMsg, 'data' => empty($response) ? '' : $response ]; if (!empty($msgType)) { $sendData['type'] = $msgType; } // 向客户端发送服务器反馈数据. if ($sendData['code'] == 200) { Base::sendMessage($sendData); } else { Gateway::sendToClient($param['client_id'], json_encode($sendData, JSON_UNESCAPED_UNICODE)); // access-token过期,关闭客户端. if (isset($msgType) && $msgType == 'invalid_token') { Gateway::closeClient($request->fd); return false; } } } /** * data = [ * 'type' => 'xxx', * 'params' => [], * 'data' => [] * ] */ public function onTask($ws,$task_id,$from_id,$data){ $task = new Task(); $method = $data['method']; $task->$method($data['data'],$data['params']); return "Finished"; } /** * 当用户断开连接时触发 * @param int $clientId 连接id */ public function onClose($ws, $clientId){ (new Service($clientId))->actionClose(); echo date('Y-m-d H:i:s').":连接关闭[$clientId]".PHP_EOL; } /** * 进程退出时触发 */ public function onWorkerStop($ws, $worker_id){ echo "进程退出---$worker_id\n"; } /** * 验证令牌. */ private static function _checkAccessTokenIsValid(& $param) { try { if (empty($param['param']['type']) || !in_array($param['param']['type'],['kf','user'])) { throw new ApiException(450, 'type错误'); } list($module, $method) = explode('/',$param['path']); if ($param['param']['type'] == 'kf') { if ($method == 'init') { if (empty($param['access_token'])) { throw new ApiException(450, 'access_token不能为空.'); } $result = Db::instance()->select('id,user_name,sex,user_avatar,access_token,expire_time,group_id,kf_type') ->from('ws_users') ->where('access_token=:access_token') ->bindValues(['access_token' => $param['access_token']]) ->row(); if (empty($result)) { throw new ApiException(450, 'access_token不存在.'); } if ($result['access_token'] != $param['access_token']) { throw new ApiException(450, 'access_token错误.'); } if ($result['expire_time'] < time()) { throw new ApiException(450, "access_token已过期."); } $userInfo = [ 'uid' => "KF".$result['id'], 'name' => $result['user_name'], 'sex' => $result['sex'], 'avatar' => $result['user_avatar'], 'kf_type' => $result['kf_type'], 'group_id' => $result['group_id'], ]; unset($param['param']['from_id'],$param['param']['from_name'],$param['param']['from_avatar']); }else{ $kf = json_decode(Redis::instance()->hget('fd',$param['client_id']),true); if(empty($kf)){ $kf = Db::instance()->select('CONCAT("KF",id) as uid,user_name as name,sex,user_avatar as avatar,access_token,expire_time,group_id,kf_type') ->from('ws_users') ->where('access_token=:access_token') ->bindValues(['access_token' => $param['access_token']]) ->row(); } $userInfo = [ 'uid' => $kf['uid'], 'name' => $kf['name'], 'sex' => $kf['sex'], 'avatar' => $kf['avatar'], 'kf_type' => $kf['kf_type'], 'group_id' => $kf['group_id'], ]; } $param['user'] = $userInfo; } else { if ($method == 'userInit') { if (empty($param['access_token'])) { throw new ApiException(450, 'access_token不能为空.'); } $avatar = empty($param['param']['avatar_url']) ? self::$_defaultAvatar : $param['param']['avatar_url']; $userInfo = [ 'uid' => $param['param']['user_id'], 'name' => $param['param']['user_name'], 'group_id' => $param['param']['group_id'], 'access_token' => $param['access_token'], 'avatar' => $avatar ]; unset($param['param']['user_id'],$param['param']['user_name'],$param['param']['avatar_url']); // // 验证用户,并获取用户信息 // $api = '/api/Customer/GetPlayerByToken'; // $res = Utility::callRemoteApi($api,$param['access_token']); // if($res['State'] !== 1 || empty($res['Data'])){ // throw new ApiException(450, '获取用户信息失败'); // } // $avatar = $res['Data']['Head']; // if(empty($avatar) || is_numeric($avatar)) $avatar = self::$_defaultAvatar; // $userInfo = [ // 'uid' => $res['Data']['SnId'], // 'name' => $res['Data']['Name'], // 'group_id' => $param['param']['group_id'], // 'access_token' => $param['access_token'], // 'avatar' => $avatar // ]; }else{ $users = json_decode(Redis::instance()->hget('fd',$param['client_id']),true); if (empty($users)) { throw new ApiException(401, '请先初始化连接'); } $userInfo = [ 'uid' => $users['uid'], 'name' => $users['name'], 'avatar' => $users['avatar'], 'access_token' => $users['access_token'], 'group_id' => $users['group_id'], 'data' => $users['data'], ]; unset($param['param']['from_id'],$param['param']['from_name'],$param['param']['from_avatar']); } $param['user'] = $userInfo; } unset($param['access_token']); } catch (ApiException $e) { throw new ApiException($e->getCode(), $e->getMessage() . '|' . 'invalid_token...'); } } } (new Command())->execute($argv);