Skip to content

Commit d7e99f3

Browse files
committed
Split broadcast command polling, reading logic
1 parent 799e7d6 commit d7e99f3

File tree

2 files changed

+35
-21
lines changed

2 files changed

+35
-21
lines changed

src/command/manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ pub struct BsonCommandManager<S> {
7373
}
7474

7575
impl<S> BsonCommandManager<S> {
76-
/// Create new [BsonCommandSession] from Stream
76+
/// Create new [BsonCommandManager] from Stream
7777
pub fn new(stream: S) -> Self {
7878
Self::with_capacity(stream, 2048)
7979
}
8080

81-
/// Create new [BsonCommandSession] from Stream with specific max write chunk size.
81+
/// Create new [BsonCommandManager] from Stream with specific max write chunk size.
8282
pub fn with_capacity(stream: S, max_write_chunk_size: usize) -> Self {
8383
Self {
8484
current_id: 0,

src/command/session.rs

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* Copyright (c) storycraft. Licensed under the MIT Licence.
55
*/
66

7-
use std::{collections::VecDeque, io::{self, Read, Write}, time::Duration};
7+
use std::{io::{self, Read, Write}, time::Duration, vec::Drain};
88

99
use bson::Document;
1010
use loco_protocol::command::codec::StreamError;
@@ -36,26 +36,26 @@ impl From<ReadError> for RequestError {
3636
/// Command session with command cache.
3737
/// Provide methods for requesting command response and broadcast command handling.
3838
/// Useful when creating client.
39-
/// You must use non blocking mode to prevent blocking. Also using async is recommended.
39+
/// Using non blocking mode highly recommended to prevent blocking.
4040
#[derive(Debug)]
4141
pub struct BsonCommandSession<S> {
42-
store: VecDeque<BsonCommand<Document>>,
42+
store: Vec<BsonCommand<Document>>,
4343
manager: BsonCommandManager<S>,
4444
}
4545

4646
impl<S> BsonCommandSession<S> {
4747
/// Create new [BsonCommandSession]
4848
pub fn new(stream: S) -> Self {
4949
Self {
50-
store: VecDeque::new(),
50+
store: Vec::new(),
5151
manager: BsonCommandManager::new(stream),
5252
}
5353
}
5454

5555
/// Create new [BsonCommandSession] with specific max write chunk size.
5656
pub fn with_capacity(stream: S, max_write_chunk_size: usize) -> Self {
5757
Self {
58-
store: VecDeque::new(),
58+
store: Vec::new(),
5959
manager: BsonCommandManager::with_capacity(stream, max_write_chunk_size),
6060
}
6161
}
@@ -93,14 +93,15 @@ impl<S: Read + Write> BsonCommandSession<S> {
9393
if id == request_id && read.method == command.method {
9494
return Ok(read.data);
9595
} else {
96-
self.store.push_back(read);
96+
self.store.push(read);
9797
}
9898
}
9999

100100
Err(ReadError::Codec(StreamError::Io(err)))
101-
if err.kind() == io::ErrorKind::WouldBlock => {
102-
std::thread::sleep(Duration::from_millis(1));
103-
}
101+
if err.kind() == io::ErrorKind::WouldBlock =>
102+
{
103+
std::thread::sleep(Duration::from_millis(1));
104+
}
104105

105106
Err(err) => return Err(RequestError::from(err)),
106107
}
@@ -109,22 +110,35 @@ impl<S: Read + Write> BsonCommandSession<S> {
109110
}
110111

111112
impl<S: Read> BsonCommandSession<S> {
112-
/// Poll next broadcast command.
113-
pub fn poll_broadcast(&mut self) -> Result<Option<BsonCommand<Document>>, ReadError> {
114-
match self.store.pop_front() {
115-
Some(command) => Ok(Some(command)),
116-
117-
None => match self.manager.read() {
118-
Ok((_, read)) => Ok(Some(read)),
113+
/// Poll one broadcast command.
114+
/// This method can block depending on stream.
115+
pub fn poll(&mut self) -> Result<(), ReadError> {
116+
self.poll_many(1)
117+
}
119118

119+
/// Poll broadcast commands up to max limit
120+
pub fn poll_many(&mut self, max: usize) -> Result<(), ReadError> {
121+
for _ in 0..max {
122+
match self.manager.read() {
123+
Ok((_, read)) => {
124+
self.store.push(read)
125+
},
126+
120127
Err(ReadError::Codec(StreamError::Io(err)))
121128
if err.kind() == io::ErrorKind::WouldBlock =>
122129
{
123-
Ok(None)
130+
break;
124131
}
125-
126-
Err(err) => Err(ReadError::from(err)),
132+
133+
Err(err) => return Err(ReadError::from(err)),
127134
}
128135
}
136+
137+
Ok(())
138+
}
139+
140+
/// Drain every broadcast commands stored
141+
pub fn broadcasts(&mut self) -> Drain<'_, BsonCommand<Document>> {
142+
self.store.drain(..)
129143
}
130144
}

0 commit comments

Comments
 (0)