Skip to content

Commit 7c8d3c9

Browse files
authored
[improve][broker] PIP-402: Optionally prevent role/originalPrincipal logging (#23386)
1 parent 7504538 commit 7c8d3c9

File tree

10 files changed

+220
-13
lines changed

10 files changed

+220
-13
lines changed

conf/broker.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,9 @@ metadataStoreBatchingMaxSizeKb=128
909909
# Enable authentication
910910
authenticationEnabled=false
911911

912+
# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, hash:MD5, default is NONE
913+
authenticationRoleLoggingAnonymizer=NONE
914+
912915
# Authentication provider name list, which is comma separated list of class names
913916
authenticationProviders=
914917

conf/proxy.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ forwardAuthorizationCredentials=false
184184
# Whether authentication is enabled for the Pulsar proxy
185185
authenticationEnabled=false
186186

187+
# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, hash:MD5, default is NONE
188+
authenticationRoleLoggingAnonymizer=NONE
189+
187190
# Authentication provider name list (a comma-separated list of class names)
188191
authenticationProviders=
189192

conf/standalone.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,9 @@ authenticateOriginalAuthData=false
572572
# Enable authentication
573573
authenticationEnabled=false
574574

575+
# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, hash:MD5, default is NONE
576+
authenticationRoleLoggingAnonymizer=NONE
577+
575578
# Authentication provider name list, which is comma separated list of class names
576579
authenticationProviders=
577580

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
445445
)
446446
private String clusterName;
447447

448+
@FieldContext(
449+
category = CATEGORY_SERVER,
450+
doc = "Defines how the broker will anonymize the role and originalAuthRole before logging. "
451+
+ "Possible values are: NONE (no anonymization), REDACTED (replaces with '[REDACTED]'), "
452+
+ "hash:SHA256 (hashes using SHA-256), and hash:MD5 (hashes using MD5). Default is NONE."
453+
)
454+
private String authenticationRoleLoggingAnonymizer = "NONE";
455+
448456
@FieldContext(
449457
category = CATEGORY_SERVER,
450458
dynamic = true,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.configuration.anonymizer;
20+
21+
import static org.apache.pulsar.common.configuration.anonymizer.DefaultRoleAnonymizerType.NONE;
22+
23+
/**
24+
* This class provides a utility to anonymize authentication roles before logging,
25+
* based on a configured anonymization strategy. The anonymization strategy is
26+
* determined by the provided value of the {@link DefaultRoleAnonymizerType} enum.
27+
*
28+
* The primary purpose of this class is to enable flexible anonymization of sensitive
29+
* information (such as user roles) in logs, ensuring compliance with security and
30+
* privacy requirements while allowing customization of the anonymization behavior.
31+
*
32+
* Usage:
33+
* - The class constructor accepts a string that represents the desired anonymization
34+
* strategy (e.g., "NONE", "REDACTED", "SHA256", "MD5"), and it initializes the
35+
* anonymizer type accordingly.
36+
* - The {@code anonymize} method applies the selected anonymization strategy to
37+
* the provided role and returns the anonymized value.
38+
*
39+
* Example:
40+
* <pre>
41+
* DefaultAuthenticationRoleLoggingAnonymizer roleAnonymizer =
42+
* new DefaultAuthenticationRoleLoggingAnonymizer("SHA256");
43+
* String anonymizedRole = roleAnonymizer.anonymize("admin");
44+
* </pre>
45+
*
46+
* Anonymization strategies:
47+
* - NONE: No anonymization (returns the role as-is).
48+
* - REDACTED: Replaces the role with "[REDACTED]".
49+
* - hash:SHA256: Hashes the role using the SHA-256 algorithm and prefixes it with "SHA-256:".
50+
* - hash:MD5: Hashes the role using the MD5 algorithm and prefixes it with "MD5:"
51+
*/
52+
public final class DefaultAuthenticationRoleLoggingAnonymizer {
53+
private static DefaultRoleAnonymizerType anonymizerType = NONE;
54+
55+
public DefaultAuthenticationRoleLoggingAnonymizer(String authenticationRoleLoggingAnonymizer) {
56+
if (authenticationRoleLoggingAnonymizer.startsWith("hash:")) {
57+
anonymizerType = DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer
58+
.substring("hash:".length()).toUpperCase());
59+
} else {
60+
anonymizerType = DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer);
61+
}
62+
}
63+
64+
public String anonymize(String role) {
65+
return anonymizerType.anonymize(role);
66+
}
67+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.configuration.anonymizer;
20+
21+
import java.security.MessageDigest;
22+
import java.security.NoSuchAlgorithmException;
23+
import java.util.Base64;
24+
25+
public enum DefaultRoleAnonymizerType {
26+
NONE {
27+
@Override
28+
public String anonymize(String role) {
29+
return role;
30+
}
31+
},
32+
REDACTED {
33+
@Override
34+
public String anonymize(String role) {
35+
return REDACTED_VALUE;
36+
}
37+
},
38+
SHA256 {
39+
private static final String PREFIX = "SHA-256:";
40+
private final MessageDigest digest;
41+
42+
{
43+
// Initializing the MessageDigest once for SHA-256
44+
try {
45+
digest = MessageDigest.getInstance("SHA-256");
46+
} catch (NoSuchAlgorithmException e) {
47+
throw new RuntimeException("SHA-256 algorithm not found", e);
48+
}
49+
}
50+
51+
@Override
52+
public String anonymize(String role) {
53+
byte[] hash = digest.digest(role.getBytes());
54+
return PREFIX + Base64.getEncoder().encodeToString(hash);
55+
}
56+
},
57+
MD5 {
58+
private static final String PREFIX = "MD5:";
59+
private final MessageDigest digest;
60+
61+
{
62+
// Initializing the MessageDigest once for MD5
63+
try {
64+
// codeql[java/weak-cryptographic-algorithm] - md5 is sufficient for this use case&
65+
digest = MessageDigest.getInstance("MD5");
66+
} catch (NoSuchAlgorithmException e) {
67+
throw new RuntimeException("MD5 algorithm not found", e);
68+
}
69+
}
70+
71+
@Override
72+
public String anonymize(String role) {
73+
byte[] hash = digest.digest(role.getBytes());
74+
return PREFIX + Base64.getEncoder().encodeToString(hash);
75+
}
76+
};
77+
78+
private static final String REDACTED_VALUE = "[REDACTED]";
79+
public abstract String anonymize(String role);
80+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
/**
20+
* Pulsar Client API.
21+
*/
22+
package org.apache.pulsar.common.configuration.anonymizer;
23+

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@
148148
import org.apache.pulsar.common.api.proto.Schema;
149149
import org.apache.pulsar.common.api.proto.ServerError;
150150
import org.apache.pulsar.common.api.proto.TxnAction;
151+
import org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer;
151152
import org.apache.pulsar.common.intercept.InterceptException;
152153
import org.apache.pulsar.common.lookup.data.LookupData;
153154
import org.apache.pulsar.common.naming.Metadata;
@@ -215,6 +216,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
215216
private AuthData originalAuthDataCopy;
216217
private boolean pendingAuthChallengeResponse = false;
217218
private ScheduledFuture<?> authRefreshTask;
219+
private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer;
218220

219221
// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
220222
// control done by a single producer might not be enough to prevent write spikes on the broker.
@@ -352,6 +354,8 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
352354
this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
353355
this.throttleTracker = new ServerCnxThrottleTracker(this);
354356
topicsPatternImplementation = conf.getTopicsPatternRegexImplementation();
357+
this.authenticationRoleLoggingAnonymizer = new DefaultAuthenticationRoleLoggingAnonymizer(
358+
conf.getAuthenticationRoleLoggingAnonymizer());
355359
}
356360

