X Tutup
/* * Copyright (C) 2012 Yee Young Han (http://blog.naver.com/websearch) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "SipTcp.h" #include "Log.h" #include "MonitorCallBack.h" #include "ServerService.h" #include "ServerUtility.h" #include "MemoryDebug.h" static int giMonitorThreadCount = 0; CSipMutex gclsCountMutex; /** * @ingroup ServerPlatform * @brief ¼­¹ö ¸ð´ÏÅ͸µ ¾²·¹µå¸¦ À§ÇÑ º¯¼ö */ class CMonitorSocket { public: Socket hSocket; std::string m_strIp; int m_iPort; IMonitorCallBack * m_pclsCallBack; }; /** * @ingroup ServerPlatform * @brief ¸ð´ÏÅ͸µ Ŭ¶óÀÌ¾ðÆ®¿¡¼­ ¼ö½ÅÇÑ ¸í·ÉÀ» ó¸®ÇÑ´Ù. * @param pclsArg ¼­¹ö ¸ð´ÏÅ͸µ ¾²·¹µå º¯¼ö * @param pszPacket ¼ö½ÅµÈ ÆÐŶ * @returns ¼º°øÇϸé true ¸¦ ¸®ÅÏÇÏ°í ½ÇÆÐÇϸé false ¸¦ ¸®ÅÏÇÑ´Ù. */ static bool MonitorCommand( CMonitorSocket * pclsArg, const char * pszPacket ) { CMonitorString strBuf; int iPacketLen, n; if( pclsArg->m_pclsCallBack->RecvRequest( pszPacket, strBuf ) == false ) { CLog::Print( LOG_DEBUG, "MonitorThread(%s:%d) command(%s) is not defined", pclsArg->m_strIp.c_str(), pclsArg->m_iPort, pszPacket ); return false; } iPacketLen = htonl( strBuf.GetLength() ); n = TcpSend( pclsArg->hSocket, (char *)&iPacketLen, sizeof(iPacketLen) ); if( n != sizeof(iPacketLen) ) { CLog::Print( LOG_DEBUG, "MonitorThread(%s:%d) send header error(%d)", pclsArg->m_strIp.c_str(), pclsArg->m_iPort, GetError() ); return false; } n = TcpSend( pclsArg->hSocket, strBuf.GetString(), strBuf.GetLength() ); if( n != strBuf.GetLength() ) { CLog::Print( LOG_DEBUG, "MonitorThread(%s:%d) send body error(%d)", pclsArg->m_strIp.c_str(), pclsArg->m_iPort, GetError() ); return false; } return true; } /** * @ingroup ServerPlatform * @brief SipLoadBalancer ÀÇ ³»ºÎ ÀڷᱸÁ¶¸¦ ¸ð´ÏÅ͸µÇϱâ À§ÇÑ ¾²·¹µå ÇÔ¼ö * @param lpParameter Àǹ̾øÀ½ * @returns 0 À» ¸®ÅÏÇÑ´Ù. */ THREAD_API MonitorThread( LPVOID lpParameter ) { CMonitorSocket * pclsArg = ( CMonitorSocket * )lpParameter; char szPacket[1024]; int iPacketLen, n, iNoCommandSecond = 0; pollfd sttPoll[1]; sttPoll[0].fd = pclsArg->hSocket; sttPoll[0].events = POLLIN; sttPoll[0].revents = 0; gclsCountMutex.acquire(); ++giMonitorThreadCount; gclsCountMutex.release(); CLog::Print( LOG_INFO, "MonitorThread(%s:%d) is started", pclsArg->m_strIp.c_str(), pclsArg->m_iPort ); while( gbStop == false ) { n = poll( sttPoll, 1, 1000 ); if( n < 0 ) { CLog::Print( LOG_ERROR, "MonitorThread(%s:%d) poll error(%d)", pclsArg->m_strIp.c_str(), pclsArg->m_iPort, GetError() ); break; } else if( n == 0 ) { ++iNoCommandSecond; if( iNoCommandSecond == 600 ) { CLog::Print( LOG_DEBUG, "MonitorThread(%s:%d) no command received in 600 second", pclsArg->m_strIp.c_str(), pclsArg->m_iPort ); break; } continue; } iNoCommandSecond = 0; // ¸í·É ±æÀ̸¦ ¼ö½ÅÇÑ´Ù. n = recv( pclsArg->hSocket, (char *)&iPacketLen, sizeof(iPacketLen), 0 ); if( n <= 0 ) { CLog::Print( LOG_DEBUG, "MonitorThread(%s:%d) recv header(%d)", pclsArg->m_strIp.c_str(), pclsArg->m_iPort, n ); break; } iPacketLen = ntohl( iPacketLen ); if( iPacketLen <= 0 || iPacketLen >= (int)sizeof(szPacket) ) { CLog::Print( LOG_DEBUG, "MonitorThread(%s:%d) recv command length(%d)", pclsArg->m_strIp.c_str(), pclsArg->m_iPort, iPacketLen ); break; } // ¸í·ÉÀ» ¼ö½ÅÇÑ´Ù. n = TcpRecv( pclsArg->hSocket, szPacket, iPacketLen, 5 ); if( n <= 0 ) { CLog::Print( LOG_DEBUG, "MonitorThread(%s:%d) recv command body(%d)", pclsArg->m_strIp.c_str(), pclsArg->m_iPort, n ); break; } szPacket[n] = '\0'; if( MonitorCommand( pclsArg, szPacket ) == false ) { break; } } closesocket( pclsArg->hSocket ); CLog::Print( LOG_INFO, "MonitorThread(%s:%d) is terminated", pclsArg->m_strIp.c_str(), pclsArg->m_iPort ); delete pclsArg; gclsCountMutex.acquire(); --giMonitorThreadCount; gclsCountMutex.release(); return 0; } /** * @ingroup ServerPlatform * @brief ¸ð´ÏÅ͸µ ¾²·¹µå¸¦ ½ÃÀÛÇÑ´Ù. * @param hSocket ¿¬°áµÈ TCP ¼ÒÄÏ * @param pszIp Ŭ¶óÀÌ¾ðÆ® IP ÁÖ¼Ò * @param iPort Ŭ¶óÀÌ¾ðÆ® Æ÷Æ® ¹øÈ£ * @param pclsCallBack ¸ð´ÏÅ͸µ callback ÀÎÅÍÆäÀ̽º * @returns ¼º°øÇϸé true ¸¦ ¸®ÅÏÇÏ°í ±×·¸Áö ¾ÊÀ¸¸é false ¸¦ ¸®ÅÏÇÑ´Ù. */ bool StartMonitorThread( Socket hSocket, const char * pszIp, int iPort, IMonitorCallBack * pclsCallBack ) { CMonitorSocket * pclsArg = new CMonitorSocket(); if( pclsArg == NULL ) return false; pclsArg->hSocket = hSocket; pclsArg->m_strIp = pszIp; pclsArg->m_iPort = iPort; pclsArg->m_pclsCallBack = pclsCallBack; if( StartThread( "MonitorThread", MonitorThread, pclsArg ) == false ) return false; return true; } /** * @ingroup ServerPlatform * @brief ¸ð´ÏÅ͸µ ¾²·¹µå ½ÇÇà À¯¹«¸¦ °Ë»çÇÑ´Ù. * @returns ¸ð´ÏÅ͸µ ¾²·¹µå°¡ ½ÇÇà ÁßÀ̸é true ¸¦ ¸®ÅÏÇÏ°í ±×·¸Áö ¾ÊÀ¸¸é false ¸¦ ¸®ÅÏÇÑ´Ù. */ bool IsMonitorThreadRun() { if( giMonitorThreadCount > 0 ) return true; return false; }
X Tutup