|
1 |
| -from typing import Optional |
| 1 | +from typing import List, Optional, Tuple |
2 | 2 |
|
3 | 3 | from dedoc.data_structures.unstructured_document import UnstructuredDocument
|
4 | 4 | from dedoc.readers.base_reader import BaseReader
|
| 5 | +from dedoc.readers.pdf_reader.pdf_auto_reader.txtlayer_result import TxtLayerResult |
5 | 6 |
|
6 | 7 |
|
7 | 8 | class PdfAutoReader(BaseReader):
|
@@ -54,100 +55,91 @@ def read(self, file_path: str, parameters: Optional[dict] = None) -> Unstructure
|
54 | 55 | """
|
55 | 56 | parameters = {} if parameters is None else parameters
|
56 | 57 | warnings = []
|
57 |
| - txtlayer_parameters = self.txtlayer_detector.detect_txtlayer(path=file_path, parameters=parameters) |
| 58 | + txtlayer_result = self.txtlayer_detector.detect_txtlayer(path=file_path, parameters=parameters) |
58 | 59 |
|
59 |
| - if txtlayer_parameters.is_correct_text_layer: |
60 |
| - result = self.__handle_correct_text_layer(is_first_page_correct=txtlayer_parameters.is_first_page_correct, |
61 |
| - parameters=parameters, |
62 |
| - path=file_path, |
63 |
| - warnings=warnings) |
64 |
| - else: |
65 |
| - result = self.__handle_incorrect_text_layer(parameters, file_path, warnings) |
| 60 | + documents = [] |
| 61 | + for txtlayer_result_chunk in txtlayer_result: |
| 62 | + document = self.__parse_document(txtlayer_result=txtlayer_result_chunk, parameters=parameters, path=file_path, warnings=warnings) |
| 63 | + documents.append(document) |
66 | 64 |
|
67 |
| - result.warnings.extend(warnings) |
68 |
| - return result |
| 65 | + result_document = self.__merge_documents(documents) |
| 66 | + result_document.warnings.extend(warnings) |
| 67 | + return result_document |
69 | 68 |
|
70 |
| - def __handle_incorrect_text_layer(self, parameters_copy: dict, path: str, warnings: list) -> UnstructuredDocument: |
| 69 | + def __parse_document(self, txtlayer_result: TxtLayerResult, parameters: dict, path: str, warnings: list) -> UnstructuredDocument: |
71 | 70 | import os
|
72 | 71 |
|
73 |
| - self.logger.info(f"Assume document {os.path.basename(path)} has incorrect textual layer") |
74 |
| - warnings.append("Assume document has incorrect textual layer") |
75 |
| - result = self.pdf_image_reader.read(file_path=path, parameters=parameters_copy) |
76 |
| - return result |
| 72 | + end = "" if txtlayer_result.end is None else txtlayer_result.end |
| 73 | + correct_text = "correct" if txtlayer_result.correct else "incorrect" |
| 74 | + log_text = f"Assume document {os.path.basename(path)} has {correct_text} textual layer on pages [{txtlayer_result.start}:{end}]" |
| 75 | + self.logger.info(log_text) |
| 76 | + warnings.append(log_text) |
| 77 | + if txtlayer_result.document: |
| 78 | + return txtlayer_result.document |
77 | 79 |
|
78 |
| - def __handle_correct_text_layer(self, is_first_page_correct: bool, parameters: dict, path: str, warnings: list) -> UnstructuredDocument: |
79 |
| - import os |
| 80 | + import copy |
80 | 81 | from dedoc.utils.parameter_utils import get_param_pdf_with_txt_layer
|
81 | 82 |
|
82 |
| - self.logger.info(f"Assume document {os.path.basename(path)} has a correct textual layer") |
83 |
| - warnings.append("Assume document has a correct textual layer") |
84 |
| - recognized_first_page = None |
85 |
| - |
86 |
| - if not is_first_page_correct: |
87 |
| - message = "Assume the first page hasn't a textual layer" |
88 |
| - warnings.append(message) |
89 |
| - self.logger.info(message) |
90 |
| - |
91 |
| - # GET THE FIRST PAGE: recognize the first page like a scanned page |
92 |
| - scan_parameters = self.__preparing_first_page_parameters(parameters) |
93 |
| - recognized_first_page = self.pdf_image_reader.read(file_path=path, parameters=scan_parameters) |
94 |
| - |
95 |
| - # PREPARE PARAMETERS: from the second page we recognize the content like PDF with a textual layer |
96 |
| - parameters = self.__preparing_other_pages_parameters(parameters) |
| 83 | + if txtlayer_result.correct: |
| 84 | + pdf_with_txt_layer = get_param_pdf_with_txt_layer(parameters) |
| 85 | + reader = self.pdf_txtlayer_reader if pdf_with_txt_layer == "auto" else self.pdf_tabby_reader |
| 86 | + else: |
| 87 | + reader = self.pdf_image_reader |
97 | 88 |
|
98 |
| - pdf_with_txt_layer = get_param_pdf_with_txt_layer(parameters) |
99 |
| - reader = self.pdf_txtlayer_reader if pdf_with_txt_layer == "auto" else self.pdf_tabby_reader |
100 |
| - result = reader.read(file_path=path, parameters=parameters) |
101 |
| - result = self.__merge_documents(recognized_first_page, result) if recognized_first_page is not None else result |
| 89 | + copy_parameters = copy.deepcopy(parameters) |
| 90 | + copy_parameters["pages"] = f"{txtlayer_result.start}:{end}" |
| 91 | + result = reader.read(file_path=path, parameters=copy_parameters) |
102 | 92 | return result
|
103 | 93 |
|
104 |
| - def __preparing_first_page_parameters(self, parameters: dict) -> dict: |
105 |
| - import copy |
106 |
| - from dedoc.utils.parameter_utils import get_param_page_slice |
107 |
| - |
108 |
| - first_page, last_page = get_param_page_slice(parameters) |
109 |
| - # calculate indexes for the first page parsing |
110 |
| - first_page_index = 0 if first_page is None else first_page |
111 |
| - last_page_index = 0 |
112 |
| - scan_parameters = copy.deepcopy(parameters) |
113 |
| - |
114 |
| - # page numeration in parameters starts with 1, both ends are included |
115 |
| - scan_parameters["pages"] = f"{first_page_index + 1}:{last_page_index + 1}" |
116 |
| - # if the first page != 0 then we won't read it (because first_page_index > last_page_index) |
117 |
| - return scan_parameters |
| 94 | + def __merge_documents(self, documents: List[UnstructuredDocument]) -> UnstructuredDocument: |
| 95 | + if len(documents) == 0: |
| 96 | + raise ValueError("No documents to merge") |
118 | 97 |
|
119 |
| - def __preparing_other_pages_parameters(self, parameters: dict) -> dict: |
120 |
| - from dedoc.utils.parameter_utils import get_param_page_slice |
| 98 | + if len(documents) == 1: |
| 99 | + return documents[0] |
121 | 100 |
|
122 |
| - first_page, last_page = get_param_page_slice(parameters) |
123 |
| - # parameters for reading pages from the second page |
124 |
| - first_page_index = 1 if first_page is None else first_page |
125 |
| - last_page_index = "" if last_page is None else last_page |
126 |
| - parameters["pages"] = f"{first_page_index + 1}:{last_page_index}" |
127 |
| - |
128 |
| - return parameters |
129 |
| - |
130 |
| - def __merge_documents(self, first: UnstructuredDocument, second: UnstructuredDocument) -> UnstructuredDocument: |
131 | 101 | from itertools import chain
|
| 102 | + from dedoc.data_structures.concrete_annotations.attach_annotation import AttachAnnotation |
132 | 103 | from dedoc.data_structures.concrete_annotations.table_annotation import TableAnnotation
|
133 | 104 | from dedoc.data_structures.line_with_meta import LineWithMeta
|
134 | 105 |
|
135 |
| - tables = first.tables |
136 |
| - dropped_tables = set() |
137 |
| - for table in second.tables: |
138 |
| - if table.metadata.page_id != 0: |
139 |
| - tables.append(table) |
140 |
| - else: |
141 |
| - dropped_tables.add(table.metadata.uid) |
142 |
| - |
143 |
| - lines = [] |
144 |
| - line_id = 0 |
145 |
| - for line in chain(first.lines, second.lines): |
| 106 | + tables, attachments = self.__prepare_tables_attachments(documents) |
| 107 | + warnings = list(set(chain.from_iterable([document.warnings for document in documents]))) |
| 108 | + table_uids = set([table.metadata.uid for table in tables]) |
| 109 | + attachment_uids = set([attachment.uid for attachment in attachments]) |
| 110 | + lines, line_id = [], 0 |
| 111 | + |
| 112 | + for line in chain.from_iterable([document.lines for document in documents]): |
146 | 113 | line.metadata.line_id = line_id
|
147 | 114 | line_id += 1
|
148 |
| - annotations = [ |
149 |
| - annotation for annotation in line.annotations if not (isinstance(annotation, TableAnnotation) and annotation.value in dropped_tables) |
150 |
| - ] |
151 |
| - new_line = LineWithMeta(line=line.line, metadata=line.metadata, annotations=annotations, uid=line.uid) |
152 |
| - lines.append(new_line) |
153 |
| - return UnstructuredDocument(tables=tables, lines=lines, attachments=first.attachments + second.attachments, metadata=second.metadata) |
| 115 | + annotations = [] |
| 116 | + for annotation in line.annotations: |
| 117 | + if isinstance(annotation, TableAnnotation) and annotation.value not in table_uids: |
| 118 | + continue |
| 119 | + if isinstance(annotation, AttachAnnotation) and annotation.value not in attachment_uids: |
| 120 | + continue |
| 121 | + annotations.append(annotation) |
| 122 | + lines.append(LineWithMeta(line=line.line, metadata=line.metadata, annotations=annotations, uid=line.uid)) |
| 123 | + |
| 124 | + return UnstructuredDocument(tables=tables, lines=lines, attachments=attachments, metadata=documents[0].metadata, warnings=warnings) |
| 125 | + |
| 126 | + def __prepare_tables_attachments(self, documents: List[UnstructuredDocument]) -> Tuple[list, list]: |
| 127 | + from dedoc.readers.pdf_reader.data_classes.pdf_image_attachment import PdfImageAttachment |
| 128 | + |
| 129 | + tables, attachments, attachment_uids = [], [], set() |
| 130 | + for document in documents: |
| 131 | + if not document.lines: |
| 132 | + continue |
| 133 | + |
| 134 | + lines = sorted(document.lines, key=lambda l: l.metadata.page_id) |
| 135 | + min_page, max_page = lines[0].metadata.page_id, lines[-1].metadata.page_id |
| 136 | + tables.extend([table for table in document.tables if min_page <= table.metadata.page_id <= max_page]) |
| 137 | + for attachment in document.attachments: |
| 138 | + if not isinstance(attachment, PdfImageAttachment) and attachment.uid not in attachment_uids: |
| 139 | + attachment_uids.add(attachment.uid) |
| 140 | + attachments.append(attachment) |
| 141 | + |
| 142 | + if isinstance(attachment, PdfImageAttachment) and min_page <= attachment.location.page_number <= max_page: |
| 143 | + attachments.append(attachment) |
| 144 | + |
| 145 | + return tables, attachments |
0 commit comments