Условная переменная
Условная переменная — примитив синхронизации, обеспечивающий блокирование одного или нескольких потоков до момента поступления сигнала от другого потока о выполнении некоторого условия или до истечения максимального промежутка времени ожидания. Условные переменные используются вместе с ассоциированным мьютексом и являются элементом некоторых видов мониторов.
Обзор
Концептуально, условная переменная — это очередь потоков, ассоциированных с разделяемым объектом данных, которые ожидают выполнения некоторого условия, накладываемого на состояние данных. Таким образом, каждая условная переменная связана с утверждением . Когда поток находится в состоянии ожидания на условной переменной, он не считается владеющим данными и другой поток может изменить разделяемый объект и просигнализировать ожидающим потокам в случае выполнения утверждения .
Примеры использования
Приведенный пример иллюстрирует применение условных переменных для синхронизации потоков производителя и потребителя. Поток-производитель, постепенно увеличивая значение общей переменной, сигнализирует потоку, ожидающему на условной переменной о выполнении утверждения о превышении максимального значения. Ожидающий поток-потребитель, проверяя значение общей переменной, блокируется в случае невыполнения условия превышения максимума. При получении сигнала об истинности утверждения поток «потребляет» разделяемый ресурс, уменьшая значение общей переменной так, чтобы оно не стало меньше допустимого минимума.
POSIX threads
В библиотеке POSIX Threads для языка C за использование условных переменных отвечают функции и структуры данных с префиксом pthread_cond.
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#define STORAGE_MIN 10
#define STORAGE_MAX 20
/* Разделяемый ресурс */
int storage = STORAGE_MIN;
pthread_mutex_t mutex;
pthread_cond_t condition;
/* Функция потока потребителя */
void *consumer(void *args)
{
puts("[CONSUMER] thread started");
int toConsume = 0;
while(1)
{
pthread_mutex_lock(&mutex);
/* Если значение общей переменной меньше максимального,
* то поток входит в состояние ожидания сигнала о достижении
* максимума */
while (storage < STORAGE_MAX)
{
pthread_cond_wait(&condition, &mutex);
}
toConsume = storage-STORAGE_MIN;
printf("[CONSUMER] storage is maximum, consuming %d\n", \
toConsume);
/* "Потребление" допустимого объема из значения общей
* переменной */
storage -= toConsume;
printf("[CONSUMER] storage = %d\n", storage);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
/* Функция потока производителя */
void *producer(void *args)
{
puts("[PRODUCER] thread started");
while (1)
{
usleep(200000);
pthread_mutex_lock(&mutex);
/* Производитель постоянно увеличивает значение общей переменной */
++storage;
printf("[PRODUCER] storage = %d\n", storage);
/* Если значение общей переменной достигло или превысило
* максимум, поток потребитель уведомляется об этом */
if (storage >= STORAGE_MAX)
{
puts("[PRODUCER] storage maximum");
pthread_cond_signal(&condition);
}
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main(int argc, char *argv[])
{
int res = 0;
pthread_t thProducer, thConsumer;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&condition, NULL);
res = pthread_create(&thProducer, NULL, producer, NULL);
if (res != 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
res = pthread_create(&thConsumer, NULL, consumer, NULL);
if (res != 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
pthread_join(thProducer, NULL);
pthread_join(thConsumer, NULL);
return EXIT_SUCCESS;
}
C++
Стандарт C++11 добавил в язык поддержку многопоточности. Работа с условными переменными обеспечивается средствами, объявленными в заголовочном файле condition_variable
#include <cstdlib>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#define STORAGE_MIN 10
#define STORAGE_MAX 20
int storage = STORAGE_MIN;
std::mutex globalMutex;
std::condition_variable condition;
/* Функция потока потребителя */
void consumer()
{
std::cout << "[CONSUMER] thread started" << std::endl;
int toConsume = 0;
while(true)
{
std::unique_lock<std::mutex> lock(globalMutex);
/* Если значение общей переменной меньше максимального,
* то поток входит в состояние ожидания сигнала о достижении
* максимума */
if (storage < STORAGE_MAX)
{
condition.wait(lock , []{return storage >= STORAGE_MAX;} ); // Атомарно _отпускает мьютекс_ и сразу же блокирует поток
toConsume = storage-STORAGE_MIN;
std::cout << "[CONSUMER] storage is maximum, consuming "
<< toConsume << std::endl;
}
/* "Потребление" допустимого объема из значения общей
* переменной */
storage -= toConsume;
std::cout << "[CONSUMER] storage = " << storage << std::endl;
}
}
/* Функция потока производителя */
void producer()
{
std::cout << "[PRODUCER] thread started" << std::endl;
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::unique_lock<std::mutex> lock(globalMutex);
++storage;
std::cout << "[PRODUCER] storage = " << storage << std::endl;
/* Если значение общей переменной достигло или превысило
* максимум, поток потребитель уведомляется об этом */
if (storage >= STORAGE_MAX)
{
std::cout << "[PRODUCER] storage maximum" << std::endl;
condition.notify_one();
}
}
}
int main(int argc, char *argv[])
{
std::thread thProducer(producer);
std::thread thConsumer(consumer);
thProducer.join();
thConsumer.join();
return 0;
}
Qt 4
cw.h
#ifndef CW_H
#define CW_H
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include <QDebug>
#define STORAGE_MIN 10
#define STORAGE_MAX 20
extern int storage;
extern QMutex qmt;
extern QWaitCondition condition;
class Producer : public QThread
{
Q_OBJECT
private:
void run()
{
qDebug() << "[PRODUCER] thread started";
while (1)
{
QThread::msleep(200);
qmt.lock();
++storage;
qDebug() << "[PRODUCER] storage = " << storage;
/* Если значение общей переменной достигло или превысило
* максимум, поток потребитель уведомляется об этом */
if (storage >= STORAGE_MAX)
{
qDebug() << "[PRODUCER] storage maximum";
condition.wakeOne();
}
qmt.unlock();
}
}
};
class Consumer : public QThread
{
Q_OBJECT
private:
void run()
{
qDebug() << "[CONSUMER] thread started";
int toConsume = 0;
while(1)
{
qmt.lock();
/* Если значение общей переменной меньше максимального,
* то поток входит в состояние ожидания сигнала о достижении
* максимума */
if (storage < STORAGE_MAX)
{
condition.wait(&qmt);
toConsume = storage-STORAGE_MIN;
qDebug() << "[CONSUMER] storage is maximum, consuming "
<< toConsume;
}
/* "Потребление" допустимого объема из значения общей
* переменной */
storage -= toConsume;
qDebug() << "[CONSUMER] storage = " << storage;
qmt.unlock();
}
}
};
#endif /* CW_H */
main.cpp
#include <QCoreApplication>
#include "cw.h"
int storage = STORAGE_MIN;
QMutex qmt;
QWaitCondition condition;
int main(int argc, char *argv[])
{
QCoreApplication app(argc, argv);
Producer prod;
Consumer cons;
prod.start();
cons.start();
return app.exec();
}
Python
В языке Python условные переменные реализованы в виде экземпляров класса Condition
модуля threading
. В следующем примере одна и та же условная переменная используется в потоках производителя и потребителя с применением синтаксиса менеджера контекста[1]
# Поток-потребитель
with cond_var: # в контексте условия cond_var
while not an_item_is_available(): # пока элемент недоступен
cond_var.wait() # ждать
get_an_item() # получить элемент
# Поток-производитель
with cond_var: # в контексте условия cond_var
make_an_item_available() # произвести элемент
cond_var.notify() # известить потребителей
Ada '95
В языке Ада нет необходимости в использовании условных переменных. Для организации мониторов с блокированием задач возможно использовать защищенные типы данных.
with Ada.Text_IO;
procedure Main is
task Producer; -- объявление задачи производителя
task Consumer; -- объявление задачи потребителя
type Storage_T is range 10 .. 20; -- тип диапазон для общего ресурса
-- монитор (защищенный объект), разделяемый производителем и потребителем
protected type Storage is
entry Put; -- операция "произвести" единицу ресурса
entry Get; -- операция "потребить" допустимое количество ресурса
entry Value(val : out Storage_T); -- аксессор значения переменной
private
-- скрытая переменная с минимальным начальным значением из диапазона типа
StorageData : Storage_T := Storage_T'First;
end Storage;
-- реализация монитора Storage
protected body Storage is
entry Put when StorageData < Storage_T'Last is
begin
StorageData := StorageData + 1;
if StorageData >= Storage_T'Last then
Ada.Text_IO.Put_Line("[PRODUCER] storage maximum");
end if;
end;
entry Get when StorageData >= Storage_T'Last is
To_Consume : Storage_T;
begin
To_Consume := StorageData - Storage_T'First;
StorageData := StorageData - To_Consume;
Ada.Text_IO.Put_Line("[CONSUMER] consuming");
end Get;
entry Value(val : out Storage_T) when true is
begin
val := StorageData;
end;
end Storage;
-- экземпляр монитора Storage
Storage1 : Storage;
-- реализация задачи производителя
task body Producer is
v : Storage_T;
begin
Ada.Text_IO.Put_Line("[PRODUCER] Task started");
loop
delay 0.2;
Storage1.Put;
Storage1.Value(v);
Ada.Text_IO.Put("[PRODUCER] ");
Ada.Text_IO.Put_Line(v'Img);
end loop;
end Producer;
-- реализация задачи потребителя
task body Consumer is
begin
Ada.Text_IO.Put_Line("[CONSUMER] Task started");
loop
Storage1.Get;
end loop;
end Consumer;
begin
null;
end Main;