Skip to content

MINOR: Cleanups in Tools Module (3/n) #20332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 79 additions & 89 deletions tools/src/main/java/org/apache/kafka/tools/AclCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,13 @@ public class AclCommand {

public static void main(String[] args) {
AclCommandOptions opts = new AclCommandOptions(args);
AdminClientService aclCommandService = new AdminClientService(opts);
try (Admin admin = Admin.create(adminConfigs(opts))) {
if (opts.options.has(opts.addOpt)) {
aclCommandService.addAcls(admin);
addAcls(admin, opts);
} else if (opts.options.has(opts.removeOpt)) {
aclCommandService.removeAcls(admin);
removeAcls(admin, opts);
} else if (opts.options.has(opts.listOpt)) {
aclCommandService.listAcls(admin);
listAcls(admin, opts);
}
} catch (Throwable e) {
System.out.println("Error while executing ACL command: " + e.getMessage());
Expand All @@ -102,106 +101,97 @@ private static Properties adminConfigs(AclCommandOptions opts) throws IOExceptio
return props;
}

private static class AdminClientService {

private final AclCommandOptions opts;

AdminClientService(AclCommandOptions opts) {
this.opts = opts;
}

void addAcls(Admin admin) throws ExecutionException, InterruptedException {
Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = getResourceToAcls(opts);
for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : resourceToAcl.entrySet()) {
ResourcePattern resource = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue();
System.out.println("Adding ACLs for resource `" + resource + "`: " + NL + " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL);
Collection<AclBinding> aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).collect(Collectors.toList());
admin.createAcls(aclBindings).all().get();
}
private static void addAcls(Admin admin, AclCommandOptions opts) throws ExecutionException, InterruptedException {
Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = getResourceToAcls(opts);
for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : resourceToAcl.entrySet()) {
ResourcePattern resource = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue();
System.out.println("Adding ACLs for resource `" + resource + "`: " + NL + " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL);
Collection<AclBinding> aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).collect(Collectors.toList());
admin.createAcls(aclBindings).all().get();
}
}

void removeAcls(Admin admin) throws ExecutionException, InterruptedException {
Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl = getResourceFilterToAcls(opts);
for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry : filterToAcl.entrySet()) {
ResourcePatternFilter filter = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue();
if (acls.isEmpty()) {
if (confirmAction(opts, "Are you sure you want to delete all ACLs for resource filter `" + filter + "`? (y/n)")) {
removeAcls(admin, acls, filter);
}
} else {
String msg = "Are you sure you want to remove ACLs: " + NL +
" " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL +
" from resource filter `" + filter + "`? (y/n)";
if (confirmAction(opts, msg)) {
removeAcls(admin, acls, filter);
}
private static void removeAcls(Admin admin, AclCommandOptions opts) throws ExecutionException, InterruptedException {
Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl = getResourceFilterToAcls(opts);
for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry : filterToAcl.entrySet()) {
ResourcePatternFilter filter = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue();
if (acls.isEmpty()) {
if (confirmAction(opts, "Are you sure you want to delete all ACLs for resource filter `" + filter + "`? (y/n)")) {
removeAcls(admin, acls, filter);
}
} else {
String msg = "Are you sure you want to remove ACLs: " + NL +
" " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL +
" from resource filter `" + filter + "`? (y/n)";
if (confirmAction(opts, msg)) {
removeAcls(admin, acls, filter);
}
}
}
}

private void listAcls(Admin admin) throws ExecutionException, InterruptedException {
Set<ResourcePatternFilter> filters = getResourceFilter(opts, false);
Set<KafkaPrincipal> listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt);
Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = getAcls(admin, filters);
private static void listAcls(Admin admin, AclCommandOptions opts) throws ExecutionException, InterruptedException {
Set<ResourcePatternFilter> filters = getResourceFilter(opts, false);
Set<KafkaPrincipal> listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt);
Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = getAcls(admin, filters);

if (listPrincipals.isEmpty()) {
printResourceAcls(resourceToAcls);
} else {
listPrincipals.forEach(principal -> {
System.out.println("ACLs for principal `" + principal + "`");
Map<ResourcePattern, Set<AccessControlEntry>> filteredResourceToAcls = resourceToAcls.entrySet().stream()
.map(entry -> {
ResourcePattern resource = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue().stream()
.filter(acl -> principal.toString().equals(acl.principal()))
.collect(Collectors.toSet());
return new AbstractMap.SimpleEntry<>(resource, acls);
})
.filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
printResourceAcls(filteredResourceToAcls);
});
}
if (listPrincipals.isEmpty()) {
printResourceAcls(resourceToAcls);
} else {
listPrincipals.forEach(principal -> {
System.out.println("ACLs for principal `" + principal + "`");
Map<ResourcePattern, Set<AccessControlEntry>> filteredResourceToAcls = resourceToAcls.entrySet().stream()
.map(entry -> {
ResourcePattern resource = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue().stream()
.filter(acl -> principal.toString().equals(acl.principal()))
.collect(Collectors.toSet());
return new AbstractMap.SimpleEntry<>(resource, acls);
})
.filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
printResourceAcls(filteredResourceToAcls);
});
}
}

private static void printResourceAcls(Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls) {
resourceToAcls.forEach((resource, acls) ->
System.out.println("Current ACLs for resource `" + resource + "`:" + NL +
acls.stream().map(acl -> "\t" + acl).collect(Collectors.joining(NL)) + NL)
);
}
private static void printResourceAcls(Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls) {
resourceToAcls.forEach((resource, acls) ->
System.out.println("Current ACLs for resource `" + resource + "`:" + NL +
acls.stream().map(acl -> "\t" + acl).collect(Collectors.joining(NL)) + NL)
);
}

private static void removeAcls(Admin adminClient, Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws ExecutionException, InterruptedException {
if (acls.isEmpty()) {
adminClient.deleteAcls(List.of(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
} else {
List<AclBindingFilter> aclBindingFilters = acls.stream().map(acl -> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList());
adminClient.deleteAcls(aclBindingFilters).all().get();
}
private static void removeAcls(Admin adminClient, Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws ExecutionException, InterruptedException {
if (acls.isEmpty()) {
adminClient.deleteAcls(List.of(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
} else {
List<AclBindingFilter> aclBindingFilters = acls.stream().map(acl -> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList());
adminClient.deleteAcls(aclBindingFilters).all().get();
}
}

private Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException, InterruptedException {
Collection<AclBinding> aclBindings;
if (filters.isEmpty()) {
aclBindings = adminClient.describeAcls(AclBindingFilter.ANY).values().get();
} else {
aclBindings = new ArrayList<>();
for (ResourcePatternFilter filter : filters) {
aclBindings.addAll(adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
}
private static Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException, InterruptedException {
Collection<AclBinding> aclBindings;
if (filters.isEmpty()) {
aclBindings = adminClient.describeAcls(AclBindingFilter.ANY).values().get();
} else {
aclBindings = new ArrayList<>();
for (ResourcePatternFilter filter : filters) {
aclBindings.addAll(adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
}
}

Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new HashMap<>();
for (AclBinding aclBinding : aclBindings) {
ResourcePattern resource = aclBinding.pattern();
Set<AccessControlEntry> acls = resourceToAcls.getOrDefault(resource, new HashSet<>());
acls.add(aclBinding.entry());
resourceToAcls.put(resource, acls);
}
return resourceToAcls;
Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new HashMap<>();
for (AclBinding aclBinding : aclBindings) {
ResourcePattern resource = aclBinding.pattern();
Set<AccessControlEntry> acls = resourceToAcls.getOrDefault(resource, new HashSet<>());
acls.add(aclBinding.entry());
resourceToAcls.put(resource, acls);
}
return resourceToAcls;
}

private static Map<ResourcePattern, Set<AccessControlEntry>> getResourceToAcls(AclCommandOptions opts) {
Expand Down
35 changes: 5 additions & 30 deletions tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,22 +196,8 @@ enum Command {
LIST, SYNC_MANIFESTS
}

private static class Config {
private final Command command;
private final Set<Path> locations;
private final boolean dryRun;
private final boolean keepNotFound;
private final PrintStream out;
private final PrintStream err;

private Config(Command command, Set<Path> locations, boolean dryRun, boolean keepNotFound, PrintStream out, PrintStream err) {
this.command = command;
this.locations = locations;
this.dryRun = dryRun;
this.keepNotFound = keepNotFound;
this.out = out;
this.err = err;
}
private record Config(Command command, Set<Path> locations, boolean dryRun, boolean keepNotFound, PrintStream out,
PrintStream err) {

@Override
public String toString() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about trusting the generated toString?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The auto generated one will also have the out and err params which the record takes in. Hence I prefered keeping this custom one.

Expand Down Expand Up @@ -262,16 +248,9 @@ public static void runCommand(Config config) throws TerseException {
* <p>This is unique to the (source, class, type) tuple, and contains additional pre-computed information
* that pertains to this specific plugin.
*/
private static class Row {
private final ManifestWorkspace.SourceWorkspace<?> workspace;
private final String className;
private final PluginType type;
private final String version;
private final List<String> aliases;
private final boolean loadable;
private final boolean hasManifest;

public Row(ManifestWorkspace.SourceWorkspace<?> workspace, String className, PluginType type, String version, List<String> aliases, boolean loadable, boolean hasManifest) {
private record Row(ManifestWorkspace.SourceWorkspace<?> workspace, String className, PluginType type,
String version, List<String> aliases, boolean loadable, boolean hasManifest) {
private Row(ManifestWorkspace.SourceWorkspace<?> workspace, String className, PluginType type, String version, List<String> aliases, boolean loadable, boolean hasManifest) {
this.workspace = Objects.requireNonNull(workspace, "workspace must be non-null");
this.className = Objects.requireNonNull(className, "className must be non-null");
this.version = Objects.requireNonNull(version, "version must be non-null");
Expand All @@ -281,10 +260,6 @@ public Row(ManifestWorkspace.SourceWorkspace<?> workspace, String className, Plu
this.hasManifest = hasManifest;
}

private boolean loadable() {
return loadable;
}

private boolean compatible() {
return loadable && hasManifest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@

import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.util.Arrays.asList;

/**
* A tool for describing quorum status
Expand Down Expand Up @@ -206,7 +205,7 @@ private static void handleDescribeReplication(Admin admin, boolean humanReadable
rows.addAll(quorumInfoToRows(leader, quorumInfo.observers().stream(), "Observer", humanReadable));

ToolsUtils.prettyPrintTable(
asList("NodeId", "DirectoryId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
List.of("NodeId", "DirectoryId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
rows,
System.out
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,7 @@ private Argument addArgument(String option, String help, Class<?> clazz) {

}

private static class ConfigHandler {

private final Namespace namespace;


private ConfigHandler(Namespace namespace) {
this.namespace = namespace;
}
private record ConfigHandler(Namespace namespace) {

private Map<String, ?> getConfigs() {
Map<String, Object> m = new HashMap<>();
Expand Down
11 changes: 1 addition & 10 deletions tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,16 +500,7 @@ private static void printError(String msg, Optional<Throwable> e) {

public interface LogOffsetResult { }

public static class LogOffset implements LogOffsetResult {
final long value;

public LogOffset(long value) {
this.value = value;
}

public long value() {
return value;
}
public record LogOffset(long value) implements LogOffsetResult {
}

public static class Unknown implements LogOffsetResult { }
Expand Down
Loading
Loading