Skip to content

Commit 2dd8a09

Browse files
committed
added celerity blockchain for task divergence checking
1 parent cde3587 commit 2dd8a09

18 files changed

+929
-25
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ set(SOURCES
188188
src/command_graph.cc
189189
src/config.cc
190190
src/device_queue.cc
191+
src/divergence_block_chain.cc
191192
src/executor.cc
192193
src/distributed_graph_generator.cc
193194
src/graph_serializer.cc

include/debug.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ namespace debug {
1515
return detail::get_buffer_name(buff);
1616
}
1717

18-
inline void set_task_name(celerity::handler& cgh, const std::string& debug_name) {
19-
detail::set_task_name(cgh, debug_name);
20-
}
18+
inline void set_task_name(celerity::handler& cgh, const std::string& debug_name) { detail::set_task_name(cgh, debug_name); }
2119
} // namespace debug
2220
} // namespace celerity

include/divergence_block_chain.h

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#pragma once
2+
3+
#include "recorders.h"
4+
#include <mutex>
5+
#include <thread>
6+
#include <vector>
7+
8+
namespace celerity::detail {
9+
// in c++23 replace this with mdspan
10+
template <typename T>
11+
struct mpi_multidim_send_wrapper {
12+
public:
13+
const T& operator[](std::pair<int, int> ij) const {
14+
assert(ij.first * m_width + ij.second < m_data.size());
15+
return m_data[ij.first * m_width + ij.second];
16+
}
17+
18+
T* data() { return m_data.data(); }
19+
20+
mpi_multidim_send_wrapper(size_t width, size_t height) : m_data(width * height), m_width(width){};
21+
22+
private:
23+
std::vector<T> m_data;
24+
const size_t m_width;
25+
};
26+
27+
// Probably replace this in c++20 with span
28+
template <typename T>
29+
struct window {
30+
public:
31+
window(const std::vector<T>& value) : m_value(value) {}
32+
33+
const T& operator[](size_t i) const {
34+
assert(i >= 0 && i < m_width);
35+
return m_value[m_offset + i];
36+
}
37+
38+
size_t size() {
39+
m_width = m_value.size() - m_offset;
40+
return m_width;
41+
}
42+
43+
void slide(size_t i) {
44+
assert(i == 0 || (i >= 0 && i <= m_width));
45+
m_offset += i;
46+
m_width -= i;
47+
}
48+
49+
private:
50+
const std::vector<T>& m_value;
51+
size_t m_offset = 0;
52+
size_t m_width = 0;
53+
};
54+
55+
using task_hash = size_t;
56+
using task_hash_data = mpi_multidim_send_wrapper<task_hash>;
57+
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;
58+
59+
class abstract_block_chain {
60+
friend struct abstract_block_chain_testspy;
61+
62+
public:
63+
virtual void stop() { m_is_running = false; };
64+
65+
abstract_block_chain(const abstract_block_chain&) = delete;
66+
abstract_block_chain& operator=(const abstract_block_chain&) = delete;
67+
abstract_block_chain& operator=(abstract_block_chain&&) = delete;
68+
69+
abstract_block_chain(abstract_block_chain&&) = default;
70+
71+
abstract_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_recorder, MPI_Comm comm)
72+
: m_local_nid(local_nid), m_num_nodes(num_nodes), m_sizes(num_nodes), m_task_recorder_window(task_recorder), m_comm(comm) {}
73+
74+
virtual ~abstract_block_chain() = default;
75+
76+
protected:
77+
void start() { m_is_running = true; };
78+
79+
virtual void run() = 0;
80+
81+
virtual void divergence_out(const divergence_map& check_map, const int task_num) = 0;
82+
83+
void add_new_hashes();
84+
void clear(const int min_progress);
85+
virtual void allgather_sizes();
86+
virtual void allgather_hashes(const int max_size, task_hash_data& data);
87+
std::pair<int, int> collect_sizes();
88+
task_hash_data collect_hashes(const int max_size);
89+
divergence_map create_check_map(const task_hash_data& task_graphs, const int task_num) const;
90+
91+
void check_for_deadlock() const;
92+
93+
static void print_node_divergences(const divergence_map& check_map, const int task_num);
94+
95+
static void print_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash);
96+
97+
virtual void dedub_print_task_record(const divergence_map& check_map, const int task_num) const;
98+
99+
bool check_for_divergence();
100+
101+
protected:
102+
node_id m_local_nid;
103+
size_t m_num_nodes;
104+
105+
std::vector<task_hash> m_hashes;
106+
std::vector<int> m_sizes;
107+
108+
bool m_is_running = true;
109+
110+
window<task_record> m_task_recorder_window;
111+
112+
std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();
113+
114+
MPI_Comm m_comm;
115+
};
116+
117+
class single_node_test_divergence_block_chain : public abstract_block_chain {
118+
public:
119+
single_node_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_recorder, MPI_Comm comm,
120+
const std::vector<std::reference_wrapper<const std::vector<task_record>>>& other_task_records)
121+
: abstract_block_chain(num_nodes, local_nid, task_recorder, comm), m_other_hashes(other_task_records.size()) {
122+
for(auto& tsk_rcd : other_task_records) {
123+
m_other_task_records.push_back(window<task_record>(tsk_rcd));
124+
}
125+
}
126+
127+
private:
128+
void run() override {}
129+
130+
void divergence_out(const divergence_map& check_map, const int task_num) override;
131+
void allgather_sizes() override;
132+
void allgather_hashes(const int max_size, task_hash_data& data) override;
133+
134+
void dedub_print_task_record(const divergence_map& check_map, const int task_num) const override;
135+
136+
std::vector<std::vector<task_hash>> m_other_hashes;
137+
std::vector<window<task_record>> m_other_task_records;
138+
139+
int m_injected_delete_size = 0;
140+
};
141+
142+
class distributed_test_divergence_block_chain : public abstract_block_chain {
143+
public:
144+
distributed_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm)
145+
: abstract_block_chain(num_nodes, local_nid, task_record, comm) {}
146+
147+
private:
148+
void run() override {}
149+
150+
void divergence_out(const divergence_map& check_map, const int task_num) override;
151+
};
152+
153+
class divergence_block_chain : public abstract_block_chain {
154+
public:
155+
void start();
156+
void stop() override;
157+
158+
divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm)
159+
: abstract_block_chain(num_nodes, local_nid, task_record, comm) {
160+
start();
161+
}
162+
163+
divergence_block_chain(const divergence_block_chain&) = delete;
164+
divergence_block_chain& operator=(const divergence_block_chain&) = delete;
165+
divergence_block_chain& operator=(divergence_block_chain&&) = delete;
166+
167+
divergence_block_chain(divergence_block_chain&&) = default;
168+
169+
~divergence_block_chain() override { divergence_block_chain::stop(); }
170+
171+
private:
172+
void run() override;
173+
174+
void divergence_out(const divergence_map& check_map, const int task_num) override;
175+
176+
private:
177+
std::thread m_thread;
178+
};
179+
} // namespace celerity::detail

