Skip to content

Commit 66f394c

Browse files
committed
Add stdexec::associate
This diff defines `stdexec::associate` and adds some initial tests to confirm it works properly. Still a work in progress.
1 parent 91c4158 commit 66f394c

File tree

5 files changed

+341
-1
lines changed

5 files changed

+341
-1
lines changed
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
* Copyright (c) 2025 Ian Petersen
3+
* Copyright (c) 2025 NVIDIA Corporation
4+
*
5+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://llvm.org/LICENSE.txt
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include "__execution_fwd.hpp"
20+
21+
#include "__basic_sender.hpp"
22+
#include "__concepts.hpp"
23+
#include "__diagnostics.hpp"
24+
#include "__queries.hpp"
25+
#include "__scope_concepts.hpp"
26+
#include "__senders.hpp"
27+
#include "__sender_adaptor_closure.hpp"
28+
29+
namespace stdexec {
30+
/////////////////////////////////////////////////////////////////////////////
31+
// [exec.associate]
32+
namespace __associate {
33+
template <scope_token _Token, sender _Sender>
34+
struct __associate_data {
35+
using __wrap_result_t = decltype(__declval<_Token&>().wrap(__declval<_Sender>()));
36+
using __wrap_sender_t = std::remove_cvref_t<__wrap_result_t>;
37+
38+
using __assoc_t = decltype(__declval<_Token&>().try_associate());
39+
40+
using __sender_ref =
41+
std::unique_ptr<__wrap_sender_t, decltype([](auto* p) noexcept { std::destroy_at(p); })>;
42+
43+
// BUGBUG: should the spec require __token to be declared as a const _Token, or should this be
44+
// changed to declare __token as a mutable _Token?
45+
explicit __associate_data(const _Token __token, _Sender&& __sndr) noexcept(
46+
__nothrow_constructible_from<__wrap_sender_t, __wrap_result_t>
47+
&& noexcept(__token.wrap(static_cast<_Sender&&>(__sndr)))
48+
&& noexcept(__token.try_associate()))
49+
: __sndr_(__token.wrap(static_cast<_Sender&&>(__sndr)))
50+
, __assoc_([&] {
51+
__sender_ref guard{std::addressof(__sndr_)};
52+
53+
auto assoc = __token.try_associate();
54+
55+
if (assoc) {
56+
(void) guard.release();
57+
}
58+
59+
return assoc;
60+
}()) {
61+
}
62+
63+
__associate_data(const __associate_data& __other) noexcept(
64+
__nothrow_copy_constructible<__wrap_sender_t> && noexcept(__other.__assoc_.try_associate()))
65+
requires copy_constructible<__wrap_sender_t>
66+
: __assoc_(__other.__assoc_.try_associate()) {
67+
if (__assoc_) {
68+
std::construct_at(&__sndr_, __other.__sndr_);
69+
}
70+
}
71+
72+
__associate_data(__associate_data&& __other)
73+
noexcept(__nothrow_move_constructible<__wrap_sender_t>)
74+
: __associate_data(std::move(__other).release()) {
75+
}
76+
77+
~__associate_data() {
78+
if (__assoc_) {
79+
std::destroy_at(&__sndr_);
80+
}
81+
}
82+
83+
std::pair<__assoc_t, __sender_ref> release() && noexcept {
84+
__sender_ref u(__assoc_ ? std::addressof(__sndr_) : nullptr);
85+
return {std::move(__assoc_), std::move(u)};
86+
}
87+
88+
private:
89+
__associate_data(std::pair<__assoc_t, __sender_ref> __parts)
90+
: __assoc_(std::move(__parts.first)) {
91+
if (__assoc_) {
92+
std::construct_at(&__sndr_, std::move(*__parts.second));
93+
}
94+
}
95+
96+
union {
97+
__wrap_sender_t __sndr_;
98+
};
99+
__assoc_t __assoc_;
100+
};
101+
102+
template <scope_token _Token, sender _Sender>
103+
__associate_data(_Token, _Sender&&) -> __associate_data<_Token, _Sender>;
104+
105+
////////////////////////////////////////////////////////////////////////////////////////////////
106+
struct associate_t {
107+
template <sender _Sender, scope_token _Token>
108+
auto operator()(_Sender&& __sndr, _Token&& __token) const
109+
noexcept(__nothrow_constructible_from<
110+
__associate_data<std::remove_cvref_t<_Token>, _Sender>,
111+
_Token,
112+
_Sender
113+
>) -> __well_formed_sender auto {
114+
return __make_sexpr<associate_t>(
115+
__associate_data(static_cast<_Token&&>(__token), static_cast<_Sender&&>(__sndr)));
116+
}
117+
118+
template <scope_token _Token>
119+
STDEXEC_ATTRIBUTE(always_inline)
120+
auto operator()(_Token&& __token) const noexcept {
121+
return __closure(*this, static_cast<_Token&&>(__token));
122+
}
123+
};
124+
125+
struct __associate_impl : __sexpr_defaults {
126+
static constexpr auto get_attrs = []<class _Child>(__ignore, const _Child& __child) noexcept {
127+
return __sync_attrs{__child};
128+
};
129+
130+
static constexpr auto get_completion_signatures =
131+
[]<class _Sender, class... _Env>(_Sender&&, _Env&&...) noexcept
132+
-> transform_completion_signatures<
133+
__completion_signatures_of_t<typename __data_of<_Sender>::__wrap_sender_t>,
134+
completion_signatures<set_stopped_t()>
135+
> {
136+
static_assert(sender_expr_for<_Sender, associate_t>);
137+
return {};
138+
};
139+
140+
static constexpr auto get_state =
141+
[]<class _Self, class _Receiver>(_Self&& __self, _Receiver& __rcvr) noexcept(
142+
(same_as<_Self, std::remove_cvref_t<_Self>>
143+
|| __nothrow_constructible_from<std::remove_cvref_t<_Self>, _Self>) &&
144+
__nothrow_callable<
145+
connect_t,
146+
typename std::remove_cvref_t<__data_of<_Self>>::__wrap_sender_t,
147+
_Receiver
148+
>) {
149+
auto&& [_, data] = std::forward<_Self>(__self);
150+
151+
using associate_data_t = std::remove_cvref_t<decltype(data)>;
152+
using assoc_t = associate_data_t::__assoc_t;
153+
using sender_ref_t = associate_data_t::__sender_ref;
154+
155+
using op_t = connect_result_t<typename sender_ref_t::element_type, _Receiver>;
156+
157+
struct op_state {
158+
assoc_t __assoc_;
159+
union {
160+
_Receiver* __rcvr_;
161+
op_t __op_;
162+
};
163+
164+
explicit op_state(std::pair<assoc_t, sender_ref_t> parts, _Receiver r)
165+
: __assoc_(std::move(parts.first)) {
166+
if (__assoc_) {
167+
::new ((void*) std::addressof(__op_))
168+
op_t(connect(std::move(*parts.second), std::move(r)));
169+
} else {
170+
__rcvr_ = std::addressof(r);
171+
}
172+
}
173+
174+
explicit op_state(associate_data_t&& ad, _Receiver& r)
175+
: op_state(std::move(ad).release(), r) {
176+
}
177+
178+
explicit op_state(const associate_data_t& ad, _Receiver& r)
179+
requires copy_constructible<associate_data_t>
180+
: op_state(associate_data_t(ad).release(), r) {
181+
}
182+
183+
~op_state() {
184+
if (__assoc_) {
185+
std::destroy_at(&__op_);
186+
}
187+
}
188+
189+
void __run() noexcept {
190+
if (__assoc_) {
191+
stdexec::start(__op_);
192+
} else {
193+
stdexec::set_stopped(std::move(*__rcvr_));
194+
}
195+
}
196+
};
197+
198+
return op_state{__forward_like<_Self>(data), __rcvr};
199+
};
200+
201+
static constexpr auto start = [](auto& __state, auto&) noexcept -> void {
202+
__state.__run();
203+
};
204+
};
205+
} // namespace __associate
206+
207+
using __associate::associate_t;
208+
209+
/// @brief The associate sender adaptor, which associates a sender with the
210+
/// async scope referred to by the given token
211+
/// @hideinitializer
212+
inline constexpr associate_t associate{};
213+
214+
template <>
215+
struct __sexpr_impl<associate_t> : __associate::__associate_impl { };
216+
} // namespace stdexec

