Skip to content

Commit 53797ad

Browse files
committed
rebased PR streamnative#267
1 parent cdb1fdf commit 53797ad

File tree

3 files changed

+80
-2
lines changed

3 files changed

+80
-2
lines changed

src/client.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,25 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
506506
self
507507
}
508508

509+
/// add a certificate and private key to authenticate the client in TLS connections
510+
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
511+
pub fn with_identity(mut self, certificate: Vec<u8>, private_key: Vec<u8>) -> Self {
512+
match &mut self.tls_options {
513+
Some(tls) => {
514+
tls.certificate = Some(certificate);
515+
tls.private_key = Some(private_key);
516+
}
517+
None => {
518+
self.tls_options = Some(TlsOptions {
519+
certificate: Some(certificate),
520+
private_key: Some(private_key),
521+
..Default::default()
522+
})
523+
}
524+
}
525+
self
526+
}
527+
509528
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
510529
pub fn with_allow_insecure_connection(mut self, allow: bool) -> Self {
511530
match &mut self.tls_options {
@@ -549,6 +568,26 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
549568
Ok(self.with_certificate_chain(v))
550569
}
551570

571+
/// add a certificate and private key to authenticate the client in TLS connections
572+
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
573+
pub fn with_identity_files<P: AsRef<std::path::Path>>(
574+
self,
575+
certificate_path: P,
576+
private_key_path: P,
577+
) -> Result<Self, std::io::Error> {
578+
use std::io::Read;
579+
580+
let mut file = std::fs::File::open(certificate_path)?;
581+
let mut certificate = vec![];
582+
file.read_to_end(&mut certificate)?;
583+
584+
let mut file = std::fs::File::open(private_key_path)?;
585+
let mut private_key = vec![];
586+
file.read_to_end(&mut private_key)?;
587+
588+
Ok(self.with_identity(certificate, private_key))
589+
}
590+
552591
/// creates the Pulsar client and connects it
553592
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
554593
pub async fn build(self) -> Result<Pulsar<Exe>, Error> {

src/connection.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use futures::{
2121
Future, FutureExt, Sink, SinkExt, Stream, StreamExt,
2222
};
2323
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
24-
use native_tls::Certificate;
24+
use native_tls::{Certificate, Identity};
2525
use proto::MessageIdData;
2626
use rand::{seq::SliceRandom, thread_rng};
2727
#[cfg(all(
@@ -756,6 +756,9 @@ impl<Exe: Executor> Connection<Exe> {
756756
auth_data: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
757757
proxy_to_broker_url: Option<String>,
758758
certificate_chain: &[Certificate],
759+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] identity: &Option<
760+
Identity,
761+
>,
759762
allow_insecure_connection: bool,
760763
tls_hostname_verification_enabled: bool,
761764
connection_timeout: Duration,
@@ -814,6 +817,8 @@ impl<Exe: Executor> Connection<Exe> {
814817
auth_data.clone(),
815818
proxy_to_broker_url.clone(),
816819
certificate_chain,
820+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
821+
identity.clone(),
817822
allow_insecure_connection,
818823
tls_hostname_verification_enabled,
819824
executor.clone(),
@@ -891,6 +896,9 @@ impl<Exe: Executor> Connection<Exe> {
891896
auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
892897
proxy_to_broker_url: Option<String>,
893898
certificate_chain: &[Certificate],
899+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] identity: Option<
900+
Identity,
901+
>,
894902
allow_insecure_connection: bool,
895903
tls_hostname_verification_enabled: bool,
896904
executor: Arc<Exe>,
@@ -906,6 +914,9 @@ impl<Exe: Executor> Connection<Exe> {
906914
for certificate in certificate_chain {
907915
builder.add_root_certificate(certificate.clone());
908916
}
917+
if let Some(identity) = identity {
918+
builder.identity(identity);
919+
}
909920
builder.danger_accept_invalid_hostnames(
910921
allow_insecure_connection && !tls_hostname_verification_enabled,
911922
);
@@ -1016,6 +1027,9 @@ impl<Exe: Executor> Connection<Exe> {
10161027
for certificate in certificate_chain {
10171028
connector = connector.add_root_certificate(certificate.clone());
10181029
}
1030+
if let Some(identity) = identity {
1031+
connector = connector.identity(identity);
1032+
}
10191033
connector = connector.danger_accept_invalid_hostnames(
10201034
allow_insecure_connection && !tls_hostname_verification_enabled,
10211035
);

src/connection_manager.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use futures::{channel::oneshot, lock::Mutex};
44
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
5-
use native_tls::Certificate;
5+
use native_tls::{Certificate, Identity};
66
use rand::Rng;
77
#[cfg(all(
88
any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"),
@@ -81,6 +81,12 @@ pub struct TlsOptions {
8181
/// contains a list of PEM encoded certificates
8282
pub certificate_chain: Option<Vec<u8>>,
8383

84+
/// PEM encoded X509 certificates
85+
pub certificate: Option<Vec<u8>>,
86+
87+
/// is a PEM encoded PKCS #8 formatted private key for the leaf certificate
88+
pub private_key: Option<Vec<u8>>,
89+
8490
/// allow insecure TLS connection if set to true
8591
///
8692
/// defaults to *false*
@@ -97,6 +103,8 @@ impl Default for TlsOptions {
97103
fn default() -> Self {
98104
Self {
99105
certificate_chain: None,
106+
certificate: None,
107+
private_key: None,
100108
allow_insecure_connection: false,
101109
tls_hostname_verification_enabled: true,
102110
}
@@ -123,6 +131,8 @@ pub struct ConnectionManager<Exe: Executor> {
123131
pub(crate) operation_retry_options: OperationRetryOptions,
124132
tls_options: TlsOptions,
125133
certificate_chain: Vec<Certificate>,
134+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
135+
identity: Option<Identity>,
126136
}
127137

128138
impl<Exe: Executor> ConnectionManager<Exe> {
@@ -178,6 +188,17 @@ impl<Exe: Executor> ConnectionManager<Exe> {
178188
}
179189
};
180190

191+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
192+
let identity = match (
193+
tls_options.certificate.as_ref(),
194+
tls_options.private_key.as_ref(),
195+
) {
196+
(None, _) | (_, None) => None,
197+
(Some(certificate), Some(privatekey)) => {
198+
Some(native_tls::Identity::from_pkcs8(&certificate, &privatekey)?)
199+
}
200+
};
201+
181202
if let Some(auth) = auth.clone() {
182203
auth.lock().await.initialize().await?;
183204
}
@@ -191,6 +212,8 @@ impl<Exe: Executor> ConnectionManager<Exe> {
191212
operation_retry_options,
192213
tls_options,
193214
certificate_chain,
215+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
216+
identity,
194217
};
195218
let broker_address = BrokerAddress {
196219
url: url.clone(),
@@ -308,6 +331,8 @@ impl<Exe: Executor> ConnectionManager<Exe> {
308331
self.auth.clone(),
309332
proxy_url.clone(),
310333
&self.certificate_chain,
334+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
335+
&self.identity,
311336
self.tls_options.allow_insecure_connection,
312337
self.tls_options.tls_hostname_verification_enabled,
313338
self.connection_retry_options.connection_timeout,

0 commit comments

Comments
 (0)