5/14/2008

on the I/O Completion Port Mechanism

Microsoft provides a great OS infrastructure called I/O Completion Port to help developer building high performance server applications. In this article I will try to explain how it works and how to use it.

The main motivation for iocp is to improve i/o performance and the basic ideas behind the design of iocp are:
- async i/o results are posted to a kernel queue
- worker thread fetch and process one result at a time
- leverage resources as much as possible but also avoid context switching as much as possible

Before going further, let's clarify some concepts behind iocp:
1. async i/o - (multiple) i/o requests are posted to device driver asynchronously
2. consumer/producer model - most async model is a typical consumer/producer model: client thread produce i/o request packets, device driver consume them; device driver produce i/o result packets, worker thread consume them
3. queue - queue is a great facility for thread cooperation. two executing threads that works asynchronously can leverage queue to implement a consumer/producer model

So what's iocp and what can it provide?
1. you can regard iocp as a Queue kernel object, with some auxiliary data structure and routines, that is designed for aysnc i/o result processing
2. if you associate an iocp with i/o handle (file/network/namedpipe/mailslot), all subsequent i/o requests' results will be queued to this iocp
3. these queued async i/o results can be fetched from iocp (in so called "worker thread")
4. you can post completion status(faked i/o result) to iocp directly. This can be used to implement application level consume/producer cooperation model
5. iocp can control max # of active associated worker threads to reduce context switching overhead

┌--> kernel device driver ---┐
| |
(async i/0 request) |
| (i/o completion status)
| |
| (direct V
client thread -------------> iocp[queue embedded]
completion |
status) |
(completion status)
|
V
worker threads
     
        [iocp based application architecture]

How the whole stuff works?
1. you issue async i/o to device driver
2. device driver complete i/o request and post the result to the queue in iocp
3. when an i/o result comes, iocp checks current active(ready state) worker thread associated with it , if the # is below the max value specified when creating iocp, and some worker thread is waiting on the queue, one of them(in LIFO manner) is waked up to continue execution
4. worker thread will query iocp when finishing current task(or creation), if the queue is empty or active worker thread is above max setting, it will wait on this queue until iocp wake it up as described in step 3
5. an thread is said to be "associated" with an iocp when it calls GetQueuedCompletionStatus() API
6. the association between iocp and thread ends when:
- thread terminating
- iocp closing
- other associating

How to make use of this great mechanism?
1. create iocp
2. associate it with i/o device handles
3. create worker thread for this iocp
4. issue async i/o requests on those i/o device handles

Let's see a concrete example that leverages iocp to improve performance.
Problem & Requirement
1. You want your computer to do some sum computing works(each one is small, but huge amount of works)
2. You want each core of your processor to do some works

Solution
1. Build some kind of worker threads that can do sum computing
2. Worker threads get work items from Iocp, client send computing requests to Iocp

main.cpp - client of the Iocp Server
 1 #include <windows.h>
 2 #include <stdio.h>
 3 #include "iocpcomm.h"
 4 #include "iocpserver.h"
 5
 6 volatile LONGLONG   g_sum = 0;
 7 volatile LONG g_curWorkCount = 0;
 8
 9 const int WORK_COUNT = 100;
10
11 struct WORK_ITEM : OVERLAPPED
12 {
13     int m_nMin;
14     int m_nMax;
15 };
16
17 // called by worker thread on Iocp
18 DWORD WINAPI WorkItemProccessor(LPVOID param)
19 {
20     OVERLAPPED_ENTRY* ole = (OVERLAPPED_ENTRY *)param;
21     WORK_ITEM* wi = (WORK_ITEM*)ole->lpOverlapped;
22     
23     // compute partical sum
24     __int64 mySum = 0;
25     for (int i = wi->m_nMin; i <= wi->m_nMax; ++i)
26     {
27         mySum += i;
28     }
29
30     // add partical result to global result
31     InterlockedExchangeAdd64(&g_sum, mySum);
32
33     InterlockedExchangeAdd(&g_curWorkCount, -1);
34
35     return 0;
36 }
37
38 int main(int argc, char** argv)
39 {
40     //g_showDetailInfo = TRUE;
41
42     // startup iocp server
43     StartupIocpServer(WorkItemProccessor);
44
45     // create & post some work items
46     g_curWorkCount = WORK_COUNT;
47     WORK_ITEM arraWI[WORK_COUNT];
48     ZeroMemory(arraWI, sizeof(WORK_ITEM) * WORK_COUNT);
49
50     for (int i = 0; i < WORK_COUNT; ++i)
51     {
52         arraWI[i].m_nMax = (i + 1) * 10000 - 1;
53         arraWI[i].m_nMin = i * 10000;
54         QueueWorkToIocpServer(0, 0, arraWI + i);
55     }
56
57     // poll the work progress
58     bool isWorkDone = false;
59     while (!isWorkDone)
60     {
61         Sleep(1000);
62         isWorkDone = (g_curWorkCount == 0);
63     }
64
65     // check final results
66     printf("the final results are [%I64d]\n", g_sum);
67
68     StopIocpServer();
69
70     // check thread load statistics
71     for (std::map<int, int>::const_iterator iter = g_mapThreadLoad.begin(); iter != g_mapThreadLoad.end(); ++iter)

72     {
73         printf("thread [%d] processed %d work items\n", iter->first, iter->second);
74     }
75 }


