Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/nodetool/nodes/nodetool/os.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import shutil
import glob
import zipfile
from datetime import datetime
import pandas as pd
from pydantic import Field
Expand Down Expand Up @@ -863,6 +864,105 @@ async def process(self, context: ProcessingContext) -> VideoRef:
return VideoRef(uri="file://" + expanded_path, data=video_data)


class ZipFiles(BaseNode):
"""
Create a zip archive from a list of files.
files, zip, archive, compress

Use cases:
- Archive workflow outputs
- Bundle files for download
- Create backups
"""

files: list[FilePath] = Field(default=[], description="Files to include in the zip")
zip_path: FilePath = Field(
default=FilePath(), description="Path to the output zip file"
)

async def process(self, context: ProcessingContext) -> FilePath:
if Environment.is_production():
raise ValueError("This node is not available in production")
if not self.files:
raise ValueError("files cannot be empty")
if not self.zip_path.path:
raise ValueError("zip_path cannot be empty")

expanded_zip = os.path.expanduser(self.zip_path.path)
os.makedirs(os.path.dirname(expanded_zip), exist_ok=True)
with zipfile.ZipFile(expanded_zip, "w") as zipf:
for file in self.files:
if not file.path:
continue
expanded_file = os.path.expanduser(file.path)
zipf.write(expanded_file, arcname=os.path.basename(expanded_file))
return FilePath(path=expanded_zip)


class UnzipFile(BaseNode):
"""
Extract a zip archive into a folder.
files, zip, archive, extract

Use cases:
- Unpack downloaded archives
- Prepare datasets from zipped files
- Access individual files from an archive
"""

zip_path: FilePath = Field(default=FilePath(), description="Path to the zip file")
output_folder: FolderPath = Field(
default=FolderPath(), description="Destination folder"
)

async def process(self, context: ProcessingContext) -> list[FilePath]:
if Environment.is_production():
raise ValueError("This node is not available in production")
if not self.zip_path.path:
raise ValueError("zip_path cannot be empty")
if not self.output_folder.path:
raise ValueError("output_folder cannot be empty")

expanded_zip = os.path.expanduser(self.zip_path.path)
expanded_folder = os.path.expanduser(self.output_folder.path)
if not os.path.exists(expanded_zip):
raise ValueError(f"Zip file not found: {expanded_zip}")
os.makedirs(expanded_folder, exist_ok=True)
with zipfile.ZipFile(expanded_zip, "r") as zipf:
zipf.extractall(path=expanded_folder)
file_paths = [
FilePath(path=os.path.join(expanded_folder, name))
for name in zipf.namelist()
]
return file_paths


class ListZipContents(BaseNode):
"""
List the files inside a zip archive.
files, zip, archive, list

Use cases:
- Inspect archive contents
- Validate zipped datasets
"""

zip_path: FilePath = Field(default=FilePath(), description="Path to the zip file")

async def process(self, context: ProcessingContext) -> list[str]:
if Environment.is_production():
raise ValueError("This node is not available in production")
if not self.zip_path.path:
raise ValueError("zip_path cannot be empty")

expanded_zip = os.path.expanduser(self.zip_path.path)
if not os.path.exists(expanded_zip):
raise ValueError(f"Zip file not found: {expanded_zip}")

with zipfile.ZipFile(expanded_zip, "r") as zipf:
return zipf.namelist()


class FileNameMatch(BaseNode):
"""
Match a filename against a pattern using Unix shell-style wildcards.
Expand Down
36 changes: 35 additions & 1 deletion tests/nodetool/test_os_nodes.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import os
import pytest
from nodetool.workflows.processing_context import ProcessingContext
from nodetool.metadata.types import FilePath
from nodetool.metadata.types import FilePath, FolderPath
from nodetool.nodes.nodetool.os import (
SetEnvironmentVariable,
GetEnvironmentVariable,
FileExists,
ListFiles,
CreateDirectory,
ZipFiles,
UnzipFile,
ListZipContents,
)


Expand Down Expand Up @@ -39,3 +43,33 @@ async def test_file_operations(context: ProcessingContext, tmp_path):
files = await list_node.process(context)
assert len(files) == 1
assert files[0].path == str(file_path)


@pytest.mark.asyncio
async def test_zipfile_nodes(context: ProcessingContext, tmp_path):
file1 = tmp_path / "a.txt"
file1.write_text("one")
file2 = tmp_path / "b.txt"
file2.write_text("two")

zip_path = tmp_path / "archive.zip"
zip_node = ZipFiles(
files=[FilePath(path=str(file1)), FilePath(path=str(file2))],
zip_path=FilePath(path=str(zip_path)),
)
result_path = await zip_node.process(context)
assert result_path.path == str(zip_path)
assert zip_path.exists()

list_node = ListZipContents(zip_path=FilePath(path=str(zip_path)))
contents = await list_node.process(context)
assert sorted(contents) == ["a.txt", "b.txt"]

out_dir = tmp_path / "out"
unzip_node = UnzipFile(
zip_path=FilePath(path=str(zip_path)),
output_folder=FolderPath(path=str(out_dir)),
)
extracted = await unzip_node.process(context)
extracted_names = {os.path.basename(fp.path) for fp in extracted}
assert extracted_names == {"a.txt", "b.txt"}
Loading