Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 65 additions & 52 deletions crates/rbuilder-utils/src/clickhouse/backup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ use crate::{
tasks::TaskExecutor,
};

const TARGET: &str = "clickhouse_with_backup::backup";

/// A type alias for disk backup keys.
type DiskBackupKey = u128;
/// A type alias for disk backup tables.
Expand Down Expand Up @@ -402,14 +400,20 @@ impl<T: ClickhouseRowExt> DiskBackup<T> {
async fn flush_routine(mut self) {
loop {
self.config.flush_interval.tick().await;
let start = Instant::now();
match self.flush().await {
Ok(_) => {
tracing::debug!(target: TARGET, elapsed = ?start.elapsed(), "flushed backup write buffer to disk");
}
Err(e) => {
tracing::error!(target: TARGET, ?e, "failed to flush backup write buffer to disk");
}
self.flush_routine_inner().await;
}
}

/// The inner flush routine, instrumented for tracing.
#[tracing::instrument(name = "disk_backup_flush_routine", skip(self))]
async fn flush_routine_inner(&mut self) {
let start = Instant::now();
match self.flush().await {
Ok(_) => {
tracing::debug!(elapsed = ?start.elapsed(), "flushed backup write buffer to disk");
}
Err(e) => {
tracing::error!(?e, "failed to flush backup write buffer to disk");
}
}
}
Expand Down Expand Up @@ -551,9 +555,10 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
}

/// Backs up a failed commit, first trying to write to disk, then to memory.
#[tracing::instrument(name = "indexer_backup", skip_all, fields(order = T::ORDER))]
fn backup(&mut self, failed_commit: FailedCommit<T>) {
let quantities = failed_commit.quantities;
tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, "backing up failed commit");
tracing::debug!(bytes = ?quantities.bytes, rows = ?quantities.rows, "backing up failed commit");

#[cfg(any(test, feature = "test-utils"))]
if self.use_only_memory_backup {
Expand All @@ -568,23 +573,26 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
let start = Instant::now();
match self.disk_backup.save(&failed_commit) {
Ok(stats) => {
tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "saved failed commit to disk");
tracing::debug!(total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "saved failed commit to disk");
MetricsType::set_disk_backup_size(stats.size_bytes, stats.total_batches, T::ORDER);

return;
}
Err(e) => {
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit, trying in-memory");
tracing::error!(?e, "failed to write commit, trying in-memory");
MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref());
}
};

let stats = self.memory_backup.save(failed_commit);
MetricsType::set_memory_backup_size(stats.size_bytes, stats.total_batches, T::ORDER);
tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, ?stats, "saved failed commit in-memory");
tracing::debug!(bytes = ?quantities.bytes, rows = ?quantities.rows, ?stats, "saved failed commit in-memory");

if let Some((stats, oldest_quantities)) = self.memory_backup.drop_excess() {
tracing::warn!(target: TARGET, order = T::ORDER, ?stats, "failed commits exceeded max memory backup size, dropping oldest");
tracing::warn!(
?stats,
"failed commits exceeded max memory backup size, dropping oldest"
);
MetricsType::process_backup_data_lost_quantities(&oldest_quantities);
// Clear the cached last commit if it was from memory and we just dropped it.
self.last_cached = self
Expand All @@ -595,46 +603,47 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
}

/// Retrieves the oldest failed commit, first trying from memory, then from disk.
#[tracing::instrument(name = "indexer_backup_retrieve", skip_all, fields(order = T::ORDER))]
fn retrieve_oldest(&mut self) -> Option<RetrievedFailedCommit<T>> {
if let Some(cached) = self.last_cached.take() {
tracing::debug!(target: TARGET, order = T::ORDER, rows = cached.commit.rows.len(), "retrieved last cached failed commit");
tracing::debug!(rows = cached.commit.rows.len(), "last cached commit");
return Some(cached);
}

if let Some(commit) = self.memory_backup.retrieve_oldest() {
tracing::debug!(target: TARGET, order = T::ORDER, rows = commit.rows.len(), "retrieved oldest failed commit from memory");
tracing::debug!(rows = commit.rows.len(), "oldest commit from memory");
return Some(RetrievedFailedCommit {
source: BackupSource::Memory,
commit,
});
}

match self.disk_backup.retrieve_oldest() {
Ok(maybe_commit) => {
maybe_commit.inspect(|data| {
tracing::debug!(target: TARGET, order = T::ORDER, rows = data.stats.total_batches, "retrieved oldest failed commit from disk");
Ok(maybe_commit) => maybe_commit
.inspect(|data| {
tracing::debug!(rows = data.stats.total_batches, "oldest commit from disk");
})
.map(|data| RetrievedFailedCommit {
source: BackupSource::Disk(data.key),
commit: data.value,
})
}
}),
Err(e) => {
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to retrieve oldest failed commit from disk");
tracing::error!(?e, "failed to retrieve oldest commit from disk");
MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref());
None
}
}
}

