Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 157 additions & 7 deletions fz/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,157 @@ def _parse_argument(arg, alias_type=None):
return arg


def _resolve_calculators_arg(calculators):
def _find_all_calculators(model_name=None):
"""
Find all calculator JSON files in .fz/calculators/ directories.

Searches in:
1. Current working directory: ./.fz/calculators/
2. Home directory: ~/.fz/calculators/

Args:
model_name: Optional model name to filter calculators (only include calculators that support this model)

Returns:
List of calculator specifications (URIs or dicts)
"""
search_dirs = [Path.cwd() / ".fz" / "calculators", Path.home() / ".fz" / "calculators"]
calculators = []

for calc_dir in search_dirs:
if not calc_dir.exists() or not calc_dir.is_dir():
continue

# Find all .json files in the calculator directory
for calc_file in calc_dir.glob("*.json"):
try:
with open(calc_file, 'r') as f:
calc_data = json.load(f)

# Check if calculator supports the model (if model_name is provided)
if model_name and not _calculator_supports_model(calc_data, model_name):
log_debug(f"Skipping calculator {calc_file.name}: does not support model '{model_name}'")
continue

# Extract calculator specification
if "uri" in calc_data:
# Calculator with URI specification
uri = calc_data["uri"]

# If models dict exists and model_name is provided, get model-specific command
if "models" in calc_data and isinstance(calc_data["models"], dict) and model_name:
if model_name in calc_data["models"]:
# Use model-specific command from models dict
model_command = calc_data["models"][model_name]
# Append command to URI if it doesn't already contain it
if not uri.endswith(model_command):
uri = f"{uri.rstrip('/')}/{model_command}" if '://' in uri else model_command
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The URI construction has a bug. When uri = "sh://" and we call uri.rstrip('/'), it removes BOTH trailing slashes, resulting in "sh:". Then appending "/{model_command}" produces "sh:/bash model1.sh" (single slash after colon), which is an invalid URI format.

The correct format should be "sh://bash model1.sh" (double slash). Consider using a different approach, such as:

if uri.endswith("://"):
    uri = f"{uri}{model_command}"
else:
    uri = f"{uri}/{model_command}"
Suggested change
uri = f"{uri.rstrip('/')}/{model_command}" if '://' in uri else model_command
if uri.endswith("://"):
uri = f"{uri}{model_command}"
else:
uri = f"{uri}/{model_command}"

Copilot uses AI. Check for mistakes.

calculators.append(uri)
log_debug(f"Found calculator: {calc_file.name} -> {uri}")
elif "command" in calc_data:
# Simple calculator with command
calculators.append(calc_data["command"])
log_debug(f"Found calculator: {calc_file.name} -> {calc_data['command']}")
else:
# Just add the whole dict as a calculator spec
calculators.append(calc_data)
log_debug(f"Found calculator: {calc_file.name} (dict spec)")

except (json.JSONDecodeError, IOError, KeyError) as e:
log_warning(f"Could not load calculator file {calc_file}: {e}")
continue

return calculators
Comment on lines +279 to +326
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential for duplicate calculators: the function searches both ./.fz/calculators/ and ~/.fz/calculators/ without deduplication. If the same calculator file exists in both locations (e.g., calc1.json in both directories), it will be added to the list twice, potentially leading to duplicate calculator executions.

Consider adding deduplication logic, either by:

  1. Tracking seen calculator names and skipping duplicates
  2. Using a set/dict to store unique calculators
  3. Documenting that local calculators override home directory calculators

Example fix:

seen_names = set()
for calc_dir in search_dirs:
    if not calc_dir.exists() or not calc_dir.is_dir():
        continue
    for calc_file in calc_dir.glob("*.json"):
        if calc_file.stem in seen_names:
            continue  # Skip duplicate
        seen_names.add(calc_file.stem)
        # ... rest of processing

Copilot uses AI. Check for mistakes.


def _calculator_supports_model(calc_data, model_name):
"""
Check if calculator supports a given model.

A calculator supports a model if:
1. It has no "models" field (supports all models)
2. It has a "models" dict that includes the model_name as a key
3. It has a "models" list that includes the model_name

Args:
calc_data: Calculator data dict from JSON
model_name: Model name to check

Returns:
True if calculator supports the model, False otherwise
"""
if "models" not in calc_data:
# No models field - supports all models
return True

models = calc_data["models"]

if isinstance(models, dict):
# Models is a dict - check if model_name is a key
return model_name in models
elif isinstance(models, list):
# Models is a list - check if model_name is in list
return model_name in models
else:
# Unknown format - assume it supports the model
return True


def _filter_calculators_by_model(calculators, model_name):
"""
Filter a list of calculator specifications by model support.

This is used when calculators are explicitly provided (not wildcarded)
to filter out calculators that don't support the specified model.

Args:
calculators: List of calculator specifications
model_name: Model name to filter by

Returns:
Filtered list of calculator specifications
"""
filtered = []

for calc in calculators:
if isinstance(calc, dict):
if _calculator_supports_model(calc, model_name):
filtered.append(calc)
else:
log_debug(f"Filtered out calculator (dict): does not support model '{model_name}'")
else:
# String URIs - include them (we don't have metadata to filter)
filtered.append(calc)

return filtered


def _resolve_calculators_arg(calculators, model_name=None):
"""
Parse and resolve calculator argument.

Handles:
- None (defaults to ["sh://"])
- None (defaults to "*" - find all calculators in .fz/calculators/)
- "*" (wildcard - find all calculators in .fz/calculators/)
- JSON string, JSON file, or alias string
- Single calculator dict (wraps in list)
- List of calculator specs

Args:
calculators: Calculator specification (None, "*", string, dict, or list)
model_name: Optional model name to filter calculators (only include calculators that support this model)

Returns:
List of calculator specifications
"""
if calculators is None:
return ["sh://"]
if calculators is None or calculators == "*":
# Find all calculator files in .fz/calculators/
calc_specs = _find_all_calculators(model_name)
if not calc_specs:
# Fallback to default sh:// if no calculators found
return ["sh://"]
return calc_specs

# Parse the argument (could be JSON string, file, or alias)
calculators = _parse_argument(calculators, alias_type='calculators')
Expand All @@ -282,6 +421,14 @@ def _resolve_calculators_arg(calculators):
if isinstance(calculators, dict):
calculators = [calculators]

# Wrap string in list if it's a single calculator specification
if isinstance(calculators, str):
calculators = [calculators]

# Filter calculators by model if model_name is provided
if model_name and isinstance(calculators, list):
calculators = _filter_calculators_by_model(calculators, model_name)

return calculators


Expand Down Expand Up @@ -1012,11 +1159,14 @@ def fzr(
from .config import get_interpreter
interpreter = get_interpreter()

# Get model ID for calculator filtering
model_id = model.get("id") if isinstance(model, dict) else None

# Parse calculator argument (handles JSON string, file, or alias)
calculators = _resolve_calculators_arg(calculators)
# Pass model_id to filter calculators by model support
calculators = _resolve_calculators_arg(calculators, model_name=model_id)

# Get model ID for calculator resolution
model_id = model.get("id") if isinstance(model, dict) else None
# Resolve calculators (convert aliases to URIs)
calculators = resolve_calculators(calculators, model_id)

# Convert to absolute paths immediately while we're in the correct working directory
Expand Down
Loading