local skynet = require "skynet" local oo = require "Class" local gameCmd = require "GameCmd" local json =require "json" local log = require "Log" local dataType = require "DataType" local sqlUrl = require "SqlUrl" local errorInfo = require "ErrorInfo" local redisKeyUrl = require "RedisKeyUrl" local serverId = tonumber(skynet.getenv "serverId") local serverConfigFields = require "ServerConfigFields" local dbData = require "DBData" local serverManage = require "ServerManage" local clusterServer = require "ClusterServer" local CenterServer = oo.class(clusterServer) --初始化 function CenterServer:Init() self.isUpdateClusterInfo = false self:LoadPlatform() self:LoadServerConfig() skynet.server.timer:Add( dataType.Timer_SaveOnlineData , 60 , self["SaveOnlineData"] , self ) end --跨天 function CenterServer:OnNewDay() end --1秒Timer function CenterServer:On1SecTimer() end --5秒Timer function CenterServer:On5SecTimer() for k, v in pairs( self.clusterInfo ) do if v.status == self.Status_Running and skynet.GetTime() - v.pingTime > self.curClusterConfig.ClusterSlavePingTimeOut then self.clusterInfo[ k ].status = self.Status_PingTimeout log.info(string.format("集群服务器 服务器ID %d Ping时间超过最大阀值 暂时下线" ,k)) elseif v.status == self.Status_PingTimeout and skynet.GetTime() - v.pingTime <= self.curClusterConfig.ClusterSlavePingTimeOut then self.clusterInfo[ k ].status = self.Status_Running log.info(string.format("集群服务器 服务器ID %d 恢复在线" ,k)) end end skynet.server.timer:Refresh() end --接收集群数据 function CenterServer:ClusterRecv(...) local cmd , c2sData = ... local s2sData = {} s2sData.code = errorInfo.Suc if self.All_Reg == cmd then --服务器注册 self:RecvReg( c2sData ) elseif self.All_UnReg == cmd then --服务器注销 self:RecvUnReg( c2sData , s2sData ) elseif self.All_Ping == cmd then --PING消息 self:RecvPing( c2sData ) elseif self.All_SyncServerInfoToCenter == cmd then self:RecvSyncServerInfoToCenter( c2sData ) elseif self.All_ErrorInfo == cmd then self:RecvErrorInfo( c2sData ) else log.info(string.format("集群服务器 消息接口 %d 不存在", cmd)) s2sData.code = 2 end log.info(string.format("集群服 执行命令 %d 返回消息状态 %d " , cmd , s2sData.code)) return s2sData end --接收HTTP数据 function CenterServer:HttpRecv( c2sData , url , addr ) local s2cData = {} s2cData.code = errorInfo.Suc log.info("消息接口",url) if "ServerManage" == url then self:ServerManage( c2sData , s2cData) elseif "QueryClusterInfo" == url then self:QueryClusterInfo( c2sData , s2cData) elseif "SetServerConfig" == url then self:SetServerConfig( c2sData , s2cData) elseif "metrics" == url then self:Metrics( c2sData , s2cData) elseif "DeleteServerReg" == url then self:DeleteServerReg( c2sData , s2cData ) elseif "WebMsg" == url then self:WebMsg( c2sData , s2cData ) else log.info(string.format("Http服务器 消息接口 %d 不存在", url)) s2cData.code = errorInfo.ErrorCode.NoExistInterface end log.info("Http服务器 消息接口 ",url , "返回信息",s2cData.code) return s2cData end --获取注册的服务器信息 function CenterServer:QueryClusterInfo( c2sData , s2cData ) s2cData.clusterInfo = {} for k, v in pairs( self.clusterInfo ) do table.insert( s2cData.clusterInfo , v ) end --s2cData.clusterInfo = self.clusterInfo end --设置服务器配置信息 function CenterServer:SetServerConfig( c2sData , s2cData ) if nil == c2sData.serverId or nil == c2sData.key or nil == c2sData.value then s2cData.code = errorInfo.ErrorCode.ErrRequestParam log.info(string.format("不存在的请求参数 cmd %s value %s" , c2sData.key , c2sData.value)) return end c2sData.serverName = clusterServer:GetServerTypeName( c2sData.serverId ) if not self.allServerConfig[ c2sData.serverName ] then s2cData.code = errorInfo.ErrorCode.NoExistServer log.info(string.format("不存在该游戏服 %s value %s" , c2sData.serverId , c2sData.serverName)) return end local cfg = self.allServerConfig[ c2sData.serverName ] if nil ~= cfg[ c2sData.key ] then log.info(string.format("修改前参数 key %s value %s" , c2sData.key , cfg[ c2sData.key ] )) cfg[ c2sData.key ] = c2sData.value log.info(string.format("修改后参数 key %s value %s" , c2sData.key , cfg[ c2sData.key ] )) else s2cData.code = errorInfo.ErrorCode.NoExistConfig log.info(string.format("不存在的请求参数 key %s value %s" , c2sData.key , c2sData.value)) return end self.allServerConfig[ c2sData.serverName ] = cfg --保存到数据库 local sql = string.format(sqlUrl.updateServerConfig , json:encode(cfg) , c2sData.serverName) skynet.server.db:Query( "center" , sql ) s2cData = self:SyncServerConfig() end --同步服务器信息 function CenterServer:SyncServerConfig() local c2sData = {} local s2cData = {} s2cData.code = errorInfo.Suc s2cData.successful = {} s2cData.unsuccessful = {} c2sData.platformInfo = self.platformInfo c2sData.allServerConfig = self.allServerConfig for k, v in pairs(self.clusterInfo) do local callData = self:CallMsgToServer( v.serverId , self.Center2All_SyncServerConfig , c2sData ) if callData then table.insert( s2cData.successful , { serverName = v.serverName , serverId = v.serverId}) else table.insert( s2cData.unsuccessful , { serverName = v.serverName , serverId = v.serverId}) end end return s2cData end --接收注册 function CenterServer:RecvReg( c2sData ) local regServerId = c2sData.serverId self.clusterInfo[ regServerId ] = {} self.clusterInfo[ regServerId ].serverId = regServerId self.clusterInfo[ regServerId ].serverName = c2sData.serverName self.clusterInfo[ regServerId ].status = self.Status_Running self.clusterInfo[ regServerId ].pingTime = skynet.GetTime() self.clusterInfo[ regServerId ].serverInfo = {} self:SyncClusterInfo() self:SyncServerConfig() log.info(string.format("集群服务器 %s 向中心服注册成功", c2sData.serverName)) end --接收注销 function CenterServer:RecvUnReg( c2sData , s2sData ) local unRegServerId = c2sData.serverId if self.clusterInfo[ unRegServerId ] then self.clusterInfo[ unRegServerId ].status = self.Status_Stop log.info(string.format("集群服务器 %s 向中心服注销成功", c2sData.serverName)) self:SyncClusterInfo() else s2sData.code = clusterServer.Error.NoExistServer log.info(string.format("集群服务器 %s 停止失败,不存在该服", c2sData.serverName)) end end --接收Ping function CenterServer:RecvPing( clusterMsg ) local slaveServerId = clusterMsg.serverId if self.clusterInfo[ slaveServerId ] then local t1 = skynet.GetTime() - self.clusterInfo[ slaveServerId ].pingTime --两次PING的时间差值 local t2 = self.curClusterConfig.ClusterSlaveSendPingTime + 5 --正常PING的时间差加5秒 if t1 > t2 then local errorText = string.format("集群服务器 服务器 %s 正常心跳时间为 %d 目前心跳时间为 %d", slaveServerId ,self.curClusterConfig.ClusterSlaveSendPingTime , t1) local c2sData = {} c2sData.serverId = slaveServerId c2sData.errorCode = errorInfo.ErrorCode.PingTimeOut c2sData.errorText = errorText self:RecvErrorInfo( c2sData ) log.info(errorText) end self.clusterInfo[ slaveServerId ].pingTime = skynet.GetTime() end end --接收服务器同步消息 function CenterServer:RecvSyncServerInfoToCenter( c2sData ) local syncServerId = c2sData.serverId if not self.clusterInfo[ syncServerId ] or not syncServerId or not c2sData or not c2sData.serverInfo then return end self.clusterInfo[ syncServerId ].serverInfo = c2sData.serverInfo self:SyncClusterInfo() log.info("接收集群服同步信息 ", syncServerId , skynet.server.common:TableToString( c2sData.serverInfo )) end --接收服务器错误消息 function CenterServer:RecvErrorInfo( c2sData ) if not c2sData or not c2sData.errorCode or not c2sData.errorText then return end local errorServerId = c2sData.serverId local errorCode = c2sData.errorCode local errorText = c2sData.errorText local sql = string.format(sqlUrl.insertErrorInfoToCenter , errorServerId , errorCode , errorText) skynet.server.db:Query( "center" , sql ) log.info("接收集群服的报错信息 ", errorServerId , errorCode , errorText ) end --获取所有平台信息 function CenterServer:LoadPlatform() local sql = sqlUrl.selectPlatform local queryData = skynet.server.db:Query("center" , sql ) for k, v in pairs(queryData) do self.platformInfo[ v.Name ] = {} self.platformInfo[ v.Name ].status = v.Status end end --获取所有服务器配置 function CenterServer:LoadServerConfig() local allServerConfig = {} --所有服的配置处理 local sql = sqlUrl.selectServerConfig local queryData = skynet.server.db:Query("center" , sql ) for serverName, v1 in pairs(serverConfigFields) do local isExist = false local jsonConfig = nil for k, v2 in pairs(queryData) do if v2.ServerName == serverName then isExist = true jsonConfig = v2.Config break end end if isExist then --检查下有不有更新数据 local config = json:decode(jsonConfig) dbData:CheckNewFields( config , serverConfigFields[ serverName ]) allServerConfig[ serverName ] = config sql = string.format(sqlUrl.updateServerConfig , json:encode(config) , serverName) skynet.server.db:Query("center" , sql ) else --插入最新配置 allServerConfig[ serverName ] = {} dbData:Traversal( allServerConfig[ serverName ] , v1 ) sql = string.format(sqlUrl.insertServerConfig , serverName , json:encode(allServerConfig[ serverName ])) local insertData = skynet.server.db:Query("center" , sql ) if 1 ~= insertData.affected_rows then log.info("插入服务器配置失败" , serverName) end end end self.allServerConfig = allServerConfig self.curClusterConfig = self.allServerConfig[ "ClusterServer" ] end --管理服务器命令 function CenterServer:ServerManage( c2sData , s2cData) local cmd = c2sData.cmd local curServerId = c2sData.serverId --只有主服能执行的命令 if not skynet.server.clusterServer:IsCenterServer( serverId ) then s2cData.code = errorInfo.ErrorCode.NoCenterServer return end s2cData.data = {} if c2sData.cmd and curServerId then --多功能服 if 0 == curServerId then skynet.server.serverManage:DoManageCmd( c2sData , s2cData) elseif -1 == curServerId then --全游戏服遍历 for k, v in pairs( self.clusterInfo ) do if v.serverId >= self.gameServerMinID and v.serverId <= self.gameServerMaxID then local callData = self:CallMsgToServer( v.serverId , self.Center2All_ServerManageCmd , c2sData ) if not callData then s2cData.code = errorInfo.ErrorCode.NoExistServer else table.insert( s2cData.data , callData.data ) end end end else --指定服接收命令 local callData = self:CallMsgToServer( curServerId , self.Center2All_ServerManageCmd , c2sData ) if not callData then s2cData.code = errorInfo.ErrorCode.NoExistServer log.info(string.format("集群服serverId %d 不在线", serverId)) else table.insert( s2cData.data , callData.data ) return end end else log.info("错误的请求命令",cmd) s2cData.code = errorInfo.ErrorCode.ErrRequestParam end end --获取指标 function CenterServer:Metrics( c2sData , s2cData) s2cData.data = {} local pre5MinLoginCount = 0 local loginCount = 0 local onlineCount = 0 local offlineCount = 0 for k, v in pairs( self.clusterInfo ) do if v.serverId >= self.gameServerMinID and v.serverId <= self.gameServerMaxID then pre5MinLoginCount = pre5MinLoginCount + v.serverInfo.pre5MinLoginCount loginCount = loginCount + v.serverInfo.loginCount onlineCount = onlineCount + v.serverInfo.playerCount.playing offlineCount = offlineCount +v.serverInfo.playerCount.offline end end s2cData.data.pre5MinLoginCount = pre5MinLoginCount s2cData.data.loginCount = loginCount s2cData.data.onlineCount = onlineCount s2cData.data.offlineCount = offlineCount local listData = {} table.insert(listData , "#TYPE gsserver_http_requeststotal gauge") table.insert(listData , "#HELP gsserver_http_requeststotal Total number of landings") table.insert(listData , string.format("gsserver_http_requeststotal{ label=\"FiveMinLoginCount\" ,method=\"Get\" } %d",pre5MinLoginCount)) table.insert(listData , string.format("gsserver_http_requeststotal{ label=\"LoginCount\" ,method=\"Get\" } %d",loginCount)) local data = "" for k, v in pairs(listData) do data = data .. v data = data .. "\n" end s2cData.data = data end --删除服务器注册信息 function CenterServer:DeleteServerReg( c2sData , s2cData) local deleteServerId = c2sData.serverId if self.clusterInfo[ deleteServerId ] then self.clusterInfo[ deleteServerId ] = nil self:SyncClusterInfo() end end --后台发来的消息 function CenterServer:WebMsg( c2sData , s2cData ) log.info("后台发来的消息 ",skynet.server.common:TableToString(c2sData)) for k, v in pairs( self.clusterInfo ) do if self.Status_Running == v.status and k >= self.gameServerMinID and k <= self.gameServerMaxID then self:SendMsgToServer( k , self.Center2All_WebMsg , c2sData ) end end end --获取所有服务器信息 function CenterServer:GetAllServerInfo() local allServerInfo = {} for k, v in pairs(self.clusterInfo) do allServerInfo[ k ] = self:CallMsgToServer( k , self.C2M_QueryAllServerInfo ) end log.info("查询所有服务器信息 ", skynet.server.common:TableToString(allServerInfo)) return allServerInfo end --同步集群服务器信息 function CenterServer:SyncClusterInfo() local c2sData = {} c2sData.clusterInfo = self.clusterInfo local info = nil for k, v in pairs( self.clusterInfo ) do info = self.clusterInfo[ k ] if self.Status_Running == info.status then self:SendMsgToServer( k , self.Center2All_SyncClusterInfo , c2sData) end end end --保存在线数据 function CenterServer:SaveOnlineData() local data = {} for k, v in pairs( self.clusterInfo ) do table.insert( data , v ) end local sql = string.format(sqlUrl.insertClusterInfoToCenter , json:encode( data )) skynet.server.db:Query( "center" , sql ) log.info("保存在线数据成功",sql) end skynet.server.centerServer = CenterServer return CenterServer