/// Populates the inserter with the rows from the given failed commit.
#[tracing::instrument(name = "indexer_backup_populate_inserter", skip_all, fields(order = T::ORDER))]
async fn populate_inserter(&mut self, commit: &FailedCommit<T>) {
for row in &commit.rows {
let value_ref = T::to_row_ref(row);

if let Err(e) = self.inserter.write(value_ref).await {
MetricsType::increment_write_failures(e.to_string());
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter");
tracing::error!(?e, "failed to write to inserter");
continue;
}
}
Expand All @@ -646,18 +655,37 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
let start = Instant::now();
match self.disk_backup.delete(key) {
Ok(stats) => {
tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "deleted failed commit from disk");
tracing::debug!(total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "deleted failed commit from disk");
MetricsType::set_disk_backup_size(
stats.size_bytes,
stats.total_batches,
T::ORDER,
);
}
Err(e) => {
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to purge failed commit from disk");
tracing::error!(?e, "failed to purge failed commit from disk");
}
}
tracing::debug!(target: TARGET, order = T::ORDER, "purged committed failed commit from disk");
tracing::debug!("purged committed failed commit from disk");
}
}

#[tracing::instrument(name = "indexer_backup_force_commit", skip_all, fields(order = T::ORDER))]
async fn force_commit(&mut self, oldest: RetrievedFailedCommit<T>) {
let start = Instant::now();
match self.inserter.force_commit().await {
Ok(quantities) => {
tracing::info!(?quantities, "successfully backed up");
MetricsType::process_backup_data_quantities(&quantities.into());
MetricsType::record_batch_commit_time(start.elapsed());
self.interval.reset();
self.purge_commit(&oldest).await;
}
Err(e) => {
tracing::error!(?e, quantities = ?oldest.commit.quantities, "failed to commit bundle to clickhouse from backup");
MetricsType::increment_commit_failures(e.to_string());
self.last_cached = Some(oldest);
}
}
}

Expand All @@ -669,7 +697,7 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
tokio::select! {
maybe_failed_commit = self.rx.recv() => {
let Some(failed_commit) = maybe_failed_commit else {
tracing::error!(target: TARGET, order = T::ORDER, "backup channel closed");
tracing::warn!(order = T::ORDER, "backup channel closed");
break;
};

Expand All @@ -683,63 +711,48 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
};

self.populate_inserter(&oldest.commit).await;

let start = Instant::now();
match self.inserter.force_commit().await {
Ok(quantities) => {
tracing::info!(target: TARGET, order = T::ORDER, ?quantities, "successfully backed up");
MetricsType::process_backup_data_quantities(&quantities.into());
MetricsType::record_batch_commit_time(start.elapsed());
self.interval.reset();
self.purge_commit(&oldest).await;
}
Err(e) => {
tracing::error!(target: TARGET, order = T::ORDER, ?e, quantities = ?oldest.commit.quantities, "failed to commit bundle to clickhouse from backup");
MetricsType::increment_commit_failures(e.to_string());
self.last_cached = Some(oldest);
continue;
}
}
self.force_commit(oldest).await;
}
}
}
}

/// To call on shutdown, tries make a last-resort attempt to post back to Clickhouse all
/// in-memory data.
#[tracing::instrument(name = "indexer_backup_end", skip(self), fields(order = T::ORDER))]
pub async fn end(mut self) {
for failed_commit in self.memory_backup.failed_commits.drain(..) {
for row in &failed_commit.rows {
let value_ref = T::to_row_ref(row);

if let Err(e) = self.inserter.write(value_ref).await {
tracing::error!( target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter during shutdown");
tracing::error!(?e, "failed to write to inserter");
MetricsType::increment_write_failures(e.to_string());
continue;
}
}
if let Err(e) = self.inserter.force_commit().await {
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit backup to CH during shutdown, trying disk");
tracing::error!(?e, "failed to force commit, trying disk");
MetricsType::increment_commit_failures(e.to_string());
}

if let Err(e) = self.disk_backup.save(&failed_commit) {
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit to disk backup during shutdown");
tracing::error!(?e, "failed to write commit to disk");
MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref());
}
}

if let Err(e) = self.disk_backup.flush().await {
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to flush disk backup during shutdown");
tracing::error!(?e, "failed to flush disk");
MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref());
} else {
tracing::info!(target: TARGET, order = T::ORDER, "flushed disk backup during shutdown");
tracing::info!("flushed disk");
}

