Skip to content

Commit 7adb929

Browse files
authored
coro::semaphore uses coro::mutex internally (#389)
There has been a semaphore test using a thread_pool that constantly hangs and causes test failures. After reviewing the code this is likely due to how the m_counter on the semaphore is handled. To remove the ambiguity of incrementing and decrementing in a coroutine this class now uses a coro::mutex to correctly acquire and release without going over or under 0/max_value bounds. Consequences of this change: * coro::semaphore::release() is now a task and must be awaited Closes #388
1 parent 6b5ee6c commit 7adb929

File tree

2 files changed

+66
-48
lines changed

2 files changed

+66
-48
lines changed

include/coro/semaphore.hpp

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
#include "coro/detail/awaiter_list.hpp"
44
#include "coro/expected.hpp"
55
#include "coro/export.hpp"
6+
#include "coro/mutex.hpp"
67

78
#include <atomic>
89
#include <coroutine>
9-
#include <mutex>
1010
#include <string>
1111

1212
namespace coro
@@ -38,29 +38,33 @@ class acquire_operation
3838
public:
3939
explicit acquire_operation(semaphore<max_value>& s) : m_semaphore(s) { }
4040

41-
auto await_ready() const noexcept -> bool
41+
[[nodiscard]] auto await_ready() const noexcept -> bool
4242
{
43-
if (m_semaphore.m_shutdown.load(std::memory_order::acquire))
43+
// If the semaphore is shutdown or a resources can be acquired without suspending release the lock and resume execution.
44+
if (m_semaphore.m_shutdown.load(std::memory_order::acquire) || m_semaphore.try_acquire())
4445
{
46+
m_semaphore.m_mutex.unlock();
4547
return true;
4648
}
47-
return m_semaphore.try_acquire();
49+
50+
return false;
4851
}
4952

50-
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
53+
auto await_suspend(const std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
5154
{
52-
// Check again now that we've setup the coroutine frame, the state could have changed.
55+
// Check again now that we've set up the coroutine frame, the state could have changed.
5356
if (await_ready())
5457
{
5558
return false;
5659
}
5760

5861
m_awaiting_coroutine = awaiting_coroutine;
5962
detail::awaiter_list_push(m_semaphore.m_acquire_waiters, this);
63+
m_semaphore.m_mutex.unlock();
6064
return true;
6165
}
6266

63-
auto await_resume() const -> semaphore_acquire_result
67+
[[nodiscard]] auto await_resume() const -> semaphore_acquire_result
6468
{
6569
if (m_semaphore.m_shutdown.load(std::memory_order::acquire))
6670
{
@@ -69,9 +73,9 @@ class acquire_operation
6973
return semaphore_acquire_result::acquired;
7074
}
7175

72-
acquire_operation<max_value>* m_next{nullptr};
73-
semaphore<max_value>& m_semaphore;
74-
std::coroutine_handle<> m_awaiting_coroutine;
76+
acquire_operation<max_value>* m_next{nullptr};
77+
semaphore<max_value>& m_semaphore;
78+
std::coroutine_handle<> m_awaiting_coroutine;
7579
};
7680

7781
} // namespace detail
@@ -80,7 +84,7 @@ template<std::ptrdiff_t max_value>
8084
class semaphore
8185
{
8286
public:
83-
explicit semaphore(std::ptrdiff_t starting_value)
87+
explicit semaphore(const std::ptrdiff_t starting_value)
8488
: m_counter(starting_value)
8589
{ }
8690

@@ -92,36 +96,47 @@ class semaphore
9296
auto operator=(const semaphore&) noexcept -> semaphore& = delete;
9397
auto operator=(semaphore&&) noexcept -> semaphore& = delete;
9498

95-
auto release() -> void
99+
/**
100+
* @brief Acquires a resource from the semaphore, if the semaphore has no resources available then
101+
* this will suspend and wait until a resource becomes available.
102+
*/
103+
[[nodiscard]] auto acquire() -> coro::task<semaphore_acquire_result>
104+
{
105+
co_await m_mutex.lock();
106+
co_return co_await detail::acquire_operation<max_value>{*this};
107+
}
108+
109+
/**
110+
* @brief Releases a resources back to the semaphore, if the semaphore is already at value() == max() this does nothing.
111+
* @return
112+
*/
113+
[[nodiscard]] auto release() -> coro::task<void>
96114
{
97-
// If there are any waiters just transfer ownership to the waiter.
115+
co_await m_mutex.lock();
116+
// Do not resume or increment resources past the max_value.
117+
if (value() == max())
118+
{
119+
m_mutex.unlock();
120+
co_return;
121+
}
122+
123+
// If there are any waiters just transfer resource ownership to the waiter.
98124
auto* waiter = detail::awaiter_list_pop(m_acquire_waiters);
99125
if (waiter != nullptr)
100126
{
127+
m_mutex.unlock();
101128
waiter->m_awaiting_coroutine.resume();
102129
}
103130
else
104131
{
105-
// Attempt to increment the counter only up to max_value.
106-
auto current = m_counter.load(std::memory_order::acquire);
107-
do
108-
{
109-
if (current >= max_value)
110-
{
111-
return;
112-
}
113-
} while (!m_counter.compare_exchange_weak(current, current + 1, std::memory_order::acq_rel, std::memory_order::acquire));
132+
// Release the resource.
133+
m_counter.fetch_add(1, std::memory_order::release);
134+
m_mutex.unlock();
114135
}
115136
}
116137

117138
/**
118-
* Acquires a resource from the semaphore, if the semaphore has no resources available then
119-
* this will wait until a resource becomes available.
120-
*/
121-
[[nodiscard]] auto acquire() -> detail::acquire_operation<max_value> { return detail::acquire_operation<max_value>{*this}; }
122-
123-
/**
124-
* Attemtps to acquire a resource if there is any resources available.
139+
* @brief Attempts to acquire a resource if there are any resources available.
125140
* @return True if the acquire operation was able to acquire a resource.
126141
*/
127142
auto try_acquire() -> bool
@@ -144,7 +159,7 @@ class semaphore
144159
[[nodiscard]] static constexpr auto max() noexcept -> std::ptrdiff_t { return max_value; }
145160

146161
/**
147-
* The current number of resources available in this semaphore.
162+
* @return The current number of resources available to acquire for this semaphore.
148163
*/
149164
[[nodiscard]] auto value() const noexcept -> std::ptrdiff_t { return m_counter.load(std::memory_order::acquire); }
150165

@@ -167,14 +182,20 @@ class semaphore
167182
}
168183
}
169184

