HomeServer/lualib-src/Server-main/AllServer/CenterServer/CenterServer.lua

422 lines
16 KiB
Lua
Raw Permalink Normal View History

2024-11-20 15:41:09 +08:00
local skynet = require "skynet"
local oo = require "Class"
local gameCmd = require "GameCmd"
local json =require "json"
local log = require "Log"
local dataType = require "DataType"
local sqlUrl = require "SqlUrl"
local errorInfo = require "ErrorInfo"
local redisKeyUrl = require "RedisKeyUrl"
local serverId = tonumber(skynet.getenv "serverId")
local serverConfigFields = require "ServerConfigFields"
local dbData = require "DBData"
local serverManage = require "ServerManage"
local clusterServer = require "ClusterServer"
local CenterServer = oo.class(clusterServer)
--初始化
function CenterServer:Init()
self.isUpdateClusterInfo = false
self:LoadPlatform()
self:LoadServerConfig()
skynet.server.timer:Add( dataType.Timer_SaveOnlineData , 60 , self["SaveOnlineData"] , self )
end
--跨天
function CenterServer:OnNewDay()
end
--1秒Timer
function CenterServer:On1SecTimer()
end
--5秒Timer
function CenterServer:On5SecTimer()
for k, v in pairs( self.clusterInfo ) do
if v.status == self.Status_Running and skynet.GetTime() - v.pingTime > self.curClusterConfig.ClusterSlavePingTimeOut then
self.clusterInfo[ k ].status = self.Status_PingTimeout
log.info(string.format("集群服务器 服务器ID %d Ping时间超过最大阀值 暂时下线" ,k))
elseif v.status == self.Status_PingTimeout and skynet.GetTime() - v.pingTime <= self.curClusterConfig.ClusterSlavePingTimeOut then
self.clusterInfo[ k ].status = self.Status_Running
log.info(string.format("集群服务器 服务器ID %d 恢复在线" ,k))
end
end
skynet.server.timer:Refresh()
end
--接收集群数据
function CenterServer:ClusterRecv(...)
local cmd , c2sData = ...
local s2sData = {}
s2sData.code = errorInfo.Suc
if self.All_Reg == cmd then --服务器注册
self:RecvReg( c2sData )
elseif self.All_UnReg == cmd then --服务器注销
self:RecvUnReg( c2sData , s2sData )
elseif self.All_Ping == cmd then --PING消息
self:RecvPing( c2sData )
elseif self.All_SyncServerInfoToCenter == cmd then
self:RecvSyncServerInfoToCenter( c2sData )
elseif self.All_ErrorInfo == cmd then
self:RecvErrorInfo( c2sData )
else
log.info(string.format("集群服务器 消息接口 %d 不存在", cmd))
s2sData.code = 2
end
log.info(string.format("集群服 执行命令 %d 返回消息状态 %d " , cmd , s2sData.code))
return s2sData
end
--接收HTTP数据
function CenterServer:HttpRecv( c2sData , url , addr )
local s2cData = {}
s2cData.code = errorInfo.Suc
log.info("消息接口",url)
if "ServerManage" == url then
self:ServerManage( c2sData , s2cData)
elseif "QueryClusterInfo" == url then
self:QueryClusterInfo( c2sData , s2cData)
elseif "SetServerConfig" == url then
self:SetServerConfig( c2sData , s2cData)
elseif "metrics" == url then
self:Metrics( c2sData , s2cData)
elseif "DeleteServerReg" == url then
self:DeleteServerReg( c2sData , s2cData )
elseif "WebMsg" == url then
self:WebMsg( c2sData , s2cData )
else
log.info(string.format("Http服务器 消息接口 %d 不存在", url))
s2cData.code = errorInfo.ErrorCode.NoExistInterface
end
log.info("Http服务器 消息接口 ",url , "返回信息",s2cData.code)
return s2cData
end
--获取注册的服务器信息
function CenterServer:QueryClusterInfo( c2sData , s2cData )
s2cData.clusterInfo = {}
for k, v in pairs( self.clusterInfo ) do
table.insert( s2cData.clusterInfo , v )
end
--s2cData.clusterInfo = self.clusterInfo
end
--设置服务器配置信息
function CenterServer:SetServerConfig( c2sData , s2cData )
if nil == c2sData.serverId or nil == c2sData.key or nil == c2sData.value then
s2cData.code = errorInfo.ErrorCode.ErrRequestParam
log.info(string.format("不存在的请求参数 cmd %s value %s" , c2sData.key , c2sData.value))
return
end
c2sData.serverName = clusterServer:GetServerTypeName( c2sData.serverId )
if not self.allServerConfig[ c2sData.serverName ] then
s2cData.code = errorInfo.ErrorCode.NoExistServer
log.info(string.format("不存在该游戏服 %s value %s" , c2sData.serverId , c2sData.serverName))
return
end
local cfg = self.allServerConfig[ c2sData.serverName ]
if nil ~= cfg[ c2sData.key ] then
log.info(string.format("修改前参数 key %s value %s" , c2sData.key , cfg[ c2sData.key ] ))
cfg[ c2sData.key ] = c2sData.value
log.info(string.format("修改后参数 key %s value %s" , c2sData.key , cfg[ c2sData.key ] ))
else
s2cData.code = errorInfo.ErrorCode.NoExistConfig
log.info(string.format("不存在的请求参数 key %s value %s" , c2sData.key , c2sData.value))
return
end
self.allServerConfig[ c2sData.serverName ] = cfg
--保存到数据库
local sql = string.format(sqlUrl.updateServerConfig , json:encode(cfg) , c2sData.serverName)
skynet.server.db:Query( "center" , sql )
s2cData = self:SyncServerConfig()
end
--同步服务器信息
function CenterServer:SyncServerConfig()
local c2sData = {}
local s2cData = {}
s2cData.code = errorInfo.Suc
s2cData.successful = {}
s2cData.unsuccessful = {}
c2sData.platformInfo = self.platformInfo
c2sData.allServerConfig = self.allServerConfig
for k, v in pairs(self.clusterInfo) do
local callData = self:CallMsgToServer( v.serverId , self.Center2All_SyncServerConfig , c2sData )
if callData then
table.insert( s2cData.successful , { serverName = v.serverName , serverId = v.serverId})
else
table.insert( s2cData.unsuccessful , { serverName = v.serverName , serverId = v.serverId})
end
end
return s2cData
end
--接收注册
function CenterServer:RecvReg( c2sData )
local regServerId = c2sData.serverId
self.clusterInfo[ regServerId ] = {}
self.clusterInfo[ regServerId ].serverId = regServerId
self.clusterInfo[ regServerId ].serverName = c2sData.serverName
self.clusterInfo[ regServerId ].status = self.Status_Running
self.clusterInfo[ regServerId ].pingTime = skynet.GetTime()
self.clusterInfo[ regServerId ].serverInfo = {}
self:SyncClusterInfo()
self:SyncServerConfig()
log.info(string.format("集群服务器 %s 向中心服注册成功", c2sData.serverName))
end
--接收注销
function CenterServer:RecvUnReg( c2sData , s2sData )
local unRegServerId = c2sData.serverId
if self.clusterInfo[ unRegServerId ] then
self.clusterInfo[ unRegServerId ].status = self.Status_Stop
log.info(string.format("集群服务器 %s 向中心服注销成功", c2sData.serverName))
self:SyncClusterInfo()
else
s2sData.code = clusterServer.Error.NoExistServer
log.info(string.format("集群服务器 %s 停止失败,不存在该服", c2sData.serverName))
end
end
--接收Ping
function CenterServer:RecvPing( clusterMsg )
local slaveServerId = clusterMsg.serverId
if self.clusterInfo[ slaveServerId ] then
local t1 = skynet.GetTime() - self.clusterInfo[ slaveServerId ].pingTime --两次PING的时间差值
local t2 = self.curClusterConfig.ClusterSlaveSendPingTime + 5 --正常PING的时间差加5秒
if t1 > t2 then
local errorText = string.format("集群服务器 服务器 %s 正常心跳时间为 %d 目前心跳时间为 %d", slaveServerId ,self.curClusterConfig.ClusterSlaveSendPingTime , t1)
local c2sData = {}
c2sData.serverId = slaveServerId
c2sData.errorCode = errorInfo.ErrorCode.PingTimeOut
c2sData.errorText = errorText
self:RecvErrorInfo( c2sData )
log.info(errorText)
end
self.clusterInfo[ slaveServerId ].pingTime = skynet.GetTime()
end
end
--接收服务器同步消息
function CenterServer:RecvSyncServerInfoToCenter( c2sData )
local syncServerId = c2sData.serverId
if not self.clusterInfo[ syncServerId ] or not syncServerId or not c2sData or not c2sData.serverInfo then
return
end
self.clusterInfo[ syncServerId ].serverInfo = c2sData.serverInfo
self:SyncClusterInfo()
log.info("接收集群服同步信息 ", syncServerId , skynet.server.common:TableToString( c2sData.serverInfo ))
end
--接收服务器错误消息
function CenterServer:RecvErrorInfo( c2sData )
if not c2sData or not c2sData.errorCode or not c2sData.errorText then
return
end
local errorServerId = c2sData.serverId
local errorCode = c2sData.errorCode
local errorText = c2sData.errorText
local sql = string.format(sqlUrl.insertErrorInfoToCenter , errorServerId , errorCode , errorText)
skynet.server.db:Query( "center" , sql )
log.info("接收集群服的报错信息 ", errorServerId , errorCode , errorText )
end
--获取所有平台信息
function CenterServer:LoadPlatform()
local sql = sqlUrl.selectPlatform
local queryData = skynet.server.db:Query("center" , sql )
for k, v in pairs(queryData) do
self.platformInfo[ v.Name ] = {}
self.platformInfo[ v.Name ].status = v.Status
end
end
--获取所有服务器配置
function CenterServer:LoadServerConfig()
local allServerConfig = {}
--所有服的配置处理
local sql = sqlUrl.selectServerConfig
local queryData = skynet.server.db:Query("center" , sql )
for serverName, v1 in pairs(serverConfigFields) do
local isExist = false
local jsonConfig = nil
for k, v2 in pairs(queryData) do
if v2.ServerName == serverName then
isExist = true
jsonConfig = v2.Config
break
end
end
if isExist then
--检查下有不有更新数据
local config = json:decode(jsonConfig)
dbData:CheckNewFields( config , serverConfigFields[ serverName ])
allServerConfig[ serverName ] = config
sql = string.format(sqlUrl.updateServerConfig , json:encode(config) , serverName)
skynet.server.db:Query("center" , sql )
else
--插入最新配置
allServerConfig[ serverName ] = {}
dbData:Traversal( allServerConfig[ serverName ] , v1 )
sql = string.format(sqlUrl.insertServerConfig , serverName , json:encode(allServerConfig[ serverName ]))
local insertData = skynet.server.db:Query("center" , sql )
if 1 ~= insertData.affected_rows then
log.info("插入服务器配置失败" , serverName)
end
end
end
self.allServerConfig = allServerConfig
self.curClusterConfig = self.allServerConfig[ "ClusterServer" ]
end
--管理服务器命令
function CenterServer:ServerManage( c2sData , s2cData)
local cmd = c2sData.cmd
local curServerId = c2sData.serverId
--只有主服能执行的命令
if not skynet.server.clusterServer:IsCenterServer( serverId ) then
s2cData.code = errorInfo.ErrorCode.NoCenterServer
return
end
s2cData.data = {}
if c2sData.cmd and curServerId then --多功能服
if 0 == curServerId then
skynet.server.serverManage:DoManageCmd( c2sData , s2cData)
elseif -1 == curServerId then --全游戏服遍历
for k, v in pairs( self.clusterInfo ) do
if v.serverId >= self.gameServerMinID and v.serverId <= self.gameServerMaxID then
local callData = self:CallMsgToServer( v.serverId , self.Center2All_ServerManageCmd , c2sData )
if not callData then
s2cData.code = errorInfo.ErrorCode.NoExistServer
else
table.insert( s2cData.data , callData.data )
end
end
end
else
--指定服接收命令
local callData = self:CallMsgToServer( curServerId , self.Center2All_ServerManageCmd , c2sData )
if not callData then
s2cData.code = errorInfo.ErrorCode.NoExistServer
log.info(string.format("集群服serverId %d 不在线", serverId))
else
table.insert( s2cData.data , callData.data )
return
end
end
else
log.info("错误的请求命令",cmd)
s2cData.code = errorInfo.ErrorCode.ErrRequestParam
end
end
--获取指标
function CenterServer:Metrics( c2sData , s2cData)
s2cData.data = {}
local pre5MinLoginCount = 0
local loginCount = 0
local onlineCount = 0
local offlineCount = 0
for k, v in pairs( self.clusterInfo ) do
if v.serverId >= self.gameServerMinID and v.serverId <= self.gameServerMaxID then
pre5MinLoginCount = pre5MinLoginCount + v.serverInfo.pre5MinLoginCount
loginCount = loginCount + v.serverInfo.loginCount
onlineCount = onlineCount + v.serverInfo.playerCount.playing
offlineCount = offlineCount +v.serverInfo.playerCount.offline
end
end
s2cData.data.pre5MinLoginCount = pre5MinLoginCount
s2cData.data.loginCount = loginCount
s2cData.data.onlineCount = onlineCount
s2cData.data.offlineCount = offlineCount
local listData = {}
table.insert(listData , "#TYPE gsserver_http_requeststotal gauge")
table.insert(listData , "#HELP gsserver_http_requeststotal Total number of landings")
table.insert(listData , string.format("gsserver_http_requeststotal{ label=\"FiveMinLoginCount\" ,method=\"Get\" } %d",pre5MinLoginCount))
table.insert(listData , string.format("gsserver_http_requeststotal{ label=\"LoginCount\" ,method=\"Get\" } %d",loginCount))
local data = ""
for k, v in pairs(listData) do
data = data .. v
data = data .. "\n"
end
s2cData.data = data
end
--删除服务器注册信息
function CenterServer:DeleteServerReg( c2sData , s2cData)
local deleteServerId = c2sData.serverId
if self.clusterInfo[ deleteServerId ] then
self.clusterInfo[ deleteServerId ] = nil
self:SyncClusterInfo()
end
end
--后台发来的消息
function CenterServer:WebMsg( c2sData , s2cData )
log.info("后台发来的消息 ",skynet.server.common:TableToString(c2sData))
for k, v in pairs( self.clusterInfo ) do
if self.Status_Running == v.status and k >= self.gameServerMinID and k <= self.gameServerMaxID then
self:SendMsgToServer( k , self.Center2All_WebMsg , c2sData )
end
end
end
--获取所有服务器信息
function CenterServer:GetAllServerInfo()
local allServerInfo = {}
for k, v in pairs(self.clusterInfo) do
allServerInfo[ k ] = self:CallMsgToServer( k , self.C2M_QueryAllServerInfo )
end
log.info("查询所有服务器信息 ", skynet.server.common:TableToString(allServerInfo))
return allServerInfo
end
--同步集群服务器信息
function CenterServer:SyncClusterInfo()
local c2sData = {}
c2sData.clusterInfo = self.clusterInfo
local info = nil
for k, v in pairs( self.clusterInfo ) do
info = self.clusterInfo[ k ]
if self.Status_Running == info.status then
self:SendMsgToServer( k , self.Center2All_SyncClusterInfo , c2sData)
end
end
end
--保存在线数据
function CenterServer:SaveOnlineData()
local data = {}
for k, v in pairs( self.clusterInfo ) do
table.insert( data , v )
end
local sql = string.format(sqlUrl.insertClusterInfoToCenter , json:encode( data ))
skynet.server.db:Query( "center" , sql )
log.info("保存在线数据成功",sql)
end
skynet.server.centerServer = CenterServer
return CenterServer