Skip to content

Commit aa2bc75

Browse files
authored
Add checks for BigQuery ingestion failures (#99)
* added checks for BigQuery ingestion failures * updated error messages
1 parent 1779549 commit aa2bc75

File tree

3 files changed

+145
-20
lines changed

3 files changed

+145
-20
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GcsToBqLoadRunnable.java

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.google.cloud.storage.BlobId;
3636
import com.google.cloud.storage.Bucket;
3737
import com.google.cloud.storage.StorageException;
38+
import com.google.common.annotations.VisibleForTesting;
3839
import com.google.re2j.Matcher;
3940
import com.google.re2j.Pattern;
4041
import com.wepay.kafka.connect.bigquery.write.row.GcsToBqWriter;
@@ -67,18 +68,24 @@ public class GcsToBqLoadRunnable implements Runnable {
6768
private static String SOURCE_URI_FORMAT = "gs://%s/%s";
6869
private final BigQuery bigQuery;
6970

70-
// these numbers are intended to try to make this task not excede Google Cloud Quotas.
71+
// these numbers are intended to try to make this task not exceed Google Cloud Quotas.
7172
// see: https://cloud.google.com/bigquery/quotas#load_jobs
7273
private final Bucket bucket;
7374
private final Map<Job, List<BlobId>> activeJobs;
75+
/**
76+
* The set of blob Ids that the system is currently processing or are queued to process.
77+
*/
7478
private final Set<BlobId> claimedBlobIds;
79+
/**
80+
* The set of blob Ids that the system can delete.
81+
*/
7582
private final Set<BlobId> deletableBlobIds;
7683

7784
/**
7885
* Create a {@link GcsToBqLoadRunnable} with the given bigquery, bucket, and ms wait interval.
7986
*
8087
* @param bigQuery the {@link BigQuery} instance.
81-
* @param bucket the the GCS bucket to read from.
88+
* @param bucket the GCS bucket to read from.
8289
*/
8390
public GcsToBqLoadRunnable(BigQuery bigQuery, Bucket bucket) {
8491
this.bigQuery = bigQuery;
@@ -88,6 +95,24 @@ public GcsToBqLoadRunnable(BigQuery bigQuery, Bucket bucket) {
8895
this.deletableBlobIds = new HashSet<>();
8996
}
9097

98+
/**
99+
* Create a {@link GcsToBqLoadRunnable} with the given bigquery, bucket, and ms wait interval.
100+
*
101+
* @param bigQuery the {@link BigQuery} instance.
102+
* @param bucket the GCS bucket to read from.
103+
* @param activeJobs the map of job to the list of blobs it contains.
104+
* @param claimedBlobIds the list of Blob Ids being processed.
105+
* @param deletableBlobIds the list of Blob Ids that can be deleted.
106+
*/
107+
@VisibleForTesting
108+
GcsToBqLoadRunnable(BigQuery bigQuery, Bucket bucket, Map<Job, List<BlobId>> activeJobs, Set<BlobId> claimedBlobIds, Set<BlobId> deletableBlobIds) {
109+
this.bigQuery = bigQuery;
110+
this.bucket = bucket;
111+
this.activeJobs = activeJobs;
112+
this.claimedBlobIds = claimedBlobIds;
113+
this.deletableBlobIds = deletableBlobIds;
114+
}
115+
91116
/**
92117
* Given a blob, return the {@link TableId} this blob should be inserted into.
93118
*
@@ -217,7 +242,8 @@ private Job triggerBigQueryLoadJob(TableId table, List<Blob> blobs) {
217242
* any jobs that failed. We only log a message for failed jobs because those blobs will be
218243
* retried during the next run.
219244
*/
220-
private void checkJobs() {
245+
@VisibleForTesting
246+
void checkJobs() {
221247
if (activeJobs.isEmpty()) {
222248
// quick exit if nothing needs to be done.
223249
logger.debug("No active jobs to check. Skipping check jobs.");
@@ -237,31 +263,50 @@ private void checkJobs() {
237263
try {
238264
if (job.isDone()) {
239265
logger.trace("Job is marked done: id={}, status={}", job.getJobId(), job.getStatus());
240-
final List<BlobId> blobIdsToDelete = jobEntry.getValue();
266+
if (job.getStatus().getError() == null) {
267+
processSuccessfulJob(job, jobEntry.getValue());
268+
successCount++;
269+
} else {
270+
processFailedJob(job, jobEntry.getValue());
271+
failureCount++;
272+
}
241273
jobIterator.remove();
242274
logger.trace("Job is removed from iterator: {}", job.getJobId());
243-
successCount++;
244-
claimedBlobIds.removeAll(blobIdsToDelete);
245-
logger.trace("Completed blobs have been removed from claimed set: {}", blobIdsToDelete);
246-
deletableBlobIds.addAll(blobIdsToDelete);
247-
logger.trace("Completed blobs marked as deletable: {}", blobIdsToDelete);
248275
}
249276
} catch (BigQueryException ex) {
250277
// log a message.
251278
logger.warn("GCS to BQ load job failed", ex);
252-
// remove job from active jobs (it's not active anymore)
253-
List<BlobId> blobIds = activeJobs.get(job);
254-
jobIterator.remove();
255-
// unclaim blobs
256-
claimedBlobIds.removeAll(blobIds);
279+
processFailedJob(job, jobEntry.getValue());
257280
failureCount++;
281+
jobIterator.remove();
282+
logger.trace("Job is removed from iterator: {}", job.getJobId());
258283
} finally {
259284
logger.info("GCS To BQ job tally: {} successful jobs, {} failed jobs.",
260285
successCount, failureCount);
261286
}
262287
}
263288
}
264289

290+
private void processSuccessfulJob(final Job job, final List<BlobId> blobIdsToDelete) {
291+
blobIdsToDelete.forEach(claimedBlobIds::remove);
292+
logger.trace("Completed blobs have been removed from claimed set: {}", blobIdsToDelete);
293+
deletableBlobIds.addAll(blobIdsToDelete);
294+
logger.trace("Completed blobs marked as deletable: {}", blobIdsToDelete);
295+
}
296+
297+
private void processFailedJob(final Job job, final List<BlobId> blobsNotCompleted) {
298+
logger.warn("Job {} failed with {}", job.getJobId(), job.getStatus().getError());
299+
if (job.getStatus().getExecutionErrors().isEmpty()) {
300+
logger.warn("No additional errors associated with job {}", job.getJobId());
301+
} else {
302+
logger.warn("Additional errors associated with job {}: {}", job.getJobId(), job.getStatus().getExecutionErrors());
303+
}
304+
logger.warn("Blobs in job {}: {}", job.getJobId(), blobsNotCompleted);
305+
// unclaim blobs
306+
blobsNotCompleted.forEach(claimedBlobIds::remove);
307+
logger.trace("Failed blobs reset as processable");
308+
}
309+
265310
/**
266311
* Delete deletable blobs.
267312
*/
@@ -298,7 +343,7 @@ private void deleteBlobs() {
298343
// Calculate number of successful deletes, remove the successful deletes from
299344
// the deletableBlobIds.
300345
successfulDeletes = numberOfBlobs - failedDeletes;
301-
deletableBlobIds.removeAll(blobIdsToDelete);
346+
blobIdsToDelete.forEach(deletableBlobIds::remove);
302347

303348
logger.info("Successfully deleted {} blobs; failed to delete {} blobs",
304349
successfulDeletes,

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/GcsToBqLoadRunnableTest.java

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,33 @@
2525

2626
import static org.junit.jupiter.api.Assertions.assertEquals;
2727
import static org.junit.jupiter.api.Assertions.assertNull;
28+
import static org.junit.jupiter.api.Assertions.assertTrue;
2829
import static org.mockito.Mockito.mock;
29-
30+
import static org.mockito.Mockito.when;
31+
32+
import com.google.cloud.bigquery.BigQuery;
33+
import com.google.cloud.bigquery.BigQueryError;
34+
import com.google.cloud.bigquery.BigQueryException;
35+
import com.google.cloud.bigquery.Job;
36+
import com.google.cloud.bigquery.JobId;
37+
import com.google.cloud.bigquery.JobStatus;
3038
import com.google.cloud.bigquery.TableId;
3139
import com.google.cloud.storage.Blob;
40+
41+
import java.util.ArrayList;
3242
import java.util.Collections;
43+
import java.util.HashMap;
44+
import java.util.HashSet;
45+
import java.util.List;
3346
import java.util.Map;
47+
import java.util.Set;
48+
49+
import com.google.cloud.storage.BlobId;
50+
import com.google.cloud.storage.Bucket;
3451
import org.junit.jupiter.api.Test;
52+
import org.junit.jupiter.params.ParameterizedTest;
53+
import org.junit.jupiter.params.provider.Arguments;
54+
import org.junit.jupiter.params.provider.MethodSource;
3555
import org.mockito.Mockito;
3656

3757
public class GcsToBqLoadRunnableTest {
@@ -63,7 +83,7 @@ public void testGetTableFromBlobWithoutProject() {
6383
@Test
6484
public void testGetTableFromBlobWithoutMetadata() {
6585
Blob mockBlob = mock(Blob.class);
66-
Mockito.when(mockBlob.getMetadata()).thenReturn(null);
86+
when(mockBlob.getMetadata()).thenReturn(null);
6787

6888
TableId tableId = GcsToBqLoadRunnable.getTableFromBlob(mockBlob);
6989
assertNull(tableId);
@@ -91,9 +111,68 @@ private String serializeTableId(TableId tableId) {
91111

92112
private Blob createMockBlobWithTableMetadata(Map<String, String> metadata) {
93113
Blob mockBlob = mock(Blob.class);
94-
Mockito.when(mockBlob.getMetadata()).thenReturn(metadata);
114+
when(mockBlob.getMetadata()).thenReturn(metadata);
95115
return mockBlob;
96116
}
97117

118+
private static Job createJob(String jobId) {
119+
Job job = mock(Job.class);
120+
when(job.getJobId()).thenReturn(JobId.of(jobId));
121+
return job;
122+
123+
}
124+
125+
@ParameterizedTest
126+
@MethodSource("checkJobsData")
127+
void testCheckJobsFailure(String name, Job job, List<BlobId> blobIds, int activeCount, int claimedCount, int deletableCount) {
128+
BigQuery bigQuery = mock(BigQuery.class);
129+
Bucket bucket = mock(Bucket.class);
130+
final Map<Job, List<BlobId>> activeJobs = new HashMap<>();
131+
final Set<BlobId> deletableBlobIds = new HashSet<>();
132+
final Set<BlobId> claimedBlobIds = new HashSet<>(blobIds);
133+
134+
activeJobs.put(job, blobIds);
135+
136+
GcsToBqLoadRunnable runnable = new GcsToBqLoadRunnable(bigQuery, bucket, activeJobs, claimedBlobIds, deletableBlobIds);
137+
runnable.checkJobs();
138+
assertEquals(activeCount, activeJobs.size(), "Wrong active count" );
139+
assertEquals(claimedCount, claimedBlobIds.size(), "Wrong claimed count");
140+
assertEquals(deletableCount, deletableBlobIds.size(), "Wrong deletable count");
141+
}
142+
143+
static List<Arguments> checkJobsData() {
144+
List<Arguments> args = new ArrayList<>();
145+
146+
Job job = createJob("errorInProcessing");
147+
BlobId blob = BlobId.of("bucket", "blob1");
148+
when(job.isDone()).thenReturn(true);
149+
BigQueryError error = new BigQueryError("reason","location", "message", "debugInfo");
150+
JobStatus jobStatus = mock(JobStatus.class);
151+
when(jobStatus.getError()).thenReturn(error);
152+
when(jobStatus.getExecutionErrors()).thenReturn(Collections.singletonList(new BigQueryError("executionError","location", "message", "debugInfo")));
153+
when(job.getStatus()).thenReturn(jobStatus);
154+
args.add(Arguments.of(job.getJobId().getJob(), job, Collections.singletonList(blob), 0, 0, 0));
155+
156+
job = createJob("goodCompleted");
157+
blob = BlobId.of("bucket", "blob2");
158+
when(job.isDone()).thenReturn(true);
159+
jobStatus = mock(JobStatus.class);
160+
when(job.getStatus()).thenReturn(jobStatus);
161+
args.add(Arguments.of(job.getJobId().getJob(), job, Collections.singletonList(blob), 0, 0, 1));
162+
163+
job = createJob("exception");
164+
blob = BlobId.of("bucket", "blob3");
165+
when(job.isDone()).thenThrow(BigQueryException.class);
166+
jobStatus = mock(JobStatus.class);
167+
when(job.getStatus()).thenReturn(jobStatus);
168+
args.add(Arguments.of(job.getJobId().getJob(), job, Collections.singletonList(blob), 0, 0, 0));
169+
170+
job = createJob("stillRunning");
171+
blob = BlobId.of("bucket", "blob2");
172+
jobStatus = mock(JobStatus.class);
173+
when(job.getStatus()).thenReturn(jobStatus);
174+
args.add(Arguments.of(job.getJobId().getJob(), job, Collections.singletonList(blob), 1, 1, 0));
175+
return args;
176+
}
98177

99178
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/BigQuerySinkConnectorIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.junit.jupiter.api.Disabled;
6363
import org.junit.jupiter.api.Named;
6464
import org.junit.jupiter.api.Tag;
65+
import org.junit.jupiter.api.Test;
6566
import org.junit.jupiter.params.ParameterizedTest;
6667
import org.junit.jupiter.params.provider.Arguments;
6768
import org.junit.jupiter.params.provider.MethodSource;
@@ -222,8 +223,8 @@ public void cleanup() {
222223
testBase.connect.deleteConnector(connectorName);
223224
}
224225

225-
@ParameterizedTest
226-
@MethodSource
226+
@Test
227+
@Disabled("unknown configuration for test")
227228
public void runTestCase() throws Exception {
228229
final int tasksMax = 1;
229230

0 commit comments

Comments
 (0)