185+
/**
186+
* @return True if this semaphore has been shutdown.
187+
*/
170188
[[nodiscard]] auto is_shutdown() const -> bool { return m_shutdown.load(std::memory_order::acquire); }
171189

172190
private:
173191
friend class detail::acquire_operation<max_value>;
174192

193+
/// @brief The current number of resources that are available to acquire.
175194
std::atomic<std::ptrdiff_t> m_counter;
176195
/// @brief The current list of awaiters attempting to acquire the semaphore.
177196
std::atomic<detail::acquire_operation<max_value>*> m_acquire_waiters{nullptr};
197+
/// @brief mutex used to do acquire and release operations
198+
coro::mutex m_mutex;
178199
/// @brief Flag to denote that all waiters should be woken up with the shutdown result.
179200
std::atomic<bool> m_shutdown{false};
180201
};

test/test_semaphore.cpp

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ TEST_CASE("semaphore binary", "[semaphore]")
2828
output.emplace_back(1);
2929
std::cerr << "coroutine done with resource, releasing\n";
3030
REQUIRE(s.value() == 0);
31-
s.release();
31+
co_await s.release();
3232

3333
REQUIRE(s.value() == 1);
3434

3535
REQUIRE(s.try_acquire());
36-
s.release();
36+
co_await s.release();
3737

3838
co_return;
3939
};
@@ -43,7 +43,7 @@ TEST_CASE("semaphore binary", "[semaphore]")
4343
REQUIRE(s.value() == 1);
4444
REQUIRE(s.try_acquire());
4545
REQUIRE(s.value() == 0);
46-
s.release();
46+
coro::sync_wait(s.release());
4747
REQUIRE(s.value() == 1);
4848

