diff --git a/src/client.rs b/src/client.rs index 3a115815..a8afeab8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -506,6 +506,25 @@ impl PulsarBuilder { self } + /// add a certificate and private key to authenticate the client in TLS connections + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + pub fn with_identity(mut self, certificate: Vec, private_key: Vec) -> Self { + match &mut self.tls_options { + Some(tls) => { + tls.certificate = Some(certificate); + tls.private_key = Some(private_key); + } + None => { + self.tls_options = Some(TlsOptions { + certificate: Some(certificate), + private_key: Some(private_key), + ..Default::default() + }) + } + } + self + } + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub fn with_allow_insecure_connection(mut self, allow: bool) -> Self { match &mut self.tls_options { @@ -549,6 +568,26 @@ impl PulsarBuilder { Ok(self.with_certificate_chain(v)) } + /// add a certificate and private key to authenticate the client in TLS connections + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + pub fn with_identity_files>( + self, + certificate_path: P, + private_key_path: P, + ) -> Result { + use std::io::Read; + + let mut file = std::fs::File::open(certificate_path)?; + let mut certificate = vec![]; + file.read_to_end(&mut certificate)?; + + let mut file = std::fs::File::open(private_key_path)?; + let mut private_key = vec![]; + file.read_to_end(&mut private_key)?; + + Ok(self.with_identity(certificate, private_key)) + } + /// creates the Pulsar client and connects it #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub async fn build(self) -> Result, Error> { diff --git a/src/connection.rs b/src/connection.rs index 0e538415..0f962424 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -20,7 +20,7 @@ use futures::{ task::{Context, Poll}, Future, FutureExt, Sink, SinkExt, Stream, StreamExt, }; -use native_tls::Certificate; +use native_tls::{Certificate, Identity}; use proto::MessageIdData; use rand::{seq::SliceRandom, thread_rng}; use url::Url; @@ -721,6 +721,7 @@ impl Connection { auth_data: Option>>>, proxy_to_broker_url: Option, certificate_chain: &[Certificate], + identity: &Option, allow_insecure_connection: bool, tls_hostname_verification_enabled: bool, connection_timeout: Duration, @@ -779,6 +780,7 @@ impl Connection { auth_data.clone(), proxy_to_broker_url.clone(), certificate_chain, + identity.clone(), allow_insecure_connection, tls_hostname_verification_enabled, executor.clone(), @@ -854,6 +856,7 @@ impl Connection { auth: Option>>>, proxy_to_broker_url: Option, certificate_chain: &[Certificate], + identity: Option, allow_insecure_connection: bool, tls_hostname_verification_enabled: bool, executor: Arc, @@ -869,6 +872,9 @@ impl Connection { for certificate in certificate_chain { builder.add_root_certificate(certificate.clone()); } + if let Some(identity) = identity { + builder.identity(identity); + } builder.danger_accept_invalid_hostnames( allow_insecure_connection && !tls_hostname_verification_enabled, ); @@ -917,6 +923,9 @@ impl Connection { for certificate in certificate_chain { connector = connector.add_root_certificate(certificate.clone()); } + if let Some(identity) = identity { + connector = connector.identity(identity); + } connector = connector.danger_accept_invalid_hostnames( allow_insecure_connection && !tls_hostname_verification_enabled, ); diff --git a/src/connection_manager.rs b/src/connection_manager.rs index b686d889..7cd03f54 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{channel::oneshot, lock::Mutex}; -use native_tls::Certificate; +use native_tls::{Certificate, Identity}; use rand::Rng; use url::Url; @@ -75,6 +75,12 @@ pub struct TlsOptions { /// contains a list of PEM encoded certificates pub certificate_chain: Option>, + /// PEM encoded X509 certificates + pub certificate: Option>, + + /// is a PEM encoded PKCS #8 formatted private key for the leaf certificate + pub private_key: Option>, + /// allow insecure TLS connection if set to true /// /// defaults to *false* @@ -91,6 +97,8 @@ impl Default for TlsOptions { fn default() -> Self { Self { certificate_chain: None, + certificate: None, + private_key: None, allow_insecure_connection: false, tls_hostname_verification_enabled: true, } @@ -117,6 +125,7 @@ pub struct ConnectionManager { pub(crate) operation_retry_options: OperationRetryOptions, tls_options: TlsOptions, certificate_chain: Vec, + identity: Option, } impl ConnectionManager { @@ -162,6 +171,13 @@ impl ConnectionManager { } }; + let identity = match (tls_options.certificate.as_ref(), tls_options.private_key.as_ref()) { + (None, _) | (_, None) => None, + (Some(certificate), Some(privatekey)) => { + Some(native_tls::Identity::from_pkcs8(&certificate, &privatekey)?) + } + }; + if let Some(auth) = auth.clone() { auth.lock().await.initialize().await?; } @@ -175,6 +191,7 @@ impl ConnectionManager { operation_retry_options, tls_options, certificate_chain, + identity, }; let broker_address = BrokerAddress { url: url.clone(), @@ -292,6 +309,7 @@ impl ConnectionManager { self.auth.clone(), proxy_url.clone(), &self.certificate_chain, + &self.identity, self.tls_options.allow_insecure_connection, self.tls_options.tls_hostname_verification_enabled, self.connection_retry_options.connection_timeout,