Skip to content

Commit 69799f6

Browse files
authored
Fixes Error prone null checks in Pipelines batch submodule (#1484)
1 parent ce29a08 commit 69799f6

File tree

14 files changed

+105
-44
lines changed

14 files changed

+105
-44
lines changed

pipelines/batch/src/main/java/com/google/fhir/analytics/BulkExportApiClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.hl7.fhir.instance.model.api.IBaseParameters;
4141
import org.hl7.fhir.r4.model.InstantType;
4242
import org.hl7.fhir.r4.model.Parameters;
43+
import org.jspecify.annotations.Nullable;
4344

4445
/**
4546
* This class contains methods to trigger and fetch the details of bulk export api on the FHIR
@@ -64,7 +65,7 @@ public class BulkExportApiClient {
6465
* @return the absolute url via which the status and details of the job can be fetched
6566
*/
6667
public String triggerBulkExportJob(
67-
List<String> resourceTypes, String since, FhirVersionEnum fhirVersionEnum) {
68+
List<String> resourceTypes, @Nullable String since, FhirVersionEnum fhirVersionEnum) {
6869
Map<String, List<String>> headers = new HashMap<>();
6970
headers.put(HttpHeaders.ACCEPT, Arrays.asList("application/fhir+ndjson"));
7071
headers.put("Prefer", Arrays.asList("respond-async"));
@@ -129,7 +130,7 @@ public BulkExportHttpResponse fetchBulkExportHttpResponse(String bulkExportStatu
129130
}
130131

131132
private IBaseParameters fetchBulkExportParameters(
132-
FhirVersionEnum fhirVersionEnum, List<String> resourceTypes, String since) {
133+
FhirVersionEnum fhirVersionEnum, List<String> resourceTypes, @Nullable String since) {
133134
since = Strings.nullToEmpty(since);
134135
return switch (fhirVersionEnum) {
135136
case R4 -> {

pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,20 +133,21 @@ public void writeResource(HapiRowDescriptor element)
133133
return;
134134
}
135135
}
136-
totalParseTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
137-
if (forcedId == null || forcedId.equals("")) {
136+
incrementElapsedTimeCounter(totalParseTimeMillisMap, resourceType, startTime);
137+
if (forcedId == null || forcedId.isEmpty()) {
138138
resource.setId(resourceId);
139139
} else {
140140
resource.setId(forcedId);
141141
}
142142
resource.setMeta(meta);
143143

144-
numFetchedResourcesMap.get(resourceType).inc(1);
144+
if (numFetchedResourcesMap.get(resourceType) != null)
145+
numFetchedResourcesMap.get(resourceType).inc(1);
145146

146147
if (parquetUtil != null) {
147148
startTime = System.currentTimeMillis();
148149
parquetUtil.write(resource);
149-
totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
150+
incrementElapsedTimeCounter(totalGenerateTimeMillisMap, resourceType, startTime);
150151
}
151152
if (!sinkPath.isEmpty()) {
152153
startTime = System.currentTimeMillis();
@@ -155,7 +156,8 @@ public void writeResource(HapiRowDescriptor element)
155156
} else {
156157
fhirStoreUtil.uploadResource(resource);
157158
}
158-
totalPushTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
159+
160+
incrementElapsedTimeCounter(totalPushTimeMillisMap, resourceType, startTime);
159161
}
160162
if (sinkDbConfig != null) {
161163
if (isResourceDeleted) {
@@ -229,6 +231,14 @@ private Resource createNewFhirResource(String fhirVersion, String resourceType)
229231
}
230232
}
231233

234+
private void incrementElapsedTimeCounter(
235+
HashMap<String, Counter> counterMap, String resourceType, long startTime) {
236+
Counter counter = counterMap.get(resourceType);
237+
if (counter != null) {
238+
counter.inc(System.currentTimeMillis() - startTime);
239+
}
240+
}
241+
232242
private String getFhirBasePackageName(String fhirVersion) {
233243
FhirVersionEnum fhirVersionEnum = FhirVersionEnum.forVersionString(fhirVersion);
234244
return switch (fhirVersionEnum) {

pipelines/batch/src/main/java/com/google/fhir/analytics/FetchResources.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.hl7.fhir.r4.model.Reference;
3939
import org.hl7.fhir.r4.model.Resource;
4040
import org.joda.time.Instant;
41+
import org.jspecify.annotations.Nullable;
4142
import org.slf4j.Logger;
4243
import org.slf4j.LoggerFactory;
4344

@@ -61,6 +62,7 @@ public class FetchResources
6162
}
6263

6364
@VisibleForTesting
65+
@Nullable
6466
static String getSubjectPatientIdOrNull(Resource resource) {
6567
String patientId = null;
6668
// TODO: Instead of hard-coding `subject` here, use Patient Compartment:
@@ -104,6 +106,9 @@ static class SearchFn extends FetchSearchPageFn<SearchSegmentDescriptor> {
104106
// StartBundle; we don't need to `synchronized` access of it because each instance of the DoFn
105107
// class is accessed by a single thread.
106108
// https://beam.apache.org/documentation/programming-guide/#user-code-thread-compatibility
109+
110+
// Suppressing warning as this is initialized in StartBundle.
111+
@SuppressWarnings("NullAway")
107112
private Map<String, Integer> patientCount = null;
108113

109114
SearchFn(FhirEtlOptions options, String stageIdentifier) {

pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import ca.uhn.fhir.parser.IParser;
2222
import com.cerner.bunsen.exception.ProfileException;
2323
import com.google.common.annotations.VisibleForTesting;
24+
import com.google.common.base.Preconditions;
2425
import com.google.common.base.Strings;
2526
import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig;
2627
import com.google.fhir.analytics.model.DatabaseConfiguration;
@@ -128,6 +129,9 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {
128129

129130
protected AvroConversionUtil avroConversionUtil;
130131

132+
// Suppressing NullAway warning because all fields are initialized before being used, either in
133+
// the constructor on in the @Setup method.
134+
@SuppressWarnings("NullAway.Init")
131135
FetchSearchPageFn(FhirEtlOptions options, String stageIdentifier) {
132136
this.outputParquetViewPath = options.getOutputParquetViewPath();
133137
this.sinkPath = options.getFhirSinkPath();
@@ -220,6 +224,7 @@ public void setup() throws SQLException, ProfileException {
220224
oAuthClientSecret,
221225
checkPatientEndpoint,
222226
fhirContext);
227+
Preconditions.checkNotNull(fetchUtil);
223228
fhirSearchUtil = new FhirSearchUtil(fetchUtil);
224229
// TODO remove generateParquetFiles and instead rely on not setting outputParquetPath.
225230
if (generateParquetFiles
@@ -291,12 +296,12 @@ protected void addFetchTime(long millis) {
291296
totalFetchTimeMillis.inc(millis);
292297
}
293298

294-
protected void processBundle(Bundle bundle)
299+
protected void processBundle(@Nullable Bundle bundle)
295300
throws IOException, SQLException, ViewApplicationException, ProfileException {
296301
this.processBundle(bundle, null);
297302
}
298303

299-
protected void processBundle(Bundle bundle, @Nullable Set<String> resourceTypes)
304+
protected void processBundle(@Nullable Bundle bundle, @Nullable Set<String> resourceTypes)
300305
throws IOException, SQLException, ViewApplicationException, ProfileException {
301306
if (bundle != null && bundle.getEntry() != null) {
302307
numFetchedResources.inc(bundle.getEntry().size());

pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.beam.sdk.values.PCollection;
5858
import org.apache.beam.sdk.values.PCollectionList;
5959
import org.apache.commons.collections.CollectionUtils;
60+
import org.jspecify.annotations.Nullable;
6061
import org.slf4j.Logger;
6162
import org.slf4j.LoggerFactory;
6263

@@ -142,6 +143,7 @@ static void fetchPatientHistory(
142143
}
143144
}
144145

146+
@Nullable
145147
private static List<Pipeline> buildFhirSearchPipeline(
146148
FhirEtlOptions options, AvroConversionUtil avroConversionUtil) throws ProfileException {
147149
FhirSearchUtil fhirSearchUtil =
@@ -316,6 +318,7 @@ static void validateOptions(FhirEtlOptions options) {
316318
}
317319

318320
// TODO: Implement active period feature for JDBC mode with a HAPI source server (issue #278).
321+
@Nullable
319322
private static List<Pipeline> buildHapiJdbcPipeline(FhirEtlOptions options)
320323
throws SQLException, IOException {
321324
Preconditions.checkArgument(!Strings.isNullOrEmpty(options.getFhirDatabaseConfigPath()));
@@ -331,7 +334,8 @@ private static List<Pipeline> buildHapiJdbcPipeline(FhirEtlOptions options)
331334
List<Pipeline> pipelines = new ArrayList<>();
332335
long totalNumOfResources = 0L;
333336
for (String resourceType : Splitter.on(',').splitToList(options.getResourceList())) {
334-
int numResources = resourceCount.get(resourceType);
337+
int numResources =
338+
resourceCount.get(resourceType) != null ? resourceCount.get(resourceType) : 0;
335339
if (numResources == 0) {
336340
continue;
337341
}
@@ -513,6 +517,7 @@ private static Map<String, List<String>> validateAndFetchNdjsonFileMappings(
513517
* @param options the pipeline options to be used.
514518
* @return the created Pipeline instance or null if nothing needs to be done.
515519
*/
520+
@Nullable
516521
static List<Pipeline> setupAndBuildPipelines(
517522
FhirEtlOptions options, AvroConversionUtil avroConversionUtil)
518523
throws IOException, SQLException, ViewDefinitionException, ProfileException {
@@ -553,7 +558,11 @@ public static void main(String[] args)
553558
options.getStructureDefinitionsPath(),
554559
options.getRecursiveDepth());
555560
List<Pipeline> pipelines = setupAndBuildPipelines(options, avroConversionUtil);
556-
EtlUtils.runMultiplePipelinesWithTimestamp(pipelines, options);
561+
if (pipelines != null) {
562+
EtlUtils.runMultiplePipelinesWithTimestamp(pipelines, options);
563+
} else {
564+
log.info("No work needs to be done!");
565+
}
557566

558567
log.info("DONE!");
559568
}

pipelines/batch/src/main/java/com/google/fhir/analytics/FhirSearchUtil.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.http.NameValuePair;
4141
import org.apache.http.client.utils.URLEncodedUtils;
4242
import org.hl7.fhir.r4.model.Bundle;
43+
import org.jspecify.annotations.Nullable;
4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
4546

@@ -54,6 +55,7 @@ public class FhirSearchUtil {
5455
}
5556

5657
// TODO update all types to FHIR version independent interfaces!
58+
@Nullable
5759
public Bundle searchByUrl(String searchUrl, int count, SummaryEnum summaryMode) {
5860
try {
5961
IGenericClient client = fetchUtil.getSourceClient();
@@ -106,7 +108,8 @@ public Map<String, Integer> searchResourceCounts(String resourceList, String sin
106108
return hashMap;
107109
}
108110

109-
public String getNextUrl(Bundle bundle) {
111+
@Nullable
112+
public String getNextUrl(@Nullable Bundle bundle) {
110113
if (bundle != null && bundle.getLink(Bundle.LINK_NEXT) != null) {
111114
return bundle.getLink(Bundle.LINK_NEXT).getUrl();
112115
}

pipelines/batch/src/main/java/com/google/fhir/analytics/HapiRowDescriptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,5 @@ static HapiRowDescriptor create(
6464
abstract String jsonResource();
6565

6666
// FHIR tags.
67-
List<ResourceTag> tags;
67+
@Nullable List<ResourceTag> tags;
6868
}

pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcFetchHapi.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ public HapiRowDescriptor mapRow(ResultSet resultSet) throws Exception {
100100
String lastUpdated = resultSet.getString("res_updated");
101101
String fhirVersion = resultSet.getString("res_version");
102102
String resourceVersion = resultSet.getString("res_ver");
103-
numMappedResourcesMap.get(resourceType).inc();
103+
if (numMappedResourcesMap.get(resourceType) != null)
104+
numMappedResourcesMap.get(resourceType).inc();
104105
return HapiRowDescriptor.create(
105106
resourceId,
106107
forcedId,

pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcResourceWriter.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.stream.Collectors;
4040
import javax.sql.DataSource;
4141
import org.hl7.fhir.r4.model.Resource;
42+
import org.jspecify.annotations.Nullable;
4243
import org.slf4j.Logger;
4344
import org.slf4j.LoggerFactory;
4445

@@ -53,7 +54,7 @@ public class JdbcResourceWriter {
5354

5455
private static final String ID_COLUMN = "id";
5556

56-
private final ViewManager viewManager;
57+
@Nullable private final ViewManager viewManager;
5758

5859
private final IParser parser;
5960

@@ -136,16 +137,16 @@ static void createTables(FhirEtlOptions options)
136137
ViewManager viewManager = ViewManager.createForDir(viewDir);
137138
for (String resourceType : Splitter.on(',').split(options.getResourceList())) {
138139
ImmutableList<ViewDefinition> views = viewManager.getViewsForType(resourceType);
139-
if (views == null || views.isEmpty()) {
140+
if (views.isEmpty()) {
140141
log.warn("No views found for resource type {} in directory {}!", resourceType, viewDir);
141142
} else {
142143
for (ViewDefinition vDef : views) {
143144
if (Strings.isNullOrEmpty(vDef.getName())) {
144145
throw new ViewDefinitionException("Field `name` in ViewDefinition is not defined.");
145146
}
146-
if (vDef.getAllColumns().get(ID_COLUMN) == null
147-
|| !ViewApplicator.GET_RESOURCE_KEY.equals(
148-
vDef.getAllColumns().get(ID_COLUMN).getPath())) {
147+
ViewDefinition.Column idColumn =
148+
vDef.getAllColumns() != null ? vDef.getAllColumns().get(ID_COLUMN) : null;
149+
if (idColumn == null || !ViewApplicator.GET_RESOURCE_KEY.equals(idColumn.getPath())) {
149150
throw new ViewDefinitionException(
150151
String.format(
151152
"To write view '%s' to DB, there should be a column '%s' with path '%s'.",
@@ -197,18 +198,22 @@ private static void deleteRowsById(DataSource dataSource, String tableName, Stri
197198
*
198199
* @param resourceType the type of resource to be deleted
199200
* @param id the id of the resource to be deleted
200-
* @throws SQLException
201+
* @throws SQLException if a database access error occurs
201202
*/
202203
public void deleteResourceById(String resourceType, String id) throws SQLException {
203204
if (viewManager == null) {
204205
deleteRowsById(jdbcDataSource, resourceType, id);
205206
} else {
206207
ImmutableList<ViewDefinition> views = viewManager.getViewsForType(resourceType);
207-
for (ViewDefinition vDef : views) {
208-
if (Strings.isNullOrEmpty(vDef.getName())) {
209-
throw new SQLException("Field `name` in ViewDefinition is not defined.");
208+
if (!views.isEmpty()) {
209+
for (ViewDefinition vDef : views) {
210+
if (Strings.isNullOrEmpty(vDef.getName())) {
211+
throw new SQLException("Field `name` in ViewDefinition is not defined.");
212+
}
213+
deleteRowsById(jdbcDataSource, vDef.getName(), id);
210214
}
211-
deleteRowsById(jdbcDataSource, vDef.getName(), id);
215+
} else {
216+
log.warn("No views found for resource type {}!", resourceType);
212217
}
213218
}
214219
}
@@ -233,7 +238,7 @@ public void writeResource(Resource resource) throws SQLException, ViewApplicatio
233238
}
234239
} else {
235240
ImmutableList<ViewDefinition> views = viewManager.getViewsForType(resource.fhirType());
236-
if (views != null) {
241+
if (!views.isEmpty()) {
237242
for (ViewDefinition vDef : views) {
238243
if (Strings.isNullOrEmpty(vDef.getName())) {
239244
throw new SQLException("Field `name` in ViewDefinition is not defined.");

0 commit comments

Comments
 (0)