Example Problem: A Simple Producer-Consumer Solution
In this example, we’ll implement a simple producer-consumer pattern where multiple producer threads add items to a buffer, and multiple consumer threads remove items. We’ll use a semaphore to control the number of items in the buffer.
Solution Overview
- Semaphore: Used to track the number of items in the buffer.
- Mutex: Ensures mutual exclusion when accessing the shared buffer.
Explanation
- CreateSemaphore initializes a semaphore with an initial count of
0
and a maximum count equal to MAX_BUFFER_SIZE
.
- WaitForSingleObject blocks the thread until the semaphore’s count is greater than zero, indicating an item is available for the consumer, or there is space in the buffer for the producer.
- ReleaseSemaphore increases the count of the semaphore, signaling that an item has been produced.
- CreateMutex and ReleaseMutex provide mutual exclusion to protect the shared buffer during updates.
Adapting for Other Synchronization Problems
The general approach and primitives shown here can be adapted to solve classical synchronization problems like Readers-Writers, Dining Philosophers, and Barbershop, as outlined in The Little Book of Semaphores. These problems involve similar patterns of waiting, signaling, and mutual exclusion, which the Windows API provides through these basic synchronisation objects.
C++ Implementation with Windows API
#include <iostream>
#include <windows.h>
#include <thread>
#include <vector>
const int MAX_BUFFER_SIZE = 10; // Maximum number of items the buffer can hold
int buffer = 0; // Shared buffer (could be expanded to a list for more complex use cases)
// Semaphore to count items in the buffer
HANDLE itemsSemaphore;
// Mutex to protect access to the buffer
HANDLE bufferMutex;
void producer(int id) {
while (true) {
// Produce an item (simulated by increasing the buffer count)
Sleep(1000); // Simulate production time
// Wait for space in the buffer
WaitForSingleObject(bufferMutex, INFINITE);
if (buffer < MAX_BUFFER_SIZE) {
buffer++;
std::cout << "Producer " << id << " produced an item. Buffer: " << buffer << std::endl;
ReleaseSemaphore(itemsSemaphore, 1, NULL); // Signal that a new item is available
}
ReleaseMutex(bufferMutex);
Sleep(1000); // Simulate idle time
}
}
void consumer(int id) {
while (true) {
// Wait for an item to be available
WaitForSingleObject(itemsSemaphore, INFINITE);
// Consume an item (decrease buffer count)
WaitForSingleObject(bufferMutex, INFINITE);
if (buffer > 0) {
buffer--;
std::cout << "Consumer " << id << " consumed an item. Buffer: " << buffer << std::endl;
}
ReleaseMutex(bufferMutex);
Sleep(1500); // Simulate consumption time
}
}
int main() {
// Initialize the semaphore with 0 items initially available
itemsSemaphore = CreateSemaphore(NULL, 0, MAX_BUFFER_SIZE, NULL);
bufferMutex = CreateMutex(NULL, FALSE, NULL);
if (itemsSemaphore == NULL || bufferMutex == NULL) {
std::cerr << "Failed to create semaphore or mutex." << std::endl;
return 1;
}
// Create producer and consumer threads
std::vector<std::thread> producers, consumers;
for (int i = 0; i < 3; ++i) {
producers.emplace_back(producer, i + 1);
}
for (int i = 0; i < 2; ++i) {
consumers.emplace_back(consumer, i + 1);
}
// Join threads (in this example, threads run indefinitely)
for (auto& p : producers) {
p.join();
}
for (auto& c : consumers) {
c.join();
}
// Cleanup
CloseHandle(itemsSemaphore);
CloseHandle(bufferMutex);
return 0;
}