if let Err(e) = self.inserter.end().await {
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to end backup inserter during shutdown");
tracing::error!(?e, "failed to end inserter");
} else {
tracing::info!(target: TARGET, order = T::ORDER, "successfully ended backup inserter during shutdown");
tracing::info!("ended backup inserter during shutdown");
}
}
}
Expand Down
37 changes: 22 additions & 15 deletions crates/rbuilder-utils/src/clickhouse/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ use std::{
time::{Duration, Instant},
};

/// The tracing target for this indexer crate. @PendingDX REMOVE
const TARGET: &str = "indexer";

use clickhouse::{
error::Result as ClickhouseResult, inserter::Inserter, Client as ClickhouseClient, Row,
};
use tokio::sync::mpsc;
use tracing::Instrument;

use crate::{
clickhouse::{
Expand Down Expand Up @@ -91,12 +89,11 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> ClickhouseInserter<T, MetricsTyp

/// Writes the provided order into the inner Clickhouse writer buffer.
async fn write(&mut self, row: T) {
let hash = row.hash();
let value_ref = ClickhouseRowExt::to_row_ref(&row);

if let Err(e) = self.inner.write(value_ref).await {
MetricsType::increment_write_failures(e.to_string());
tracing::error!(target: TARGET, order = T::ORDER, ?e, %hash, "failed to write to clickhouse inserter");
tracing::error!(?e, "failed to write to inserter");
return;
}

Expand All @@ -114,9 +111,9 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> ClickhouseInserter<T, MetricsTyp
match self.inner.commit().await {
Ok(quantities) => {
if quantities == Quantities::ZERO.into() {
tracing::trace!(target: TARGET, order = T::ORDER, "committed to inserter");
tracing::trace!("committed batch to inserter");
} else {
tracing::debug!(target: TARGET, order = T::ORDER, ?quantities, "inserted batch to clickhouse");
tracing::debug!(?quantities, "committed batch to server");
MetricsType::process_quantities(&quantities.into());
MetricsType::record_batch_commit_time(start.elapsed());
// Clear the backup rows.
Expand All @@ -125,13 +122,13 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> ClickhouseInserter<T, MetricsTyp
}
Err(e) => {
MetricsType::increment_commit_failures(e.to_string());
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit bundle to clickhouse");
tracing::error!(?e, "failed to commit bundle");

let rows = std::mem::take(&mut self.rows_backup);
let failed_commit = FailedCommit::new(rows, pending);

if let Err(e) = self.backup_tx.try_send(failed_commit) {
tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to send rows backup");
tracing::error!(?e, "failed to send rows backup");
}
}
}
Expand Down Expand Up @@ -194,16 +191,26 @@ impl<T: ClickhouseIndexableOrder, MetricsType: Metrics> InserterRunner<T, Metric
.with_interval(Duration::from_secs(4));

while let Some(order) = self.rx.recv().await {
tracing::trace!(target: TARGET, order = T::ORDER, hash = %order.hash(), "received data to index");
sampler.sample(|| {
MetricsType::set_queue_size(self.rx.len(), T::ORDER);
});

let row = order.to_row(self.builder_name.clone());
self.inserter.write(row).await;
self.inserter.commit().await;
self.on_order(order).await;
}
tracing::error!(target: TARGET, order = T::ORDER, "tx channel closed, indexer will stop running");
tracing::error!(
order = T::ORDER,
"tx channel closed, indexer will stop running"
);
}

/// Process a new order to index.
#[tracing::instrument(skip_all, name = "indexer_inserter", fields(order = T::ORDER, hash = %order.hash()))]
pub async fn on_order(&mut self, order: T) {
tracing::trace!("received order to index");

let span = tracing::Span::current();
let row = order.to_row(self.builder_name.clone());
self.inserter.write(row).instrument(span.clone()).await;
self.inserter.commit().instrument(span).await;
}

pub async fn end(self) -> ClickhouseResult<Quantities> {
Expand Down