@@ -2,6 +2,7 @@ import * as net from "net";
2
2
import * as fs from "fs" ;
3
3
import { setInterval , clearInterval } from "timers" ;
4
4
import * as util from "util" ;
5
+ import { encodeFrame , decodeFrames , types as protocol } from "@rivet-gg/runner-protocol" ;
5
6
6
7
export function connectToManager ( ) {
7
8
const socketPath = process . env . RIVET_MANAGER_SOCKET_PATH ;
@@ -25,77 +26,79 @@ export function connectToManager() {
25
26
26
27
// Start ping loop to keep connection alive
27
28
pingInterval = setInterval ( ( ) => {
28
- const pingMessage = { ping : null } ;
29
+ const pingMessage = new protocol . ToManager ( {
30
+ ping : new protocol . ToManager . Ping ( )
31
+ } ) ;
29
32
client . write ( encodeFrame ( pingMessage ) ) ;
30
33
} , 2000 ) ;
31
34
} ) ;
32
35
33
36
client . on ( "data" , ( data ) => {
34
- const packets = decodeFrames ( data ) ;
37
+ const packets = decodeFrames ( data , protocol . ToRunner ) ;
35
38
36
39
for ( let packet of packets ) {
37
40
console . log ( "Received packet from manager:" , util . inspect ( packet , { depth : null } ) ) ;
38
41
39
42
if ( packet . start_actor ) {
40
- const response = {
41
- actor_state_update : {
43
+ const response = new protocol . ToManager ( {
44
+ actor_state_update : new protocol . ToManager . ActorStateUpdate ( {
42
45
actor_id : packet . start_actor . actor_id ,
43
46
generation : packet . start_actor . generation ,
44
- state : {
45
- running : null ,
46
- } ,
47
- } ,
48
- } ;
47
+ state : new protocol . ActorState ( {
48
+ running : new protocol . ActorState . Running ( )
49
+ } )
50
+ } )
51
+ } ) ;
49
52
client . write ( encodeFrame ( response ) ) ;
50
53
51
54
console . log ( `actor_${ packet . start_actor . actor_id } ` , 'fweh' ) ;
52
55
53
- const kvMessage = {
54
- kv : {
56
+ const kvMessage = new protocol . ToManager ( {
57
+ kv : new rivet . pegboard . kv . Request ( {
55
58
actor_id : packet . start_actor . actor_id ,
56
59
generation : packet . start_actor . generation ,
57
60
request_id : 1 ,
58
- data : {
59
- put : {
60
- keys : [
61
- [ [ 1 , 2 , 3 ] , [ 4 , 5 , 6 ] ] ,
62
- ] ,
63
- values : [
64
- [ 11 , 12 , 13 , 14 , 15 , 16 ]
65
- ] ,
66
- }
67
- }
68
- }
69
- } ;
61
+ put : new rivet . pegboard . kv . Request . Put ( {
62
+ keys : [
63
+ new rivet . pegboard . kv . Key ( {
64
+ segments : [ new Uint8Array ( [ 1 , 2 , 3 ] ) , new Uint8Array ( [ 4 , 5 , 6 ] ) ]
65
+ } )
66
+ ] ,
67
+ values : [
68
+ new Uint8Array ( [ 11 , 12 , 13 , 14 , 15 , 16 ] )
69
+ ]
70
+ } )
71
+ } )
72
+ } ) ;
70
73
client . write ( encodeFrame ( kvMessage ) ) ;
71
74
72
- const kvMessage2 = {
73
- kv : {
75
+ const kvMessage2 = new protocol . ToManager ( {
76
+ kv : new rivet . pegboard . kv . Request ( {
74
77
actor_id : packet . start_actor . actor_id ,
75
78
generation : packet . start_actor . generation ,
76
79
request_id : 2 ,
77
- data : {
78
- get : {
79
- keys : [
80
- [ [ 1 , 2 , 3 ] , [ 4 , 5 , 6 ] ]
81
- ] ,
82
- }
83
- }
84
- }
85
- } ;
80
+ get : new rivet . pegboard . kv . Request . Get ( {
81
+ keys : [
82
+ new rivet . pegboard . kv . Key ( {
83
+ segments : [ new Uint8Array ( [ 1 , 2 , 3 ] ) , new Uint8Array ( [ 4 , 5 , 6 ] ) ]
84
+ } )
85
+ ]
86
+ } )
87
+ } )
88
+ } ) ;
86
89
client . write ( encodeFrame ( kvMessage2 ) ) ;
87
90
} else if ( packet . signal_actor ) {
88
- const response = {
89
- actor_state_update : {
91
+ const response = new protocol . ToManager ( {
92
+ actor_state_update : new protocol . ToManager . ActorStateUpdate ( {
90
93
actor_id : packet . signal_actor . actor_id ,
91
94
generation : packet . signal_actor . generation ,
92
- state : {
93
- exited : {
94
- exit_code : 0 ,
95
- } ,
96
- } ,
97
- } ,
98
- } ;
95
+ state : new protocol . ActorState ( {
96
+ exited : new protocol . ActorState . Exited ( {
97
+ exit_code : 0
98
+ } )
99
+ } )
100
+ } )
101
+ } ) ;
99
102
client . write ( encodeFrame ( response ) ) ;
100
103
}
101
104
}
@@ -115,33 +118,3 @@ export function connectToManager() {
115
118
} ) ;
116
119
}
117
120
118
- function encodeFrame ( payload : any ) : Buffer {
119
- const json = JSON . stringify ( payload ) ;
120
- const payloadLength = Buffer . alloc ( 4 ) ;
121
- payloadLength . writeUInt32BE ( json . length , 0 ) ;
122
-
123
- const header = Buffer . alloc ( 4 ) ; // All zeros for now
124
-
125
- return Buffer . concat ( [ payloadLength , header , Buffer . from ( json ) ] ) ;
126
- }
127
-
128
- function decodeFrames ( buffer : Buffer ) : any [ ] {
129
- const packets = [ ] ;
130
- let offset = 0 ;
131
-
132
- while ( offset < buffer . length ) {
133
- if ( buffer . length - offset < 8 ) break ; // Incomplete frame length + header
134
- const payloadLength = buffer . readUInt32BE ( offset ) ;
135
- offset += 4 ;
136
-
137
- // Skip the header (4 bytes)
138
- offset += 4 ;
139
-
140
- if ( buffer . length - offset < payloadLength ) break ; // Incomplete frame data
141
- const json = buffer . subarray ( offset , offset + payloadLength ) . toString ( ) ;
142
- packets . push ( JSON . parse ( json ) ) ;
143
- offset += payloadLength ;
144
- }
145
-
146
- return packets ;
147
- }
0 commit comments