IOCP 서버 코어 개발
IOCP 기반 서버 코어 개발하기
이 문서는 IOCP를 기반으로 한 서버 라이브러리 개발 과정을 담았습니다.
IO Completion Port
IO Completion Port란, Windows OS에서 입출력 완료 통지를 알려주는 방식을 의미합니다. 흔히 소켓 모델의 하나로서 이야기되고는 하지만, 엄밀히 말하면 이는 그저 Overlapped IO에서 입출력 결과를 통지하는 하나의 방법론일 뿐입니다.
소켓 통신 또한 네트워크를 통한 IO이기에, 이에 대한 입출력 완료 통지를 IOCP를 통해 받도록 할 수 있습니다.
Overlapped IO와 비동기
Overlapped IO란, IO가 Overlapped (중첩) 된다는 뜻으로서, 하나의 스레드가 한번에 하나의 IO만을 처리하는 것이 아닌, 여러 개의 IO를 다루도록 하는 기술입니다.
기존 동기식 IO를 사용한다면 IO하나를 요청한 후 완료까지 대기해야 하기에 불가능하지만, 비동기 IO를 사용한다면, 입출력 요청만 보낸 이후 나중에 그 입출력 결과를 받는 형식으로 여러 IO 요청이 가능해집니다.
Overlapped 모델 중 Event 방식의 한계
Overlapped 모델은 크게 세 종류가 존재하며, 이 중 이벤트 신호를 이용한 Event 방식이 존재합니다. 하지만 이는 스레드당 처리할 수 있는 소켓의 개수가 이벤트 대기의 제약 때문에 64개 (정확히는 대기 이벤트가 64개) 이하로 제한됩니다. 이는 기존 Select 모델의 소켓 통신이나 WSAEventSelect 모델의 한계점과도 유사합니다.
Overlapped 모델 중 APC 방식의 한계
APC는 Overlapped 모델에서 완료 통지를 받는 방법 중 하나입니다. APC 큐는 스레드마다 자동적으로 생성되는 APC 큐를 사용하는 방식으로서, 이곳을 통해 입출력 완료 결과를 받고, 지정된 콜백을 해당 결과로 호출하여 동작시키는 방식입니다.
하지만 이는 스레드가 alertable wait 상태로 진입되어야 APC 큐에 대한 콜백 처리가 가능하다는 단점이 존재하며, IO를 요청한 스레드가 해당 결과를 처리해야 한다는 단점 또한 존재합니다.
IOCP 네트워크 라이브러리의 구조 설계하기
개발할 라이브러리는 IOCP를 기반으로 하며, 링버퍼와 직렬화 버퍼 또한 사용하도록 합니다.
세션은 네트워크 라이브러리에서 모든 관리 권한과 책임을 지며, 컨텐츠 단과의 세션 커플링은 세션ID를 통해서만 이루어지도록 합니다.
가상함수를 통해 IOCP 서버를 상속받은 컨텐츠 서버에서 이벤트를 처리할 수 있도록, 다음 함수들을 가상함수로 선언합니다
/**
* \brief 메시지가 수신되었을 때 Call 되는 함수
* \param sessionID 세션 ID
* \param message 수신한 메시지
*/
virtual VOID OnRecvMessage(DWORD64 sessionID, SerializedBuffer *message) = 0;
/**
* \brief 접속을 허용하는지 여부를 결정하는 함수
* \param addressIP 접속을 요청하는 IP
* \param addressPort 접속을 요청하는 PORT
* \param addressString 접속을 요청하는 클라이언트 주소 문자열
* \return 접속 허용여부
*/
virtual BOOL OnSessionConnectionRequest(DWORD addressIP, USHORT addressPort, PCWSTR addressString) = 0;
/**
* \brief 세션이 연결되었을 때 Call 되는 함수
* \param sessionID 세션 ID
* \param addressIP 클라이언트 IP
* \param addressPort 클라이언트 PORT
* \param addressString 접속한 클라이언트 주소 문자열
*/
virtual VOID OnSessionConnected(DWORD64 sessionID, DWORD addressIP, USHORT addressPort, PCWSTR addressString) = 0;
/**
* \brief 세션이 연결이 끊겼을 때 Call 되는 함수
* \param sessionID 세션 ID
*/
virtual VOID OnSessionDisconnected(DWORD64 sessionID) = 0;
/**
* \brief 세션이 timeout 되었을 때 Call 되는 함수
* \param sessionID 세션 ID
*/
virtual VOID OnSessionTimeout(DWORD64 sessionID) = 0;
/**
* \brief 서버 오류가 발생했을 때 Call 되는 함수
* \param exception IOCPServerException
*/
virtual VOID OnException(IOCPServerException exception) = 0;
IOCP와 Session 구조체
IOCP를 통해 비동기로 완료통지를 받는 구조를 안정적으로 구현하기 위해서는, Session 구조체에 여러 안전 장치들이 들어가게 됩니다. 우선, Session 구조체를 살펴보면 다음과 같습니다.
struct Session
{
Session() : sessionID(0), socket(INVALID_SOCKET), socketAddressIP(0), socketAddressPort(0), socketAddressString{0},
TimeoutTime(0), ioCount(0x80000000), ioFlag(0)
{
}
DWORD64 sessionID;
SOCKET socket;
DWORD socketAddressIP;
USHORT socketAddressPort;
WCHAR socketAddressString[SESSION_ADDRESS_WCHAR_LENGTH];
DWORD TimeoutTime;
OVERLAPPED_EXPAND RecvOverlapped;
RingBuffer RecvRingBuffer;
OVERLAPPED_EXPAND SendOverlapped;
RingBuffer SendRingBuffer;
alignas(64) DWORD ioCount;
alignas(64) DWORD ioFlag;
};
이 중 눈여겨볼만한 항목은 ioCount와, ioFlag 두가지입니다.
ioCount : 현재 세션이 유효한지 + IOCP 작업이 몇 개 들어갔는지를 의미합니다
IOCP를 통해 비동기적 완료통지가 오는 구조에서는, WorkerThread가 해당 세션에서 오류가 발생했거나, recv 크기로 0이 왔다고 해서 무작정 세션에 대한 정리를 진행하면 비정상적인 상황이 야기될 수 있습니다. (세션 참조 불가, 재사용된 엉뚱한 세션 참조 등)
따라서, 현재 IOCP를 통해 Recv와 Send가 몇 번 요청되어있는 상태인지를 count로 관리하여서, 요청이 성공적으로 만들어졌을 때에는 count를 증가시키고, 요청이 완료되면 (IOCP 완료통지 시) count를 감소시키는 형태로 구조를 작성했습니다.
세션의 정리는 ioCount를 감소시킨 후, 만일 이 값이 0이라면 진행하도록 하며, ioCount는 다양한 스레드에서 접근하는 변수이므로 Interlocked 함수를 이용해 연산이 이루어져야 합니다.
또한, 세션이 만일 풀로서 관리되거나 배열로 관리되는 경우, 현재 사용중인 유효한 세션인지를 구분하기 위해 ioCount의 맨 앞 비트를 사용합니다.
또한 이는 RemoveSession 도중 해당 세션에 대한 참조나 연산 호출을 막기 위해 세션에 대한 배타적 사용을 보장하도록 합니다.
ioFlag : 현재 송신 작업이 진행중인지를 의미합니다
어떠한 세션에 대한 수신은 한 번씩만 진행되고, 중첩되지 않음이 보장됩니다. 이는 WSARecv 후 다시 WSARecv를 호출하는 구조로 설계되어있기에 보장되는 항목입니다. 다만, WSASend는 조금 더 살펴보아야 할 필요가 있습니다.
게임 서버가 클라이언트로 Send하는 경우는 해당 유저로부터, 혹은 다른 어떤 유저로부터의 액션 (네트워크 패킷) 에 대한 처리 결과로 발생할 수 있습니다. (공격을 했다던지, 주변에서 이동했다던지 등). 이와 별개로, 서버의 로직 처리에 의해 네트워크 IO와 별개로 Send할 데이터가 생기는 경우 또한 존재합니다. (특정 시간이 지나고 행동 결과가 반영되는 경우, 물리 연산 처리 결과 등)
따라서, WSASend는 별개의 관리가 없다면 이곳저곳에서 수시로 호출되고 중첩되어 처리되게 됩니다.
이러한 송신 중첩을 막아 구조적 이점과 안전성 이점을 얻고, 불필요한 WSASend 커널 함수 호출 횟수를 줄이기 위해서, Send는 중첩되지 않고 한번만 호출되는 상태를 유지하기 위해 flag를 두게끔 합니다.
스레드 동기화와 SRW Lock
IOCP를 통해 멀티스레드 기반으로 개발하기 위해서는, 공유자원에 대한 접근에 유의해야 합니다. 동일한 자원에 대한 안정성이 보장되기 위해서는, 동기화 객체를 통한 접근 제어가 이루어져야 합니다.
이러한 접근 제어를 위한 동기화 방법에는 spinlock, Critical Section, Mutex 등 다양한 방법이 존재하지만, SRWLock은 읽기 모드 (공유 모드) 와 쓰기 모드 (독점 모드)를 구분하여, 조금 더 효율적인 동기화가 가능하게끔 합니다.
따라서 단순한 읽기만이 필요할 때는 Shared 모드로 동기화를 걸고, 쓰기가 필요할 때만 Exclusive 모드로 동기화를 걸면 경합 상태가 덜 발생하게 됩니다.
메모리풀을 이용한 성능 개선 및 구조 개선
게임 서버는 동작하는 도중 new 와 free가 일어나지 않도록 하여 안정성을 높이며, 성능 상 이점을 가져오기 위해 메모리풀을 사용합니다. 서버 초기 로드 시에 충분한 만큼의 오브젝트를 미리 준비하거나, new 후 반환되는 오브젝트를 free 하지 않고 freelist로 관리하는 식으로 진행됩니다.
따라서, 개발하는 IOCP 서버 코어에는 오브젝트풀을 세션과 패킷, 패킷과 세션 ID가 결합된 형태인 메시지의 할당과 해제에 사용하게끔 설계되었습니다.
메시지큐 스왑을 통한 성능 개선
IOCP를 통해 들어온 수신 메시지에 따라 알맞은 행위를 처리하기 위해 이를 처리할 스레드에게 큐를 이용해서 메시지를 전달하게 됩니다. 하나의 큐에 인큐하는 생산자, 즉 IOCP Worker Thread가 여럿인 상황이기에, 이에 대해 적절한 상호 배제가 이루어져야 합니다.
생산자가 여럿이고 만일 소비자인 Logic Thread는 Single일 경우, 이를 Dequeue할 때 부하가 생기게 될 것입니다. 이를 해결하기 위해 큐를 thread-safe한 큐를 사용하거나, lockfree queue를 사용할 수 있지만, 이들 모두 atomic, interlock 을 이용하기에 아예 동기화를 신경쓰지 않는 상황보다 성능이 낮을 수밖에 없게 됩니다.
따라서 이는 메시지 큐를 두 개로 분리한다면 해결이 가능합니다. IOCP Worker Thread는 Message Queue에 동기화 객체를 통해 Enqueue하는 것은 동일하지만, 메시지를 처리하는 스레드는 Worker Thread가 Enqueue중인 큐가 아닌, Working Queue를 독점적으로 사용하며 내부에 들어있는 메시지를 동기화 객체 없이 빠르게 처리할 수 있습니다. 이후, Working Queue가 비었다면 WorkerThread의 Message Queue와 Swap하는 식으로 순차적인 메시지 처리가 가능하게 됩니다.
더미를 통한 스트레스 테스트
개발한 프로그램을 검증하는 방법은 무척 중요합니다. 이는 모든 프로그램에 해당되는 당연한 말이지만, 특히 ‘멀티스레드’ 프로그램을 검증하기 위해서는 다양한 방법들 중 더미를 통한 테스트가 예상하기 힘든 멀티스레드만의 경합 상태나 동기화 문제를 검출할 수 있도록 해줍니다.
따라서 만든 네트워크 라이브러리를 에코 더미를 통해 검증되도록 한다면, 더욱 신뢰성 높은 네트워크 라이브러리가 될 수 있습니다. 에코 더미 또한 그저 단순한 에코 부하가 아닌, 여러 개의 세션을 가지고 서버로부터 핑퐁하지 않고도 여러 패킷을 보내보는 식으로 다양한 동작을 하게끔 한다면, 서버의 이상을 잡아낼 확률이 높아집니다.
코드 모음
GetQueuedCompletionStatus : WorkerThread
UINT WINAPI IOCPServer::WorkerThread(PVOID param)
{
DWORD byteTransferred = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED *overlapped = nullptr;
OVERLAPPED_EXPAND *overlappedExpand = nullptr;
DWORD64 sessionID = 0;
Session *session = nullptr;
INT32 gqcsResult = 0;
while (true)
{
// GetQueuedCompletionStatus를 호출하기 전 인자를 초기화합니다
byteTransferred = 0;
completionKey = 0;
overlapped = nullptr;
overlappedExpand = nullptr;
session = nullptr;
// IOCP 완료 통지를 대기합니다
gqcsResult = GetQueuedCompletionStatus(handleIOCP, &byteTransferred, &completionKey, &overlapped, INFINITE);
if (overlapped == nullptr)
{
// 만일 completionKey가 0xffffffff라면 IOCP 종료를 의미합니다
if (completionKey == 0xffffffff)
{
ULONG_PTR finishKey = 0xffffffff;
DWORD finishTransferred = 0;
PostQueuedCompletionStatus(handleIOCP, finishTransferred, finishKey, nullptr);
return 0;
}
if (completionKey == 0)
{
int errorCode = WSAGetLastError();
EXCEPTION(EXCEPTION_IOCP, errorCode);
} else
{
EXCEPTION(EXCEPTION_IOCP);
}
continue;
}
// 만일 overlapped가 0xffffffff라면 세션 종료를 의미합니다
if (overlapped == (LPOVERLAPPED)0xffffffff)
{
OnSessionDisconnected(completionKey);
continue;
}
overlappedExpand = (OVERLAPPED_EXPAND *)overlapped;
// IOCP 완료 통지를 받은 세션을 찾습니다
sessionID = completionKey;
session = FindSession(sessionID);
if (session == nullptr)
{
EXCEPTION(EXCEPTION_SESSION_NOT_FOUND);
continue;
}
// GetQueuedCompletionStatus 함수의 결과가 True이고, 바이트 전송량이 0이 아니라면 성공적으로 IOCP 완료 통지를 받았습니다
if (gqcsResult != 0 && byteTransferred != 0)
{
// IOCP 완료 통지를 받은 작업의 종류에 따라 처리합니다
switch (overlappedExpand->type)
{
case OVERLAPPED_EXPAND::TYPE_RECV:
{
RecvProc(session, byteTransferred);
}
break;
case OVERLAPPED_EXPAND::TYPE_SEND:
{
SendProc(session, byteTransferred);
}
break;
}
}
// 세션의 IO Count를 감소시키고, 만일 IO Count가 0이라면 세션을 종료합니다
if (InterlockedDecrement(&session->ioCount) == 0)
{
RemoveSession(session);
}
}
return -1;
}
GetQueuedCompletionStatus 이후 Recv 처리부
/**
* IOCP를 통해 RECV가 완료됨 메시지를 받으면 호출되는 함수
*/
VOID IOCPServer::RecvProc(Session *session, DWORD byteTransferred)
{
// 네트워크로부터 세션에 byteTransferred만큼 데이터가 수신되었으니,
// 세션의 RecvRingBuffer의 WriteBuffer를 byteTransferred만큼 이동시킵니다
session->RecvRingBuffer.MoveWriteBuffer(byteTransferred);
// 타임아웃 처리를 위해 세션의 TimeoutTime을 현재 시간으로 갱신합니다
session->TimeoutTime = timeGetTime() + serverSettings.sessionTimeout;
// 받은 네트워크 데이터에서 메시지를 가능한 만큼 뽑아내어 처리합니다
while (true)
{
// 패킷 풀에서 패킷을 할당합니다
AcquireSRWLockExclusive(&packetPoolSRW);
SerializedBuffer *serializedBuffer = packetPool->Alloc();
ReleaseSRWLockExclusive(&packetPoolSRW);
// 패킷을 초기화합니다
serializedBuffer->Clear(false);
// 세션의 RecvRingBuffer에서 완성된 메시지를 읽어오기를 시도합니다
NetworkHeader networkHeader;
bool packetCompleted = GetPacketCompleted(session, serializedBuffer, &networkHeader);
if (packetCompleted)
{
InterlockedIncrement(&this->recvMessagePerSecondCounter);
// 메시지 풀에서 메시지를 할당합니다
NetworkMessage *newMessage = nullptr;
AcquireSRWLockExclusive(&messagePoolSRW);
newMessage = messagePool->Alloc();
ReleaseSRWLockExclusive(&messagePoolSRW);
newMessage->sessionID = session->sessionID;
newMessage->packet = serializedBuffer;
// 메시지 큐에 메시지를 넣습니다
messageQueue.Enqueue(newMessage);
} else
{
// 패킷 풀에 패킷을 반환합니다
AcquireSRWLockExclusive(&packetPoolSRW);
packetPool->Free(serializedBuffer);
ReleaseSRWLockExclusive(&packetPoolSRW);
}
// 세션의 RecvRingBuffer에서 완성된 메시지를 읽어오는데 실패했다면 루프를 탈출합니다
if (!packetCompleted) break;
}
// 읽기가 완료되었으니, 다시 WSARecv 호출을 통해 클라이언트의 데이터를 받아옵니다
RecvPost(session);
}
컨텐츠 파트에서 네트워크 파트로 메시지 전송 요청
VOID IOCPServer::SendPacket(DWORD64 sessionID, SerializedBuffer *serializedBuffer)
{
// 세션을 얻어옵니다
Session *session = AcquireSession(sessionID);
if (session == nullptr)
{
return;
}
// 메시지의 앞 부분 헤더를 만들어 채웁니다
serializedBuffer->BuildNetworkHeader();
InterlockedIncrement(&sendMessagePerSecondCounter);
// 세션의 송신 큐를 잠그고 패킷을 삽입합니다
session->SendRingBuffer.LockSRWExclusive();
INT32 enqueuedSize = 0;
BOOL enqueueResult = session->SendRingBuffer.Enqueue((PCHAR)serializedBuffer->GetBufferRead(), serializedBuffer->GetBufferSizeUsed(), &enqueuedSize);
session->SendRingBuffer.UnlockSRWExclusive();
if (!enqueueResult || enqueuedSize != serializedBuffer->GetBufferSizeUsed())
{
EXCEPTION(EXCEPTION_BUFFER_ERROR);
}
// 세션의 WSASend를 시도합니다
SendPost(session);
// 세션을 반환합니다
ReturnSession(session);
}
AcceptThread
/**
* \brief Accept만을 담당하는 스레드
* \param params nullptr
* \return 0 : successful -1 : non-successful
*/
UINT WINAPI IOCPServer::AcceptThread(PVOID param)
{
DWORD64 sessionID = 0;
SOCKET clientSocket;
SOCKADDR_IN clientAddress;
while (true)
{
clientSocket = INVALID_SOCKET;
// 클라이언트의 접속을 대기합니다
// 만일 GetAccept 함수가 false를 반환하면 스레드를 종료합니다
if (!GetAccept(&clientSocket, &clientAddress)) return 0;
// 접속한 소켓의 주소를 가공합니다
WCHAR clientAddressString[SESSION_ADDRESS_WCHAR_LENGTH];
ZeroMemory(clientAddressString, SESSION_ADDRESS_WCHAR_LENGTH * sizeof(WCHAR));
DWORD clientAddressLength = 0;
WSAAddressToStringW(reinterpret_cast<LPSOCKADDR>(&clientAddress), sizeof(clientAddress), 0, clientAddressString, &clientAddressLength);
DWORD clientAddressIP;
WSANtohl(clientSocket, clientAddress.sin_addr.S_un.S_addr, &clientAddressIP);
USHORT clientAddressPort;
WSANtohs(clientSocket, clientAddress.sin_port, &clientAddressPort);
// 접속을 요청하는 소켓의 주소를 알려주어 허용 여부를 판단합니다
if (!OnSessionConnectionRequest(clientAddressIP, clientAddressPort, clientAddressString))
{
closesocket(clientSocket);
continue;
}
// 접속한 클라이언트의 세션을 생성합니다
Session *session = CreateSession(clientSocket, ++sessionNextID, clientAddress);
if (session == nullptr)
{
EXCEPTION(EXCEPTION_SESSION_CREATE);
closesocket(clientSocket);
continue;
}
// 접속한 클라이언트의 세션 생성을 OnSessionConnected로 알립니다
OnSessionConnected(session->sessionID, session->socketAddressIP, session->socketAddressPort, session->socketAddressString);
// 새로운 세션에 WSARecv를 요청합니다
RecvPost(session);
// 세션의 IO Count를 감소시키고, 만일 IO Count가 0이라면 세션을 종료합니다
if (InterlockedDecrement(&session->ioCount) == 0)
{
RemoveSession(session);
}
// 세션 생성 통계를 증가시킵니다
InterlockedIncrement(&sessionAccepted);
InterlockedIncrement(&acceptPerSecondCounter);
}
return -1;
}
'연구한 이야기 > 깊게 공부해보기' 카테고리의 다른 글
동기화객체 성능테스트 (0) | 2023.06.19 |
---|---|
패킷 직렬화버퍼 (0) | 2023.05.02 |
메모리풀과 프리리스트 (0) | 2023.05.02 |
TCP와 링버퍼 (0) | 2023.05.02 |
더 빠른 길찾기 (0) | 2023.05.02 |