Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ version = "0.1.0"
edition = "2021"

[workspace.dependencies]
async-lock = "3"
futures = "0.3"
rand = "0.8"
bytes = "1"
Expand All @@ -43,6 +42,9 @@ clap = { version = "4", features = ["derive"] }
dashmap = "6"
derive_builder = "0.20"
env_logger = "0.11"
parking_lot = "0.11"


[workspace.dependencies.qbase]
path = "./qbase"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions qbase/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ deref-derive = { workspace = true }
rustls = { workspace = true }
log = { workspace = true }
derive_builder = { workspace = true }
parking_lot = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }
12 changes: 6 additions & 6 deletions qbase/src/cid/local_cid.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use parking_lot::Mutex;

use super::{ConnectionId, UniqueCid};
use crate::{
Expand All @@ -8,7 +10,6 @@ use crate::{
util::IndexDeque,
varint::{VarInt, VARINT_MAX},
};

/// 我方负责发放足够的cid,poll_issue_cid,将当前有效的cid注册到连接id路由。
/// 当cid不足时,就发放新的连接id,包括增大active_cid_limit,以及对方淘汰旧的cid。
#[derive(Debug)]
Expand Down Expand Up @@ -140,15 +141,14 @@ where
pub fn active_cids(&self) -> Vec<ConnectionId> {
self.0
.lock()
.unwrap()
.cid_deque
.iter()
.filter_map(|v| v.map(|(cid, _)| cid))
.collect()
}

pub fn set_limit(&self, active_cid_limit: u64) -> Result<(), Error> {
self.0.lock().unwrap().set_limit(active_cid_limit)
self.0.lock().set_limit(active_cid_limit)
}
}

Expand All @@ -163,7 +163,7 @@ where
&mut self,
frame: &RetireConnectionIdFrame,
) -> Result<Self::Output, crate::error::Error> {
self.0.lock().unwrap().recv_retire_cid_frame(frame)
self.0.lock().recv_retire_cid_frame(frame)
}
}

