1
+ import Base: launch, kill, manage, connect
2
+ export MPIWindowIOManager, launch, kill, manage, connect, @cluster
3
+
4
+ """
5
+ Stores the buffers needed for communication, in one instance per rank. Loop stops when the stop_condition is triggered
6
+ """
7
+ mutable struct MPIWindowIOManager <: ClusterManager
8
+ comm:: MPI.Comm
9
+ connection_windows:: Vector{WindowIO}
10
+ stdio_windows:: Vector{WindowIO}
11
+ workers_wait:: Bool
12
+
13
+ function MPIWindowIOManager (comm:: MPI.Comm , workers_wait:: Bool )
14
+ nb_procs = MPI. Comm_size (comm)
15
+ connection_windows = Vector {WindowIO} (nb_procs)
16
+ stdio_windows = Vector {WindowIO} (nb_procs)
17
+
18
+ for i in 1 : nb_procs
19
+ connection_windows[i] = WindowIO (comm)
20
+ stdio_windows[i] = WindowIO (comm)
21
+ end
22
+
23
+ # Make sure all windows are created before continuing
24
+ MPI. Barrier (comm)
25
+
26
+ return new (comm, connection_windows, stdio_windows, workers_wait)
27
+ end
28
+ end
29
+
30
+ # Closes all local MPI Windows in a manager. Must be called collectively on all ranks
31
+ function closeall (manager:: MPIWindowIOManager )
32
+ for w in manager. connection_windows
33
+ close (w)
34
+ end
35
+ for w in manager. stdio_windows
36
+ close (w)
37
+ end
38
+ end
39
+
40
+ function launch (mgr:: MPIWindowIOManager , params:: Dict ,
41
+ instances:: Array , cond:: Condition )
42
+ try
43
+ nprocs = MPI. Comm_size (mgr. comm)
44
+ for cnt in 1 : (nprocs- 1 )
45
+ push! (instances, WorkerConfig ())
46
+ end
47
+ notify (cond)
48
+ catch e
49
+ println (" Error in MPI launch $e " )
50
+ rethrow (e)
51
+ end
52
+ end
53
+
54
+ function kill (mgr:: MPIWindowIOManager , pid:: Int , config:: WorkerConfig )
55
+ @spawnat pid notify (_stop_requested)
56
+ Distributed. set_worker_state (Distributed. Worker (pid), Distributed. W_TERMINATED)
57
+ end
58
+
59
+ function manage (mgr:: MPIWindowIOManager , id:: Integer , config:: WorkerConfig , op:: Symbol ) end
60
+
61
+ function connect (mgr:: MPIWindowIOManager , pid:: Int , config:: WorkerConfig )
62
+ myrank = MPI. Comm_rank (mgr. comm)
63
+ if myrank == 0
64
+ proc_stdio = mgr. stdio_windows[pid]
65
+ @schedule while ! eof (proc_stdio)
66
+ try
67
+ println (" \t From worker $(pid) :\t $(readline (proc_stdio)) " )
68
+ catch e
69
+ end
70
+ end
71
+ end
72
+ return (mgr. connection_windows[pid], WindowWriter (mgr. connection_windows[myrank+ 1 ], pid- 1 ))
73
+ end
74
+
75
+ function redirect_to_mpi (s:: WindowWriter )
76
+ (rd, wr) = redirect_stdout ()
77
+ @schedule while ! eof (rd) && isopen (s. winio)
78
+ av = readline (rd)
79
+ if isopen (s. winio)
80
+ println (s,av)
81
+ flush (s)
82
+ end
83
+ end
84
+ end
85
+
86
+ function checkworkers ()
87
+ for w in workers ()
88
+ if w != (@fetchfrom w myid ())
89
+ error (" worker $w is not waiting" )
90
+ end
91
+ end
92
+ end
93
+
94
+ function notify_workers ()
95
+ for w in workers ()
96
+ @spawnat (w, notify (_stop_requested))
97
+ end
98
+ end
99
+
100
+ function wait_for_events ()
101
+ global _stop_requested
102
+ wait (_stop_requested)
103
+ end
104
+
105
+ """
106
+ Initialize the current process as a Julia parallel worker. Must be called on all ranks.
107
+ If comm is not supplied, MPI is initialized and MPI_COMM_WORLD is used.
108
+ """
109
+ function start_window_worker (comm:: Comm , workers_wait)
110
+ rank = MPI. Comm_rank (comm)
111
+ N = MPI. Comm_size (comm)
112
+
113
+ manager = MPIWindowIOManager (comm, workers_wait)
114
+ cookie = string (comm)
115
+ if length (cookie) > Base. Distributed. HDR_COOKIE_LEN
116
+ cookie = cookie[1 : Base. Distributed. HDR_COOKIE_LEN]
117
+ end
118
+
119
+ try
120
+ if rank == 0
121
+ Base. cluster_cookie (cookie)
122
+ MPI. Barrier (comm)
123
+ addprocs (manager)
124
+ @assert nprocs () == N
125
+ @assert nworkers () == (N == 1 ? 1 : N- 1 )
126
+
127
+ if ! workers_wait
128
+ checkworkers ()
129
+ notify_workers ()
130
+ end
131
+ else
132
+ init_worker (cookie, manager)
133
+ MPI. Barrier (comm)
134
+ redirect_to_mpi (WindowWriter (manager. stdio_windows[rank+ 1 ], 0 ))
135
+ for i in vcat ([1 ], (rank+ 2 ): N)
136
+ # Receiving end of connections to all higher workers and master
137
+ Base. process_messages (manager. connection_windows[i], WindowWriter (manager. connection_windows[rank+ 1 ], i- 1 ))
138
+ end
139
+
140
+ global _stop_requested = Condition ()
141
+ wait_for_events ()
142
+ end
143
+ catch e
144
+ Base. display_error (STDERR," exception $e on rank $rank " ,backtrace ())
145
+ end
146
+
147
+ if workers_wait && rank != 0
148
+ closeall (manager)
149
+ MPI. Finalize ()
150
+ exit (0 )
151
+ end
152
+
153
+ return manager
154
+ end
155
+
156
+ """
157
+ Stop the manager. This closes all windows and calls MPI.Finalize on all workers
158
+ """
159
+ function stop_main_loop (manager:: MPIWindowIOManager )
160
+ if myid () != 1
161
+ wait_for_events ()
162
+ else
163
+ checkworkers ()
164
+ if nprocs () > 1
165
+ rmprocs (workers ())
166
+ end
167
+ end
168
+ closeall (manager)
169
+ MPI. Finalize ()
170
+ end
171
+
172
+ """
173
+ Runs the given expression using the Julia parallel cluster. Useful when running with MPI_WINDOW_NOWAIT,
174
+ since this will temporarily activate the worker event loops to listen for messages.
175
+ """
176
+ macro cluster (expr)
177
+ quote
178
+ if myid () != 1
179
+ wait_for_events ()
180
+ else
181
+ $ (esc (expr))
182
+ notify_workers ()
183
+ end
184
+ end
185
+ end
0 commit comments