Skip to content

Commit 91dd51c

Browse files
committed
fix: fix logs for new actors
1 parent 83b5b26 commit 91dd51c

File tree

34 files changed

+248
-81
lines changed

34 files changed

+248
-81
lines changed

examples/system-test-actor/src/managerClient.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ export function connectToManager() {
4848
};
4949
client.write(encodeFrame(response));
5050

51+
console.log(`actor_${packet.start_actor.actor_id}`, 'fweh');
52+
5153
const kvMessage = {
5254
kv: {
5355
actor_id: packet.start_actor.actor_id,

packages/common/fdb-util/src/lib.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ use foundationdb::{
1111
self as fdb,
1212
future::FdbValue,
1313
options::DatabaseOption,
14-
tuple::{self, PackResult, PackError, TuplePack, TupleUnpack},
14+
tuple::{self, PackError, PackResult, TuplePack, TupleUnpack},
1515
KeySelector, RangeOption,
1616
};
1717

18-
pub mod keys;
1918
pub mod codes;
19+
pub mod keys;
2020
mod metrics;
2121

2222
/// Makes the code blatantly obvious if its using a snapshot read.
@@ -193,34 +193,34 @@ pub fn end_of_key_range(key: &[u8]) -> Vec<u8> {
193193
// Copied from foundationdb crate
194194
#[inline]
195195
pub fn parse_bytes(input: &[u8], num: usize) -> PackResult<(&[u8], &[u8])> {
196-
if input.len() < num {
197-
Err(PackError::MissingBytes)
198-
} else {
199-
Ok((&input[num..], &input[..num]))
200-
}
196+
if input.len() < num {
197+
Err(PackError::MissingBytes)
198+
} else {
199+
Ok((&input[num..], &input[..num]))
200+
}
201201
}
202202

203203
// Copied from foundationdb crate
204204
#[inline]
205205
pub fn parse_byte(input: &[u8]) -> PackResult<(&[u8], u8)> {
206-
if input.is_empty() {
207-
Err(PackError::MissingBytes)
208-
} else {
209-
Ok((&input[1..], input[0]))
210-
}
206+
if input.is_empty() {
207+
Err(PackError::MissingBytes)
208+
} else {
209+
Ok((&input[1..], input[0]))
210+
}
211211
}
212212

213213
// Copied from foundationdb crate
214214
pub fn parse_code(input: &[u8], expected: u8) -> PackResult<&[u8]> {
215-
let (input, found) = parse_byte(input)?;
216-
if found == expected {
217-
Ok(input)
218-
} else {
219-
Err(PackError::BadCode {
220-
found,
221-
expected: Some(expected),
222-
})
223-
}
215+
let (input, found) = parse_byte(input)?;
216+
if found == expected {
217+
Ok(input)
218+
} else {
219+
Err(PackError::BadCode {
220+
found,
221+
expected: Some(expected),
222+
})
223+
}
224224
}
225225

226226
pub mod prelude {

packages/common/util/core/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
pub use id::Id;
12
use rand::Rng;
2-
pub use rivet_util_macros as macros;
33
pub use rivet_util_id as id;
4-
pub use id::Id;
4+
pub use rivet_util_macros as macros;
55
use tokio::time::{Duration, Instant};
66

77
pub mod billing;

packages/common/util/id/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ impl TuplePack for Id {
242242
let mut size = 1;
243243

244244
w.write_all(&[fdb_util::codes::ID])?;
245-
245+
246246
// IMPORTANT: While the normal bytes representation of a v0 ID doesn't include the version, we write
247247
// it here so that we can unpack without a terminating NIL.
248248
if let Id::V0(_) = self {

packages/core/api/actor/src/route/logs.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ pub async fn get_logs(
167167
.iter()
168168
.map(|x| x.stream_type as i32)
169169
.collect::<Vec<_>>();
170+
let mut foreigns = logs_res
171+
.entries
172+
.iter()
173+
.map(|x| x.foreign)
174+
.collect::<Vec<_>>();
170175
let mut actor_indices = logs_res
171176
.entries
172177
.iter()
@@ -177,6 +182,7 @@ pub async fn get_logs(
177182
lines.reverse();
178183
timestamps.reverse();
179184
streams.reverse();
185+
foreigns.reverse();
180186
actor_indices.reverse();
181187

182188
let watch_nts = logs_res.entries.first().map_or(before_nts, |x| x.ts);
@@ -185,6 +191,7 @@ pub async fn get_logs(
185191
lines,
186192
timestamps,
187193
streams,
194+
foreigns,
188195
actor_indices,
189196
watch: WatchResponse::new_as_model(watch_nts),
190197
})

packages/core/services/cluster/src/ops/datacenter/get_for_label.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use chirp_workflow::prelude::*;
22

3-
use crate::types::Datacenter;
43
use crate::ops::datacenter::get::DatacenterRow;
4+
use crate::types::Datacenter;
55

66
#[derive(Debug)]
77
pub struct Input {
@@ -14,7 +14,10 @@ pub struct Output {
1414
}
1515

1616
#[operation]
17-
pub async fn cluster_datacenter_get_for_label(ctx: &OperationCtx, input: &Input) -> GlobalResult<Output> {
17+
pub async fn cluster_datacenter_get_for_label(
18+
ctx: &OperationCtx,
19+
input: &Input,
20+
) -> GlobalResult<Output> {
1821
let datacenters = ctx
1922
.cache()
2023
.fetch_all_json("cluster.datacenters_get_for_label", input.labels.clone(), {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
pub mod get;
2+
pub mod get_for_label;
23
pub mod list;
34
pub mod location_get;
45
pub mod resolve_for_name_id;
56
pub mod server_discovery;
67
pub mod server_spec_get;
78
pub mod tls_get;
89
pub mod topology_get;
9-
pub mod get_for_label;

packages/edge/api/actor/src/route/actors.rs

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -451,14 +451,13 @@ pub async fn destroy(
451451

452452
if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
453453
// Try old actors
454-
ctx
455-
.signal(pegboard::workflows::actor::Destroy {
456-
override_kill_timeout_ms: query.override_kill_timeout,
457-
})
458-
.to_workflow::<pegboard::workflows::actor::Workflow>()
459-
.tag("actor_id", actor_id)
460-
.send()
461-
.await?;
454+
ctx.signal(pegboard::workflows::actor::Destroy {
455+
override_kill_timeout_ms: query.override_kill_timeout,
456+
})
457+
.to_workflow::<pegboard::workflows::actor::Workflow>()
458+
.tag("actor_id", actor_id)
459+
.send()
460+
.await?;
462461

463462
old_sub.next().await?;
464463
} else {
@@ -514,14 +513,13 @@ pub async fn upgrade(
514513

515514
if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
516515
// Try old actors
517-
ctx
518-
.signal(pegboard::workflows::actor::Upgrade {
519-
image_id: build.build_id,
520-
})
521-
.to_workflow::<pegboard::workflows::actor::Workflow>()
522-
.tag("actor_id", actor_id)
523-
.send()
524-
.await?;
516+
ctx.signal(pegboard::workflows::actor::Upgrade {
517+
image_id: build.build_id,
518+
})
519+
.to_workflow::<pegboard::workflows::actor::Workflow>()
520+
.tag("actor_id", actor_id)
521+
.send()
522+
.await?;
525523
} else {
526524
res?;
527525
}
@@ -635,14 +633,13 @@ pub async fn upgrade_all(
635633

636634
if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
637635
// Try old actors
638-
ctx
639-
.signal(pegboard::workflows::actor::Upgrade {
640-
image_id: build.build_id,
641-
})
642-
.to_workflow::<pegboard::workflows::actor::Workflow>()
643-
.tag("actor_id", actor.actor_id)
644-
.send()
645-
.await?;
636+
ctx.signal(pegboard::workflows::actor::Upgrade {
637+
image_id: build.build_id,
638+
})
639+
.to_workflow::<pegboard::workflows::actor::Workflow>()
640+
.tag("actor_id", actor.actor_id)
641+
.send()
642+
.await?;
646643
} else {
647644
res?;
648645
}

packages/edge/infra/client/actor-kv/src/key.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use foundationdb::tuple::{
22
Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
33
};
4-
use serde::{Serialize, Deserialize};
4+
use serde::{Deserialize, Serialize};
55

66
// TODO: Custom deser impl that uses arrays instead of objects?
77
#[derive(Clone, Serialize, Deserialize)]

packages/edge/infra/client/actor-kv/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl ActorKv {
5959
if let Some(subspace) = &*guard {
6060
return Ok(subspace.clone());
6161
}
62-
62+
6363
tracing::info!(actor_id=?self.actor_id, "initializing actor KV");
6464

6565
let root = fdb::directory::DirectoryLayer::default();

0 commit comments

Comments
 (0)