include/grid.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <gch/small_vector.hpp>
99

1010
#include "ranges.h"
11+
#include "utils.h"
1112
#include "workaround.h"
1213

1314
namespace celerity::detail {
@@ -257,6 +258,27 @@ class region {
257258

258259
} // namespace celerity::detail
259260

261+
template <int Dims>
262+
struct std::hash<celerity::detail::box<Dims>> {
263+
std::size_t operator()(const celerity::detail::box<Dims> r) {
264+
std::size_t seed = 0;
265+
celerity::detail::utils::hash_combine(seed, std::hash<celerity::id<Dims>>{}(r.get_min()), std::hash<celerity::id<Dims>>{}(r.get_max()));
266+
return seed;
267+
};
268+
};
269+
270+
template <int Dims>
271+
struct std::hash<celerity::detail::region<Dims>> {
272+
std::size_t operator()(const celerity::detail::region<Dims> r) {
273+
std::size_t seed = 0;
274+
for(size_t i = 0; i < r.get_boxes().size(); ++i) {
275+
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::box<Dims>>{}(r.get_boxes()[i]));
276+
}
277+
return seed;
278+
};
279+
};
280+
281+
260282
namespace celerity::detail::grid_detail {
261283

262284
// forward-declaration for tests (explicitly instantiated)

include/handler.h

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ class handler {
376376
friend void detail::add_reduction(handler& cgh, const detail::reduction_info& rinfo);
377377
friend void detail::extend_lifetime(handler& cgh, std::shared_ptr<detail::lifetime_extending_state> state);
378378

379-
friend void detail::set_task_name(handler &cgh, const std::string& debug_name);
379+
friend void detail::set_task_name(handler& cgh, const std::string& debug_name);
380380

381381
detail::task_id m_tid;
382382
detail::buffer_access_map m_access_map;
@@ -462,11 +462,9 @@ class handler {
462462
}
463463
// Note that cgf_diagnostics has a similar check, but we don't catch void side effects there.
464464
if(!m_side_effects.empty()) { throw std::runtime_error{"Side effects cannot be used in device kernels"}; }
465-
m_task =
466-
detail::task::make_device_compute(m_tid, geometry, std::move(launcher), std::move(m_access_map), std::move(m_reductions));
465+
m_task = detail::task::make_device_compute(m_tid, geometry, std::move(launcher), std::move(m_access_map), std::move(m_reductions));
467466

468467
m_task->set_debug_name(m_usr_def_task_name.value_or(debug_name));
469-
470468
}
471469

472470
void create_collective_task(detail::collective_group_id cgid, std::unique_ptr<detail::command_launcher_storage_base> launcher) {
@@ -588,9 +586,7 @@ namespace detail {
588586

589587
inline void extend_lifetime(handler& cgh, std::shared_ptr<detail::lifetime_extending_state> state) { cgh.extend_lifetime(std::move(state)); }
590588

591-
inline void set_task_name(handler& cgh, const std::string& debug_name) {
592-
cgh.m_usr_def_task_name = {debug_name};
593-
}
589+
inline void set_task_name(handler& cgh, const std::string& debug_name) { cgh.m_usr_def_task_name = {debug_name}; }
594590

595591
// TODO: The _impl functions in detail only exist during the grace period for deprecated reductions on const buffers; move outside again afterwards.
596592
template <typename DataT, int Dims, typename BinaryOperation>

include/print_utils.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include "grid.h"
44
#include "ranges.h"
55

6+
#include "intrusive_graph.h"
7+
68
#include <spdlog/fmt/fmt.h>
79

810
template <typename Interface, int Dims>
@@ -70,4 +72,4 @@ struct fmt::formatter<celerity::chunk<Dims>> : fmt::formatter<celerity::subrange
7072
out = formatter<celerity::id<Dims>>::format(celerity::id(chunk.global_size), ctx); // cast to id to avoid multiple inheritance
7173
return out;
7274
}
73-
};
75+
};