Expand Down Expand Up @@ -196,7 +196,7 @@ mod tests {
fn test_issue_cid() {
let initial_scid = ConnectionId::random_gen(8);
let local_cids = ArcLocalCids::new(generator, initial_scid, IssuedCids::default());
let mut guard = local_cids.0.lock().unwrap();
let mut guard = local_cids.0.lock();

assert_eq!(guard.cid_deque.len(), 2);

Expand Down
40 changes: 19 additions & 21 deletions qbase/src/cid/remote_cid.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
sync::Arc,
task::{Context, Poll, Waker},
};

use deref_derive::{Deref, DerefMut};
use parking_lot::Mutex;

use super::ConnectionId;
use crate::{
Expand Down Expand Up @@ -145,7 +146,7 @@ where
// retire the cids before seq, including the applied and unapplied
for seq in self.cid_cells.offset()..max_retired {
let (_, cell) = self.cid_cells.pop_front().unwrap();
let mut guard = cell.0.lock().unwrap();
let mut guard = cell.0.lock();
if guard.is_retired() {
continue;
} else {
Expand Down Expand Up @@ -226,7 +227,7 @@ where
/// - have been allocated again after retirement of last cid
/// - have been retired
pub fn apply_cid(&self) -> ArcCidCell<RETIRED> {
self.0.lock().unwrap().apply_cid()
self.0.lock().apply_cid()
}
}

Expand All @@ -237,7 +238,7 @@ where
type Output = Option<ResetToken>;

fn recv_frame(&mut self, frame: &NewConnectionIdFrame) -> Result<Self::Output, Error> {
self.0.lock().unwrap().recv_new_cid_frame(frame)
self.0.lock().recv_new_cid_frame(frame)
}
}

Expand Down Expand Up @@ -332,15 +333,15 @@ where
}

fn assign(&self, cid: ConnectionId) {
self.0.lock().unwrap().assign(cid);
self.0.lock().assign(cid);
}

pub fn set_cid(&self, cid: ConnectionId) {
self.0.lock().unwrap().set_cid(cid);
self.0.lock().set_cid(cid);
}

pub fn poll_get_cid(&self, cx: &mut Context<'_>) -> Poll<Option<ConnectionId>> {
self.0.lock().unwrap().poll_get_cid(cx)
self.0.lock().poll_get_cid(cx)
}

/// Getting the connection ID, if it is not ready, return a future
Expand All @@ -353,7 +354,7 @@ where
/// is marked as no longer in use, with a RetireConnectionIdFrame being sent to peer.
#[inline]
pub fn retire(&self) {
let mut guard = self.0.lock().unwrap();
let mut guard = self.0.lock();
if !guard.is_retired() {
guard.state.retire();
let seq = guard.seq;
Expand All @@ -371,7 +372,7 @@ where
type Output = Option<ConnectionId>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.lock().unwrap().poll_get_cid(cx)
self.0.lock().poll_get_cid(cx)
}
}

Expand All @@ -392,10 +393,7 @@ mod tests {
// Will return Pending, because the peer hasn't issue any connection id
let cid_apply = remote_cids.apply_cid();
assert_eq!(cid_apply.get_cid().poll_unpin(&mut cx), Poll::Pending);
assert!(matches!(
cid_apply.0.lock().unwrap().state,
CidState::Demand(_)
));
assert!(matches!(cid_apply.0.lock().state, CidState::Demand(_)));

let cid = ConnectionId::random_gen(8);
let frame = NewConnectionIdFrame {
Expand Down Expand Up @@ -424,7 +422,7 @@ mod tests {
let mut cx = std::task::Context::from_waker(&waker);
let retired_cids = ArcAsyncDeque::<RetireConnectionIdFrame>::new();
let remote_cids = ArcRemoteCids::with_limit(8, retired_cids, None);
let mut guard = remote_cids.0.lock().unwrap();
let mut guard = remote_cids.0.lock();

let mut cids = vec![];
for seq in 0..8 {
Expand All @@ -441,8 +439,8 @@ mod tests {

let cid_apply1 = guard.apply_cid();
let cid_apply2 = guard.apply_cid();
assert_eq!(cid_apply1.0.lock().unwrap().seq, 0);
assert_eq!(cid_apply2.0.lock().unwrap().seq, 1);
assert_eq!(cid_apply1.0.lock().seq, 0);
assert_eq!(cid_apply2.0.lock().seq, 1);
assert_eq!(
cid_apply1.get_cid().poll_unpin(&mut cx),
Poll::Ready(Some(cids[0]))
Expand All @@ -457,8 +455,8 @@ mod tests {
assert_eq!(guard.cid_cells.offset(), 4);
assert_eq!(guard.retired_cids.len(), 4);

assert_eq!(cid_apply1.0.lock().unwrap().seq, 4);
assert_eq!(cid_apply2.0.lock().unwrap().seq, 5);
assert_eq!(cid_apply1.0.lock().seq, 4);
assert_eq!(cid_apply2.0.lock().seq, 5);

for i in 0..4 {
assert_eq!(
Expand Down Expand Up @@ -498,7 +496,7 @@ mod tests {
let mut cx = std::task::Context::from_waker(&waker);
let retired_cids = ArcAsyncDeque::<RetireConnectionIdFrame>::new();
let remote_cids = ArcRemoteCids::with_limit(8, retired_cids, None);
let mut guard = remote_cids.0.lock().unwrap();
let mut guard = remote_cids.0.lock();

let mut cids = vec![];
for seq in 0..8 {
Expand All @@ -520,8 +518,8 @@ mod tests {

let cid_apply1 = guard.apply_cid();
let cid_apply2 = guard.apply_cid();
assert_eq!(cid_apply1.0.lock().unwrap().seq, 4);
assert_eq!(cid_apply2.0.lock().unwrap().seq, 5);
assert_eq!(cid_apply1.0.lock().seq, 4);
assert_eq!(cid_apply2.0.lock().seq, 5);
assert_eq!(
cid_apply1.get_cid().poll_unpin(&mut cx),
Poll::Ready(Some(cids[4]))
Expand Down
13 changes: 7 additions & 6 deletions qbase/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, MutexGuard,
Arc,
},
task::{Context, Poll, Waker},
};

use futures::{task::AtomicWaker, Future};
use parking_lot::{Mutex, MutexGuard};
use thiserror::Error;

use crate::{
Expand Down Expand Up @@ -95,7 +96,7 @@ impl ArcSendControler {
}

fn increase_limit(&self, max_data: u64) {
let mut guard = self.0.lock().unwrap();
let mut guard = self.0.lock();
if let Ok(inner) = guard.deref_mut() {
inner.increase_limit(max_data);
}
Expand All @@ -109,7 +110,7 @@ impl ArcSendControler {

/// Apply for sending data. If it has meet error, it will return Err directly.
pub fn credit(&self) -> Result<Credit<'_>, QuicError> {
let guard = self.0.lock().unwrap();
let guard = self.0.lock();
if let Err(e) = guard.deref() {
return Err(e.clone());
}
Expand All @@ -121,15 +122,15 @@ impl ArcSendControler {
/// not require to register the send task on the flow control, as there may still be
/// retransmission data that can be sent.
pub fn register_waker(&self, waker: Waker) {
let mut guard = self.0.lock().unwrap();
let mut guard = self.0.lock();
if let Ok(inner) = guard.deref_mut() {
inner.register_waker(waker);
}
}

/// Flow control can only be terminated if the connection encounters an error
pub fn on_error(&self, error: &QuicError) {
let mut guard = self.0.lock().unwrap();
let mut guard = self.0.lock();
if guard.deref().is_err() {
return;
}
Expand All @@ -156,7 +157,7 @@ impl Future for WouldBlock {
type Output = Result<DataBlockedFrame, QuicError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut guard = self.0 .0.lock().unwrap();
let mut guard = self.0 .0.lock();
match guard.deref_mut() {
Ok(inner) => inner.poll_would_block(cx),
Err(e) => Poll::Ready(Err(e.clone())),
Expand Down
9 changes: 5 additions & 4 deletions qbase/src/packet/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::{
future::Future,
ops::DerefMut,
pin::Pin,
sync::{Arc, Mutex, MutexGuard},
sync::Arc,
task::{Context, Poll, Waker},
};

use parking_lot::{Mutex, MutexGuard};
use rustls::quic::{HeaderProtectionKey, Keys, PacketKey, Secrets};

use super::KeyPhaseBit;
Expand All @@ -22,7 +23,7 @@ pub struct ArcKeys(Arc<Mutex<KeysState>>);

impl ArcKeys {
fn lock_guard(&self) -> MutexGuard<KeysState> {
self.0.lock().unwrap()
self.0.lock()
}

pub fn new_pending() -> Self {
Expand Down Expand Up @@ -156,7 +157,7 @@ pub struct ArcOneRttPacketKeys(Arc<(Mutex<OneRttPacketKeys>, usize)>);

impl ArcOneRttPacketKeys {
pub fn lock_guard(&self) -> MutexGuard<OneRttPacketKeys> {
self.0 .0.lock().unwrap()
self.0 .0.lock()
}

pub fn tag_len(&self) -> usize {
Expand Down Expand Up @@ -184,7 +185,7 @@ pub struct ArcOneRttKeys(Arc<Mutex<OneRttKeysState>>);

impl ArcOneRttKeys {
fn lock_guard(&self) -> MutexGuard<OneRttKeysState> {
self.0.lock().unwrap()
self.0.lock()
}

pub fn new_pending() -> Self {
Expand Down
Loading