1
- use std:: sync:: Arc ;
1
+ use std:: { collections :: HashMap , sync:: Arc } ;
2
2
3
- use axum:: { Json , Router , routing:: get} ;
3
+ use axum:: {
4
+ Json , Router ,
5
+ extract:: {
6
+ WebSocketUpgrade ,
7
+ ws:: { Message , WebSocket } ,
8
+ } ,
9
+ response:: IntoResponse ,
10
+ routing:: { any, get} ,
11
+ } ;
4
12
use but_settings:: AppSettingsWithDiskSync ;
13
+ use futures_util:: { SinkExt , StreamExt as _} ;
5
14
use serde:: { Deserialize , Serialize } ;
6
15
use serde_json:: json;
7
16
use tokio:: sync:: Mutex ;
@@ -34,11 +43,13 @@ mod virtual_branches;
34
43
mod workspace;
35
44
mod zip;
36
45
46
+ #[ derive( Clone ) ]
37
47
pub ( crate ) struct RequestContext {
38
48
app_settings : Arc < AppSettingsWithDiskSync > ,
39
49
user_controller : Arc < gitbutler_user:: Controller > ,
40
50
project_controller : Arc < gitbutler_project:: Controller > ,
41
51
active_projects : Arc < Mutex < ActiveProjects > > ,
52
+ broadcaster : Arc < Mutex < Broadcaster > > ,
42
53
}
43
54
44
55
#[ derive( Serialize , Deserialize ) ]
@@ -55,6 +66,13 @@ pub(crate) struct Request {
55
66
params : serde_json:: Value ,
56
67
}
57
68
69
+ #[ derive( Debug , Serialize , Clone ) ]
70
+ #[ serde( rename_all = "camelCase" ) ]
71
+ pub ( crate ) struct FrontendEvent {
72
+ name : String ,
73
+ payload : serde_json:: Value ,
74
+ }
75
+
58
76
pub async fn run ( ) {
59
77
let cors = CorsLayer :: new ( )
60
78
. allow_methods ( Any )
@@ -70,25 +88,35 @@ pub async fn run() {
70
88
. expect ( "missing config dir" )
71
89
. join ( "gitbutler-server" ) ;
72
90
73
- let app_settings = Arc :: new (
74
- AppSettingsWithDiskSync :: new ( config_dir. clone ( ) ) . expect ( "failed to create app settings" ) ,
75
- ) ;
76
- let user_controller = Arc :: new ( gitbutler_user:: Controller :: from_path ( & app_data_dir) ) ;
77
- let project_controller = Arc :: new ( gitbutler_project:: Controller :: from_path ( & app_data_dir) ) ;
78
- let active_projects = Arc :: new ( Mutex :: new ( ActiveProjects :: new ( ) ) ) ;
91
+ let broadcaster = Arc :: new ( Mutex :: new ( Broadcaster {
92
+ senders : HashMap :: new ( ) ,
93
+ } ) ) ;
94
+
95
+ let ctx = RequestContext {
96
+ app_settings : Arc :: new (
97
+ AppSettingsWithDiskSync :: new ( config_dir. clone ( ) )
98
+ . expect ( "failed to create app settings" ) ,
99
+ ) ,
100
+ user_controller : Arc :: new ( gitbutler_user:: Controller :: from_path ( & app_data_dir) ) ,
101
+ project_controller : Arc :: new ( gitbutler_project:: Controller :: from_path ( & app_data_dir) ) ,
102
+ active_projects : Arc :: new ( Mutex :: new ( ActiveProjects :: new ( ) ) ) ,
103
+ broadcaster : broadcaster. clone ( ) ,
104
+ } ;
79
105
80
106
// build our application with a single route
81
107
let app = Router :: new ( )
82
108
. route (
83
109
"/" ,
84
- get ( || async { "Hello, World!" } ) . post ( move |req| {
85
- let ctx = RequestContext {
86
- app_settings : Arc :: clone ( & app_settings) ,
87
- user_controller : Arc :: clone ( & user_controller) ,
88
- project_controller : Arc :: clone ( & project_controller) ,
89
- active_projects : Arc :: clone ( & active_projects) ,
90
- } ;
91
- handle_command ( req, ctx)
110
+ get ( || async { "Hello, World!" } ) . post ( {
111
+ let ctx = ctx. clone ( ) ;
112
+ move |req| handle_command ( req, ctx)
113
+ } ) ,
114
+ )
115
+ . route (
116
+ "/ws" ,
117
+ any ( {
118
+ let broadcaster = broadcaster. clone ( ) ;
119
+ async move |req| handle_ws_request ( req, broadcaster) . await
92
120
} ) ,
93
121
)
94
122
. layer ( ServiceBuilder :: new ( ) . layer ( cors) ) ;
@@ -99,6 +127,54 @@ pub async fn run() {
99
127
axum:: serve ( listener, app) . await . unwrap ( ) ;
100
128
}
101
129
130
+ struct Broadcaster {
131
+ senders : HashMap < uuid:: Uuid , tokio:: sync:: mpsc:: UnboundedSender < FrontendEvent > > ,
132
+ }
133
+
134
+ impl Broadcaster {
135
+ fn send ( & self , event : FrontendEvent ) {
136
+ for sender in self . senders . values ( ) {
137
+ let _ = sender. send ( event. clone ( ) ) ;
138
+ }
139
+ }
140
+ }
141
+
142
+ async fn handle_ws_request (
143
+ ws : WebSocketUpgrade ,
144
+ broadcaster : Arc < Mutex < Broadcaster > > ,
145
+ ) -> impl IntoResponse {
146
+ ws. on_upgrade ( move |socket| handle_websocket ( socket, broadcaster) )
147
+ }
148
+
149
+ async fn handle_websocket ( socket : WebSocket , broadcaster : Arc < Mutex < Broadcaster > > ) {
150
+ let ( send, mut recv) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
151
+ let id = uuid:: Uuid :: new_v4 ( ) ;
152
+ broadcaster. lock ( ) . await . senders . insert ( id, send) ;
153
+
154
+ let ( mut socket_send, mut socket_recv) = socket. split ( ) ;
155
+ let thread = tokio:: spawn ( async move {
156
+ while let Some ( event) = recv. recv ( ) . await {
157
+ socket_send
158
+ . send ( Message :: Text ( serde_json:: to_string ( & event) . unwrap ( ) . into ( ) ) )
159
+ . await
160
+ . unwrap ( ) ;
161
+ }
162
+ } ) ;
163
+
164
+ while let Some ( Ok ( msg) ) = socket_recv. next ( ) . await {
165
+ #[ allow( clippy:: single_match) ]
166
+ match msg {
167
+ Message :: Close ( _) => {
168
+ thread. abort ( ) ;
169
+ break ;
170
+ }
171
+ _ => { }
172
+ }
173
+ }
174
+
175
+ broadcaster. lock ( ) . await . senders . remove ( & id) ;
176
+ }
177
+
102
178
async fn handle_command (
103
179
Json ( request) : Json < Request > ,
104
180
ctx : RequestContext ,
0 commit comments