Skip to content

Commit

Permalink
Add Latch exercise, remove old exercises 2 and 3
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukigaru committed May 11, 2024
1 parent b980fa4 commit 63397dc
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 342 deletions.
83 changes: 65 additions & 18 deletions src/task-2/main.cpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,81 @@
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <atomic>
#include "tests.h"

// ThreadFlag позволяет нескольким потокам ждать, пока другой поток не установит флаг на старт (set_flag).
// Флаг устанавливается один раз и навсегда. Если флаг уже установлен к моменту вызова wait(), тогда функция завершается сразу.
class ThreadFlag {
public:
void wait() {
// TODO
}

void set_flag() {
// TODO
}

// Пример lost wakeup: второй поток вероятнее всего не проснётся, несмотря на то, что был вызван notify_one.
// Нужно исправить ошибку.
private:
std::mutex _m;
std::condition_variable _cv;
bool _flag{};
};

std::condition_variable cv;
std::mutex m;
/*
* Тесты
*/
void test_set_flag_before_wait() {
ThreadFlag flag;
flag.set_flag(); // Ставим флаг еще до ожидания

void another_thread_func() {
std::cout << "another: waiting..." << std::endl;
std::thread test_thread([&]() {
flag.wait(); // Не должно быть заблокировано
});

std::unique_lock l{m};
cv.wait(l);
test_thread.join();

std::cout << "another: got the signal and resumed" << std::endl;
PASS();
}

void main_thread_func() {
std::cout << "main: signaling the other thread to resume" << std::endl;
void test_wait_then_set_flag() {
ThreadFlag flag;
std::atomic_int waits_passed{0};

cv.notify_one();
static constexpr auto NumThreads = 8;

// Стартуем несколько потоков, которые будут ждать флага
std::vector<std::thread> test_threads;
for (auto i = 0; i < NumThreads; i++) {
test_threads.emplace_back(std::thread{[&]() {
flag.wait(); // Заблокируется
waits_passed++;
}});
test_threads.back().detach();
}

std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT(waits_passed == 0); // Проверка, что потоки были и всё еще в ожидании

flag.set_flag(); // Ставим флаг и разблокируем потоки

std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT(waits_passed == NumThreads); // Проверяем счетчик

// Не завершается этот тест? Используешь ли ты notify_all, вместо notify_one?
PASS();
}

int main() {
std::thread t1{another_thread_func};
std::thread t2{main_thread_func};
try {
test_set_flag_before_wait();
test_wait_then_set_flag();

} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}

t1.join();
t2.join();
return 0;
}
92 changes: 68 additions & 24 deletions src/task-3/main.cpp
Original file line number Diff line number Diff line change
@@ -1,45 +1,89 @@
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <atomic>
#include "tests.h"

// Пример race condition: переменная resume меняется не под мьютексом.
// Нужно исправить ошибку.
// Latch - примитив синхронизации, позволяющий набору потоков подождать друг друга и стартовать вместе.
// Защёлка инициализируется со счётчиком, равным количеству ожидаемых потоков.
// Потоки блокируются в arrive_and_wait, уменьшая при этом счётчик, пока он не обнулится,
// после чего все потоки разблокируются.
class Latch {
public:
Latch(int64_t threads_expected): _counter(threads_expected) {
}

void arrive_and_wait() {
std::unique_lock l{m};
// ...
}
private:
std::condition_variable cv;
std::mutex m;
int64_t _counter;
};

bool resume{false};
std::condition_variable cv;
std::mutex m;
/*
* Тесты
*/
void test_latch_synchronizes_threads() {
constexpr auto num_threads = 16;

void first_thread_func() {
std::cout << "thread 1: waiting..." << std::endl;
Latch latch{num_threads};
std::atomic<int> counter{0};

std::unique_lock l{m};
auto worker = [&]() {
counter.fetch_add(1);
latch.arrive_and_wait();
EXPECT(counter.load(std::memory_order_relaxed) == num_threads);
};

while (!resume) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
cv.wait(l);
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(worker);
}

std::cout << "thread 1: got the signal and resumed" << std::endl;
for (auto& thread : threads) {
thread.join();
}
PASS();
}

void second_thread_func() {
std::cout << "thread 2: signaling the other thread to resume" << std::endl;
void test_latch_doesnt_reset() {
Latch latch{2};
bool passed = false;

auto func = [&]() {
latch.arrive_and_wait();
passed = true;
};
std::thread t1{func};
std::thread t2{func};
t1.join();
t2.join();

resume = true;
cv.notify_one();
// не должно заблокироваться
std::thread t3{[&]() {
latch.arrive_and_wait();
}};
t3.join();
PASS();
}

int main() {
std::thread t1(first_thread_func);
std::thread t2(second_thread_func);
try {
test_latch_synchronizes_threads();
test_latch_doesnt_reset();

} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}

t1.join();
t2.join();
return 0;
}
/*
* Усложнение:
* - почему использование std::atomic_bool вместо bool не поможет?
* - изучите и попробуйте std::condition_variable_any вместе с std::shared_lock
* - реализовать многоразовый Barrier с тем же методом arrive_and_wait
* */
135 changes: 96 additions & 39 deletions src/task-4/main.cpp
Original file line number Diff line number Diff line change
@@ -1,83 +1,140 @@
#include <condition_variable>
#include <iostream>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <algorithm>
#include <thread>
#include <iostream>
#include <chrono>
#include <vector>
#include <atomic>
#include "tests.h"

// ThreadFlag позволяет нескольким потокам ждать, пока другой поток не установит флаг на старт (set_flag).
// Флаг устанавливается один раз и навсегда. Если флаг уже установлен к моменту вызова wait(), тогда функция завершается сразу.

class ThreadFlag {
// Требования к очереди:
// - first-in-first-out очередь
// - thread-safe
// - pop блокируется, если в очереди нет элементов, и разблокируется как только, как появляется хотя бы один элемент

template <typename T>
class ConcurrentFIFOQueue {
public:
void wait() {
void push(const T &val) {
// TODO
}

void set_flag() {
T pop() {
// TODO
return T{};
}

private:
std::mutex _m;
std::condition_variable _cv;
bool _flag{};
std::condition_variable _not_empty_cv;
std::deque<T> _queue;
};


/*
* Тесты
*/
void test_set_flag_before_wait() {
ThreadFlag flag;
flag.set_flag(); // Ставим флаг еще до ожидания
void test_multiple_push_pop() {
ConcurrentFIFOQueue<int> queue;

std::thread test_thread([&]() {
flag.wait(); // Не должно быть заблокировано
});
queue.push(1);
queue.push(2);
queue.push(3);

test_thread.join();
EXPECT(queue.pop() == 1);
EXPECT(queue.pop() == 2);
EXPECT(queue.pop() == 3);

PASS();
}

void test_wait_then_set_flag() {
ThreadFlag flag;
std::atomic_int waits_passed{0};
void test_pop_wait() {
ConcurrentFIFOQueue<int> queue;
std::atomic<bool> item_popped{false};

static constexpr auto NumThreads = 8;
std::thread consumer{[&]() {
queue.pop();
item_popped.store(true);
}};

std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT(item_popped.load() == false);

queue.push(1);

consumer.join();

EXPECT(item_popped.load() == true);

PASS();
}

// Стартуем несколько потоков, которые будут ждать флага
std::vector<std::thread> test_threads;
for (auto i = 0; i < NumThreads; i++) {
test_threads.emplace_back(std::thread{[&]() {
flag.wait(); // Заблокируется
waits_passed++;
}});
test_threads.back().detach();
void test_multiple_threads() {
constexpr auto NumThreads = 4;
constexpr auto N = 100; // каждый producer поток производит N чисел

ConcurrentFIFOQueue<int> queue;

std::vector<int> consumed;
std::mutex consumed_mutex;

auto producer_func = [&](int thread_id) {
for (int i = 0; i < N; ++i) {
int num = thread_id * N + i;
queue.push(num);
}
};

auto consumer_func = [&]() {
for (int i = 0; i < N; ++i) {
int num = queue.pop();

std::lock_guard<std::mutex> lock(consumed_mutex);
consumed.push_back(num);
}
};

std::vector<std::thread> threads;

for (int i = 0; i < NumThreads; ++i) {
threads.emplace_back(producer_func, i);
threads.emplace_back(consumer_func);
}

std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT(waits_passed == 0); // Проверка, что потоки были и всё еще в ожидании
for (auto& t : threads) {
t.join();
}

EXPECT(consumed.size() == N * NumThreads);

flag.set_flag(); // Ставим флаг и разблокируем потоки
std::sort(std::begin(consumed), std::end(consumed));

std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT(waits_passed == NumThreads); // Проверяем счетчик
for (int i = 1; i < N; ++i) {
EXPECT(consumed[i] == consumed[i-1] + 1);
}

// Не завершается этот тест? Используешь ли ты notify_all, вместо notify_one?
PASS();
}

int main() {
try {
test_set_flag_before_wait();
test_wait_then_set_flag();
test_multiple_push_pop();
test_pop_wait();
test_multiple_threads();

} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}

return 0;
}

/*
* Усложнение:
* - вопрос: в push должен быть notify_one или notify_all? в чем разница?
*
* - добавьте ConcurrentFIFOQueue::push(T &&) метод, который перемещает объект в контейнер, а не копирует
*
* - добавьте ConcurrentFIFOQueue::emplace метод, конструирующий объект in-place
*/
Loading

0 comments on commit 63397dc

Please sign in to comment.