Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ jobs:
rust:
- "1.85"
- "1.88"
os:
- windows
- linux
include:
- os: windows
target: x86_64-pc-windows-gnu
timeout-minutes: 10
runs-on: ubuntu-24.04
steps:
Expand All @@ -170,10 +176,14 @@ jobs:
components: clippy
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- uses: Swatinem/rust-cache@98c8021b550208e191a6a3145459bfc9fb29c4c0
- run: just fetch
- run: just clippy-all
- run: sudo apt-get update && sudo apt-get install mingw-w64 -y
if: matrix.os == 'windows'
- run: just "target=${{ matrix.target }}" fetch
- run: just "target=${{ matrix.target }}" clippy-all
- run: just test-build
if: matrix.os == 'linux'
- run: just test
if: matrix.os == 'linux'

kubert-check-feature:
needs: changed
Expand Down
6 changes: 4 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export CXX := 'clang++-19'
export RUSTFLAGS := env_var_or_default('RUSTFLAGS', '--cfg tokio_unstable')
export RUSTDOCFLAGS := env_var_or_default('RUSTDOCFLAGS', '--cfg tokio_unstable')

target := ''

#
# Recipes
#
Expand All @@ -28,13 +30,13 @@ md-lint:

# Fetch dependencies
fetch:
@just-cargo fetch
@just-cargo target={{ target }} fetch

check *args:
@just-cargo check --workspace --all-targets {{ _features }} {{ args }}

clippy-all *args:
@just-cargo clippy --workspace --all-targets --all-features {{ args }}
@just-cargo target={{ target }} clippy --workspace --all-targets --all-features {{ args }}
Comment on lines +33 to +39
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to make this consistent -- if we need it in clippy-all, we should have it on clippy (and check, etc)


clippy *args:
@just-cargo clippy -p kubert --all-targets {{ _features }} {{ args }}
Expand Down
2 changes: 1 addition & 1 deletion kubert/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl<S> Builder<S> {
cb.build().await?
};

let (shutdown, shutdown_rx) = shutdown::sigint_or_sigterm()?;
let (shutdown, shutdown_rx) = shutdown::register()?;
let admin = self.admin.bind()?;
Ok(Runtime {
client,
Expand Down
61 changes: 35 additions & 26 deletions kubert/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,28 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::signal::unix::{signal, Signal, SignalKind};
use tracing::debug;

#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub use drain::Watch;

#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
mod signals;

#[cfg(windows)]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
use signals::windows::Signals;

#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
use signals::unix::Signals;

/// Drives shutdown by watching signals
#[derive(Debug)]
#[must_use = "call `Shutdown::on_signal` to await a signal"]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub struct Shutdown {
interrupt: Signal,
terminate: Signal,
signals: Signals,
tx: drain::Signal,
}

Expand Down Expand Up @@ -52,44 +61,49 @@ pin_project_lite::pin_project! {
/// signal is received while waiting for watches to be dropped, the shutdown is aborted.
///
/// If a second signal is received while waiting for shutdown to complete, the process
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
#[deprecated(note = "please use `register` instead")]
pub fn sigint_or_sigterm() -> Result<(Shutdown, Watch), RegisterError> {
let interrupt = signal(SignalKind::interrupt())?;
let terminate = signal(SignalKind::terminate())?;
register()
}

/// Creates a shutdown channel
///
/// [`Shutdown`] watches for `SIGINT` and `SIGTERM` signals on Linux or Ctrl-Shutdown on
/// Windows. When a signal is received, [`Watch`] instances are notifed and, when all watches are
/// dropped, the shutdown is completed. If a second signal is received while waiting for watches
/// to be dropped, the shutdown is aborted.
///
/// If a second signal is received while waiting for shutdown to complete, the process
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub fn register() -> Result<(Shutdown, Watch), RegisterError> {
let signals = Signals::new()?;

let (tx, rx) = drain::channel();
let shutdown = Shutdown {
interrupt,
terminate,
tx,
};
let shutdown = Shutdown { signals, tx };
Ok((shutdown, rx))
}

impl Shutdown {
/// Watches for signals and drives shutdown
///
/// When a `SIGINT` or `SIGTERM` signal is received, the shutdown is initiated, notifying all
/// When a signal is received, the shutdown is initiated, notifying all
/// [`Watch`] instances. When all watches are dropped, the shutdown is completed.
///
/// If a second signal is received while waiting for watches to be dropped, this future
/// completes immediately with an [`Aborted`] error.
pub async fn signaled(self) -> Result<(), Aborted> {
let Self {
mut interrupt,
mut terminate,
mut signals,
mut tx,
} = self;

tokio::select! {
_ = interrupt.recv() => {
debug!("Received SIGINT; draining");
_ = signals.recv() => {
debug!("draining");
},

_ = terminate.recv() => {
debug!("Received SIGTERM; draining");
}

_ = tx.closed() => {
debug!("All shutdown receivers dropped");
// Drain can't do anything if the receivers have been dropped
Expand All @@ -103,15 +117,10 @@ impl Shutdown {
Ok(())
},

_ = interrupt.recv() => {
debug!("Received SIGINT; aborting");
_ = signals.recv() => {
debug!("aborting");
Err(Aborted(()))
},

_ = terminate.recv() => {
debug!("Received SIGTERM; aborting");
Err(Aborted(()))
}
}
}
}
Expand Down
62 changes: 62 additions & 0 deletions kubert/src/shutdown/signals.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#[cfg(unix)]
pub(crate) mod unix {
use crate::shutdown::RegisterError;
use tokio::signal::unix::{signal, Signal, SignalKind};

#[derive(Debug)]
#[must_use = "call `Shutdown::on_signal` to await a signal"]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub(crate) struct Signals {
interrupt: Signal,
terminate: Signal,
}

impl Signals {
pub(crate) fn new() -> Result<Self, RegisterError> {
let interrupt = signal(SignalKind::interrupt())?;
let terminate = signal(SignalKind::terminate())?;
Ok(Self {
interrupt,
terminate,
})
}

pub(crate) async fn recv(&mut self) {
tokio::select! {
_ = self.interrupt.recv() => {
tracing::debug!("Received SIGINT");

},
_ = self.terminate.recv() => {
tracing::debug!("Received SIGTERM");
}
}
}
}
}

#[cfg(windows)]
pub(crate) mod windows {
use crate::shutdown::RegisterError;
use tokio::signal::windows::{ctrl_shutdown, CtrlShutdown};

#[derive(Debug)]
#[must_use = "call `Shutdown::on_signal` to await a signal"]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub(crate) struct Signals(CtrlShutdown);

impl Signals {
pub(crate) fn new() -> Result<Self, RegisterError> {
// On Windows, we use Ctrl-Shutdown as this is what Kubernetes uses to signal
// shutdown to windows containers. For reference, see:
// https://kubernetes.io/docs/concepts/windows/intro/#compatibility-v1-pod
let ctrl_shutdown = ctrl_shutdown()?;
Ok(Self(ctrl_shutdown))
}

pub(crate) async fn recv(&mut self) {
self.0.recv().await;
tracing::debug!("Received Ctrl-Shutdown");
}
}
}
Loading