Tag: Producer-Consumer

  • Windows Multithreading: A Simple Producer-Consumer Solution

    Example Problem: A Simple Producer-Consumer Solution

    In this example, we’ll implement a simple producer-consumer pattern where multiple producer threads add items to a buffer, and multiple consumer threads remove items. We’ll use a semaphore to control the number of items in the buffer.

    Solution Overview

    1. Semaphore: Used to track the number of items in the buffer.
    2. Mutex: Ensures mutual exclusion when accessing the shared buffer.

    Explanation

    • CreateSemaphore initializes a semaphore with an initial count of 0 and a maximum count equal to MAX_BUFFER_SIZE.
    • WaitForSingleObject blocks the thread until the semaphore’s count is greater than zero, indicating an item is available for the consumer, or there is space in the buffer for the producer.
    • ReleaseSemaphore increases the count of the semaphore, signaling that an item has been produced.
    • CreateMutex and ReleaseMutex provide mutual exclusion to protect the shared buffer during updates.

    Adapting for Other Synchronization Problems

    The general approach and primitives shown here can be adapted to solve classical synchronization problems like Readers-Writers, Dining Philosophers, and Barbershop, as outlined in The Little Book of Semaphores. These problems involve similar patterns of waiting, signaling, and mutual exclusion, which the Windows API provides through these basic synchronisation objects.

    C++ Implementation with Windows API

    #include <iostream>
    #include <windows.h>
    #include <thread>
    #include <vector>
    
    const int MAX_BUFFER_SIZE = 10; // Maximum number of items the buffer can hold
    int buffer = 0; // Shared buffer (could be expanded to a list for more complex use cases)
    
    // Semaphore to count items in the buffer
    HANDLE itemsSemaphore;
    // Mutex to protect access to the buffer
    HANDLE bufferMutex;
    
    void producer(int id) {
        while (true) {
            // Produce an item (simulated by increasing the buffer count)
            Sleep(1000); // Simulate production time
    
            // Wait for space in the buffer
            WaitForSingleObject(bufferMutex, INFINITE);
            if (buffer < MAX_BUFFER_SIZE) {
                buffer++;
                std::cout << "Producer " << id << " produced an item. Buffer: " << buffer << std::endl;
                ReleaseSemaphore(itemsSemaphore, 1, NULL); // Signal that a new item is available
            }
            ReleaseMutex(bufferMutex);
    
            Sleep(1000); // Simulate idle time
        }
    }
    
    void consumer(int id) {
        while (true) {
            // Wait for an item to be available
            WaitForSingleObject(itemsSemaphore, INFINITE);
    
            // Consume an item (decrease buffer count)
            WaitForSingleObject(bufferMutex, INFINITE);
            if (buffer > 0) {
                buffer--;
                std::cout << "Consumer " << id << " consumed an item. Buffer: " << buffer << std::endl;
            }
            ReleaseMutex(bufferMutex);
    
            Sleep(1500); // Simulate consumption time
        }
    }
    
    int main() {
        // Initialize the semaphore with 0 items initially available
        itemsSemaphore = CreateSemaphore(NULL, 0, MAX_BUFFER_SIZE, NULL);
        bufferMutex = CreateMutex(NULL, FALSE, NULL);
    
        if (itemsSemaphore == NULL || bufferMutex == NULL) {
            std::cerr << "Failed to create semaphore or mutex." << std::endl;
            return 1;
        }
    
        // Create producer and consumer threads
        std::vector<std::thread> producers, consumers;
        for (int i = 0; i < 3; ++i) {
            producers.emplace_back(producer, i + 1);
        }
        for (int i = 0; i < 2; ++i) {
            consumers.emplace_back(consumer, i + 1);
        }
    
        // Join threads (in this example, threads run indefinitely)
        for (auto& p : producers) {
            p.join();
        }
        for (auto& c : consumers) {
            c.join();
        }
    
        // Cleanup
        CloseHandle(itemsSemaphore);
        CloseHandle(bufferMutex);
    
        return 0;
    }
    
  • Windows MultiThreading : Producer-Consumer with a Bounded Buffer

    Producer-Consumer with a Bounded Buffer

    A variation of the producer-consumer problem involves multiple producers and consumers sharing a finite-sized buffer. The challenge is to prevent producers from adding when the buffer is full and to prevent consumers from removing when the buffer is empty.

    Explanation

    • emptySlots semaphore keeps track of available slots in the buffer.
    • fullSlots semaphore tracks items in the buffer.
    • mutex ensures mutual exclusion when accessing the buffer.

    These implementations demonstrate the flexibility and power of semaphores, mutexes, and synchronization patterns in solving classical concurrency problems. Let me know if you’d like further explanations or additional examples!

    Implementation

    #include <iostream>
    #include <windows.h>
    #include <thread>
    #include <vector>
    #include <queue>
    
    const int BUFFER_SIZE = 5;
    std::queue<int> buffer;
    
    // Semaphores
    HANDLE emptySlots;
    HANDLE fullSlots;
    HANDLE mutex;
    
    void producer(int id) {
        int item = 0;
        while (true) {
            Sleep(1000); // Simulate production time
            WaitForSingleObject(emptySlots, INFINITE);
            WaitForSingleObject(mutex, INFINITE);
    
            buffer.push(++item);
            std::cout << "Producer " << id << " produced item " << item << ". Buffer size: " << buffer.size() << "\n";
    
            ReleaseMutex(mutex);
            ReleaseSemaphore(fullSlots, 1, NULL);
        }
    }
    
    void consumer(int id) {
        while (true) {
            WaitForSingleObject(fullSlots, INFINITE);
            WaitForSingleObject(mutex, INFINITE);
    
            int item = buffer.front();
            buffer.pop();
            std::cout << "Consumer " << id << " consumed item " << item << ". Buffer size: " << buffer.size() << "\n";
    
            ReleaseMutex(mutex);
            ReleaseSemaphore(emptySlots, 1, NULL);
            Sleep(1500); // Simulate consumption time
        }
    }
    
    int main() {
        emptySlots = CreateSemaphore(NULL, BUFFER_SIZE, BUFFER_SIZE, NULL);
        fullSlots = CreateSemaphore(NULL, 0, BUFFER_SIZE, NULL);
        mutex = CreateMutex(NULL, FALSE, NULL);
    
        if (emptySlots == NULL || fullSlots == NULL || mutex == NULL) {
            std::cerr << "Failed to create semaphores or mutex.\n";
            return 1;
        }
    
        std::vector<std::thread> producers, consumers;
        for (int i = 0; i < 3; ++i) {
            producers.emplace_back(producer, i + 1);
        }
        for (int i = 0; i < 2; ++i) {
            consumers.emplace_back(consumer, i + 1);
        }
    
        for (auto& p : producers) {
            p.join();
        }
        for (auto& c : consumers) {
            c.join();
        }
    
        CloseHandle(emptySlots);
        CloseHandle(fullSlots);
        CloseHandle(mutex);
    
        return 0;
    }