Skip to content

Commit fd3fd1f

Browse files
authored
Sample optimized tables (#715)
* sample optimized table with tests
1 parent 4090580 commit fd3fd1f

File tree

3 files changed

+186
-44
lines changed

3 files changed

+186
-44
lines changed

gcp_variant_transforms/libs/bigquery_util.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,57 @@ def compose_table_name(base_name, suffix):
365365
# type: (str, str) -> str
366366
return TABLE_SUFFIX_SEPARATOR.join([base_name, suffix])
367367

368+
369+
def compose_temp_table_base(base_name, prefix):
370+
# type: (str, str) -> str
371+
table_re_match = re.match(
372+
r'^((?P<project>.+):)(?P<dataset>\w+)\.(?P<table>[\w\$]+)$', base_name)
373+
project_id = table_re_match.group('project')
374+
dataset_id = table_re_match.group('dataset')
375+
table_id = table_re_match.group('table')
376+
temp_table_base_name = '{}:{}.{}{}'.format(project_id, dataset_id, prefix,
377+
table_id)
378+
return temp_table_base_name
379+
380+
381+
def get_non_temp_table_name(temp_table, prefix):
382+
table_re_match = re.match(
383+
rf'^((?P<project>.+):)(?P<dataset>\w+)\.(?P<tmpname>{prefix})(?P<table>[\w\$]+)$',
384+
temp_table)
385+
project_id = table_re_match.group('project')
386+
dataset_id = table_re_match.group('dataset')
387+
table_id = table_re_match.group('table')
388+
non_temp_table_name = ('{}:{}.{}'.format(project_id, dataset_id, table_id))
389+
return non_temp_table_name
390+
391+
392+
def copy_table(source, destination):
393+
source_table_re_match = re.match(
394+
r'^((?P<project>.+):)(?P<dataset>\w+)\.(?P<table>[\w\$]+)$', source)
395+
source_project_id = source_table_re_match.group('project')
396+
source_dataset_id = source_table_re_match.group('dataset')
397+
source_table_id = source_table_re_match.group('table')
398+
destination_table_re_match = re.match(
399+
r'^((?P<project>.+):)(?P<dataset>\w+)\.(?P<table>[\w\$]+)$', destination)
400+
destination_project_id = destination_table_re_match.group('project')
401+
destination_dataset_id = destination_table_re_match.group('dataset')
402+
destination_table_id = destination_table_re_match.group('table')
403+
client = bigquery.Client(project=source_project_id)
404+
source_dataset = client.dataset(
405+
dataset_id=source_dataset_id, project=source_project_id)
406+
source_table = source_dataset.table(source_table_id)
407+
destination_dataset = client.dataset(
408+
dataset_id=destination_dataset_id, project=destination_project_id)
409+
destination_table = destination_dataset.table(destination_table_id)
410+
job_config = bigquery.job.CopyJobConfig(write_disposition='WRITE_APPEND')
411+
copy_job = client.copy_table(
412+
source_table, destination_table, job_config=job_config)
413+
try:
414+
results = copy_job.result(timeout=600)
415+
except TimeoutError as e:
416+
logging.warning('Time out copying from temp table: %s', source_table)
417+
418+
368419
def get_table_base_name(table_name):
369420
return table_name.split(TABLE_SUFFIX_SEPARATOR)[0]
370421

gcp_variant_transforms/libs/partitioning.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def __init__(self, base_table_id, suffixes, append):
8282
self._sub_fields = []
8383

8484
job_config = bigquery.job.QueryJobConfig(
85-
write_disposition='WRITE_TRUNCATE' if append else 'WRITE_EMPTY')
85+
write_disposition='WRITE_APPEND' if append else 'WRITE_EMPTY')
8686
self._client = bigquery.Client(project=self._project_id,
8787
default_query_job_config=job_config)
8888
self._find_one_non_empty_table()

gcp_variant_transforms/vcf_to_bq.py

Lines changed: 134 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ def _record_newly_created_table(full_table_id):
9999
global _newly_created_tables # pylint: disable=global-statement
100100
_newly_created_tables.append(full_table_id)
101101

102+
_new_temp_tables = []
103+
def _record_new_temp_table(full_table_id):
104+
global _new_temp_tables # pylint: disable=global-statement
105+
_new_temp_tables.append(full_table_id)
102106

103107
def _read_variants(all_patterns, # type: List[str]
104108
pipeline, # type: beam.Pipeline
@@ -569,20 +573,137 @@ def run(argv=None):
569573
known_args.output_table))
570574

