вторник, июля 03, 2007

Барьер

Когда вы пишите код, предназначенный для параллельного исполнения (многопоточный код), часто возникают ситуации, когда необходимо синхронизировать в определенной точке исполнение кода в параллельных потоках.
Например, вот такой код:


static void Main(string[] args)
{
// число параллельных потоков
int threadCount = 5;
//запускаем потоки
for (int i = 0; i < threadCount; ++i)
{
Thread t = new Thread(new ThreadStart(ThreadRun));
t.Name = "Thread " + i.ToString();
t.Start();
Console.WriteLine("{0} started", t.Name);
}
Console.WriteLine("All off {0} threads have been finished", threadCount);
Console.WriteLine("End of Main()");
Console.ReadLine();
}

// код для выполнения в потоке
static void ThreadRun()
{
for (int i = 0; i < 10; ++i)
Thread.Sleep(1); // do something
Console.WriteLine("{0} has finished.", Thread.CurrentThread.Name);
}


Выдаст нам, что-то в этом роде:


Thread 0 started
Thread 0 has finished.
Thread 1 started
Thread 2 started
Thread 3 started
Thread 4 started
All of 5 threads have been finished
End of Main()
Thread 1 has finished.
Thread 2 has finished.
Thread 3 has finished.
Thread 4 has finished.


Естественно, нам хотелось, чтоб сообщение “All of 5 threads have been finished” завершало листинг. Для этого нам надо дождаться окончания выполнения всех 5 запущенных потоков. Это и есть задача по синхронизации параллельного исполнения.

Нам надо приостановить основной поток, в котором исполняется метод Main(), дождаться завершения ThreadRun() во всех пяти потоках и затем продолжить исполнение в Main(). В .Net Framework для этих целей служат синхронизационные примитивы, унаследованные от System.Threading.WaitHandle: AutoResetEvent, ManualResetEvent, Mutex. Все они представляют собой managed обертки над различными объектами синхронизации операционной системы. Стоящую перед нами задачу можно решить при помощи ManualResetEvent и статического поля для подсчета числа завершившихся потоков, но лучше всего упрятать детали в специальный класс – Barrier


public class Barrier
{
ManualResetEvent _event;
int _count;
volatile int _current;

/// <summary>
/// Конструктор
/// </summary>
/// <param name="count">"высота" барьера</param>
public Barrier(int count)
{
_event = new ManualResetEvent(false);
_count = count;
_current = 0;
}

/// <summary>
/// Синхронизирующий метод ожидания барьера
/// </summary>
public void Await()
{
if (_current == _count || Interlocked.Increment(ref _current) == _count)
{
_event.Set();
Interlocked.Decrement(ref _current);
}
else
{
_event.WaitOne();
if (Interlocked.Decrement(ref _current) == 0) _event.Reset();
}
}
}


Barrier очень простой класс. Его предназначение, приостанавливать исполняющиеся потоки до тех пор, пока число ожидающих не превысит определенного порога. Величина этого порога (высота барьера) задается в конструкторе. А постановку потока в ожидание осуществляет вызов метода Await().
Использовать его тоже очень просто. В код примера добавляем всего четыре строки (выделены жирным):


static Barrier barrier;
static void Main(string[] args)
{
// число параллельных потоков
int threadCount = 5;
barrier = new Barrier(threadCount+1);
//запускаем потоки
for (int i = 0; i < threadCount; ++i)
{
Thread t = new Thread(new ThreadStart(ThreadRun));
t.Name = "Thread " + i.ToString();
t.Start();
Console.WriteLine("{0} started", t.Name);
}
barrier.Await();// ожидаем преодоления барьера
Console.WriteLine("All of {0} threads have been finished", threadCount);
Console.WriteLine("End of Main()");
Console.ReadLine();
}
// код для выполнения в потоке
static void ThreadRun()
{
for (int i = 0; i < 10; ++i) Thread.Sleep(1); // do something
Console.WriteLine("{0} has finished.", Thread.CurrentThread.Name);
barrier.Await(); // ожидаем преодоления барьера
}