include/ranges.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include "sycl_wrappers.h"
4+
#include "utils.h"
45
#include "workaround.h"
56

67
namespace celerity {
@@ -229,6 +230,17 @@ struct ones_t {
229230

230231
}; // namespace celerity::detail
231232

233+
template <typename Interface, int Dims>
234+
struct std::hash<celerity::detail::coordinate<Interface, Dims>> {
235+
std::size_t operator()(const celerity::detail::coordinate<Interface, Dims>& r) const noexcept {
236+
std::size_t seed = 0;
237+
for(int i = 0; i < Dims; ++i) {
238+
celerity::detail::utils::hash_combine(seed, std::hash<int>{}(r[i]));
239+
}
240+
return seed;
241+
};
242+
};
243+
232244
namespace celerity {
233245

234246
template <int Dims>
@@ -401,6 +413,17 @@ nd_range(range<3> global_range, range<3> local_range)->nd_range<3>;
401413

402414
} // namespace celerity
403415

416+
417+
template <int Dims>
418+
struct std::hash<celerity::range<Dims>> {
419+
std::size_t operator()(const celerity::range<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::range<Dims>, Dims>>{}(r); };
420+
};
421+
422+
template <int Dims>
423+
struct std::hash<celerity::id<Dims>> {
424+
std::size_t operator()(const celerity::id<Dims>& r) const noexcept { return std::hash<celerity::detail::coordinate<celerity::id<Dims>, Dims>>{}(r); };
425+
};
426+
404427
namespace celerity {
405428
namespace detail {
406429

0 commit comments

Comments
 (0)