571575
suffixes.append(sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
572-
load_avro = avro_util.LoadAvro(
573-
avro_root_path, known_args.output_table, suffixes, False)
574-
not_empty_variant_suffixes = load_avro.start_loading()
575-
logging.info('Following tables were loaded with at least 1 row:')
576-
for suffix in not_empty_variant_suffixes:
577-
logging.info(bigquery_util.compose_table_name(known_args.output_table,
578-
suffix))
579-
# Remove sample_info table from both lists to avoid duplicating it when
580-
# --sample_lookup_optimized_output_table flag is set
581-
suffixes.remove(sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
582-
if sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX in\
583-
not_empty_variant_suffixes:
584-
not_empty_variant_suffixes.remove(
576+
577+
# If creating sample optimized tables and running in append mode, create
578+
# temp tables first, to be able to copy just new records to sample
579+
# optimized tables.
580+
if (known_args.append and known_args.sample_lookup_optimized_output_table):
581+
tmp_prefix = '_tmp_'
582+
temp_table_base_name = bigquery_util.compose_temp_table_base(
583+
known_args.output_table, tmp_prefix)
584+
585+
temp_suffixes = []
586+
for i in range(num_shards):
587+
temp_suffixes.append(sharding.get_output_table_suffix(i))
588+
temp_partition_range_end = sharding.get_output_table_partition_range_end(
589+
i)
590+
temp_table_name = bigquery_util.compose_table_name(
591+
temp_table_base_name, temp_suffixes[i])
592+
partitioning.create_bq_table(
593+
temp_table_name, schema_file,
594+
bigquery_util.ColumnKeyConstants.START_POSITION,
595+
temp_partition_range_end)
596+
_record_newly_created_table(temp_table_name)
597+
logging.info('Integer range partitioned table %s was created.',
598+
temp_table_name)
599+
_record_new_temp_table(temp_table_name)
600+
temp_sample_table_id = sample_info_table_schema_generator.create_sample_info_table(
601+
temp_table_base_name)
602+
_record_newly_created_table(temp_sample_table_id)
603+
_record_new_temp_table(temp_sample_table_id)
604+
temp_suffixes.append(
605+
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
606+
temp_load_avro = avro_util.LoadAvro(avro_root_path, temp_table_base_name,
607+
temp_suffixes, False)
608+
temp_not_empty_variant_suffixes = temp_load_avro.start_loading()
609+
610+
# Copy tables
611+
for temp_t in _new_temp_tables:
612+
try:
613+
output_table = bigquery_util.get_non_temp_table_name(
614+
temp_t, tmp_prefix)
615+
bigquery_util.copy_table(temp_t, output_table)
616+
except Exception as e:
617+
logging.error(
618+
'Something unexpected during the copy of the temp '
619+
'table: %s to the target table %s: %s', temp_t, output_table,
620+
str(e))
621+
622+
# Remove sample_info table from both lists to avoid duplicating it when
623+
# --sample_lookup_optimized_output_table flag is set
624+
temp_suffixes.remove(
585625
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
626+
if sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX in temp_not_empty_variant_suffixes:
627+
temp_not_empty_variant_suffixes.remove(
628+
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
629+
630+
# Copy to sample optimized tables
631+
temp_flatten_call_column = partitioning.FlattenCallColumn(
632+
temp_table_base_name, temp_not_empty_variant_suffixes,
633+
known_args.append)
634+
try:
635+
temp_flatten_schema_file = tempfile.mkstemp(
636+
suffix=_BQ_SCHEMA_FILE_SUFFIX)[1]
637+
if not temp_flatten_call_column.get_flatten_table_schema(
638+
temp_flatten_schema_file):
639+
raise ValueError('Failed to extract schema of flatten table')
640+
# Copy to flatten sample lookup tables from the variant lookup tables.
641+
temp_flatten_call_column.copy_to_flatten_table(
642+
known_args.sample_lookup_optimized_output_table)
643+
logging.info('All sample lookup optimized tables are fully loaded.')
644+
except Exception as e:
645+
logging.error(
646+
'Something unexpected happened during the loading rows to '
647+
'sample optimized table stage. Since this copy failed, the '
648+
'temporary tables were not deleted. To avoid extra storage '
649+
'charges, delete the temporary tables in your dataset that '
650+
'will begin with %s. Error: %s', tmp_prefix, str(e))
651+
raise e
652+
else:
653+
for temp_t in _new_temp_tables:
654+
if bigquery_util.delete_table(temp_t) != 0:
655+
logging.error('Deletion of temporary table "%s" has failed.',
656+
temp_t)
657+
658+
else:
659+
load_avro = avro_util.LoadAvro(avro_root_path, known_args.output_table,
660+
suffixes, False)
661+
not_empty_variant_suffixes = load_avro.start_loading()
662+
logging.info('Following tables were loaded with at least 1 row:')
663+
for suffix in not_empty_variant_suffixes:
664+
logging.info(
665+
bigquery_util.compose_table_name(known_args.output_table, suffix))
666+
# Remove sample_info table from both lists to avoid duplicating it when
667+
# --sample_lookup_optimized_output_table flag is set
668+
suffixes.remove(
669+
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
670+
if sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX in\
671+
not_empty_variant_suffixes:
672+
not_empty_variant_suffixes.remove(
673+
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
674+
675+
if known_args.sample_lookup_optimized_output_table:
676+
flatten_call_column = partitioning.FlattenCallColumn(
677+
known_args.output_table, not_empty_variant_suffixes,
678+
known_args.append)
679+
try:
680+
flatten_schema_file = tempfile.mkstemp(
681+
suffix=_BQ_SCHEMA_FILE_SUFFIX)[1]
682+
if not flatten_call_column.get_flatten_table_schema(
683+
flatten_schema_file):
684+
raise ValueError('Failed to extract schema of flatten table')
685+
686+
# Create all sample optimized tables including those that will be empty.
687+
for suffix in suffixes:
688+
output_table_id = bigquery_util.compose_table_name(
689+
known_args.sample_lookup_optimized_output_table, suffix)
690+
partitioning.create_bq_table(
691+
output_table_id, flatten_schema_file,
692+
bigquery_util.ColumnKeyConstants.CALLS_SAMPLE_ID,
693+
partitioning.MAX_RANGE_END)
694+
_record_newly_created_table(output_table_id)
695+
logging.info('Sample lookup optimized table %s was created.',
696+
output_table_id)
697+
# Copy to flatten sample lookup tables from the variant lookup tables.
698+
flatten_call_column.copy_to_flatten_table(
699+
known_args.sample_lookup_optimized_output_table)
700+
logging.info('All sample lookup optimized tables are fully loaded.')
701+
except Exception as e:
702+
logging.error(
703+
'Something unexpected happened during the loading rows to '
704+
'sample optimized table stage: %s', str(e))
705+
raise e
706+
586707
except Exception as e:
587708
logging.error('Something unexpected happened during the loading of AVRO '
588709
'files to BigQuery: %s', str(e))
@@ -603,36 +724,6 @@ def run(argv=None):
603724
'failed.', avro_root_path)
604725

605726

606-
if known_args.sample_lookup_optimized_output_table:
607-
flatten_call_column = partitioning.FlattenCallColumn(
608-
known_args.output_table, not_empty_variant_suffixes, known_args.append)
609-
try:
610-
flatten_schema_file = tempfile.mkstemp(suffix=_BQ_SCHEMA_FILE_SUFFIX)[1]
611-
if not flatten_call_column.get_flatten_table_schema(flatten_schema_file):
612-
raise ValueError('Failed to extract schema of flatten table')
613-
# Create output flatten tables if needed
614-
if not known_args.append:
615-
# Create all sample optimized tables including those that will be empty.
616-
for suffix in suffixes:
617-
output_table_id = bigquery_util.compose_table_name(
618-
known_args.sample_lookup_optimized_output_table, suffix)
619-
partitioning.create_bq_table(
620-
output_table_id, flatten_schema_file,
621-
bigquery_util.ColumnKeyConstants.CALLS_SAMPLE_ID,
622-
partitioning.MAX_RANGE_END)
623-
_record_newly_created_table(output_table_id)
624-
logging.info('Sample lookup optimized table %s was created.',
625-
output_table_id)
626-
# Copy to flatten sample lookup tables from the variant lookup tables.
627-
# Note: uses WRITE_TRUNCATE to overwrite the existing tables (issue #607).
628-
flatten_call_column.copy_to_flatten_table(
629-
known_args.sample_lookup_optimized_output_table)
630-
logging.info('All sample lookup optimized tables are fully loaded.')
631-
except Exception as e:
632-
logging.error('Something unexpected happened during the loading rows to '
633-
'sample optimized table stage: %s', str(e))
634-
raise e
635-
636727
if __name__ == '__main__':
637728
logging.getLogger().setLevel(logging.INFO)
638729
try:

0 commit comments

Comments
 (0)