Skip to content

Commit 66bff71

Browse files
Merge pull request #48 from CleverCloud/clg/sse
feat(sse): add SSE client
2 parents 9a09db0 + 6d71c95 commit 66bff71

File tree

5 files changed

+1597
-82
lines changed

5 files changed

+1597
-82
lines changed

Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ keywords = ["clevercloud", "client", "logging", "metrics", "oauth1a"]
1616
base64 = { version = "^0.22.1", optional = true }
1717
bytes = { version = "^1.10.1", features = ["serde"], optional = true }
1818
crypto-common = { version = "^0.1.6", optional = true }
19+
futures = { version = "^0.3.31", optional = true }
1920
hmac = { version = "^0.12.1", features = ["std"], optional = true }
2021
log = { version = "^0.4.27", optional = true }
22+
memchr = { version = "^2.7.4", optional = true }
23+
mime = { version = "^0.3.17", optional = true }
2124
prometheus = { version = "^0.14.0", optional = true }
2225
reqwest = { version = "^0.12.15", default-features = true, features = [
2326
"rustls-tls-webpki-roots",
@@ -28,6 +31,7 @@ reqwest = { version = "^0.12.15", default-features = true, features = [
2831
"zstd",
2932
"json",
3033
"hickory-dns",
34+
"stream",
3135
], optional = true }
3236
serde = { version = "^1.0.219", features = ["derive"], optional = true }
3337
serde_json = { version = "^1.0.140", features = [
@@ -36,13 +40,21 @@ serde_json = { version = "^1.0.140", features = [
3640
], optional = true }
3741
sha2 = { version = "^0.10.8", optional = true }
3842
thiserror = { version = "^2.0.12", optional = true }
43+
tokio = { version = "^1.44.2", optional = true, default-features = false, features = [
44+
"time",
45+
] }
3946
tracing = { version = "^0.1.41", optional = true }
4047
url = { version = "^2.5.4", default-features = false, features = [
4148
"serde",
4249
], optional = true }
4350
urlencoding = { version = "^2.1.3", optional = true }
4451
uuid = { version = "^1.16.0", features = ["serde", "v4"], optional = true }
4552

53+
[dev-dependencies]
54+
anyhow = { version = "^1.0.98" }
55+
axum = { version = "^0.8.3" }
56+
tokio = { version = "^1.44.2", features = ["full"] }
57+
4658
[features]
4759
default = ["client", "logging"]
4860
client = [
@@ -62,3 +74,4 @@ client = [
6274
logging = ["log", "tracing/log-always"]
6375
tracing = ["dep:tracing"]
6476
metrics = ["prometheus"]
77+
sse = ["dep:futures", "dep:mime", "dep:memchr", "dep:tokio"]

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4444
| logging | Use the `log` facility crate to print logs |
4545
| metrics | Use `prometheus` crates to register metrics |
4646
| tracing | Use `tracing` crate to add `tracing::instrument` on functions |
47+
| sse | Enables streaming Server-Sent Events (SSE) |
4748

4849
### Metrics
4950

src/client/mod.rs

Lines changed: 82 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ use serde::{Deserialize, Serialize, de::DeserializeOwned};
3030
use sha2::Sha512;
3131
use uuid::Uuid;
3232

33+
#[cfg(feature = "sse")]
34+
pub mod sse;
35+
3336
// -----------------------------------------------------------------------------
3437
// Exports
3538

@@ -86,7 +89,7 @@ pub trait Request {
8689
fn execute(
8790
&self,
8891
request: reqwest::Request,
89-
) -> impl Future<Output = Result<reqwest::Response, Self::Error>> + Send;
92+
) -> impl Future<Output = Result<reqwest::Response, Self::Error>> + Send + 'static;
9093
}
9194

9295
// -----------------------------------------------------------------------------
@@ -362,7 +365,7 @@ impl OAuth1 for Signer {
362365
hasher.update(base.as_bytes());
363366

364367
let digest = hasher.finalize().into_bytes();
365-
Ok(BASE64_ENGINE.encode(digest.as_slice()))
368+
Ok(urlencoding::encode(&BASE64_ENGINE.encode(digest.as_slice())).into_owned())
366369
}
367370

368371
#[cfg_attr(feature = "tracing", tracing::instrument)]
@@ -433,8 +436,9 @@ pub enum ClientError {
433436
// -----------------------------------------------------------------------------
434437
// Client structure
435438

436-
pub const APPLICATION_JSON: &str = "application/json";
437-
pub const UTF8: &str = "utf-8";
439+
pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
440+
441+
pub const UTF8: HeaderValue = HeaderValue::from_static("utf-8");
438442

439443
#[derive(Clone, Debug)]
440444
pub struct Client {
@@ -462,22 +466,19 @@ impl Request for Client {
462466
endpoint.parse().map_err(ClientError::ParseUrlEndpoint)?,
463467
);
464468

465-
request.headers_mut().insert(
466-
header::CONTENT_TYPE,
467-
HeaderValue::from_static(APPLICATION_JSON),
468-
);
469-
470469
request
471470
.headers_mut()
472-
.insert(header::CONTENT_LENGTH, HeaderValue::from(buf.len()));
471+
.insert(header::CONTENT_TYPE, APPLICATION_JSON);
473472

474473
request
475474
.headers_mut()
476-
.insert(header::ACCEPT_CHARSET, HeaderValue::from_static(UTF8));
475+
.insert(header::CONTENT_LENGTH, HeaderValue::from(buf.len()));
476+
477+
request.headers_mut().insert(header::ACCEPT_CHARSET, UTF8);
477478

478479
request
479480
.headers_mut()
480-
.insert(header::ACCEPT, HeaderValue::from_static(APPLICATION_JSON));
481+
.insert(header::ACCEPT, APPLICATION_JSON);
481482

482483
*request.body_mut() = Some(buf.into());
483484

@@ -503,80 +504,81 @@ impl Request for Client {
503504
serde_json::from_reader(buf.reader()).map_err(ClientError::Deserialize)
504505
}
505506

506-
#[cfg_attr(feature = "tracing", tracing::instrument)]
507-
async fn execute(
507+
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
508+
fn execute(
508509
&self,
509510
mut request: reqwest::Request,
510-
) -> Result<reqwest::Response, Self::Error> {
511-
let method = request.method().to_string();
512-
let endpoint = request.url().to_string();
513-
if !request.headers().contains_key(&header::AUTHORIZATION) {
514-
match &self.credentials {
515-
Some(Credentials::Bearer { token }) => {
516-
request.headers_mut().insert(
517-
header::AUTHORIZATION,
518-
HeaderValue::from_str(&format!("Bearer {token}"))
519-
.map_err(ClientError::SerializeHeaderValue)?,
520-
);
511+
) -> impl Future<Output = Result<reqwest::Response, Self::Error>> + 'static {
512+
let client = self.clone();
513+
514+
async move {
515+
let method = request.method().to_string();
516+
let endpoint = request.url().to_string();
517+
518+
if !request.headers().contains_key(&header::AUTHORIZATION) {
519+
match &client.credentials {
520+
Some(Credentials::Bearer { token }) => {
521+
request.headers_mut().insert(
522+
header::AUTHORIZATION,
523+
HeaderValue::from_str(&format!("Bearer {token}"))
524+
.map_err(ClientError::SerializeHeaderValue)?,
525+
);
526+
}
527+
Some(Credentials::Basic { username, password }) => {
528+
let token = BASE64_ENGINE.encode(format!("{username}:{password}"));
529+
530+
request.headers_mut().insert(
531+
header::AUTHORIZATION,
532+
HeaderValue::from_str(&format!("Basic {token}",))
533+
.map_err(ClientError::SerializeHeaderValue)?,
534+
);
535+
}
536+
Some(credentials) => {
537+
request.headers_mut().insert(
538+
header::AUTHORIZATION,
539+
Signer::try_from(credentials.to_owned())
540+
.map_err(ClientError::Signer)?
541+
.sign(&method, &endpoint)
542+
.map_err(ClientError::Digest)?
543+
.parse()
544+
.map_err(ClientError::SerializeHeaderValue)?,
545+
);
546+
}
547+
_ => {}
521548
}
522-
Some(Credentials::Basic { username, password }) => {
523-
let token = BASE64_ENGINE.encode(format!("{username}:{password}"));
524-
525-
request.headers_mut().insert(
526-
header::AUTHORIZATION,
527-
HeaderValue::from_str(&format!("Basic {token}"))
528-
.map_err(ClientError::SerializeHeaderValue)?,
529-
);
530-
}
531-
Some(credentials) => {
532-
let signer =
533-
Signer::try_from(credentials.to_owned()).map_err(ClientError::Signer)?;
534-
535-
request.headers_mut().insert(
536-
header::AUTHORIZATION,
537-
signer
538-
.sign(&method, &endpoint)
539-
.map_err(ClientError::Digest)?
540-
.parse()
541-
.map_err(ClientError::SerializeHeaderValue)?,
542-
);
543-
}
544-
_ => {}
545549
}
546-
}
547550

548-
#[cfg(feature = "logging")]
549-
if log_enabled!(Level::Trace) {
551+
#[cfg(feature = "logging")]
550552
trace!("execute request, endpoint: '{endpoint}', method: '{method}'");
551-
}
552553

553-
#[cfg(feature = "metrics")]
554-
let instant = Instant::now();
555-
let res = self
556-
.inner
557-
.execute(request)
558-
.await
559-
.map_err(ClientError::Request)?;
560-
561-
#[cfg(feature = "metrics")]
562-
{
563-
let status = res.status();
564-
565-
CLIENT_REQUEST
566-
.with_label_values(&[&endpoint, &method, &status.as_u16().to_string()])
567-
.inc();
568-
569-
CLIENT_REQUEST_DURATION
570-
.with_label_values(&[
571-
&endpoint,
572-
&method,
573-
&status.as_u16().to_string(),
574-
&"us".to_string(),
575-
])
576-
.inc_by(Instant::now().duration_since(instant).as_micros() as f64);
577-
}
554+
#[cfg(feature = "metrics")]
555+
let instant = Instant::now();
556+
let res = client
557+
.inner
558+
.execute(request)
559+
.await
560+
.map_err(ClientError::Request)?;
561+
562+
#[cfg(feature = "metrics")]
563+
{
564+
let status = res.status();
565+
566+
CLIENT_REQUEST
567+
.with_label_values(&[&endpoint, &method, &status.as_u16().to_string()])
568+
.inc();
569+
570+
CLIENT_REQUEST_DURATION
571+
.with_label_values(&[
572+
&endpoint,
573+
&method,
574+
&status.as_u16().to_string(),
575+
&"us".to_string(),
576+
])
577+
.inc_by(Instant::now().duration_since(instant).as_micros() as f64);
578+
}
578579

579-
Ok(res)
580+
Ok(res)
581+
}
580582
}
581583
}
582584

@@ -593,11 +595,9 @@ impl RestClient for Client {
593595
endpoint.parse().map_err(ClientError::ParseUrlEndpoint)?,
594596
);
595597

596-
req.headers_mut()
597-
.insert(header::ACCEPT_CHARSET, HeaderValue::from_static(UTF8));
598+
req.headers_mut().insert(header::ACCEPT_CHARSET, UTF8);
598599

599-
req.headers_mut()
600-
.insert(header::ACCEPT, HeaderValue::from_static(APPLICATION_JSON));
600+
req.headers_mut().insert(header::ACCEPT, APPLICATION_JSON);
601601

602602
let res = self.execute(req).await?;
603603
let status = res.status();

0 commit comments

Comments
 (0)