Skip to content

Commit 449c5ce

Browse files
committed
Run formatter and fix clippy errors
Signed-off-by: James Rhodes <[email protected]>
1 parent 22a4149 commit 449c5ce

File tree

5 files changed

+81
-47
lines changed

5 files changed

+81
-47
lines changed

crates/extensions/tedge_flows/src/bin/db_bench.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ async fn main() -> anyhow::Result<()> {
3232
.unwrap_or(1000);
3333
let db_path = format!("bench.{backend}");
3434

35-
println!("Benchmarking {backend} backend with {} inserts and {} drains from {db_path}", count * 5 * 3, count * 5);
35+
println!(
36+
"Benchmarking {backend} backend with {} inserts and {} drains from {db_path}",
37+
count * 5 * 3,
38+
count * 5
39+
);
3640

3741
let mut db: Box<dyn MeaDb> = match backend.as_str() {
3842
#[cfg(feature = "fjall-db")]
@@ -81,7 +85,11 @@ async fn main() -> anyhow::Result<()> {
8185
.unwrap();
8286
}
8387
}
84-
println!("Inserted {} items (across 15 batches of {count}) in {:?}", count * 5 * 3, time.elapsed());
88+
println!(
89+
"Inserted {} items (across 15 batches of {count}) in {:?}",
90+
count * 5 * 3,
91+
time.elapsed()
92+
);
8593

8694
let time = Instant::now();
8795

crates/extensions/tedge_flows/src/bin/db_dump.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use anyhow::Context;
22
use cfg_if::cfg_if;
3+
use tedge_flows::database;
34
use tedge_flows::database::MeaDb;
45
use tedge_flows::flow::DateTime;
56
use tedge_flows::flow::Message;
6-
use tedge_flows::database;
77

88
const DEFAULT_DB_PATH: &str = "/etc/tedge/tedge-flows.db";
99

crates/extensions/tedge_flows/src/config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,11 @@ impl StepConfig {
143143
flow: &Utf8Path,
144144
) -> Result<FlowStep, ConfigError> {
145145
let path = match self.script {
146-
ScriptSpec::JavaScript(path) if path.is_absolute() => path.into(),
147-
ScriptSpec::JavaScript(path) if path.starts_with(config_dir) => path.into(),
146+
ScriptSpec::JavaScript(path) if path.is_absolute() => path,
147+
ScriptSpec::JavaScript(path) if path.starts_with(config_dir) => path,
148148
ScriptSpec::JavaScript(path) => config_dir.join(path),
149149
};
150-
let script = JsScript::new(flow.to_owned().into(), index, path)
150+
let script = JsScript::new(flow.to_owned(), index, path)
151151
.with_config(self.config)
152152
.with_interval(self.interval);
153153
let config_topics = topic_filters(self.meta_topics)?;

crates/extensions/tedge_flows/src/database.rs

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use fjall::Slice;
1616
use tokio::task::spawn_blocking;
1717

1818
#[cfg(feature = "sqlite-db")]
19-
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
19+
use sqlx::sqlite::SqlitePool;
20+
#[cfg(feature = "sqlite-db")]
21+
use sqlx::sqlite::SqlitePoolOptions;
2022
#[cfg(feature = "sqlite-db")]
2123
use sqlx::Row;
2224

@@ -307,7 +309,10 @@ impl SqliteMeaDb {
307309
.connect(&database_url)
308310
.await
309311
.with_context(|| format!("opening SQLite database at {path:?}"))
310-
.map_err(|source| DatabaseError::OpenError { path: path.to_owned(), source })?;
312+
.map_err(|source| DatabaseError::OpenError {
313+
path: path.to_owned(),
314+
source,
315+
})?;
311316

312317
// Create the messages table if it doesn't exist
313318
// Store timestamp as nanoseconds since epoch (i64) for simpler ordering
@@ -322,7 +327,7 @@ impl SqliteMeaDb {
322327
message_timestamp_nanos INTEGER,
323328
PRIMARY KEY (series, timestamp_nanos)
324329
)
325-
"#
330+
"#,
326331
)
327332
.execute(&pool)
328333
.await
@@ -331,7 +336,7 @@ impl SqliteMeaDb {
331336

332337
// Create index for efficient querying
333338
sqlx::query(
334-
"CREATE INDEX IF NOT EXISTS idx_messages_series ON messages (series, timestamp_nanos)"
339+
"CREATE INDEX IF NOT EXISTS idx_messages_series ON messages (series, timestamp_nanos)",
335340
)
336341
.execute(&pool)
337342
.await
@@ -356,7 +361,7 @@ impl MeaDb for SqliteMeaDb {
356361
INSERT OR REPLACE INTO messages
357362
(series, timestamp_nanos, topic, payload, message_timestamp_nanos)
358363
VALUES (?, ?, ?, ?, ?)
359-
"#
364+
"#,
360365
)
361366
.bind(series)
362367
.bind(Self::datetime_to_nanos(timestamp))
@@ -366,7 +371,10 @@ impl MeaDb for SqliteMeaDb {
366371
.execute(&self.pool)
367372
.await
368373
.with_context(|| format!("storing message in series {series:?}"))
369-
.map_err(|source| DatabaseError::StoreError { series: series.to_owned(), source })?;
374+
.map_err(|source| DatabaseError::StoreError {
375+
series: series.to_owned(),
376+
source,
377+
})?;
370378

371379
Ok(())
372380
}
@@ -376,32 +384,43 @@ impl MeaDb for SqliteMeaDb {
376384
series: &str,
377385
data: Vec<(DateTime, Message)>,
378386
) -> Result<(), DatabaseError> {
379-
let mut tx = self.pool.begin().await.map_err(|source| DatabaseError::Internal { source: source.into() })?;
387+
let mut tx = self
388+
.pool
389+
.begin()
390+
.await
391+
.map_err(|source| DatabaseError::Internal {
392+
source: source.into(),
393+
})?;
380394
for (timestamp, payload) in data {
381-
sqlx::query(
382-
r#"
395+
sqlx::query(
396+
r#"
383397
INSERT OR REPLACE INTO messages
384398
(series, timestamp_nanos, topic, payload, message_timestamp_nanos)
385399
VALUES (?, ?, ?, ?, ?)
386-
"#
387-
)
388-
.bind(series)
389-
.bind(Self::datetime_to_nanos(timestamp))
390-
.bind(&payload.topic)
391-
.bind(&payload.payload)
392-
.bind(payload.timestamp.map(Self::datetime_to_nanos))
393-
.execute(&mut *tx)
394-
.await
395-
.with_context(|| format!("storing message in series {series:?}"))
396-
.map_err(|source| DatabaseError::StoreError { series: series.to_owned(), source })?;
400+
"#,
401+
)
402+
.bind(series)
403+
.bind(Self::datetime_to_nanos(timestamp))
404+
.bind(&payload.topic)
405+
.bind(&payload.payload)
406+
.bind(payload.timestamp.map(Self::datetime_to_nanos))
407+
.execute(&mut *tx)
408+
.await
409+
.with_context(|| format!("storing message in series {series:?}"))
410+
.map_err(|source| DatabaseError::StoreError {
411+
series: series.to_owned(),
412+
source,
413+
})?;
397414
}
398-
tx.commit().await.map_err(|source| DatabaseError::Internal { source: anyhow::Error::from(source) })?;
415+
tx.commit()
416+
.await
417+
.map_err(|source| DatabaseError::Internal {
418+
source: anyhow::Error::from(source),
419+
})?;
399420

400421
Ok(())
401422
}
402423

403-
404-
405424
async fn drain_older_than(
406425
&mut self,
407426
cutoff: DateTime,
@@ -415,14 +434,17 @@ impl MeaDb for SqliteMeaDb {
415434
FROM messages
416435
WHERE series = ? AND timestamp_nanos <= ?
417436
ORDER BY timestamp_nanos
418-
"#
437+
"#,
419438
)
420439
.bind(series)
421440
.bind(cutoff_nanos)
422441
.fetch_all(&self.pool)
423442
.await
424443
.with_context(|| format!("querying messages to drain from series {series:?}"))
425-
.map_err(|source| DatabaseError::DrainError { series: series.to_owned(), source })?;
444+
.map_err(|source| DatabaseError::DrainError {
445+
series: series.to_owned(),
446+
source,
447+
})?;
426448

427449
// Convert rows to messages
428450
let mut messages = Vec::new();
@@ -450,14 +472,17 @@ impl MeaDb for SqliteMeaDb {
450472
r#"
451473
DELETE FROM messages
452474
WHERE series = ? AND timestamp_nanos <= ?
453-
"#
475+
"#,
454476
)
455477
.bind(series)
456478
.bind(cutoff_nanos)
457479
.execute(&self.pool)
458480
.await
459481
.with_context(|| format!("deleting drained messages from series {series:?}"))
460-
.map_err(|source| DatabaseError::DrainError { series: series.to_owned(), source })?;
482+
.map_err(|source| DatabaseError::DrainError {
483+
series: series.to_owned(),
484+
source,
485+
})?;
461486

462487
Ok(messages)
463488
}
@@ -469,13 +494,16 @@ impl MeaDb for SqliteMeaDb {
469494
FROM messages
470495
WHERE series = ?
471496
ORDER BY timestamp_nanos
472-
"#
497+
"#,
473498
)
474499
.bind(series)
475500
.fetch_all(&self.pool)
476501
.await
477502
.with_context(|| format!("querying all messages from series {series:?}"))
478-
.map_err(|source| DatabaseError::QueryError { series: series.to_owned(), source })?;
503+
.map_err(|source| DatabaseError::QueryError {
504+
series: series.to_owned(),
505+
source,
506+
})?;
479507

480508
let mut messages = Vec::new();
481509
for row in rows {
@@ -751,9 +779,7 @@ mod tests {
751779
}
752780

753781
fn create_inmemory_db() -> BoxFuture<'static, Box<dyn MeaDb>> {
754-
Box::pin(async {
755-
Box::new(InMemoryMeaDb::default()) as Box<dyn MeaDb>
756-
})
782+
Box::pin(async { Box::new(InMemoryMeaDb::default()) as Box<dyn MeaDb> })
757783
}
758784

759785
fn test_message(topic: &str, payload: &str) -> Message {
@@ -763,4 +789,4 @@ mod tests {
763789
timestamp: Some(DateTime::now()),
764790
}
765791
}
766-
}
792+
}

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,15 @@ impl Flow {
223223

224224
// Only then process the tick if it's time to execute
225225
if step.script.should_execute_interval() {
226-
let step_started_at = stats.flow_step_start(&js, "onInterval");
227-
let tick_output = step.script.on_interval(js_runtime, timestamp).await;
228-
match &tick_output {
229-
Ok(messages) => {
230-
stats.flow_step_done(&js, "onInterval", step_started_at, messages.len())
226+
let step_started_at = stats.flow_step_start(&js, "onInterval");
227+
let tick_output = step.script.on_interval(js_runtime, timestamp).await;
228+
match &tick_output {
229+
Ok(messages) => {
230+
stats.flow_step_done(&js, "onInterval", step_started_at, messages.len())
231+
}
232+
Err(_) => stats.flow_step_failed(&js, "onInterval"),
231233
}
232-
Err(_) => stats.flow_step_failed(&js, "onInterval"),
233-
}
234-
transformed_messages.extend(tick_output?);
234+
transformed_messages.extend(tick_output?);
235235
}
236236

237237
// Iterate with all the messages collected at this step

0 commit comments

Comments
 (0)