4949
REQUIRE(output.size() == 1);
@@ -71,7 +71,7 @@ TEST_CASE("semaphore binary many waiters until event", "[semaphore]")
7171
std::cerr << "id = " << id << " semaphore acquired\n";
7272
value.fetch_add(1, std::memory_order::relaxed);
7373
std::cerr << "id = " << id << " semaphore release\n";
74-
s.release();
74+
co_await s.release();
7575
co_return;
7676
};
7777

@@ -83,7 +83,7 @@ TEST_CASE("semaphore binary many waiters until event", "[semaphore]")
8383
std::cerr << "block task acquired semaphore, waiting on event\n";
8484
co_await e;
8585
std::cerr << "block task releasing semaphore\n";
86-
s.release();
86+
co_await s.release();
8787
co_return;
8888
};
8989

@@ -113,11 +113,11 @@ TEST_CASE("semaphore binary many waiters until event", "[semaphore]")
113113
TEST_CASE("semaphore release over max", "[semaphore]")
114114
{
115115
coro::semaphore<2> s{0};
116-
s.release();
116+
coro::sync_wait(s.release());
117117
REQUIRE(s.value() == 1);
118-
s.release();
118+
coro::sync_wait(s.release());
119119
REQUIRE(s.value() == 2);
120-
s.release();
120+
coro::sync_wait(s.release());
121121
REQUIRE(s.value() == 2);
122122
}
123123

@@ -132,10 +132,9 @@ TEST_CASE("semaphore try_acquire", "[semaphore]")
132132
TEST_CASE("semaphore produce consume", "[semaphore]")
133133
{
134134
std::cerr << "BEGIN semaphore produce consume\n";
135-
const std::size_t iterations = 10;
135+
constexpr std::size_t iterations = 10;
136136

137-
// This test is run in the context of a thread pool so the producer task can yield. Otherwise
138-
// the producer will just run wild!
137+
// This test is run in the context of a thread pool so the producer task can yield. Otherwise, the producer will just run wild!
139138
auto tp = coro::thread_pool::make_shared(coro::thread_pool::options{.thread_count = 1});
140139
std::atomic<uint64_t> value{0};
141140
std::vector<coro::task<void>> tasks;
@@ -156,7 +155,6 @@ TEST_CASE("semaphore produce consume", "[semaphore]")
156155
std::cerr << "id = " << id << " semaphore acquired, consuming value\n";
157156

158157
value.fetch_add(1, std::memory_order::release);
159-
// In the ringbfuffer acquire is 'consuming', we never release back into the buffer
160158
}
161159
else
162160
{
@@ -180,7 +178,7 @@ TEST_CASE("semaphore produce consume", "[semaphore]")
180178
co_await tp->yield();
181179

182180
std::cerr << "producer: releasing\n";
183-
s.release();
181+
co_await s.release();
184182
std::cerr << "producer: produced\n";
185183
}
186184

@@ -201,9 +199,9 @@ TEST_CASE("semaphore produce consume", "[semaphore]")
201199
TEST_CASE("semaphore 1 producers and many consumers", "[semaphore]")
202200
{
203201
std::cerr << "BEGIN semaphore 1 producers and many consumers\n";
204-
const std::size_t consumers = 16;
205-
const std::size_t producers = 1;
206-
const std::size_t iterations = 100'000;
202+
constexpr std::size_t consumers = 16;
203+
constexpr std::size_t producers = 1;
204+
constexpr std::size_t iterations = 100'000;
207205

208206
std::atomic<uint64_t> value{0};
209207

@@ -243,8 +241,7 @@ TEST_CASE("semaphore 1 producers and many consumers", "[semaphore]")
243241

244242
for (size_t i = 0; i < iterations; ++i)
245243
{
246-
// std::cerr << "producer " << id << " s.release()\n";
247-
s.release();
244+
co_await s.release();
248245
co_await tp->yield();
249246
}
250247

0 commit comments

Comments
 (0)