357361
@Override
@@ -821,12 +825,14 @@ private void completeConnect(int clientProtoVersion, String clientVersion) {
821825
clientVersion, clientProtoVersion, proxyVersion);
822826
} else if (originalPrincipal != null) {
823827
log.info("[{}] connected role={} and originalAuthRole={} using authMethod={}, clientVersion={}, "
824-
+ "clientProtocolVersion={}, proxyVersion={}", remoteAddress, authRole, originalPrincipal,
825-
authMethod, clientVersion, clientProtoVersion, proxyVersion);
828+
+ "clientProtocolVersion={}, proxyVersion={}", remoteAddress,
829+
authenticationRoleLoggingAnonymizer.anonymize(authRole),
830+
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), authMethod, clientVersion,
831+
clientProtoVersion, proxyVersion);
826832
} else {
827833
log.info("[{}] connected with role={} using authMethod={}, clientVersion={}, clientProtocolVersion={}, "
828-
+ "proxyVersion={}", remoteAddress, authRole, authMethod, clientVersion, clientProtoVersion,
829-
proxyVersion);
834+
+ "proxyVersion={}", remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(authRole),
835+
authMethod, clientVersion, clientProtoVersion, proxyVersion);
830836
}
831837
if (brokerInterceptor != null) {
832838
brokerInterceptor.onConnectionCreated(this);
@@ -1214,7 +1220,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
12141220

12151221
if (log.isDebugEnabled()) {
12161222
log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}",
1217-
remoteAddress, authRole, originalPrincipal);
1223+
remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(authRole),
1224+
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal));
12181225
}
12191226