IocpServer.cpp - iocp server implementation
 1 DWORD WINAPI WorkerThreadProc(LPVOID param)
 2 {
 3     LPTHREAD_START_ROUTINE taskProcessor = (LPTHREAD_START_ROUTINE)param;
 4
 5     OVERLAPPED_ENTRY olEntry = {0};
 6     while (true)
 7     {
 8         // get completion packet on Iocp
 9         if (GetQueuedCompletionStatus(
10                 g_hIocp,
11                 &(olEntry.dwNumberOfBytesTransferred),
12                 &(olEntry.lpCompletionKey),
13                 &(olEntry.lpOverlapped),
14                 INFINITE))
15         {
16             if (olEntry.lpCompletionKey == ckStopCommand)
17             {
18                 g_showDetailInfo ? printf("worker thread [%d] ends gracefully.\n", GetCurrentThreadId()) : void();
19                 break;
20             }
21             else
22             {
23                 g_showDetailInfo ? printf("worker thread [%d] will process one request.\n", GetCurrentThreadId()) : void();
24                 g_mapThreadLoad[GetCurrentThreadId()]++;
25                 taskProcessor((LPVOID)&olEntry);
26             }
27         }
28         else
29         {
30             // report error and exit this thread
31             g_showDetailInfo ? printf("Error when query completion status on Iocp due to: [%d]\n", GetLastError()) : void();
32             break;
33         }
34     }
35
36     return 0;
37 }
38
39 BOOL StartupIocpServer(LPTHREAD_START_ROUTINE taskProcessor)
40 {
41     // Get # of system Processor
42     SYSTEM_INFO sysInfo;
43     GetSystemInfo(&sysInfo);
44     DWORD coreCount = sysInfo.dwNumberOfProcessors;
45
46     // Create Iocp
47     g_hIocp = CreateNewCompletionPort(coreCount);
48     if (g_hIocp == NULL)
49     {
50         printf("Failed to create Iocp due to: [%d]\n", GetLastError());
51         return FALSE;
52     }
53
54     // Startup worker threads
55     DWORD dwThreadId = 0;
56     dwThreadCount = coreCount * 2;
57     g_arrThreads = (HANDLE*) new unsigned char[sizeof(HANDLE) * dwThreadCount];
58     for (DWORD i = 0; i < dwThreadCount; ++i)
59     {
60         HANDLE hThread = CreateThread(NULL, 0, WorkerThreadProc, (LPVOID)taskProcessor, 0, &dwThreadId);
61         if (hThread == NULL)
62         {
63             printf("Failed to create worker thread due to: [%d]\n", GetLastError());
64             
65             // should clean created threads here
66             delete[] g_arrThreads;
67             g_arrThreads = NULL;
68             return FALSE;
69         }
70         else
71         {
72             g_arrThreads[i] = hThread;
73             printf("Thread [%d] created successfully\n", dwThreadId);
74         }
75     }
76
77     return TRUE;
78 }
full source code package: http://code4cs.googlecode.com/files/Iocp4W.zip

Notes:
1. the win32 API - CreateIoCompletionPort() is overloaded on its semantic, I have divide it into two APIs in my code(see IocpComm.h):CreateNewIoCompletionPort(),AssociateIoCompletionPortWithDevice()
2. the OVERLAPPED(or derived struct) object(mem address) created at line47@main.cpp will be retrieved/accessed at the worker thread at line13@IocpServer.cpp. So you must ensure these data structure is not destroyed before the worker threads complete.
3. the g_showDetailInfo var at line40@main.cpp is used to control output information. If turned on, it will introduces i/o operation in worker threads. This will greatly impact the worker threads scheduling. You can try truning on/off to see the different result and explain what happened behind the scene.
4. essentially, Iocp is a system Queue with some thread scheduling improvements.

[Reference]
1. iocp on Windows
2. Inside windows iocp
3. Async I/O & iocp
4. iocp on Solaris
5. Thread Pooling

No comments: