From 4e35f687044d4157c3f2285fc99fd32ded1a2cff Mon Sep 17 00:00:00 2001 From: Vasiliy Yorkin Date: Sun, 27 Jul 2025 15:59:28 +0300 Subject: [PATCH 1/9] Fix clippy warnings --- src/decoder.rs | 22 +++++++++------------- src/encoder.rs | 10 +++++----- src/event.rs | 14 ++++---------- src/lines.rs | 4 ++-- tests/decode.rs | 8 ++++---- 5 files changed, 24 insertions(+), 34 deletions(-) diff --git a/src/decoder.rs b/src/decoder.rs index 481a86a..a4c347c 100644 --- a/src/decoder.rs +++ b/src/decoder.rs @@ -15,7 +15,6 @@ where Decoder { lines: Lines::new(reader), processed_bom: false, - buffer: vec![], last_event_id: None, event_type: None, data: vec![], @@ -29,9 +28,6 @@ pub struct Decoder { lines: Lines, /// Have we processed the optional Byte Order Marker on the first line? processed_bom: bool, - /// Was the last character of the previous line a \r? - /// Bytes that were fed to the decoder but do not yet form a message. - buffer: Vec, /// The _last event ID_ buffer. last_event_id: Option, /// The _event type_ buffer. @@ -49,11 +45,11 @@ impl Decoder { None } else { // Removing tailing newlines - if self.data.ends_with(&[b'\n']) { + if self.data.ends_with(b"\n") { self.data.pop(); } let name = self.event_type.take().unwrap_or("message".to_string()); - let data = std::mem::replace(&mut self.data, vec![]); + let data = std::mem::take(&mut self.data); // The _last event ID_ buffer persists between messages. let id = self.last_event_id.clone(); Some(Event::new_msg(name, data, id)) @@ -83,7 +79,7 @@ impl Stream for Decoder { &line }; - log::trace!("> new line: {:?}", line); + log::trace!("> new line: {line}"); let mut parts = line.splitn(2, ':'); loop { match (parts.next(), parts.next()) { @@ -105,7 +101,7 @@ impl Stream for Decoder { } // If the field name is "data": (Some("data"), value) => { - log::trace!("> data: {:?}", &value); + log::trace!("> data: {value:?}"); // Append the field value to the data buffer, if let Some(value) = value { self.data.extend(strip_leading_space_b(value.as_bytes())); @@ -122,13 +118,13 @@ impl Stream for Decoder { // return Poll::Ready(Ok(self.take_message()).transpose()); } // Comment - (Some(""), Some(_)) => (log::trace!("> comment")), + (Some(""), Some(_)) => log::trace!("> comment"), // End of frame (Some(""), None) => { log::trace!("> end of frame"); match self.take_message() { Some(event) => { - log::trace!("> end of frame [event]: {:?}", event); + log::trace!("> end of frame [event]: {event:?}"); return Poll::Ready(Some(Ok(event))); } None => { @@ -149,15 +145,15 @@ impl Stream for Decoder { /// Remove a leading space (code point 0x20) from a string slice. fn strip_leading_space(input: &str) -> &str { if input.starts_with(' ') { - &input[1..] + input.strip_prefix(' ').unwrap() } else { input } } fn strip_leading_space_b(input: &[u8]) -> &[u8] { - if input.starts_with(&[b' ']) { - &input[1..] + if input.starts_with(b" ") { + input.strip_prefix(b" ").unwrap() } else { input } diff --git a/src/encoder.rs b/src/encoder.rs index 6528527..2fbb75d 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -108,17 +108,17 @@ impl Sender { ) -> io::Result<()> { // Write the event name if let Some(name) = name.into() { - self.inner_send(format!("event:{}\n", name)).await?; + self.inner_send(format!("event:{name}\n")).await?; } // Write the id if let Some(id) = id { - self.inner_send(format!("id:{}\n", id)).await?; + self.inner_send(format!("id:{id}\n")).await?; } // Write the data section, and end. for line in data.lines() { - let msg = format!("data:{}\n", line); + let msg = format!("data:{line}\n"); self.inner_send(msg).await?; } self.inner_send("\n").await?; @@ -130,12 +130,12 @@ impl Sender { pub async fn send_retry(&self, dur: Duration, id: Option<&str>) -> io::Result<()> { // Write the id if let Some(id) = id { - self.inner_send(format!("id:{}\n", id)).await?; + self.inner_send(format!("id:{id}\n")).await?; } // Write the retry section, and end. let dur = dur.as_secs_f64() as u64; - let msg = format!("retry:{}\n\n", dur); + let msg = format!("retry:{dur}\n\n"); self.inner_send(msg).await?; Ok(()) } diff --git a/src/event.rs b/src/event.rs index 224164c..f8e0ac7 100644 --- a/src/event.rs +++ b/src/event.rs @@ -5,7 +5,7 @@ use std::time::Duration; /// The kind of SSE event sent. #[derive(Debug, Eq, PartialEq)] pub enum Event { - /// A retry frame, signaling a new retry duration must be used.. + /// A retry frame, signaling a new retry duration must be used. Retry(Duration), /// A data frame containing a message. Message(Message), @@ -22,19 +22,13 @@ impl Event { Self::Retry(Duration::from_secs_f64(dur as f64)) } - /// Check whether this is a Retry variant. + /// Check whether this is a `Retry` variant. pub fn is_retry(&self) -> bool { - match *self { - Self::Retry(_) => true, - _ => false, - } + matches!(*self, Self::Retry(_)) } /// Check whether this is a `Message` variant. pub fn is_message(&self) -> bool { - match *self { - Self::Message(_) => true, - _ => false, - } + matches!(*self, Self::Message(_)) } } diff --git a/src/lines.rs b/src/lines.rs index 26a494d..76a604b 100644 --- a/src/lines.rs +++ b/src/lines.rs @@ -64,7 +64,7 @@ impl Stream for Lines { if this.buf.ends_with('\r') { this.buf.pop(); } - Poll::Ready(Some(Ok(mem::replace(this.buf, String::new())))) + Poll::Ready(Some(Ok(std::mem::take(this.buf)))) } } @@ -76,7 +76,7 @@ fn read_line_internal( read: &mut usize, ) -> Poll> { let ret = ready!(read_until_internal(reader, cx, bytes, read)); - if str::from_utf8(&bytes).is_err() { + if str::from_utf8(bytes).is_err() { Poll::Ready(ret.and_then(|_| { Err(io::Error::new( io::ErrorKind::InvalidData, diff --git a/tests/decode.rs b/tests/decode.rs index 83e67cb..63ae088 100644 --- a/tests/decode.rs +++ b/tests/decode.rs @@ -39,7 +39,7 @@ async fn decode_stream_when_fed_by_line() -> http_types::Result<()> { let reader = decode(Cursor::new(":ok\nevent:message\nid:id1\ndata:data1\n\n")); let res = reader.map(|i| i.unwrap()).collect::>().await; assert_eq!(res.len(), 1); - assert_message(res.get(0).unwrap(), "message", "data1", Some("id1")); + assert_message(res.first().unwrap(), "message", "data1", Some("id1")); Ok(()) } @@ -127,12 +127,12 @@ async fn comments() -> http_types::Result<()> { let longstring = "x".repeat(2049); let mut input = concat!("data:1\r", ":\0\n", ":\r\n", "data:2\n", ":").to_string(); input.push_str(&longstring); - input.push_str("\r"); + input.push('\r'); input.push_str("data:3\n"); input.push_str(":data:fail\r"); - input.push_str(":"); + input.push(':'); input.push_str(&longstring); - input.push_str("\n"); + input.push('\n'); input.push_str("data:4\n\n"); let mut reader = decode(Cursor::new(input)); assert_message( From aaeb58d04085b429482f1050521a7d9a31df1e32 Mon Sep 17 00:00:00 2001 From: Vasiliy Yorkin Date: Sun, 27 Jul 2025 16:01:41 +0300 Subject: [PATCH 2/9] Upgrade memchr: 2.3.3 -> 2.7.5 --- Cargo.toml | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 051c4c2..22425d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,7 @@ readme = "README.md" edition = "2018" keywords = ["async", "server-sent-events", "sse", "client", "server"] categories = [] -authors = [ - "Renée Kooi ", - "Yoshua Wuyts ", -] +authors = ["Renée Kooi ", "Yoshua Wuyts "] [features] @@ -20,7 +17,7 @@ authors = [ futures-lite = "1.11.3" http-types = { version = "2.10.0", default-features = false } log = "0.4.8" -memchr = "2.3.3" +memchr = "2.7.5" pin-project-lite = "0.2.7" async-channel = "1.1.1" From b878cfff43ba95d16b92b9373d6003085ba7ecfe Mon Sep 17 00:00:00 2001 From: Vasiliy Yorkin Date: Sun, 27 Jul 2025 16:02:33 +0300 Subject: [PATCH 3/9] Upgrade pin-project-lite: 0.2.7 -> 0.2.16 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 22425d7..b96af93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ futures-lite = "1.11.3" http-types = { version = "2.10.0", default-features = false } log = "0.4.8" memchr = "2.7.5" -pin-project-lite = "0.2.7" +pin-project-lite = "0.2.16" async-channel = "1.1.1" [dev-dependencies] From 94d5bd701dc2a1717a083a98d48f7aac87f295a4 Mon Sep 17 00:00:00 2001 From: Vasiliy Yorkin Date: Sun, 27 Jul 2025 16:03:07 +0300 Subject: [PATCH 4/9] Upgrade log: 0.4.8 -> 0.4.27 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b96af93..bac06aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ authors = ["Renée Kooi ", "Yoshua Wuyts " [dependencies] futures-lite = "1.11.3" http-types = { version = "2.10.0", default-features = false } -log = "0.4.8" +log = "0.4.27" memchr = "2.7.5" pin-project-lite = "0.2.16" async-channel = "1.1.1" From 5e03441908a24187cc103e2bc51124cb486d5c23 Mon Sep 17 00:00:00 2001 From: Vasiliy Yorkin Date: Sun, 27 Jul 2025 16:03:50 +0300 Subject: [PATCH 5/9] Upgrade http-types: 2.10.0 -> 2.12.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index bac06aa..2104b73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ authors = ["Renée Kooi ", "Yoshua Wuyts " [dependencies] futures-lite = "1.11.3" -http-types = { version = "2.10.0", default-features = false } +http-types = { version = "2.12.0", default-features = false } log = "0.4.27" memchr = "2.7.5" pin-project-lite = "0.2.16" From 7d29efef60b49d96bf8f61f3a5ed3c4f6e94a1d9 Mon Sep 17 00:00:00 2001 From: Vasiliy Yorkin Date: Sun, 27 Jul 2025 16:05:32 +0300 Subject: [PATCH 6/9] Upgrade femme: 2.0.0 -> 2.2.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 2104b73..4b3edd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,5 +22,5 @@ pin-project-lite = "0.2.16" async-channel = "1.1.1" [dev-dependencies] -femme = "2.0.0" +femme = "2.2.1" async-std = { version = "1.6.0", features = ["attributes", "unstable"] } From 3051c13ec519a58853337b94ce9ce0a1faf750f2 Mon Sep 17 00:00:00 2001 From: Vasiliy Yorkin Date: Sun, 27 Jul 2025 16:06:12 +0300 Subject: [PATCH 7/9] Upgrade async-std: 1.6.0 -> 1.13.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4b3edd6..faccbbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,4 +23,4 @@ async-channel = "1.1.1" [dev-dependencies] femme = "2.2.1" -async-std = { version = "1.6.0", features = ["attributes", "unstable"] } +async-std = { version = "1.13.1", features = ["attributes", "unstable"] } From 139be43defc8e12a9c08a6c42fefe270e2d193ad Mon Sep 17 00:00:00 2001 From: Vasiliy Yorkin Date: Sun, 27 Jul 2025 16:09:08 +0300 Subject: [PATCH 8/9] Upgrade futures-lite: 1.11.3 -> 2.6.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index faccbbb..f2ebee6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ authors = ["Renée Kooi ", "Yoshua Wuyts " [features] [dependencies] -futures-lite = "1.11.3" +futures-lite = "2.6.0" http-types = { version = "2.12.0", default-features = false } log = "0.4.27" memchr = "2.7.5" From 7b4603d25ee4186e4438717afe51ded919103758 Mon Sep 17 00:00:00 2001 From: Vasiliy Yorkin Date: Mon, 28 Jul 2025 22:54:07 +0300 Subject: [PATCH 9/9] Upgrade async-channel: 1.1.1 -> 2.5.0 --- Cargo.toml | 2 +- src/encoder.rs | 11 +++++++---- src/lib.rs | 2 +- src/lines.rs | 2 +- tests/encode.rs | 16 ++++++++-------- 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2ebee6..4353cab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ http-types = { version = "2.12.0", default-features = false } log = "0.4.27" memchr = "2.7.5" pin-project-lite = "0.2.16" -async-channel = "1.1.1" +async-channel = "2.5.0" [dev-dependencies] femme = "2.2.1" diff --git a/src/encoder.rs b/src/encoder.rs index 2fbb75d..6dcb00b 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -6,7 +6,9 @@ use std::io; use std::pin::Pin; use std::time::Duration; -pin_project_lite::pin_project! { +use pin_project_lite::pin_project; + +pin_project! { /// An SSE protocol encoder. #[derive(Debug)] pub struct Encoder { @@ -24,12 +26,13 @@ impl AsyncRead for Encoder { buf: &mut [u8], ) -> Poll> { let mut this = self.project(); + // Request a new buffer if current one is exhausted. if this.buf.len() <= *this.cursor { match ready!(this.receiver.as_mut().poll_next(cx)) { - Some(buf) => { - log::trace!("> Received a new buffer with len {}", buf.len()); - *this.buf = buf.into_boxed_slice(); + Some(new_buf) => { + log::trace!("> Received a new buffer with len {}", new_buf.len()); + *this.buf = new_buf.into_boxed_slice(); *this.cursor = 0; } None => { diff --git a/src/lib.rs b/src/lib.rs index 5a28592..4890ff9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ //! }); //! //! // Decode messages using a decoder. -//! let mut reader = decode(BufReader::new(encoder)); +//! let mut reader = decode(BufReader::new(Box::pin(encoder))); //! let event = reader.next().await.unwrap()?; //! // Match and handle the event //! diff --git a/src/lines.rs b/src/lines.rs index 76a604b..0aaea7e 100644 --- a/src/lines.rs +++ b/src/lines.rs @@ -32,7 +32,7 @@ pin_project! { impl Lines { pub(crate) fn new(reader: R) -> Lines where - R: AsyncBufRead + Unpin + Sized, + R: AsyncBufRead + Sized, { Lines { reader, diff --git a/tests/encode.rs b/tests/encode.rs index 56b0149..eda2ca0 100644 --- a/tests/encode.rs +++ b/tests/encode.rs @@ -31,7 +31,7 @@ async fn encode_message() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send("cat", "chashu", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cat", "chashu", None); Ok(()) @@ -42,7 +42,7 @@ async fn encode_message_some() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send(Some("cat"), "chashu", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cat", "chashu", None); Ok(()) @@ -53,7 +53,7 @@ async fn encode_message_data_only() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send(None, "chashu", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "message", "chashu", None); Ok(()) @@ -64,7 +64,7 @@ async fn encode_message_with_id() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send("cat", "chashu", Some("0")).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cat", "chashu", Some("0")); Ok(()) @@ -75,7 +75,7 @@ async fn encode_message_data_only_with_id() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send(None, "chashu", Some("0")).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "message", "chashu", Some("0")); Ok(()) @@ -89,7 +89,7 @@ async fn encode_retry() -> http_types::Result<()> { sender.send_retry(dur, None).await.unwrap(); }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_retry(&event, 12); Ok(()) @@ -100,7 +100,7 @@ async fn encode_multiline_message() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send("cats", "chashu\nnori", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cats", "chashu\nnori", None); Ok(()) @@ -112,7 +112,7 @@ async fn dropping_encoder() -> http_types::Result<()> { let sender_clone = sender.clone(); task::spawn(async move { sender_clone.send("cat", "chashu", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cat", "chashu", None);