Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/handlers.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
| [`NETGEAR TRX V1`](#netgear-trx-v1) | ARCHIVE | :octicons-check-16: |
| [`NETGEAR TRX V2`](#netgear-trx-v2) | ARCHIVE | :octicons-check-16: |
| [`NTFS`](#ntfs) | FILESYSTEM | :octicons-check-16: |
| [`PAR2 (MULTI-VOLUME)`](#par2-multi-volume) | ARCHIVE | :octicons-check-16: |
| [`PARTCLONE`](#partclone) | ARCHIVE | :octicons-check-16: |
| [`QNAP NAS`](#qnap-nas) | ARCHIVE | :octicons-check-16: |
| [`RAR`](#rar) | ARCHIVE | :octicons-alert-fill-12: |
Expand Down Expand Up @@ -793,6 +794,22 @@
=== "References"

- [NTFS Wikipedia](https://en.wikipedia.org/wiki/NTFS){ target="_blank" }
## PAR2 (multi-volume)

!!! success "Fully supported"

=== "Description"

Parchive or PAR2, is a format for creating redundant data that helps detect and repair corrupted files. These archives typically accompany split-file sets (like multi-volume RAR or ZIP archives). Each PAR2 file is composed of multiple 'packets'.

---

- **Handler type:** Archive


=== "References"

- [Parchive Documentation](https://parchive.github.io/){ target="_blank" }
## Partclone

!!! success "Fully supported"
Expand Down
2 changes: 2 additions & 0 deletions python/unblob/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
cab,
cpio,
dmg,
par2,
partclone,
rar,
sevenzip,
Expand Down Expand Up @@ -126,4 +127,5 @@
BUILTIN_DIR_HANDLERS: DirectoryHandlers = (
sevenzip.MultiVolumeSevenZipHandler,
gzip.MultiVolumeGzipHandler,
par2.MultiVolumePAR2Handler,
)
80 changes: 80 additions & 0 deletions python/unblob/handlers/archive/par2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import hashlib
import io
from pathlib import Path
from typing import Optional

from unblob.file_utils import Endian, StructParser
from unblob.models import (
DirectoryHandler,
Glob,
HandlerDoc,
HandlerType,
MultiFile,
Reference,
)

C_DEFINITIONS = r"""
typedef struct par2_header{
char magic[8];
uint64 packet_length;
char md5_hash[16];
char recovery_set_id[16];
char type[16];
} par2_header_t;
"""

PAR2_MAGIC = b"PAR2\x00PKT"
HEADER_STRUCT = "par2_header_t"
HEADER_PARSER = StructParser(C_DEFINITIONS)


class MultiVolumePAR2Handler(DirectoryHandler):
NAME = "multi-par2"
PATTERN = Glob("*.par2")
EXTRACTOR = None

DOC = HandlerDoc(
name="PAR2 (multi-volume)",
description="Parchive or PAR2, is a format for creating redundant data that helps detect and repair corrupted files. These archives typically accompany split-file sets (like multi-volume RAR or ZIP archives). Each PAR2 file is composed of multiple 'packets'.",
handler_type=HandlerType.ARCHIVE,
vendor=None,
references=[
Reference(
title="Parchive Documentation",
url="https://parchive.github.io/",
),
],
limitations=[],
)

def is_valid_header(self, file_paths: list) -> bool:
for path in file_paths:
with path.open("rb") as f:
header = HEADER_PARSER.parse(HEADER_STRUCT, f, Endian.LITTLE)
if header.magic != PAR2_MAGIC:
return False

offset_to_recovery_id = 32
# seek to beginning of recovery set ID
f.seek(offset_to_recovery_id, io.SEEK_SET)
packet_content = f.read(
header.packet_length - len(header) + offset_to_recovery_id
)
packet_checksum = hashlib.md5(packet_content).digest() # noqa: S324

if packet_checksum != header.md5_hash:
return False
return True

def calculate_multifile(self, file: Path) -> Optional[MultiFile]:
paths = sorted(
[p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()]
)

if len(paths) <= 1 or not self.is_valid_header(paths):
return None

return MultiFile(
name=file.stem,
paths=paths,
)
9 changes: 6 additions & 3 deletions python/unblob/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,15 @@ def get_files(self, directory: Path) -> Iterable[Path]:
return [path] if path.exists() else []


class DirectoryHandler(abc.ABC):
DExtractor = TypeVar("DExtractor", bound=Union[None, DirectoryExtractor])


class DirectoryHandler(abc.ABC, Generic[DExtractor]):
"""A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory."""

NAME: str

EXTRACTOR: DirectoryExtractor
EXTRACTOR: DExtractor

PATTERN: DirectoryPattern

Expand All @@ -436,7 +439,7 @@ class DirectoryHandler(abc.ABC):
@classmethod
def get_dependencies(cls):
"""Return external command dependencies needed for this handler to work."""
if cls.EXTRACTOR:
if cls.EXTRACTOR is not None:
return cls.EXTRACTOR.get_dependencies()
return []

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/archive/par2/__input__/foo.erofs.img.par2
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Empty file.
Loading