30
30
from apache_beam .io import tfrecordio
31
31
from tensorflow .contrib import learn
32
32
from tensorflow .contrib .learn .python .learn .utils import input_fn_utils
33
+ from tensorflow_transform .beam import impl as beam_impl
34
+ from tensorflow_transform .beam .tft_beam_io import transform_fn_io
35
+ from tensorflow_transform .coders import example_proto_coder
36
+ from tensorflow_transform .saved import saved_transform_io
33
37
from tensorflow_transform .tf_metadata import dataset_metadata
34
38
from tensorflow_transform .tf_metadata import dataset_schema
35
39
from tensorflow_transform .tf_metadata import metadata_io
@@ -137,14 +141,16 @@ def read_and_shuffle_data(
137
141
(train_neg_filepattern , train_pos_filepattern ))
138
142
| 'WriteTrainData' >> tfrecordio .WriteToTFRecord (
139
143
os .path .join (working_dir , SHUFFLED_TRAIN_DATA_FILEBASE ),
140
- coder = tft .ExampleProtoCoder (RAW_DATA_METADATA .schema )))
144
+ coder = example_proto_coder .ExampleProtoCoder (
145
+ RAW_DATA_METADATA .schema )))
141
146
_ = (
142
147
pipeline
143
148
| 'ReadAndShuffleTest' >> ReadAndShuffleData (
144
149
(test_neg_filepattern , test_pos_filepattern ))
145
150
| 'WriteTestData' >> tfrecordio .WriteToTFRecord (
146
151
os .path .join (working_dir , SHUFFLED_TEST_DATA_FILEBASE ),
147
- coder = tft .ExampleProtoCoder (RAW_DATA_METADATA .schema )))
152
+ coder = example_proto_coder .ExampleProtoCoder (
153
+ RAW_DATA_METADATA .schema )))
148
154
# pylint: enable=no-value-for-parameter
149
155
150
156
@@ -161,20 +167,22 @@ def transform_data(working_dir):
161
167
"""
162
168
163
169
with beam .Pipeline () as pipeline :
164
- with tft .Context (temp_dir = tempfile .mkdtemp ()):
170
+ with beam_impl .Context (temp_dir = tempfile .mkdtemp ()):
165
171
train_data = (
166
172
pipeline |
167
173
'ReadTrain' >> tfrecordio .ReadFromTFRecord (
168
174
os .path .join (working_dir ,
169
175
SHUFFLED_TRAIN_DATA_FILEBASE + '*' ),
170
- coder = tft .ExampleProtoCoder (RAW_DATA_METADATA .schema )))
176
+ coder = example_proto_coder .ExampleProtoCoder (
177
+ RAW_DATA_METADATA .schema )))
171
178
172
179
test_data = (
173
180
pipeline |
174
181
'ReadTest' >> tfrecordio .ReadFromTFRecord (
175
182
os .path .join (working_dir ,
176
183
SHUFFLED_TEST_DATA_FILEBASE + '*' ),
177
- coder = tft .ExampleProtoCoder (RAW_DATA_METADATA .schema )))
184
+ coder = example_proto_coder .ExampleProtoCoder (
185
+ RAW_DATA_METADATA .schema )))
178
186
179
187
def preprocessing_fn (inputs ):
180
188
"""Preprocess input columns into transformed columns."""
@@ -193,34 +201,36 @@ def preprocessing_fn(inputs):
193
201
194
202
(transformed_train_data , transformed_metadata ), transform_fn = (
195
203
(train_data , RAW_DATA_METADATA )
196
- | 'AnalyzeAndTransform' >> tft .AnalyzeAndTransformDataset (
204
+ | 'AnalyzeAndTransform' >> beam_impl .AnalyzeAndTransformDataset (
197
205
preprocessing_fn ))
198
206
199
207
transformed_test_data , _ = (
200
208
((test_data , RAW_DATA_METADATA ), transform_fn )
201
- | 'Transform' >> tft .TransformDataset ())
209
+ | 'Transform' >> beam_impl .TransformDataset ())
202
210
203
211
_ = (
204
212
transformed_train_data
205
213
| 'WriteTrainData' >> tfrecordio .WriteToTFRecord (
206
214
os .path .join (working_dir ,
207
215
TRANSFORMED_TRAIN_DATA_FILEBASE ),
208
- coder = tft .ExampleProtoCoder (transformed_metadata .schema )))
216
+ coder = example_proto_coder .ExampleProtoCoder (
217
+ transformed_metadata .schema )))
209
218
210
219
_ = (
211
220
transformed_test_data
212
221
| 'WriteTestData' >> tfrecordio .WriteToTFRecord (
213
222
os .path .join (working_dir ,
214
223
TRANSFORMED_TEST_DATA_FILEBASE ),
215
- coder = tft .ExampleProtoCoder (transformed_metadata .schema )))
224
+ coder = example_proto_coder .ExampleProtoCoder (
225
+ transformed_metadata .schema )))
216
226
217
227
# Will write a SavedModel and metadata to two subdirectories of
218
- # working_dir, given by tft .TRANSFORM_FN_DIR and
219
- # tft .TRANSFORMED_METADATA_DIR respectively.
228
+ # working_dir, given by transform_fn_io .TRANSFORM_FN_DIR and
229
+ # transform_fn_io .TRANSFORMED_METADATA_DIR respectively.
220
230
_ = (
221
231
transform_fn
222
232
| 'WriteTransformFn' >>
223
- tft .WriteTransformFn (working_dir ))
233
+ transform_fn_io .WriteTransformFn (working_dir ))
224
234
225
235
226
236
# Functions for training
@@ -239,7 +249,8 @@ def _make_training_input_fn(working_dir, filebase, batch_size):
239
249
The input function for training or eval.
240
250
"""
241
251
transformed_metadata = metadata_io .read_metadata (
242
- os .path .join (working_dir , tft .TRANSFORMED_METADATA_DIR ))
252
+ os .path .join (
253
+ working_dir , transform_fn_io .TRANSFORMED_METADATA_DIR ))
243
254
transformed_feature_spec = transformed_metadata .schema .as_feature_spec ()
244
255
245
256
def input_fn ():
@@ -282,8 +293,9 @@ def serving_input_fn():
282
293
# Apply the transform function that was used to generate the materialized
283
294
# data.
284
295
_ , transformed_features = (
285
- tft .partially_apply_saved_transform (
286
- os .path .join (working_dir , tft .TRANSFORM_FN_DIR ), raw_features ))
296
+ saved_transform_io .partially_apply_saved_transform (
297
+ os .path .join (working_dir , transform_fn_io .TRANSFORM_FN_DIR ),
298
+ raw_features ))
287
299
288
300
return input_fn_utils .InputFnOps (transformed_features , None , default_inputs )
289
301
0 commit comments