include/stdexec/__detail/__sender_adaptor_closure.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ namespace stdexec {
6969

7070
template <sender _Sender, __sender_adaptor_closure_for<_Sender> _Closure>
7171
STDEXEC_ATTRIBUTE(always_inline)
72-
auto operator|(_Sender&& __sndr, _Closure&& __clsur) -> __call_result_t<_Closure, _Sender> {
72+
auto operator|(_Sender&& __sndr, _Closure&& __clsur)
73+
noexcept(__nothrow_callable<_Closure, _Sender>) -> __call_result_t<_Closure, _Sender> {
7374
return static_cast<_Closure&&>(__clsur)(static_cast<_Sender&&>(__sndr));
7475
}
7576

include/stdexec/execution.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
// include these after __execution_fwd.hpp
2121
#include "__detail/__as_awaitable.hpp" // IWYU pragma: export
22+
#include "__detail/__associate.hpp" // IWYU pragma: export
2223
#include "__detail/__basic_sender.hpp" // IWYU pragma: export
2324
#include "__detail/__bulk.hpp" // IWYU pragma: export
2425
#include "__detail/__completion_signatures.hpp" // IWYU pragma: export

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ set(stdexec_test_sources
3838
stdexec/algos/factories/test_just_stopped.cpp
3939
stdexec/algos/factories/test_read.cpp
4040
stdexec/algos/factories/test_schedule.cpp
41+
stdexec/algos/adaptors/test_associate.cpp
4142
stdexec/algos/adaptors/test_starts_on.cpp
4243
stdexec/algos/adaptors/test_on.cpp
4344
stdexec/algos/adaptors/test_on2.cpp
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright (c) 2025 Ian Petersen
3+
* Copyright (c) 2025 NVIDIA Corporation
4+
*
5+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://llvm.org/LICENSE.txt
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include <catch2/catch.hpp>
19+
#include <stdexec/execution.hpp>
20+
21+
namespace ex = stdexec;
22+
23+
namespace {
24+
struct null_token {
25+
struct assoc {
26+
constexpr operator bool() const noexcept {
27+
return true;
28+
}
29+
30+
constexpr assoc try_associate() const noexcept {
31+
return {};
32+
}
33+
};
34+
35+
template <ex::sender Sender>
36+
constexpr Sender&& wrap(Sender&& sndr) const noexcept {
37+
return std::forward<Sender>(sndr);
38+
}
39+
40+
constexpr assoc try_associate() const noexcept {
41+
return {};
42+
}
43+
};
44+
45+
TEST_CASE("associate returns a sender", "[adaptors][associate]") {
46+
auto snd = ex::associate(ex::just(), null_token{});
47+
STATIC_REQUIRE(ex::sender<decltype(snd)>);
48+
(void) snd;
49+
}
50+
51+
TEST_CASE("associate is appropriately noexcept", "[adaptors][associate]") {
52+
// double-check our dependencies
53+
STATIC_REQUIRE(noexcept(ex::just()));
54+
STATIC_REQUIRE(noexcept(null_token{}));
55+
56+
// null_token is no-throw default constructible and tokens must be no-throw
57+
// copyable and movable so this whole thing had better be no-throw
58+
STATIC_REQUIRE(noexcept(ex::associate(null_token{})));
59+
60+
// constructing and passing in a no-throw sender should let the whole
61+
// expression be no-throw
62+
STATIC_REQUIRE(noexcept(ex::associate(ex::just(), null_token{})));
63+
STATIC_REQUIRE(noexcept(ex::just() | ex::associate(null_token{})));
64+
65+
// conversely, trafficking in senders with potentially-throwing copy
66+
// constructors should lead to the whole expression becoming potentially-throwing
67+
const auto justString = ex::just(std::string{"Copying strings is potentially-throwing"});
68+
STATIC_REQUIRE(!noexcept(ex::associate(justString, null_token{})));
69+
STATIC_REQUIRE(!noexcept(justString | ex::associate(null_token{})));
70+
(void) justString;
71+
}
72+
73+
template <class Sender, class... CompSig>
74+
constexpr bool expected_completion_signatures() {
75+
using expected_sigs = ex::completion_signatures<CompSig...>;
76+
using actual_sigs = ex::completion_signatures_of_t<Sender>;
77+
return expected_sigs{} == actual_sigs{};
78+
}
79+
80+
TEST_CASE("associate has appropriate completion signatures", "[adaptors][associate]") {
81+
STATIC_REQUIRE(
82+
expected_completion_signatures<
83+
decltype(ex::associate(ex::just(), null_token{})),
84+
ex::set_value_t(),
85+
ex::set_stopped_t()
86+
>());
87+
88+
STATIC_REQUIRE(
89+
expected_completion_signatures<
90+
decltype(ex::associate(ex::just(std::string{}), null_token{})),
91+
ex::set_value_t(std::string),
92+
ex::set_stopped_t()
93+
>);
94+
95+
STATIC_REQUIRE(
96+
expected_completion_signatures<
97+
decltype(ex::associate(ex::just_stopped(), null_token{})),
98+
ex::set_stopped_t()
99+
>);
100+
101+
STATIC_REQUIRE(
102+
expected_completion_signatures<
103+
decltype(ex::associate(ex::just_error(5), null_token{})),
104+
ex::set_error_t(int),
105+
ex::set_stopped_t()
106+
>);
107+
}
108+
109+
// TODO: confirm that running an associate-sender produces the expected output
110+
// variations:
111+
// - with a null_token, it's the identity
112+
// - with an always_expired_token, it's just_stopped
113+
// - change the state of a token between copies; the copy is just_stopped
114+
// TODO: confirm that `associate(foo(), token{})` destroys resources owned by foo()
115+
// when token{} is expired
116+
// TODO: check the pass-through nature of __sync_attrs
117+
// TODO: check the pass-through stop request behaviour
118+
// TODO: confirm that senders-of-references forward references when associated
119+
// TODO: confirm timing of destruction of opstate relative to release of association
120+
// TODO: confirm that the TODO list is exhaustive
121+
} // namespace

0 commit comments

Comments
 (0)