12201227
final String subscriptionName = subscribe.getSubscription();
@@ -2433,11 +2440,11 @@ private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName nam
24332440
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
24342441
if (!isProxyAuthorized) {
24352442
log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}",
2436-
originalPrincipal, operation, namespaceName);
2443+
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), operation, namespaceName);
24372444
}
24382445
if (!isAuthorized) {
24392446
log.warn("Role {} is not authorized to perform operation {} on namespace {}",
2440-
authRole, operation, namespaceName);
2447+
authenticationRoleLoggingAnonymizer.anonymize(authRole), operation, namespaceName);
24412448
}
24422449
return isProxyAuthorized && isAuthorized;
24432450
});

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,13 @@ public class ProxyConfiguration implements PulsarConfiguration {
295295
+ "is enabled.")
296296
private Boolean webServiceLogDetailedAddresses;
297297

298+
@FieldContext(category = CATEGORY_SERVER, doc =
299+
"Defines how the broker will anonymize the role and originalAuthRole before logging. "
300+
+ "Possible values are: NONE (no anonymization), REDACTED (replaces with '[REDACTED]'), "
301+
+ "hash:SHA256 (hashes using SHA-256), and hash:MD5 (hashes using MD5). Default is NONE."
302+
)
303+
private String authenticationRoleLoggingAnonymizer = "NONE";
304+
298305
@FieldContext(category = CATEGORY_SERVER,
299306
doc = "Enables zero-copy transport of data across network interfaces using the spice. "
300307
+ "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.")

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.pulsar.common.api.proto.FeatureFlags;
7373
import org.apache.pulsar.common.api.proto.ProtocolVersion;
7474
import org.apache.pulsar.common.api.proto.ServerError;
75+
import org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer;
7576
import org.apache.pulsar.common.protocol.Commands;
7677
import org.apache.pulsar.common.protocol.PulsarHandler;
7778
import org.apache.pulsar.common.util.Runnables;
@@ -120,6 +121,7 @@ public class ProxyConnection extends PulsarHandler {
120121
private int protocolVersionToAdvertise;
121122
private String proxyToBrokerUrl;
122123
private HAProxyMessage haProxyMessage;
124+
private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer;
123125

124126
protected static final Integer SPLICE_BYTES = 1024 * 1024 * 1024;
125127
private static final byte[] EMPTY_CREDENTIALS = new byte[0];
@@ -161,6 +163,8 @@ public ProxyConnection(ProxyService proxyService, DnsAddressResolverGroup dnsAdd
161163
this.state = State.Init;
162164
this.brokerProxyValidator = service.getBrokerProxyValidator();
163165
this.connectionController = proxyService.getConnectionController();
166+
this.authenticationRoleLoggingAnonymizer = new DefaultAuthenticationRoleLoggingAnonymizer(
167+
proxyService.getConfiguration().getAuthenticationRoleLoggingAnonymizer());
164168
}
165169

166170
@Override
@@ -343,16 +347,17 @@ protected static boolean isTlsChannel(Channel channel) {
343347

344348
private synchronized void completeConnect() throws PulsarClientException {
345349
checkArgument(state == State.Connecting);
350+
String maybeAnonymizedClientAuthRole = authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole);
346351
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
347-
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
352+
remoteAddress, authMethod, maybeAnonymizedClientAuthRole, hasProxyToBrokerUrl);
348353
if (hasProxyToBrokerUrl) {
349354
// Optimize proxy connection to fail-fast if the target broker isn't active
350355
// Pulsar client will retry connecting after a back off timeout
351356
if (service.getConfiguration().isCheckActiveBrokers()
352357
&& !isBrokerActive(proxyToBrokerUrl)) {
353358
state = State.Closing;
354359
LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.",
355-
remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole);
360+
remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole);
356361
final ByteBuf msg = Commands.newError(-1,
357362
ServerError.ServiceNotReady, "Target broker isn't available.");
358363
writeAndFlushAndClose(msg);
@@ -371,10 +376,11 @@ private synchronized void completeConnect() throws PulsarClientException {
371376

372377
LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.",
373378
remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(),
374-
authMethod, clientAuthRole);
379+
authMethod, maybeAnonymizedClientAuthRole);
375380
} else {
376381
LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.",
377-
remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole, throwable);
382+
remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole,
383+
throwable);
378384
}
379385
final ByteBuf msg = Commands.newError(-1, ServerError.ServiceNotReady,
380386
"Target broker cannot be validated.");
@@ -401,7 +407,7 @@ private synchronized void completeConnect() throws PulsarClientException {
401407
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())), null);
402408
} else {
403409
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
404-
remoteAddress, state, clientAuthRole);
410+
remoteAddress, state, maybeAnonymizedClientAuthRole);
405411
}
406412

407413
state = State.ProxyLookupRequests;
@@ -488,7 +494,7 @@ protected void authChallengeSuccessCallback(AuthData authChallenge) {
488494
clientAuthRole = authState.getAuthRole();
489495
if (LOG.isDebugEnabled()) {
490496
LOG.debug("[{}] Client successfully authenticated with {} role {}",
491-
remoteAddress, authMethod, clientAuthRole);
497+
remoteAddress, authMethod, authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole));
492498
}
493499

494500
// First connection

0 commit comments

Comments
 (0)