1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14- from bson .codec_options import DEFAULT_CODEC_OPTIONS
15- from pyarrow import Table , timestamp
14+ from pyarrow import ListArray , StructArray , Table
1615
1716from pymongoarrow .types import _BsonArrowTypes , _get_internal_typemap
1817
19- try :
20- from pymongoarrow .lib import (
21- BinaryBuilder ,
22- BoolBuilder ,
23- CodeBuilder ,
24- Date32Builder ,
25- Date64Builder ,
26- DatetimeBuilder ,
27- Decimal128Builder ,
28- DocumentBuilder ,
29- DoubleBuilder ,
30- Int32Builder ,
31- Int64Builder ,
32- ListBuilder ,
33- NullBuilder ,
34- ObjectIdBuilder ,
35- StringBuilder ,
36- )
37-
38- _TYPE_TO_BUILDER_CLS = {
39- _BsonArrowTypes .int32 : Int32Builder ,
40- _BsonArrowTypes .int64 : Int64Builder ,
41- _BsonArrowTypes .double : DoubleBuilder ,
42- _BsonArrowTypes .datetime : DatetimeBuilder ,
43- _BsonArrowTypes .objectid : ObjectIdBuilder ,
44- _BsonArrowTypes .decimal128 : Decimal128Builder ,
45- _BsonArrowTypes .string : StringBuilder ,
46- _BsonArrowTypes .bool : BoolBuilder ,
47- _BsonArrowTypes .document : DocumentBuilder ,
48- _BsonArrowTypes .array : ListBuilder ,
49- _BsonArrowTypes .binary : BinaryBuilder ,
50- _BsonArrowTypes .code : CodeBuilder ,
51- _BsonArrowTypes .date32 : Date32Builder ,
52- _BsonArrowTypes .date64 : Date64Builder ,
53- _BsonArrowTypes .null : NullBuilder ,
54- }
55- except ImportError :
56- pass
57-
5818
5919class PyMongoArrowContext :
6020 """A context for converting BSON-formatted data to an Arrow Table."""
6121
62- def __init__ (self , schema , builder_map , codec_options = None ):
22+ def __init__ (self , schema , codec_options = None ):
6323 """Initialize the context.
6424
6525 :Parameters:
@@ -68,57 +28,75 @@ def __init__(self, schema, builder_map, codec_options=None):
6828 :class:`~pymongoarrow.builders._BuilderBase` instances.
6929 """
7030 self .schema = schema
71- self .builder_map = builder_map
7231 if self .schema is None and codec_options is not None :
7332 self .tzinfo = codec_options .tzinfo
7433 else :
7534 self .tzinfo = None
35+ schema_map = {}
36+ if self .schema is not None :
37+ str_type_map = _get_internal_typemap (schema .typemap )
38+ _parse_types (str_type_map , schema_map , self .tzinfo )
7639
77- @classmethod
78- def from_schema (cls , schema , codec_options = DEFAULT_CODEC_OPTIONS ):
79- """Initialize the context from a :class:`~pymongoarrow.schema.Schema`
80- instance.
40+ # Delayed import to prevent import errors for unbuilt library.
41+ from pymongoarrow .lib import BuilderManager
8142
82- :Parameters:
83- - `schema`: Instance of :class:`~pymongoarrow.schema.Schema`.
84- - `codec_options` (optional): An instance of
85- :class:`~bson.codec_options.CodecOptions`.
86- """
87- if schema is None :
88- return cls (schema , {}, codec_options )
89-
90- builder_map = {}
91- tzinfo = codec_options .tzinfo
92- str_type_map = _get_internal_typemap (schema .typemap )
93- for fname , ftype in str_type_map .items ():
94- builder_cls = _TYPE_TO_BUILDER_CLS [ftype ]
95- encoded_fname = fname .encode ("utf-8" )
96-
97- # special-case initializing builders for parameterized types
98- if builder_cls == DatetimeBuilder :
99- arrow_type = schema .typemap [fname ]
100- if tzinfo is not None and arrow_type .tz is None :
101- arrow_type = timestamp (arrow_type .unit , tz = tzinfo )
102- builder_map [encoded_fname ] = DatetimeBuilder (dtype = arrow_type )
103- elif builder_cls == DocumentBuilder :
104- arrow_type = schema .typemap [fname ]
105- builder_map [encoded_fname ] = DocumentBuilder (arrow_type , tzinfo )
106- elif builder_cls == ListBuilder :
107- arrow_type = schema .typemap [fname ]
108- builder_map [encoded_fname ] = ListBuilder (arrow_type , tzinfo )
109- elif builder_cls == BinaryBuilder :
110- subtype = schema .typemap [fname ].subtype
111- builder_map [encoded_fname ] = BinaryBuilder (subtype )
112- else :
113- builder_map [encoded_fname ] = builder_cls ()
114- return cls (schema , builder_map )
43+ self .manager = BuilderManager (schema_map , self .schema is not None , self .tzinfo )
44+
45+ def process_bson_stream (self , stream ):
46+ self .manager .process_bson_stream (stream , len (stream ))
11547
11648 def finish (self ):
117- arrays = []
118- names = []
119- for fname , builder in self .builder_map .items ():
120- arrays .append (builder .finish ())
121- names .append (fname .decode ("utf-8" ))
49+ array_map = _parse_builder_map (self .manager .finish ())
50+ arrays = list (array_map .values ())
12251 if self .schema is not None :
12352 return Table .from_arrays (arrays = arrays , schema = self .schema .to_arrow ())
124- return Table .from_arrays (arrays = arrays , names = names )
53+ return Table .from_arrays (arrays = arrays , names = list (array_map .keys ()))
54+
55+
56+ def _parse_builder_map (builder_map ):
57+ # Handle nested builders.
58+ to_remove = []
59+ # Traverse the builder map right to left.
60+ for key , value in reversed (builder_map .items ()):
61+ if value .type_marker == _BsonArrowTypes .document .value :
62+ names = value .finish ()
63+ full_names = [f"{ key } .{ name } " for name in names ]
64+ arrs = [builder_map [c ] for c in full_names ]
65+ builder_map [key ] = StructArray .from_arrays (arrs , names = names )
66+ to_remove .extend (full_names )
67+ elif value .type_marker == _BsonArrowTypes .array .value :
68+ child_name = key + "[]"
69+ to_remove .append (child_name )
70+ child = builder_map [child_name ]
71+ builder_map [key ] = ListArray .from_arrays (value .finish (), child )
72+ else :
73+ builder_map [key ] = value .finish ()
74+
75+ for key in to_remove :
76+ if key in builder_map :
77+ del builder_map [key ]
78+
79+ return builder_map
80+
81+
82+ def _parse_types (str_type_map , schema_map , tzinfo ):
83+ for fname , (ftype , arrow_type ) in str_type_map .items ():
84+ schema_map [fname ] = ftype , arrow_type
85+
86+ # special-case nested builders
87+ if ftype == _BsonArrowTypes .document .value :
88+ # construct a sub type map here
89+ sub_type_map = {}
90+ for i in range (arrow_type .num_fields ):
91+ field = arrow_type [i ]
92+ sub_name = f"{ fname } .{ field .name } "
93+ sub_type_map [sub_name ] = field .type
94+ sub_type_map = _get_internal_typemap (sub_type_map )
95+ _parse_types (sub_type_map , schema_map , tzinfo )
96+ elif ftype == _BsonArrowTypes .array .value :
97+ sub_type_map = {}
98+ sub_name = f"{ fname } []"
99+ sub_value_type = arrow_type .value_type
100+ sub_type_map [sub_name ] = sub_value_type
101+ sub_type_map = _get_internal_typemap (sub_type_map )
102+ _parse_types (sub_type_map , schema_map , tzinfo )
0 commit comments