Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ffi/rodbus-ffi/src/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<'a> RegisterValueIterator<'a> {
}

pub(crate) unsafe fn bit_value_iterator_next(
it: *mut crate::BitValueIterator,
it: *mut crate::BitValueIterator<'_>,
) -> Option<&crate::ffi::BitValue> {
match it.as_mut() {
Some(it) => match it.inner.next() {
Expand All @@ -46,7 +46,7 @@ pub(crate) unsafe fn bit_value_iterator_next(
}

pub(crate) unsafe fn register_value_iterator_next(
it: *mut crate::RegisterValueIterator,
it: *mut crate::RegisterValueIterator<'_>,
) -> Option<&crate::ffi::RegisterValue> {
match it.as_mut() {
Some(it) => match it.inner.next() {
Expand Down
43 changes: 37 additions & 6 deletions rodbus/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use crate::client::channel::*;
pub use crate::client::listener::*;
pub use crate::client::requests::write_multiple::WriteMultiple;
pub use crate::retry::*;
use crate::ClientOptions;

#[cfg(feature = "ffi")]
pub use ffi_channel::*;
Expand Down Expand Up @@ -103,15 +104,42 @@ pub fn spawn_tcp_client_task(
host: HostAddr,
max_queued_requests: usize,
retry: Box<dyn RetryStrategy>,
decode: DecodeLevel,
decode_level: DecodeLevel,
listener: Option<Box<dyn Listener<ClientState>>>,
) -> Channel {
let options = ClientOptions::default()
.decode_level(decode_level)
.max_queued_requests(max_queued_requests);
crate::tcp::client::spawn_tcp_channel(
host,
retry,
listener.unwrap_or_else(|| NullListener::create()),
options,
)
}

/// Spawns a channel task onto the runtime that maintains a TCP connection and processes
/// requests. The task completes when the returned channel handle is dropped.
///
/// The channel uses the provided [`RetryStrategy`] to pause between failed connection attempts
///
/// * `host` - Address/port of the remote server. Can be a IP address or name on which to perform DNS resolution.
/// * `retry` - A boxed trait object that controls when the connection is retried on failure
/// * `listener` - Optional callback to monitor the TCP connection state
/// * `client_options` - A builder that contains various client options.
///
/// `WARNING`: This function must be called from with the context of the Tokio runtime or it will panic.
pub fn spawn_tcp_client_task_with_options(
host: HostAddr,
retry: Box<dyn RetryStrategy>,
listener: Option<Box<dyn Listener<ClientState>>>,
client_options: ClientOptions,
) -> Channel {
crate::tcp::client::spawn_tcp_channel(
host,
max_queued_requests,
retry,
decode,
listener.unwrap_or_else(|| NullListener::create()),
client_options,
)
}

Expand Down Expand Up @@ -169,15 +197,18 @@ pub fn spawn_tls_client_task(
max_queued_requests: usize,
retry: Box<dyn RetryStrategy>,
tls_config: TlsClientConfig,
decode: DecodeLevel,
decode_level: DecodeLevel,
listener: Option<Box<dyn Listener<ClientState>>>,
) -> Channel {
let options = ClientOptions::default()
.decode_level(decode_level)
.max_queued_requests(max_queued_requests);

spawn_tls_channel(
host,
max_queued_requests,
retry,
tls_config,
decode,
options,
listener.unwrap_or_else(|| NullListener::create()),
)
}
26 changes: 15 additions & 11 deletions rodbus/src/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,41 @@ use tracing::Instrument;

use crate::client::{Channel, ClientState, HostAddr, Listener};
use crate::common::phys::PhysLayer;
use crate::decode::DecodeLevel;

use crate::client::message::Command;
use crate::client::task::{ClientLoop, SessionError, StateChange};
use crate::common::frame::{FrameWriter, FramedReader};
use crate::error::Shutdown;
use crate::retry::RetryStrategy;
use crate::{ChannelLoggingType, ClientOptions};

use tokio::net::TcpStream;

pub(crate) fn spawn_tcp_channel(
host: HostAddr,
max_queued_requests: usize,
connect_retry: Box<dyn RetryStrategy>,
decode: DecodeLevel,
listener: Box<dyn Listener<ClientState>>,
client_options: ClientOptions,
) -> Channel {
let (handle, task) =
create_tcp_channel(host, max_queued_requests, connect_retry, decode, listener);
let (handle, task) = create_tcp_channel(host, connect_retry, listener, client_options);
tokio::spawn(task);
handle
}

pub(crate) fn create_tcp_channel(
host: HostAddr,
max_queued_requests: usize,
connect_retry: Box<dyn RetryStrategy>,
decode: DecodeLevel,
listener: Box<dyn Listener<ClientState>>,
options: ClientOptions,
) -> (Channel, impl std::future::Future<Output = ()>) {
let (tx, rx) = tokio::sync::mpsc::channel(max_queued_requests);
let (tx, rx) = tokio::sync::mpsc::channel(options.max_queued_requests);
let task = async move {
TcpChannelTask::new(
host.clone(),
rx.into(),
TcpTaskConnectionHandler::Tcp,
connect_retry,
decode,
options,
listener,
)
.run()
Expand Down Expand Up @@ -75,6 +72,7 @@ pub(crate) struct TcpChannelTask {
connection_handler: TcpTaskConnectionHandler,
client_loop: ClientLoop,
listener: Box<dyn Listener<ClientState>>,
_channel_logging: ChannelLoggingType,
}

impl TcpChannelTask {
Expand All @@ -83,15 +81,21 @@ impl TcpChannelTask {
rx: crate::channel::Receiver<Command>,
connection_handler: TcpTaskConnectionHandler,
connect_retry: Box<dyn RetryStrategy>,
decode: DecodeLevel,
options: ClientOptions,
listener: Box<dyn Listener<ClientState>>,
) -> Self {
Self {
host,
connect_retry,
connection_handler,
client_loop: ClientLoop::new(rx, FrameWriter::tcp(), FramedReader::tcp(), decode),
client_loop: ClientLoop::new(
rx,
FrameWriter::tcp(),
FramedReader::tcp(),
options.decode_level,
),
listener,
_channel_logging: options.channel_logging,
}
}

Expand Down
21 changes: 6 additions & 15 deletions rodbus/src/tcp/tls/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::common::phys::PhysLayer;
use crate::tcp::client::{TcpChannelTask, TcpTaskConnectionHandler};
use crate::tcp::tls::{CertificateMode, MinTlsVersion, TlsError};

use crate::DecodeLevel;
use crate::ClientOptions;

/// TLS configuration
pub struct TlsClientConfig {
Expand All @@ -25,40 +25,31 @@ pub struct TlsClientConfig {

pub(crate) fn spawn_tls_channel(
host: HostAddr,
max_queued_requests: usize,
connect_retry: Box<dyn RetryStrategy>,
tls_config: TlsClientConfig,
decode: DecodeLevel,
options: ClientOptions,
listener: Box<dyn Listener<ClientState>>,
) -> Channel {
let (handle, task) = create_tls_channel(
host,
max_queued_requests,
connect_retry,
tls_config,
decode,
listener,
);
let (handle, task) = create_tls_channel(host, connect_retry, tls_config, options, listener);
tokio::spawn(task);
handle
}

pub(crate) fn create_tls_channel(
host: HostAddr,
max_queued_requests: usize,
connect_retry: Box<dyn RetryStrategy>,
tls_config: TlsClientConfig,
decode: DecodeLevel,
options: ClientOptions,
listener: Box<dyn Listener<ClientState>>,
) -> (Channel, impl std::future::Future<Output = ()>) {
let (tx, rx) = tokio::sync::mpsc::channel(max_queued_requests);
let (tx, rx) = tokio::sync::mpsc::channel(options.max_queued_requests);
let task = async move {
TcpChannelTask::new(
host.clone(),
rx.into(),
TcpTaskConnectionHandler::Tls(tls_config),
connect_retry,
decode,
options,
listener,
)
.run()
Expand Down
63 changes: 62 additions & 1 deletion rodbus/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::decode::AppDecodeLevel;
use crate::error::{AduParseError, InvalidRange};
use crate::DecodeLevel;

use scursor::ReadCursor;

Expand Down Expand Up @@ -375,6 +376,66 @@ impl Default for UnitId {
}
}

/// How verbose to make the logging of events of communication channel itself
#[derive(Default, Clone, Copy)]
pub enum ChannelLoggingType {
/// Log every event, e.g. even failed connections when the client is already disconnected
///
/// This is the default, but can get noisy if connection attempts are repeatedly failing
/// depending on how the backoff is configured.
#[default]
Verbose,
/// Log only state changes, e.g. transitions from "connected" to "disconnected"
///
/// This can greatly reduce verbosity of logging during disrupted communication, but comes
/// with a loss of visibility
StateChanges,
}

/// A ClientOptions builder
#[derive(Copy, Clone)]
pub struct ClientOptions {
pub(crate) channel_logging: ChannelLoggingType,
pub(crate) max_queued_requests: usize,
pub(crate) decode_level: DecodeLevel,
}

impl ClientOptions {
/// Set the channel logging type
pub fn channel_logging(self, channel_logging: ChannelLoggingType) -> Self {
Self {
channel_logging,
..self
}
}

/// Set the maximum number of queued requests
pub fn max_queued_requests(self, max_queued_requests: usize) -> Self {
Self {
max_queued_requests,
..self
}
}

/// Set the decode level
pub fn decode_level(self, decode_level: DecodeLevel) -> Self {
Self {
decode_level,
..self
}
}
}

impl Default for ClientOptions {
fn default() -> Self {
Self {
channel_logging: ChannelLoggingType::default(),
max_queued_requests: 16,
decode_level: DecodeLevel::default(),
}
}
}

#[cfg(test)]
mod tests {
use crate::error::*;
Expand All @@ -383,7 +444,7 @@ mod tests {

#[test]
fn address_start_max_count_of_one_is_allowed() {
AddressRange::try_from(std::u16::MAX, 1).unwrap();
AddressRange::try_from(u16::MAX, 1).unwrap();
}

#[test]
Expand Down