30
30
31
31
import re
32
32
33
+ import numpy as np
33
34
import tensorflow as tf
34
35
35
36
@@ -102,32 +103,102 @@ def output_is_asset(self, output_tensor):
102
103
return self ._output_is_asset_map [output_tensor ]
103
104
104
105
105
- class _NumericCombineSpec (object ):
106
- """Operation to combine numeric values."""
106
+ class CombinerSpec (object ):
107
+ """Analyze using combiner function.
108
+
109
+ This object mirrors a beam.CombineFn, that will receive a beam PCollection
110
+ representing the batched input tensors.
111
+ """
112
+
113
+ def create_accumulator (self ):
114
+ """Return a fresh, empty accumulator.
115
+
116
+ Returns: An empty accumulator. This can be an Python value.
117
+ """
118
+ raise NotImplementedError
119
+
120
+ def add_input (self , accumulator , element ):
121
+ """Return result of folding element into accumulator.
122
+
123
+ Args:
124
+ accumulator: the current accumulator
125
+ element: the element to add, which will be an ndarray representing the
126
+ value of the input for a batch.
127
+
128
+ Returns: An accumulator that includes the additional element.
129
+ """
130
+ raise NotImplementedError
131
+
132
+ def merge_accumulators (self , accumulators ):
133
+ """Merges several accumulators to a single accumulator value.
134
+
135
+ Args:
136
+ accumulators: the accumulators to merge
137
+
138
+ Returns: The sole merged accumulator.
139
+ """
140
+ raise NotImplementedError
141
+
142
+ def extract_output (self , accumulator ):
143
+ """Return result of converting accumulator into the output value.
144
+
145
+ Args:
146
+ accumulator: the final accumulator value. Should be a list of ndarrays.
147
+
148
+ Returns: An ndarray representing the result of this combiner.
149
+ """
150
+ raise NotImplementedError
107
151
108
- MIN = 'min'
109
- MAX = 'max'
110
- SUM = 'sum'
111
152
112
- def __init__ (self , dtype , combiner_type , reduce_instance_dims ):
113
- self ._dtype = dtype
114
- self ._combiner_type = combiner_type
153
+ def combine_analyzer (x , output_dtype , output_shape , combiner_spec , name ):
154
+ """Applies the combiner over the whole dataset.
155
+
156
+ Args:
157
+ x: An input `Tensor` or `SparseTensor`.
158
+ output_dtype: The dtype of the output of the analyzer.
159
+ output_shape: The shape of the output of the analyzer.
160
+ combiner_spec: A subclass of CombinerSpec.
161
+ name: Similar to a TF op name. Used to define a unique scope for this
162
+ analyzer, which can be used for debugging info.
163
+
164
+ Returns:
165
+ The combined values, which is a `Tensor` with type output_dtype and shape
166
+ `output_shape`. These must be compatible with the combiner_spec.
167
+ """
168
+ return Analyzer ([x ], [(output_dtype , output_shape , False )], combiner_spec ,
169
+ name ).outputs [0 ]
170
+
171
+
172
+ class _NumPyCombinerSpec (CombinerSpec ):
173
+ """Combines the PCollection only on the 0th dimension using nparray."""
174
+
175
+ def __init__ (self , fn , reduce_instance_dims ):
176
+ self ._fn = fn
115
177
self ._reduce_instance_dims = reduce_instance_dims
116
178
117
- @property
118
- def dtype (self ):
119
- return self ._dtype
179
+ def create_accumulator (self ):
180
+ return None
120
181
121
- @property
122
- def combiner_type (self ):
123
- return self ._combiner_type
182
+ def add_input (self , accumulator , next_input ):
183
+ if self ._reduce_instance_dims :
184
+ batch = self ._fn (next_input )
185
+ else :
186
+ batch = self ._fn (next_input , axis = 0 )
187
+ if accumulator is None :
188
+ return batch
189
+ else :
190
+ return self ._fn ((accumulator , batch ), axis = 0 )
124
191
125
- @property
126
- def reduce_instance_dims (self ):
127
- return self ._reduce_instance_dims
192
+ def merge_accumulators (self , accumulators ):
193
+ # numpy's sum, min, max, etc functions operate on array-like objects, but
194
+ # not arbitrary iterables. Convert the provided accumulators into a list
195
+ return self ._fn (list (accumulators ), axis = 0 )
196
+
197
+ def extract_output (self , accumulator ):
198
+ return [accumulator ]
128
199
129
200
130
- def _numeric_combine (x , combiner_type , reduce_instance_dims = True , name = None ):
201
+ def _numeric_combine (x , fn , reduce_instance_dims = True , name = None ):
131
202
"""Apply an analyzer with _NumericCombineSpec to given input."""
132
203
if not isinstance (x , tf .Tensor ):
133
204
raise TypeError ('Expected a Tensor, but got %r' % x )
@@ -143,10 +214,9 @@ def _numeric_combine(x, combiner_type, reduce_instance_dims=True, name=None):
143
214
# If reducing over batch dimensions, with unknown shape, the result will
144
215
# also have unknown shape.
145
216
shape = None
146
- spec = _NumericCombineSpec (x .dtype , combiner_type , reduce_instance_dims )
147
- return Analyzer (
148
- [x ], [(x .dtype , shape , False )], spec ,
149
- name if name is not None else combiner_type ).outputs [0 ]
217
+ return combine_analyzer (
218
+ x , x .dtype , shape , _NumPyCombinerSpec (fn , reduce_instance_dims ),
219
+ name if name is not None else fn .__name__ )
150
220
151
221
152
222
def min (x , reduce_instance_dims = True , name = None ): # pylint: disable=redefined-builtin
@@ -162,8 +232,7 @@ def min(x, reduce_instance_dims=True, name=None): # pylint: disable=redefined-b
162
232
Returns:
163
233
A `Tensor`. Has the same type as `x`.
164
234
"""
165
- return _numeric_combine (
166
- x , _NumericCombineSpec .MIN , reduce_instance_dims , name )
235
+ return _numeric_combine (x , np .min , reduce_instance_dims , name )
167
236
168
237
169
238
def max (x , reduce_instance_dims = True , name = None ): # pylint: disable=redefined-builtin
@@ -179,8 +248,7 @@ def max(x, reduce_instance_dims=True, name=None): # pylint: disable=redefined-b
179
248
Returns:
180
249
A `Tensor`. Has the same type as `x`.
181
250
"""
182
- return _numeric_combine (
183
- x , _NumericCombineSpec .MAX , reduce_instance_dims , name )
251
+ return _numeric_combine (x , np .max , reduce_instance_dims , name )
184
252
185
253
186
254
def sum (x , reduce_instance_dims = True , name = None ): # pylint: disable=redefined-builtin
@@ -196,8 +264,7 @@ def sum(x, reduce_instance_dims=True, name=None): # pylint: disable=redefined-b
196
264
Returns:
197
265
A `Tensor`. Has the same type as `x`.
198
266
"""
199
- return _numeric_combine (
200
- x , _NumericCombineSpec .SUM , reduce_instance_dims , name )
267
+ return _numeric_combine (x , np .sum , reduce_instance_dims , name )
201
268
202
269
203
270
def size (x , reduce_instance_dims = True , name = None ):
@@ -271,18 +338,13 @@ def var(x, reduce_instance_dims=True, name=None):
271
338
class _UniquesSpec (object ):
272
339
"""Operation to compute unique values."""
273
340
274
- def __init__ (self , dtype , top_k , frequency_threshold ,
341
+ def __init__ (self , top_k , frequency_threshold ,
275
342
vocab_filename , store_frequency ):
276
- self ._dtype = dtype
277
343
self ._top_k = top_k
278
344
self ._frequency_threshold = frequency_threshold
279
345
self ._vocab_filename = vocab_filename
280
346
self ._store_frequency = store_frequency
281
347
282
- @property
283
- def dtype (self ):
284
- return self ._dtype
285
-
286
348
@property
287
349
def top_k (self ):
288
350
return self ._top_k
@@ -400,8 +462,8 @@ def uniques(x, top_k=None, frequency_threshold=None,
400
462
# Make the file name path safe.
401
463
vocab_filename = sanitized_vocab_filename (vocab_filename , prefix = prefix )
402
464
403
- spec = _UniquesSpec (tf . string , top_k , frequency_threshold ,
404
- vocab_filename , store_frequency )
465
+ spec = _UniquesSpec (top_k , frequency_threshold , vocab_filename ,
466
+ store_frequency )
405
467
return Analyzer ([x ], [(tf .string , [], True )], spec , 'uniques' ).outputs [0 ]
406
468
407
469
@@ -469,50 +531,3 @@ def quantiles(x, num_buckets, epsilon, name=None):
469
531
# Drop the fist and last quantile boundaries, so that we end-up with
470
532
# num_buckets-1 boundaries, and hence num_buckets buckets.
471
533
return quantile_boundaries [0 :1 , 1 :- 1 ]
472
-
473
-
474
- class _CombinerSpec (object ):
475
- """Analyze using combiner function.
476
-
477
- Args:
478
- combiner: Object of a class that implements beam.CombineFn() interface.
479
- In addtion, the combiner class must implement a @property method called
480
- output_dtype() that returns the tf.DType of the output of the combiner.
481
- """
482
-
483
- def __init__ (self , combiner ):
484
- self ._combiner = combiner
485
-
486
- @property
487
- def combiner (self ):
488
- return self ._combiner
489
-
490
- @property
491
- def output_dtype (self ):
492
- return self ._combiner .output_dtype
493
-
494
-
495
- def combine_analyzer (x , combiner , name = None ):
496
- """Applies the combiner over the whole dataset.
497
-
498
- Args:
499
- x: An input `Tensor` or `SparseTensor`.
500
- combiner: Object of a class that implements beam.CombineFn() interface.
501
- In addtion, the combiner class must implement a @property method called
502
- output_dtype() that returns the type of the output of the combiner.
503
- name: (Optional) A name for this operation.
504
-
505
- Returns:
506
- The combined values as a list, where the each element in the list
507
- is of type combiner.output_dtype().
508
- """
509
-
510
- # The TF node name will be of the form:
511
- # original_scope/{combine_analyzer|name}/{class-name-of-combiner}
512
- with tf .name_scope (name , 'combine_analyzer' ):
513
- spec = _CombinerSpec (combiner )
514
- return Analyzer (
515
- [x ],
516
- [(spec .output_dtype , [1 , None ], False )],
517
- spec ,
518
- type (combiner ).__name__ ).outputs [0 ]
0 commit comments