여러 스레드에 대한 대기열의 작업 예약

여러 스레드에 대한 대기열의 작업 예약

디렉터리 집합(5-300개 파일 사이)의 모든 파일을 처리해야 하는 함수가 있습니다. 사용할 병렬 스레드 수는 사용자가 지정합니다(보통 4개). 아이디어는 4개의 개별 스레드에서 함수를 실행하는 것입니다. 하나의 스레드가 반환되면 다음(5번째) 파일 처리를 시작해야 하며 모든 파일이 완료될 때까지 계속됩니다.

Windows에서는 여기서 도움을 WaitForMultipleObjects()받았습니다 bWaitAll=False. 배열에 채우고 채울 수 있는 구조체가 있습니다.

map<UINT, string>::iterator iter = m_FileList.begin();
string outputPath = GetOutputPath();
void ***threadArgs = (void***)malloc(sizeof(void**)*numThreads);
HANDLE *hdl = (HANDLE*)malloc(sizeof(HANDLE)*numThreads);
DWORD *thr = (DWORD*)malloc(sizeof(DWORD)*numThreads);

for (int t = 0; iter != m_FileList.end() && t < numThreads; t++, iter++)
{
    threadArgs[t] = prepThreadData(t, iter->second, opPath);
    printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
    hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
    if (hdl[t] == NULL)
    {
        err = GetLastError();
        printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
    }
}

for (;iter != m_FileList.end(); iter++)
{
    int t = (int)WaitForMultipleObjects(numThreads, hdl, FALSE, INFINITE);
    if (t == WAIT_FAILED)
    {
        err = GetLastError();
        printf("main: thread failed %x %x\n", t, err);
    }
    if (t - WAIT_OBJECT_0 >= 0 && t - WAIT_OBJECT_0 < numThreads)
    {
        free(threadArgs[t][1]);
        free(threadArgs[t][2]);
        free(threadArgs[t]);
        threadArgs[t] = prepThreadData(t, iter->second, opPath);
        printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
        hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
        if (hdl[t] == NULL)
        {
            err = GetLastError();
            printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
        }
    }
}
if (WAIT_FAILED == WaitForMultipleObjects(numThreads - 1, hdl, TRUE, INFINITE))     
{
    err = GetLastError();
    printf("main: thread failed %x %x\n", err);
}

내 문제는 이제 pthread를 사용하여 유사한 기능을 얻는 것입니다. 내가 생각할 수 있는 가장 좋은 방법은 세마포어를 사용하고 그 중 하나가 사용 가능해지면 threadArgs 배열을 사용하는 대신 새 스레드를 생성하는 것입니다. 각 스레드에 할당된 메모리를 생성하는 포인터를 사용하겠습니다. 또한 메모리 관리를 용이하게 하기 위해 threadArgs[t]에 할당된 메모리는 생성된 스레드가 소유합니다.

더 나은 해결책이 있습니까? 아니면 WaitForMutlipleObjects()pthread와 비슷한 것이 있습니까 ? 좀 더 구체적으로 CreateThread()로 바꾸면 pthread_create()무엇으로 바꿔야 할까요 WaitForMultipleObjects()?

답변1

작업 대기열을 원하는 것 같습니다. 처리해야 하는 파일 모음으로 해당 대기열을 채우고 스레드 간 경합을 방지하기 위해 필요한 잠금을 수행하는 함수를 사용하여 대기열에서 항목을 제거합니다. 그런 다음 원하는 스레드를 시작하십시오. 각 스레드는 대기열에서 항목을 가져와 처리한 후 다음 항목을 가져옵니다. 대기열이 비면 스레드는 추가 입력을 기다리는 것을 차단할 수 있으며, 추가 입력이 없을 경우 종료될 수 있습니다.

간단한 예는 다음과 같습니다.

#include <cstdio>
#include <mutex>
#include <queue>
#include <thread>

template<typename T>
class ThreadSafeQueue {
public:
    void enqueue(const T& element)
    {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_queue.push(element);
    }

    bool dequeue(T& value)
    {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (m_queue.empty()) {
            return false;
        }

        value = m_queue.front();
        m_queue.pop();

        return true;
    }

private:
    std::mutex m_mutex;
    std::queue<T> m_queue;
};

static void threadEntry(const int threadNumber, ThreadSafeQueue<std::string>* const queue)
{
    std::string filename;

    while (queue->dequeue(filename)) {
        printf("Thread %d processing file '%s'\n", threadNumber, filename.c_str());
    }
}

int main()
{
    ThreadSafeQueue<std::string> queue;

    // Populate queue
    for (int i = 0; i < 100000; ++i) {
        queue.enqueue("filename_" + std::to_string(i) + ".txt");
    }

    const size_t NUM_THREADS = 4;

    // Spin up some threads
    std::thread threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; ++i) {
        threads[i] = std::thread(threadEntry, i, &queue);
    }

    // Wait for threads to finish
    for (int i = 0; i < NUM_THREADS; ++i) {
        threads[i].join();
    }

    return 0;
}

엮다:

$ g++ example.cpp -pthread

프로그램은 ThreadSafeQueue여러 스레드가 동시에 액세스할 수 있도록 내부 잠금이 있는 대기열을 정의합니다.

main함수는 먼저 대기열을 채웁니다. 그런 다음 4개의 스레드를 시작합니다. 각 스레드는 대기열에서 값을 읽고 이를 "처리"합니다(여기에서는 메시지를 표준 출력으로 인쇄). 큐가 비어 있으면 스레드가 종료됩니다. 함수 main는 스레드가 반환되기 전에 종료될 때까지 기다립니다.

이 설계에서는 스레드가 시작되기 전에 모든 요소가 대기열에 채워지는 것으로 가정합니다. 일부 변경 사항을 적용하면 스레드가 실행되는 동안 새로운 작업 처리를 지원하도록 확장될 수 있습니다.

관련 정보