-
Notifications
You must be signed in to change notification settings - Fork 5
WIP: Add basic filter command as suggested in #834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,210 @@ | ||
| """*Dfetch* can filter files in the repo. | ||
|
|
||
| It can either accept no input to list all files. A list of files can be piped in (such as through ``find``) | ||
| or it can be used as a wrapper around a certain tool to block or allow files under control by dfetch. | ||
|
|
||
| .. scenario-include:: ../features/filter-projects.feature | ||
|
|
||
| """ | ||
|
|
||
| import argparse | ||
| import os | ||
| import sys | ||
| from enum import Enum | ||
| from pathlib import Path | ||
| from typing import Optional | ||
|
|
||
| import dfetch.commands.command | ||
| import dfetch.log | ||
| import dfetch.manifest.manifest | ||
| from dfetch.log import get_logger | ||
| from dfetch.util.cmdline import run_on_cmdline_uncaptured | ||
| from dfetch.util.util import in_directory | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
|
|
||
| class FilterType(Enum): | ||
| """Types of filtering.""" | ||
|
|
||
| BLOCK_ONLY_PATH_TRAVERSAL = 0 | ||
| BLOCK_IF_INSIDE = 1 | ||
| BLOCK_IF_OUTSIDE = 2 | ||
|
|
||
|
|
||
| class Filter(dfetch.commands.command.Command): | ||
| """Filter files based on flags and pass on any command. | ||
|
|
||
| Based on the provided arguments filter files, and call the given arguments or print them out if no command given. | ||
| """ | ||
|
|
||
| @staticmethod | ||
| def silent() -> bool: | ||
| """If the command is silent the title will not be printed when the command is run.""" | ||
| return True | ||
|
|
||
| @staticmethod | ||
| def create_menu(subparsers: dfetch.commands.command.SubparserActionType) -> None: | ||
| """Add the parser menu for this action.""" | ||
| parser = dfetch.commands.command.Command.parser(subparsers, Filter) | ||
| parser.add_argument( | ||
| "--dfetched", | ||
| "-D", | ||
| action="store_true", | ||
| default=True, | ||
| help="Keep files that came here by dfetching them.", | ||
| ) | ||
|
|
||
| parser.add_argument( | ||
| "--not-dfetched", | ||
| "-N", | ||
| action="store_true", | ||
| default=False, | ||
| help="Keep files that did not came here by dfetching them.", | ||
| ) | ||
|
|
||
| parser.add_argument( | ||
| "cmd", | ||
| metavar="<cmd>", | ||
| type=str, | ||
| nargs="?", | ||
| help="Command to call", | ||
| ) | ||
|
|
||
| parser.add_argument( | ||
| "args", | ||
| metavar="<args>", | ||
| type=str, | ||
| nargs="*", | ||
| help="Arguments to pass to the command", | ||
| ) | ||
|
|
||
| def __call__(self, args: argparse.Namespace) -> None: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mixed Responsibilities in Entry Point
Tell me moreWhat is the issue?The call method mixes configuration, business logic, and output handling in a single method. Why this mattersThis violates the Single Responsibility Principle and makes the code less maintainable and harder to test individual components. Suggested change ∙ Feature PreviewSplit the call method into separate methods for configuration, filtering, and output handling: def __call__(self, args: argparse.Namespace) -> None:
self._configure_logging(args)
filtered_args = self._process_filtering(args)
self._handle_output(args, filtered_args)Provide feedback to improve future suggestions💬 Looking for more details? Reply to this comment to chat with Korbit. |
||
| """Perform the filter.""" | ||
| if not args.verbose: | ||
| dfetch.log.set_level("ERROR") | ||
|
|
||
| argument_list = self._get_arguments(args) | ||
|
|
||
| manifest = dfetch.manifest.manifest.get_manifest() | ||
| topdir = Path(manifest.path).parent | ||
|
|
||
| resolved_args = self._resolve_args(argument_list, topdir) | ||
|
|
||
| with in_directory(topdir): | ||
| abs_project_paths = { | ||
| Path(project.destination).resolve() for project in manifest.projects | ||
| } | ||
|
|
||
| if args.dfetched and not args.not_dfetched: | ||
| block_type = FilterType.BLOCK_IF_OUTSIDE | ||
| elif args.not_dfetched: | ||
| block_type = FilterType.BLOCK_IF_INSIDE | ||
| else: | ||
| block_type = FilterType.BLOCK_ONLY_PATH_TRAVERSAL | ||
|
|
||
| filtered_args = self._filter_args( | ||
| topdir, resolved_args, abs_project_paths, block_type | ||
| ) | ||
|
|
||
| if args.cmd: | ||
| run_on_cmdline_uncaptured(logger, [args.cmd] + filtered_args) | ||
| else: | ||
| print(os.linesep.join(filtered_args)) | ||
|
|
||
| def _filter_args( | ||
| self, | ||
| topdir: Path, | ||
| resolved_args: dict[str, Optional[Path]], | ||
| abs_project_paths: set[Path], | ||
| block: FilterType, | ||
| ) -> list[str]: | ||
| blocklist = self._filter_files( | ||
| topdir, | ||
| abs_project_paths, | ||
| {path for path in resolved_args.values() if path}, | ||
| block, | ||
| ) | ||
|
|
||
| filtered_args = [ | ||
| arg for arg in resolved_args.keys() if resolved_args[arg] not in blocklist | ||
| ] | ||
|
|
||
| return filtered_args | ||
|
|
||
| def _resolve_args( | ||
| self, argument_list: list[str], topdir: Path | ||
| ) -> dict[str, Optional[Path]]: | ||
| resolved_args: dict[str, Optional[Path]] = {} | ||
| if argument_list: | ||
| for argument in argument_list: | ||
| path_obj = Path(argument.strip()) | ||
| resolved_args[argument] = ( | ||
| path_obj.resolve() if path_obj.exists() else None | ||
| ) | ||
| else: | ||
| if not argument_list: | ||
| resolved_args = { | ||
| str(file): file.resolve() | ||
| for file in topdir.rglob("*") | ||
| if ".git" not in file.parts | ||
| } | ||
|
|
||
| return resolved_args | ||
|
|
||
| def _get_arguments(self, args: argparse.Namespace) -> list[str]: | ||
| argument_list: list[str] = list(str(arg) for arg in args.args) | ||
| if not sys.stdin.isatty(): | ||
| argument_list.extend( | ||
| non_empty_line for line in sys.stdin if (non_empty_line := line.strip()) | ||
| ) | ||
|
|
||
| return argument_list | ||
|
|
||
| def _filter_files( | ||
| self, | ||
| topdir: Path, | ||
| paths: set[Path], | ||
| input_paths: set[Path], | ||
| block: FilterType = FilterType.BLOCK_IF_OUTSIDE, | ||
| ) -> list[Path]: | ||
| """Filter files in input_set in files in one of the paths or not.""" | ||
| blocklist: list[Path] = [] | ||
|
|
||
| for abs_path in input_paths: | ||
| try: | ||
| abs_path.relative_to(topdir) | ||
| except ValueError: | ||
| logger.print_info_line(str(abs_path), "outside project") | ||
| blocklist.append(abs_path) | ||
| continue | ||
|
|
||
| if block == FilterType.BLOCK_ONLY_PATH_TRAVERSAL: | ||
| continue | ||
|
|
||
| containing_dir = self._is_file_contained_in_any_path(abs_path, paths) | ||
|
|
||
| if containing_dir: | ||
| logger.print_info_line( | ||
| str(abs_path), f"inside project ({containing_dir})" | ||
| ) | ||
| if block == FilterType.BLOCK_IF_INSIDE: | ||
| blocklist.append(abs_path) | ||
| else: | ||
| logger.print_info_line(str(abs_path), "not inside any project") | ||
| if block == FilterType.BLOCK_IF_OUTSIDE: | ||
| blocklist.append(abs_path) | ||
|
|
||
| return blocklist | ||
|
|
||
| def _is_file_contained_in_any_path( | ||
| self, file: Path, paths: set[Path] | ||
| ) -> Optional[Path]: | ||
| """Check if a specific file is somewhere in one of the paths.""" | ||
| for path in paths: | ||
| try: | ||
| file.relative_to(path) | ||
| return path | ||
| except ValueError: | ||
| continue | ||
| return None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,34 @@ def run_on_cmdline( | |
| return proc | ||
|
|
||
|
|
||
| def run_on_cmdline_uncaptured( | ||
| logger: logging.Logger, cmd: Union[str, list[str]] | ||
| ) -> "subprocess.CompletedProcess[Any]": | ||
| """Run a command and log the output, and raise if something goes wrong.""" | ||
| logger.debug(f"Running {cmd}") | ||
spoorcc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if not isinstance(cmd, list): | ||
| cmd = cmd.split(" ") | ||
|
Comment on lines
+78
to
+79
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naive string splitting for command parsing
Tell me moreWhat is the issue?String splitting on single space fails for commands with multiple consecutive spaces or complex arguments. Why this mattersThis naive splitting approach will create empty strings in the command list when there are multiple spaces, potentially causing subprocess execution failures or incorrect argument parsing. Suggested change ∙ Feature PreviewUse import shlex
if not isinstance(cmd, list):
cmd = shlex.split(cmd)Provide feedback to improve future suggestions💬 Looking for more details? Reply to this comment to chat with Korbit. |
||
|
|
||
| try: | ||
| proc = subprocess.run(cmd, capture_output=False, check=True) # nosec | ||
| except subprocess.CalledProcessError as exc: | ||
| raise SubprocessCommandError( | ||
| exc.cmd, | ||
| "", | ||
| "", | ||
| exc.returncode, | ||
| ) from exc | ||
| except FileNotFoundError as exc: | ||
| cmd = cmd[0] | ||
| raise RuntimeError(f"{cmd} not available on system, please install") from exc | ||
|
|
||
| if proc.returncode: | ||
| raise SubprocessCommandError(cmd, "", "", proc.returncode) | ||
|
|
||
| return proc | ||
|
|
||
|
|
||
| def _log_output(proc: subprocess.CompletedProcess, logger: logging.Logger) -> None: # type: ignore | ||
| logger.debug(f"Return code: {proc.returncode}") | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.