Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,8 @@
import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.intTable;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.TimePartitioning.Type;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
Expand All @@ -42,13 +37,10 @@
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import com.wepay.kafka.connect.bigquery.exception.BigQueryErrorResponses;
import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException;
import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException;
import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId;
import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter;
import com.wepay.kafka.connect.bigquery.utils.TableNameUtils;
import com.wepay.kafka.connect.bigquery.utils.Time;
import com.wepay.kafka.connect.bigquery.write.batch.GcsBatchTableWriter;
import com.wepay.kafka.connect.bigquery.write.batch.KcbqThreadPoolExecutor;
Expand Down Expand Up @@ -85,7 +77,6 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
Expand Down Expand Up @@ -118,15 +109,13 @@ public class BigQuerySinkTask extends SinkTask {
private GcsToBqWriter gcsToBqWriter;
private BigQuerySinkTaskConfig config;
private SinkRecordConverter recordConverter;
private boolean useMessageTimeDatePartitioning;
private boolean usePartitionDecorator;
private RecordTableResolver recordTableResolver;
private boolean upsertDelete;
private MergeBatches mergeBatches;
private MergeQueries mergeQueries;
private volatile boolean stopped;
private TopicPartitionManager topicPartitionManager;
private KcbqThreadPoolExecutor executor;
private Map<TableId, Table> cache;
private int remainingRetries;
private boolean enableRetries;
private ErrantRecordHandler errantRecordHandler;
Expand All @@ -137,10 +126,7 @@ public class BigQuerySinkTask extends SinkTask {
private boolean autoCreateTables;
private int retry;
private long retryWait;
private Map<String, PartitionedTableId> topicToPartitionTableId;

private boolean allowNewBigQueryFields;
private boolean useCredentialsProjectId;
private boolean allowRequiredFieldRelaxation;
private boolean allowSchemaUnionization;

Expand All @@ -167,15 +153,13 @@ public BigQuerySinkTask() {
* @param time {@link Time} used to wait during backoff periods; should be mocked for testing
* @see BigQuerySinkTask#BigQuerySinkTask()
*/
public BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever, Storage testGcs,
SchemaManager testSchemaManager, Map<TableId, Table> testCache,
StorageWriteApiBase testStorageWriteApi, StorageApiBatchModeHandler testStorageApiBatchHandler,
Time time) {
BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever, Storage testGcs,
SchemaManager testSchemaManager, StorageWriteApiBase testStorageWriteApi,
StorageApiBatchModeHandler testStorageApiBatchHandler, Time time) {
this.testBigQuery = testBigQuery;
this.schemaRetriever = schemaRetriever;
this.testGcs = testGcs;
this.testSchemaManager = testSchemaManager;
this.cache = testCache;
this.testStorageWriteApi = testStorageWriteApi;
this.testStorageApiBatchHandler = testStorageApiBatchHandler;
this.time = time;
Expand Down Expand Up @@ -224,66 +208,6 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, Offs
return offsets;
}

private PartitionedTableId getStorageApiRecordTable(String topic) {
return topicToPartitionTableId.computeIfAbsent(topic, topicName -> {
String project = config.getString(BigQuerySinkConfig.PROJECT_CONFIG);
String[] datasetAndtableName = TableNameUtils.getDataSetAndTableName(config, topicName);
return new PartitionedTableId.Builder(TableId.of(project, datasetAndtableName[0], datasetAndtableName[1])).build();
});

}

private PartitionedTableId getRecordTable(SinkRecord record) {
String[] datasetAndtableName = TableNameUtils.getDataSetAndTableName(config, record.topic());
String dataset = datasetAndtableName[0];
String tableName = datasetAndtableName[1];
// TODO: Order of execution of topic/table name modifications =>
// regex router SMT modifies topic name in sinkrecord.
// It could be either : separated or not.

// should we use topic2table map with sanitize table name? doesn't make sense.

// we use table name from above to sanitize table name further.


String project = null;
if (useCredentialsProjectId) {
project = config.getString(BigQuerySinkConfig.PROJECT_CONFIG);
}
TableId baseTableId = project == null
? TableId.of(dataset, tableName)
: TableId.of(project, dataset, tableName);
if (upsertDelete) {
TableId intermediateTableId = mergeBatches.intermediateTableFor(baseTableId);
// If upsert/delete is enabled, we want to stream into a non-partitioned intermediate table
return new PartitionedTableId.Builder(intermediateTableId).build();
}

PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
if (usePartitionDecorator) {
Table bigQueryTable = retrieveCachedTable(baseTableId);
TimePartitioning timePartitioning = TimePartitioning.of(Type.DAY);
if (bigQueryTable != null) {
StandardTableDefinition standardTableDefinition = bigQueryTable.getDefinition();
if (standardTableDefinition != null && standardTableDefinition.getTimePartitioning() != null) {
timePartitioning = standardTableDefinition.getTimePartitioning();
}
}

if (useMessageTimeDatePartitioning) {
if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
throw new ConnectException(
"Message has no timestamp type, cannot use message timestamp to partition.");
}
setTimePartitioningForTimestamp(baseTableId, builder, timePartitioning, record.timestamp());
} else {
setTimePartitioning(baseTableId, builder, timePartitioning);
}
}

return builder.build();
}

private void writeSinkRecords(Collection<SinkRecord> records) {
// Periodically poll for errors here instead of doing a stop-the-world check in flush()
maybeThrowErrors();
Expand All @@ -295,13 +219,13 @@ private void writeSinkRecords(Collection<SinkRecord> records) {

for (SinkRecord record : records) {
if (record.value() != null || config.getBoolean(BigQuerySinkConfig.DELETE_ENABLED_CONFIG)) {
PartitionedTableId table = useStorageApi ? getStorageApiRecordTable(record.topic()) : getRecordTable(record);
PartitionedTableId table = recordTableResolver.getRecordTable(record);
if (!tableWriterBuilders.containsKey(table)) {
TableWriterBuilder tableWriterBuilder;
if (useStorageApi) {
tableWriterBuilder = new StorageWriteApiWriter.Builder(
storageApiWriter,
TableNameUtils.tableName(table.getBaseTableId()),
table,
recordConverter,
batchHandler
);
Expand Down Expand Up @@ -400,53 +324,6 @@ private BigQuery getBigQuery() {
return bigQuery.updateAndGet(bq -> bq != null ? bq : newBigQuery());
}

private void setTimePartitioningForTimestamp(
TableId table, PartitionedTableId.Builder builder, TimePartitioning timePartitioning, Long timestamp
) {
if (timePartitioning.getType() != Type.DAY) {
throw new ConnectException(String.format(
"Cannot use decorator syntax to write to %s as it is partitioned by %s and not by day",
TableNameUtils.table(table),
timePartitioning.getType().toString().toLowerCase()
));
}
builder.setDayPartition(timestamp);
}

private void setTimePartitioning(TableId table, PartitionedTableId.Builder builder, TimePartitioning timePartitioning) {
if (timePartitioning.getType() != Type.DAY) {
throw new ConnectException(String.format(
"Cannot use decorator syntax to write to %s as it is partitioned by %s and not by day",
TableNameUtils.table(table),
timePartitioning.getType().toString().toLowerCase()
));
}
builder.setDayPartitionForNow();
}

private Table retrieveCachedTable(TableId tableId) {
return getCache().computeIfAbsent(tableId, this::retrieveTable);
}

private Table retrieveTable(TableId tableId) {
try {
return getBigQuery().getTable(tableId);
} catch (BigQueryException e) {
/* 1. Authentication error thrown by bigquery is a type of IOException
and the error code is 0. That's why we create a separate
check function for Authentication error otherwise this falls under IOError check */

/* 2. For Authentication, we don't need Retry logic. Instead, we throw Bigquery exception directly. */
if (BigQueryErrorResponses.isAuthenticationError(e)) {
throw new BigQueryConnectException("Failed to authenticate client for table " + tableId + " with error " + e, e);
} else if (BigQueryErrorResponses.isIoError(e)) {
throw new RetriableException("Failed to retrieve information for table " + tableId, e);
} else {
throw e;
}
}
}

private BigQuery newBigQuery() {
return new GcpClientBuilder.BigQueryBuilder()
.withConfig(config)
Expand Down Expand Up @@ -534,14 +411,6 @@ private SinkRecordConverter getConverter(BigQuerySinkTaskConfig config) {
return new SinkRecordConverter(config, mergeBatches, mergeQueries);
}

private synchronized Map<TableId, Table> getCache() {
if (cache == null) {
cache = new HashMap<>();
}

return cache;
}

// visible for testing
boolean isRunning() {
return !stopped;
Expand All @@ -553,19 +422,17 @@ public void start(Map<String, String> properties) {
stopped = false;
config = new BigQuerySinkTaskConfig(properties);
autoCreateTables = config.getBoolean(BigQuerySinkConfig.TABLE_CREATE_CONFIG);
upsertDelete = config.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG)
|| config.getBoolean(BigQuerySinkConfig.DELETE_ENABLED_CONFIG);

useCredentialsProjectId = config.getBoolean(BigQuerySinkConfig.USE_CREDENTIALS_PROJECT_ID_CONFIG);
useStorageApi = config.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG);
useStorageApiBatchMode = useStorageApi && config.getBoolean(BigQuerySinkConfig.ENABLE_BATCH_MODE_CONFIG);
upsertDelete = !useStorageApi && (config.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG)
|| config.getBoolean(BigQuerySinkConfig.DELETE_ENABLED_CONFIG));

retry = config.getInt(BigQuerySinkConfig.BIGQUERY_RETRY_CONFIG);
retryWait = config.getLong(BigQuerySinkConfig.BIGQUERY_RETRY_WAIT_CONFIG);
allowNewBigQueryFields = config.getBoolean(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG);
allowRequiredFieldRelaxation = config.getBoolean(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG);
allowSchemaUnionization = config.getBoolean(BigQuerySinkConfig.ALLOW_SCHEMA_UNIONIZATION_CONFIG);
topicToPartitionTableId = new HashMap<>();
bigQuery = new AtomicReference<>();
schemaManager = new AtomicReference<>();

Expand All @@ -590,7 +457,6 @@ public void start(Map<String, String> properties) {
mergeBatches = new MergeBatches(intermediateTableSuffix);
}

cache = getCache();
bigQueryWriter = getBigQueryWriter(errantRecordHandler);
gcsToBqWriter = getGcsWriter();
executor = new KcbqThreadPoolExecutor(
Expand All @@ -599,10 +465,9 @@ public void start(Map<String, String> properties) {
new MdcContextThreadFactory()
);
topicPartitionManager = new TopicPartitionManager();
useMessageTimeDatePartitioning =
config.getBoolean(BigQuerySinkConfig.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG);
usePartitionDecorator =
config.getBoolean(BigQuerySinkConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG);
recordTableResolver = new RecordTableResolver(config, mergeBatches, getBigQuery(), upsertDelete,
useStorageApiBatchMode, useStorageApi);

if (config.getBoolean(BigQuerySinkTaskConfig.GCS_BQ_TASK_CONFIG)) {
startGcsToBqLoadTask();
} else if (upsertDelete) {
Expand Down
Loading
Loading