Skip to content

Commit 3e765e1

Browse files
committed
refactor: improve error callback handling and disconnect listener robustness
- Use try_lock to avoid panics and recover from poisoned locks when invoking error callbacks - Skip error callback if lock is busy - Enhance disconnect listener thread to report creation errors and ensure proper synchronization - Update stream lock error message for clarity
1 parent b6507ac commit 3e765e1

File tree

2 files changed

+86
-29
lines changed

2 files changed

+86
-29
lines changed

src/host/coreaudio/macos/device.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,17 @@ impl Device {
686686

687687
let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) {
688688
Err(err) => {
689-
(error_callback.lock().unwrap())(err.into());
689+
// Try to invoke error callback, recovering from poison if needed
690+
match error_callback.try_lock() {
691+
Ok(mut cb) => cb(err.into()),
692+
Err(std::sync::TryLockError::Poisoned(guard)) => {
693+
// Recover from poisoned lock to still report this error
694+
guard.into_inner()(err.into());
695+
}
696+
Err(std::sync::TryLockError::WouldBlock) => {
697+
// Skip if callback is busy
698+
}
699+
}
690700
return Err(());
691701
}
692702
Ok(cb) => cb,
@@ -715,8 +725,15 @@ impl Device {
715725
} else {
716726
let error_callback_clone = error_callback_disconnect.clone();
717727
Box::new(move |err: StreamError| {
718-
if let Ok(mut cb) = error_callback_clone.lock() {
719-
cb(err);
728+
match error_callback_clone.try_lock() {
729+
Ok(mut cb) => cb(err),
730+
Err(std::sync::TryLockError::Poisoned(guard)) => {
731+
// Recover from poisoned lock to still report this error
732+
guard.into_inner()(err);
733+
}
734+
Err(std::sync::TryLockError::WouldBlock) => {
735+
// Skip if callback is busy
736+
}
720737
}
721738
})
722739
};
@@ -736,7 +753,7 @@ impl Device {
736753
.lock()
737754
.map_err(|_| BuildStreamError::BackendSpecific {
738755
err: BackendSpecificError {
739-
description: "Failed to acquire stream lock".to_string(),
756+
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
740757
},
741758
})?
742759
.audio_unit
@@ -791,7 +808,17 @@ impl Device {
791808

792809
let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) {
793810
Err(err) => {
794-
(error_callback.lock().unwrap())(err.into());
811+
// Try to invoke error callback, recovering from poison if needed
812+
match error_callback.try_lock() {
813+
Ok(mut cb) => cb(err.into()),
814+
Err(std::sync::TryLockError::Poisoned(guard)) => {
815+
// Recover from poisoned lock to still report this error
816+
guard.into_inner()(err.into());
817+
}
818+
Err(std::sync::TryLockError::WouldBlock) => {
819+
// Skip if callback is busy
820+
}
821+
}
795822
return Err(());
796823
}
797824
Ok(cb) => cb,
@@ -820,8 +847,15 @@ impl Device {
820847
} else {
821848
let error_callback_clone = error_callback_disconnect.clone();
822849
Box::new(move |err: StreamError| {
823-
if let Ok(mut cb) = error_callback_clone.lock() {
824-
cb(err);
850+
match error_callback_clone.try_lock() {
851+
Ok(mut cb) => cb(err),
852+
Err(std::sync::TryLockError::Poisoned(guard)) => {
853+
// Recover from poisoned lock to still report this error
854+
guard.into_inner()(err);
855+
}
856+
Err(std::sync::TryLockError::WouldBlock) => {
857+
// Skip if callback is busy
858+
}
825859
}
826860
})
827861
};
@@ -841,7 +875,7 @@ impl Device {
841875
.lock()
842876
.map_err(|_| BuildStreamError::BackendSpecific {
843877
err: BackendSpecificError {
844-
description: "Failed to acquire stream lock".to_string(),
878+
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
845879
},
846880
})?
847881
.audio_unit

src/host/coreaudio/macos/mod.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ type ErrorCallback = Box<dyn FnMut(crate::StreamError) + Send + 'static>;
6161
/// Manages device disconnection listener on a dedicated thread to ensure the
6262
/// AudioObjectPropertyListener is always created and dropped on the same thread.
6363
/// This avoids potential threading issues with CoreAudio APIs.
64+
///
65+
/// When a device disconnects, this manager:
66+
/// 1. Attempts to pause the stream to stop audio I/O
67+
/// 2. Calls the error callback with `StreamError::DeviceNotAvailable`
68+
///
69+
/// The dedicated thread architecture ensures `Stream` can implement `Send`.
6470
struct DisconnectManager {
6571
_shutdown_tx: mpsc::Sender<()>,
6672
}
@@ -74,6 +80,7 @@ impl DisconnectManager {
7480
) -> Result<Self, crate::BuildStreamError> {
7581
let (shutdown_tx, shutdown_rx) = mpsc::channel();
7682
let (disconnect_tx, disconnect_rx) = mpsc::channel();
83+
let (ready_tx, ready_rx) = mpsc::channel();
7784

7885
// Spawn dedicated thread to own the AudioObjectPropertyListener
7986
let disconnect_tx_clone = disconnect_tx.clone();
@@ -85,16 +92,29 @@ impl DisconnectManager {
8592
};
8693

8794
// Create the listener on this dedicated thread
88-
let _listener =
89-
AudioObjectPropertyListener::new(device_id, property_address, move || {
90-
let _ = disconnect_tx_clone.send(());
91-
})
92-
.unwrap();
93-
94-
// Drop the listener on this thread after receiving a shutdown signal
95-
let _ = shutdown_rx.recv();
95+
match AudioObjectPropertyListener::new(device_id, property_address, move || {
96+
let _ = disconnect_tx_clone.send(());
97+
}) {
98+
Ok(_listener) => {
99+
let _ = ready_tx.send(Ok(()));
100+
// Drop the listener on this thread after receiving a shutdown signal
101+
let _ = shutdown_rx.recv();
102+
}
103+
Err(e) => {
104+
let _ = ready_tx.send(Err(e));
105+
}
106+
}
96107
});
97108

109+
// Wait for listener creation to complete or fail
110+
ready_rx
111+
.recv()
112+
.map_err(|_| crate::BuildStreamError::BackendSpecific {
113+
err: BackendSpecificError {
114+
description: "Disconnect listener thread terminated unexpectedly".to_string(),
115+
},
116+
})??;
117+
98118
// Handle disconnect events on the main thread pool
99119
let stream_weak_clone = stream_weak.clone();
100120
let error_callback_clone = error_callback.clone();
@@ -103,21 +123,24 @@ impl DisconnectManager {
103123
// Check if stream still exists
104124
if let Some(stream_arc) = stream_weak_clone.upgrade() {
105125
// First, try to pause the stream to stop playback
106-
match stream_arc.lock() {
107-
Ok(mut stream_inner) => {
108-
let _ = stream_inner.pause();
109-
}
110-
Err(_) => {
111-
// Could not acquire lock. This can occur if there are
112-
// overlapping locks, if the stream is already in use, or if a panic
113-
// occurred during a previous lock. Still notify about device
114-
// disconnection even if we can't pause.
115-
}
126+
if let Ok(mut stream_inner) = stream_arc.try_lock() {
127+
let _ = stream_inner.pause();
116128
}
117129

118-
// Call the error callback to notify about device disconnection
119-
if let Ok(mut cb) = error_callback_clone.lock() {
120-
cb(crate::StreamError::DeviceNotAvailable);
130+
// Always try to notify about device disconnection
131+
match error_callback_clone.try_lock() {
132+
Ok(mut cb) => {
133+
cb(crate::StreamError::DeviceNotAvailable);
134+
}
135+
Err(std::sync::TryLockError::WouldBlock) => {
136+
// Error callback is being invoked - skip this notification
137+
}
138+
Err(std::sync::TryLockError::Poisoned(guard)) => {
139+
// Error callback panicked - try to recover and still notify
140+
// This is critical: device disconnected AND callback is broken
141+
let mut cb = guard.into_inner();
142+
cb(crate::StreamError::DeviceNotAvailable);
143+
}
121144
}
122145
} else {
123146
// Stream is gone, exit the handler thread

0 commit comments

Comments
 (0)