Во-первых, нам понадобится экземпляр Barrier. В конструкторе мы инициализируем его числом threadCount+1 (число запускаемых потоков + 1 главный поток). Далее расставляем вызовы barrier.Await() в тех точках кода, где необходима синхронизация: в конце потокового метода ThreadRun() и в главном методе Main() после запуска всех потоков. Поскольку в конструкторе мы задали «высоту» барьера в 6, то первые пять потоков достигшие вызова метода Await() будут приостановлены, а шестой вызов этого метода разблокирует все ожидающие потоки. Результат налицо:


Thread 0 started
Thread 0 has finished.
Thread 1 started
Thread 2 started
Thread 3 started
Thread 4 started
Thread 1 has finished.
Thread 2 has finished.
Thread 3 has finished.
Thread 4 has finished.
All of 5 threads have been finished
End of Main()


Внутри метода Barrier.Await() увеличивается значение внутреннего счетчика _current и проверяется, что оно не превысило порога _count. В этом случае поток попадает на блокировку _event.WaitOne(). Иначе, блокировка ManualResetEvent сбрасывается и ранее заблокированные потоки освобождаются. Однако код класса Barrier.Await() выглядит, на первый взгляд, довольно странно. Вместо простой проверки if (_current < _count) мы используем странную конструкцию if(_current == _count || Interlocked.Increment(ref _current) == _count). Это сделано для того, чтобы избежать неприятной ситуации, известной под именем “race conditions”. Дело в том, что от момента начала проверки условия до выполнения обусловленного действия (в нашем случае это _event.Set() или _event.WaitOne()), проходит определенное время. В течении этого времени, другой поток может начать исполнять этот же код, и принять не верное решение в операторе проверки (например, оба потока отправятся выполнять _event.WaitOne() и мы получим классический deadlock). Именно поэтому счетчик _current объявлен volatile, а все изменения его значения выполняются при помощи класса Interlocked, который предоставляет нам набор атомарных операций для работы с данными, разделяемыми между потоками. Дополнительная проверка _current == _count введена тоже во избежание “race conditions”.

Почему в методе Barrier.Await не используется обычная блокировка lock(object){….}? Ответ очевиден. Наложение такой блокировки на весь код метода Await() неизбежно приводит в deadlock, а использование lock() только на этапе проверки условия никак не спасает нас от “race conditions”.

Ну и напоследок ложка дегтя. Дело в том, что приведенный в примере класс Barrier не является реентерабельным. Т.е. его экземпляр можно использовать только один раз, несмотря на то, что в Await() есть код, который сбрасывает значение внутреннего счетчика до нуля после преодоления барьера. Для того чтобы сделать наш Barrier реентерабельным и избежать “race conditions”, нам пришлось бы использовать два внутренних ManualResetEvent, и это существенно усложнило бы код.



6 комментариев:

Bacek комментирует...

Чудовищно...

Collection<Thread> threads;

Thread t = new Thread()
...

threads.Add(t);

foreach(Thread thread in threads)
thread.Join();

Те же 4 строчки, без таких вот суровых перверсий.

Sergey Rozovik комментирует...

to Bacek
>Чудовищно...
Ok. А если точка синхронизации находится не в конце епотокового метода, а где-то посередине? Join() позволяет дождаться окончания работы потока, за что ему большое спасибо. Барьер предназначен для другого.

Bacek комментирует...

Не, идею с барьерами я в принципе понимаю... Вот только за всю мою жизнь потребность в них у меня ни разу не возникла. Неуловимый Джо в чистом виде...

Kirill комментирует...

imho, это не очень удачный пример использования барьера: сообщение говорит о том, что все треды уже завершились, тогда как это может быть неверно. Вариант с Join для поставленной задачи выглядит элегантнее.

Анонимный комментирует...

"Почему в методе Barrier.Await не используется обычная блокировка lock(object){….}? Ответ очевиден. Наложение такой блокировки на весь код метода Await() неизбежно приводит в deadlock, а использование lock() только на этапе проверки условия никак не спасает нас от “race conditions”."

Мне кажется Double If решил бы эту проблему элегантнее

Sergey Rozovik комментирует...

to Анонимный
Блокировка с двойной проверкой пожалуй может справиться с проблемой "гонок" в нашем случае. Но элегантным я такое решение назвать не могу. Хотя бы потому, что предложенное решение вообще не использует никаких блокировок.

to Kirill
Согласен, пример предельно прост, и к тому же имеет альтернативное решение через Join(). Но я сознательно старался придумать самый простой и привычный пример. Это облегчает понимание всего остального, что важнее.