local skynet = require "skynet" local oo = require "Class" local gameCmd = require "GameCmd" local json =require "json" local log = require "Log" local dataType = require "DataType" local sqlUrl = require "SqlUrl" local errorInfo = require "ErrorInfo" local redisKeyUrl = require "RedisKeyUrl" local serverId = tonumber(skynet.getenv "serverId") local serverConfigFields = require "ServerConfigFields" local dbData = require "DBData" local serverManage = require "ServerManage" local manageCmd = require "ManageCmd" local clusterServer = require "ClusterServer" local mail = require "Mail" local personal = require "Personal" local playerCenter = require "PlayerCenter" local CenterServer = oo.class(clusterServer) --初始化 function CenterServer:Init() self.isUpdateClusterInfo = false --self.lastSyncClusterInfoToGameTime = 0 --上一次同步集群信息到游戏服的时间 self:LoadPlatform() self:LoadServerConfig() skynet.server.timer:Add( dataType.Timer_SyncSyncClusterInfo , 10 , self["SyncClusterInfo"] , self ) --30秒同步一次集群信息 end --跨天 function CenterServer:OnNewDay() end --1秒Timer function CenterServer:On1SecTimer() end --5秒Timer function CenterServer:On5SecTimer() for k, v in pairs( self.clusterInfo ) do if v.status == self.Status_Running and os.time() - v.pingTime > self.curClusterConfig.ClusterSlavePingTimeOut then --self.clusterInfo[ k ].status = self.Status_PingTimeout --self:SyncClusterInfo() log.info(string.format("集群服务器 服务器ID %d Ping时间超过最大阀值 暂时下线" ,k)) elseif v.status == self.Status_PingTimeout and os.time() - v.pingTime <= self.curClusterConfig.ClusterSlavePingTimeOut then --self.clusterInfo[ k ].status = self.Status_Running --self:SyncClusterInfo() log.info(string.format("集群服务器 服务器ID %d 恢复在线" ,k)) end end end --接收集群数据 function CenterServer:ClusterRecv(...) local cmd , c2sData = ... local s2sData = {} s2sData.code = errorInfo.Suc local clusterServerId = c2sData.serverId or -1 if self.All_Reg == cmd then --服务器注册 self:RecvReg( c2sData ) elseif self.All_Restart == cmd then --服务器重启 self:RecvRestart( c2sData ) elseif self.All_UnReg == cmd then --服务器注销 self:RecvUnReg( c2sData , s2sData ) elseif self.All_Ping == cmd then --PING消息 self:RecvPing( c2sData ) elseif self.All_SyncServerInfoToCenter == cmd then self:RecvSyncServerInfoToCenter( c2sData ) elseif self.All_ErrorInfo == cmd then self:RecvErrorInfo( c2sData ) else log.info(string.format("集群服务器 %d 消息接口 %d 不存在", clusterServerId , cmd)) s2sData.code = 2 end log.info(string.format("集群服 %d 执行命令 %d 返回消息状态 %d " , clusterServerId , cmd , s2sData.code)) return s2sData end --接收HTTP数据 function CenterServer:HttpRecv( c2sData , url , addr ) local s2cData = {} s2cData.code = errorInfo.Suc log.info("消息接口",url) if "ServerManage" == url then self:ServerManage( c2sData , s2cData) elseif "QueryClusterInfo" == url then self:QueryClusterInfo( c2sData , s2cData) elseif "SetServerConfig" == url then self:SetServerConfig( c2sData , s2cData) elseif "metrics" == url then self:Metrics( c2sData , s2cData) elseif "DeleteServerReg" == url then self:DeleteServerReg( c2sData , s2cData ) elseif "WebMsg" == url then self:WebMsg( c2sData , s2cData ) elseif "MinIO" == url then elseif "AddGameMail" == url then self:AddGameMail( c2sData , s2cData ) elseif "AddWhiteList" == url then self:AddWhiteList( c2sData , s2cData ) elseif "AddBlackList" == url then self:AddBlackList( c2sData , s2cData ) else log.info(string.format("Http服务器 消息接口 %d 不存在", url)) s2cData.code = errorInfo.ErrorCode.NoExistInterface end log.info("Http服务器 消息接口 ",url , "返回信息",s2cData.code) return s2cData end --获取注册的服务器信息 function CenterServer:QueryClusterInfo( c2sData , s2cData ) s2cData.nowTime = skynet.server.common:GetStrTime( skynet.GetTime() ) s2cData.brief = {} local playerCount = 0 local onlinePlayerCount = 0 local offlinePlayerCount = 0 local loginCount = 0 local pre5MinLoginCount = 0 s2cData.brief.serverInfo = {} s2cData.clusterInfo = {} for k, v in pairs( self.clusterInfo ) do if v.serverId >= self.gameServerMinID and v.serverId <= self.gameServerMaxID then --对游戏服一些数据进行统计 loginCount = loginCount + v.serverInfo.loginCount playerCount = playerCount + v.serverInfo.playerCount.playing playerCount = playerCount + v.serverInfo.playerCount.offline onlinePlayerCount = onlinePlayerCount + v.serverInfo.playerCount.playing offlinePlayerCount = offlinePlayerCount + v.serverInfo.playerCount.offline pre5MinLoginCount = pre5MinLoginCount + v.serverInfo.pre5MinLoginCount table.insert( s2cData.brief.serverInfo , { severId = v.serverId , status = v.status} ) end table.insert( s2cData.clusterInfo , v ) end s2cData.brief.loginCount = loginCount --总登陆人数 s2cData.brief.playerCount = playerCount --在线总人数 s2cData.brief.onlinePlayerCount = onlinePlayerCount --当前在线 s2cData.brief.offlinePlayerCount = offlinePlayerCount --当前离线 s2cData.brief.pre5MinLoginCount = pre5MinLoginCount --5分钟登陆人数 end --设置服务器配置信息 function CenterServer:SetServerConfig( c2sData , s2cData ) if nil == c2sData.serverId or nil == c2sData.key or nil == c2sData.value then s2cData.code = errorInfo.ErrorCode.ErrRequestParam log.info(string.format("不存在的请求参数 cmd %s value %s" , c2sData.key , c2sData.value)) return end c2sData.serverName = clusterServer:GetServerTypeName( c2sData.serverId ) if not self.allServerConfig[ c2sData.serverName ] then s2cData.code = errorInfo.ErrorCode.NoExistServer log.info(string.format("不存在该游戏服 %s value %s" , c2sData.serverId , c2sData.serverName)) return end local cfg = self.allServerConfig[ c2sData.serverName ] if nil ~= cfg[ c2sData.key ] then log.info(string.format("修改前参数 key %s value %s" , c2sData.key , cfg[ c2sData.key ] )) cfg[ c2sData.key ] = c2sData.value log.info(string.format("修改后参数 key %s value %s" , c2sData.key , cfg[ c2sData.key ] )) else s2cData.code = errorInfo.ErrorCode.NoExistConfig log.info(string.format("不存在的请求参数 key %s value %s" , c2sData.key , c2sData.value)) return end self.allServerConfig[ c2sData.serverName ] = cfg --保存到数据库 local sql = string.format(sqlUrl.updateServerConfig , json:encode(cfg) , c2sData.serverName) skynet.server.db:Query( "center" , sql ) s2cData = self:SyncServerConfig() end --同步服务器信息 function CenterServer:SyncServerConfig() local c2sData = {} local s2cData = {} s2cData.code = errorInfo.Suc s2cData.successful = {} s2cData.unsuccessful = {} c2sData.platformInfo = self.platformInfo c2sData.allServerConfig = self.allServerConfig for k, v in pairs(self.clusterInfo) do local callData = self:CallMsgToServer( v.serverId , self.Center2All_SyncServerConfig , c2sData ) if callData then table.insert( s2cData.successful , { serverName = v.serverName , serverId = v.serverId}) else table.insert( s2cData.unsuccessful , { serverName = v.serverName , serverId = v.serverId}) end end return s2cData end --接收注册 function CenterServer:RecvReg( c2sData ) local regServerId = c2sData.serverId self.clusterInfo[ regServerId ] = {} self.clusterInfo[ regServerId ].serverId = regServerId self.clusterInfo[ regServerId ].serverName = c2sData.serverName self.clusterInfo[ regServerId ].status = self.Status_Running self.clusterInfo[ regServerId ].pingTime = os.time() self.clusterInfo[ regServerId ].serverInfo = c2sData.serverInfo self:SyncServerConfig() self:SyncClusterInfo() log.info(string.format("集群服务器 %s 向中心服注册成功", c2sData.serverName)) end --接收重启 function CenterServer:RecvRestart( c2sData ) local restartServerId = c2sData.serverId if self.clusterInfo[ restartServerId ] then self.clusterInfo[ restartServerId ].status = self.Status_Restart self:SyncClusterInfo() log.info(string.format("集群服务器 %s 向中心服重启成功", c2sData.serverName)) end end --接收注销 function CenterServer:RecvUnReg( c2sData , s2sData ) local unRegServerId = c2sData.serverId if self.clusterInfo[ unRegServerId ] then self.clusterInfo[ unRegServerId ].status = self.Status_Stop self:SyncClusterInfo() log.info(string.format("集群服务器 %s 向中心服注销成功", c2sData.serverName)) else s2sData.code = clusterServer.Error.NoExistServer log.info(string.format("集群服务器 %s 停止失败,不存在该服", c2sData.serverName)) end end --接收Ping function CenterServer:RecvPing( c2sData ) local slaveServerId = c2sData.serverId local curTime = os.time() if self.clusterInfo[ slaveServerId ] then local t1 = curTime - self.clusterInfo[ slaveServerId ].pingTime --两次PING的时间差值 local t2 = self.curClusterConfig.ClusterSlaveSendPingTime + 5 --正常PING的时间差加5秒 if t1 > t2 then local errorText = string.format("集群服务器 服务器 %s 正常心跳时间为 %d 目前心跳时间为 %d", slaveServerId ,self.curClusterConfig.ClusterSlaveSendPingTime , t1) local c2sData = {} c2sData.serverId = slaveServerId c2sData.errorCode = errorInfo.ErrorCode.PingTimeOut c2sData.errorText = errorText self:RecvErrorInfo( c2sData ) log.info(errorText) end self.clusterInfo[ slaveServerId ].pingTime = curTime self.clusterInfo[ slaveServerId ].pingFormatTime = skynet.server.common:GetStrTime(curTime) self.clusterInfo[ slaveServerId ].pingInterval = t1 end end --接收服务器同步消息 function CenterServer:RecvSyncServerInfoToCenter( c2sData ) local syncServerId = c2sData.serverId if not self.clusterInfo[ syncServerId ] or not syncServerId or not c2sData or not c2sData.serverInfo then return end self.clusterInfo[ syncServerId ].serverInfo = c2sData.serverInfo end --接收服务器错误消息 function CenterServer:RecvErrorInfo( c2sData ) if not c2sData or not c2sData.errorCode or not c2sData.errorText then return end local errorServerId = c2sData.serverId local errorAccount = c2sData.account local errorCode = c2sData.errorCode local errorText = c2sData.errorText local sql = string.format(sqlUrl.insertErrorInfoToCenter , errorServerId , errorAccount , errorCode , errorText) skynet.server.db:Query( "center" , sql ) log.info("接收集群服的报错信息 ", errorServerId , errorCode , errorText ) end --获取所有平台信息 function CenterServer:LoadPlatform() local sql = sqlUrl.selectPlatform local queryData = skynet.server.db:Query("center" , sql ) for k, v in pairs(queryData) do self.platformInfo[ v.Name ] = {} self.platformInfo[ v.Name ].status = v.Status end end --获取所有服务器配置 function CenterServer:LoadServerConfig() local allServerConfig = {} --所有服的配置处理 local sql = sqlUrl.selectServerConfig local queryData = skynet.server.db:Query("center" , sql ) for serverName, v1 in pairs(serverConfigFields) do local isExist = false local jsonConfig = nil for k, v2 in pairs(queryData) do if v2.ServerName == serverName then isExist = true jsonConfig = v2.Config break end end if isExist then --检查下有不有更新数据 local config = json:decode(jsonConfig) dbData:CheckNewFields( config , serverConfigFields[ serverName ]) allServerConfig[ serverName ] = config sql = string.format(sqlUrl.updateServerConfig , json:encode(config) , serverName) skynet.server.db:Query("center" , sql ) else --插入最新配置 allServerConfig[ serverName ] = {} dbData:Traversal( allServerConfig[ serverName ] , v1 ) sql = string.format(sqlUrl.insertServerConfig , serverName , json:encode(allServerConfig[ serverName ])) local insertData = skynet.server.db:Query("center" , sql ) if 1 ~= insertData.affected_rows then log.info("插入服务器配置失败" , serverName) end end end self.allServerConfig = allServerConfig self.curClusterConfig = self.allServerConfig[ "ClusterServer" ] end --管理服务器命令 function CenterServer:ServerManage( c2sData , s2cData) local cmd = c2sData.cmd local curServerId = c2sData.serverId or 0 --只有主服能执行的命令 if not skynet.server.clusterServer:IsCenterServer( serverId ) then s2cData.code = errorInfo.ErrorCode.NoCenterServer return end s2cData.data = {} if c2sData.cmd and curServerId then --多功能服 if 0 == curServerId then skynet.server.serverManage:DoManageCmd( c2sData , s2cData) elseif -1 == curServerId then --全游戏服遍历 skynet.server.serverManage:DoManageCmd( c2sData , s2cData) for k, v in pairs( self.clusterInfo ) do if v.serverId == self.payServerID or (v.serverId >= self.routeServerMinID and v.serverId <= self.routeServerMaxID) or (v.serverId >= self.gameServerMinID and v.serverId <= self.gameServerMaxID) then local callData = self:CallMsgToServer( v.serverId , self.Center2All_ServerManageCmd , c2sData ) if not callData then s2cData.code = errorInfo.ErrorCode.NoExistServer else table.insert( s2cData.data , callData.data ) end end end else --指定服接收命令 local callData = self:CallMsgToServer( curServerId , self.Center2All_ServerManageCmd , c2sData ) if not callData then s2cData.code = errorInfo.ErrorCode.NoExistServer log.info(string.format("集群服serverId %d 不在线", serverId)) else s2cData.code = callData.code s2cData.data = callData.data return end end else log.info("错误的请求命令",cmd) s2cData.code = errorInfo.ErrorCode.ErrRequestParam end end --获取指标 function CenterServer:Metrics( c2sData , s2cData) s2cData.data = {} local pre5MinLoginCount = 0 local loginCount = 0 local onlineCount = 0 local offlineCount = 0 for k, v in pairs( self.clusterInfo ) do if v.serverId >= self.gameServerMinID and v.serverId <= self.gameServerMaxID then pre5MinLoginCount = pre5MinLoginCount + v.serverInfo.pre5MinLoginCount loginCount = loginCount + v.serverInfo.loginCount onlineCount = onlineCount + v.serverInfo.pre5MinLoginCount offlineCount = offlineCount +v.serverInfo.playerCount.offline end end s2cData.data.pre5MinLoginCount = pre5MinLoginCount s2cData.data.loginCount = loginCount s2cData.data.onlineCount = onlineCount s2cData.data.offlineCount = offlineCount local listData = {} table.insert(listData , "#TYPE gsserver_http_requeststotal gauge") table.insert(listData , "#HELP gsserver_http_requeststotal Total number of landings") table.insert(listData , string.format("gsserver_http_requeststotal{ label=\"FiveMinLoginCount\" ,method=\"Get\" } %d",pre5MinLoginCount)) table.insert(listData , string.format("gsserver_http_requeststotal{ label=\"LoginCount\" ,method=\"Get\" } %d",loginCount)) local data = "" for k, v in pairs(listData) do data = data .. v data = data .. "\n" end s2cData.data = data end --删除服务器注册信息 function CenterServer:DeleteServerReg( c2sData , s2cData) local deleteServerId = c2sData.serverId if self.clusterInfo[ deleteServerId ] then self.clusterInfo[ deleteServerId ] = nil end end --后台发来的消息 function CenterServer:WebMsg( c2sData , s2cData ) log.info("后台发来的消息 ",skynet.server.common:TableToString(c2sData)) for k, v in pairs( self.clusterInfo ) do --if self.Status_Running == v.status and k >= self.gameServerMinID and k <= self.gameServerMaxID then if k == self.rankServerID or ( k >= self.gameServerMinID and k <= self.gameServerMaxID ) then self:SendMsgToServer( k , self.Center2All_WebMsg , c2sData ) end end end --新增游戏邮件 function CenterServer:AddGameMail( c2sData , s2cData ) local account = c2sData.account local mailType = c2sData.mailType local mailTitle = c2sData.mailTitle local mailText = c2sData.mailText local award = c2sData.award if nil == account or nil == mailType or nil == mailTitle or nil == mailText or nil == award then s2cData.code = errorInfo.ErrorCode.ErrRequestParam else skynet.server.mail:SendAsyncMailForAccount( account , mailType , mailTitle , mailText , award ) end end --新增白名单 function CenterServer:AddWhiteList( c2sData , s2cData ) local account = c2sData.account local opType = c2sData.opType if nil == account or nil == opType then s2cData.code = errorInfo.ErrorCode.ErrRequestParam else --通知所有网关服拉起最新的白名单 local function NoticeAllRouteServer() c2sData.cmd = manageCmd.RouteSyncWhiteList for k, v in pairs( self.clusterInfo ) do if k >= self.routeServerMinID and k <= self.routeServerMaxID then self:SendMsgToServer( k , self.Center2All_ServerManageCmd , c2sData) end end end local redisKey = redisKeyUrl.RouteWhiteList if 1 == opType then --增加白名单 skynet.server.redis:sadd( redisKey , account ) NoticeAllRouteServer() elseif 2 == opType then --删除黑名单 skynet.server.redis:srem( redisKey , account ) NoticeAllRouteServer() else s2cData.code = errorInfo.ErrorCode.ErrRequestParam end end end --新增黑名单 function CenterServer:AddBlackList( c2sData , s2cData ) local account = c2sData.account local opType = c2sData.opType if nil == account or nil == opType then s2cData.code = errorInfo.ErrorCode.ErrRequestParam else --通知所有网关服拉起最新的黑名单 local function NoticeAllRouteServer() c2sData.cmd = manageCmd.RouteSyncBlackList for k, v in pairs( self.clusterInfo ) do if k >= self.routeServerMinID and k <= self.routeServerMaxID then self:SendMsgToServer( k , self.Center2All_ServerManageCmd , c2sData) end end end local redisKey = redisKeyUrl.RouteBlackList if 1 == opType then --增加黑名单 skynet.server.redis:sadd( redisKey , account ) local redisKey = string.format( redisKeyUrl.RouteServerLoginInfoHash , account ) local gameServerId = tonumber( skynet.server.redis:hget( redisKey , "GameServerID" ) or 0 ) if gameServerId then NoticeAllRouteServer() --强制玩家下线 local userDBInfo = skynet.server.redisCenter:GetRedisDBInfo( account ) c2sData.cmd = manageCmd.ForceUserOffline c2sData.userId = userDBInfo.UserID self:SendMsgToServer( gameServerId , self.Center2All_ServerManageCmd , c2sData ) end elseif 2 == opType then --删除黑名单 skynet.server.redis:srem( redisKey , account ) NoticeAllRouteServer() else s2cData.code = errorInfo.ErrorCode.ErrRequestParam end end end --获取所有服务器信息 function CenterServer:GetAllServerInfo() local allServerInfo = {} for k, v in pairs(self.clusterInfo) do allServerInfo[ k ] = self:CallMsgToServer( k , self.C2M_QueryAllServerInfo ) end log.info("查询所有服务器信息 ", skynet.server.common:TableToString(allServerInfo)) return allServerInfo end --同步集群服务器信息 function CenterServer:SyncClusterInfo() local nowTime = skynet.GetTime() local c2sData = {} c2sData.clusterInfo = self.clusterInfo --非游戏服30秒同步一次 for k, v in pairs( self.clusterInfo ) do if not self:IsGameServer( k ) then self:SendMsgToServer( k , self.Center2All_SyncClusterInfo , c2sData ) end end log.info("同步集群服务器信息 非游戏服耗时" , skynet.GetTime() - nowTime) --游戏服 每5分钟同步一次 --[[ if nowTime >= self.lastSyncClusterInfoToGameTime + 300 then for k, v in pairs( self.clusterInfo ) do if self:IsGameServer( k ) then self:SendMsgToServer( k , self.Center2All_SyncClusterInfo , c2sData ) end end self.lastSyncClusterInfoToGameTime = nowTime log.info("同步集群服务器信息 游戏服耗时" , skynet.GetTime() - nowTime) end ]] --log.info("同步集群服务器信息成功") end --保存在线数据 function CenterServer:SaveOnlineData() local data = {} for k, v in pairs( self.clusterInfo ) do table.insert( data , v ) end local sql = string.format(sqlUrl.insertClusterInfoToCenter , json:encode( data )) skynet.server.db:Query( "center" , sql ) log.info("保存在线数据成功",sql) end skynet.server.centerServer = CenterServer return CenterServer