From 6d25b5b2040074d551200ff4d2732e04331d650d Mon Sep 17 00:00:00 2001 From: Martin Ndegwa Date: Wed, 1 Oct 2025 19:35:04 +0300 Subject: [PATCH 1/3] Fixes Error prone null checks in Pipelines batch submodule --- .../fhir/analytics/BulkExportApiClient.java | 5 +-- .../fhir/analytics/ConvertResourceFn.java | 13 +++++--- .../google/fhir/analytics/FetchResources.java | 4 ++- .../fhir/analytics/FetchSearchPageFn.java | 7 ++-- .../com/google/fhir/analytics/FhirEtl.java | 13 ++++++-- .../google/fhir/analytics/FhirSearchUtil.java | 5 ++- .../fhir/analytics/HapiRowDescriptor.java | 2 +- .../google/fhir/analytics/JdbcFetchHapi.java | 5 +-- .../fhir/analytics/JdbcResourceWriter.java | 25 +++++++++------ .../google/fhir/analytics/ParquetMerger.java | 32 ++++++++++++------- .../fhir/analytics/ProcessGenericRecords.java | 8 +++-- .../analytics/metrics/FlinkJobListener.java | 9 ++++-- 12 files changed, 87 insertions(+), 41 deletions(-) diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/BulkExportApiClient.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/BulkExportApiClient.java index f073553ef..a7b255320 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/BulkExportApiClient.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/BulkExportApiClient.java @@ -40,6 +40,7 @@ import org.hl7.fhir.instance.model.api.IBaseParameters; import org.hl7.fhir.r4.model.InstantType; import org.hl7.fhir.r4.model.Parameters; +import org.jspecify.annotations.Nullable; /** * This class contains methods to trigger and fetch the details of bulk export api on the FHIR @@ -64,7 +65,7 @@ public class BulkExportApiClient { * @return the absolute url via which the status and details of the job can be fetched */ public String triggerBulkExportJob( - List resourceTypes, String since, FhirVersionEnum fhirVersionEnum) { + List resourceTypes, @Nullable String since, FhirVersionEnum fhirVersionEnum) { Map> headers = new HashMap<>(); headers.put(HttpHeaders.ACCEPT, Arrays.asList("application/fhir+ndjson")); headers.put("Prefer", Arrays.asList("respond-async")); @@ -129,7 +130,7 @@ public BulkExportHttpResponse fetchBulkExportHttpResponse(String bulkExportStatu } private IBaseParameters fetchBulkExportParameters( - FhirVersionEnum fhirVersionEnum, List resourceTypes, String since) { + FhirVersionEnum fhirVersionEnum, List resourceTypes, @Nullable String since) { since = Strings.nullToEmpty(since); switch (fhirVersionEnum) { case R4: diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java index 553a8b363..67d43a54c 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java @@ -133,7 +133,8 @@ public void writeResource(HapiRowDescriptor element) return; } } - totalParseTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); + if (totalParseTimeMillisMap.get(resourceType) != null) + totalParseTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); if (forcedId == null || forcedId.equals("")) { resource.setId(resourceId); } else { @@ -141,12 +142,14 @@ public void writeResource(HapiRowDescriptor element) } resource.setMeta(meta); - numFetchedResourcesMap.get(resourceType).inc(1); + if (numFetchedResourcesMap.get(resourceType) != null) + numFetchedResourcesMap.get(resourceType).inc(1); if (parquetUtil != null) { startTime = System.currentTimeMillis(); parquetUtil.write(resource); - totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); + if (totalGenerateTimeMillisMap.get(resourceType) != null) + totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); } if (!sinkPath.isEmpty()) { startTime = System.currentTimeMillis(); @@ -155,7 +158,9 @@ public void writeResource(HapiRowDescriptor element) } else { fhirStoreUtil.uploadResource(resource); } - totalPushTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); + + if (totalPushTimeMillisMap.get(resourceType) != null) + totalPushTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); } if (sinkDbConfig != null) { if (isResourceDeleted) { diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchResources.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchResources.java index cabf267dc..3557c3292 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchResources.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchResources.java @@ -38,6 +38,7 @@ import org.hl7.fhir.r4.model.Reference; import org.hl7.fhir.r4.model.Resource; import org.joda.time.Instant; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,7 @@ public class FetchResources } @VisibleForTesting + @Nullable static String getSubjectPatientIdOrNull(Resource resource) { String patientId = null; // TODO: Instead of hard-coding `subject` here, use Patient Compartment: @@ -104,7 +106,7 @@ static class SearchFn extends FetchSearchPageFn { // StartBundle; we don't need to `synchronized` access of it because each instance of the DoFn // class is accessed by a single thread. // https://beam.apache.org/documentation/programming-guide/#user-code-thread-compatibility - private Map patientCount = null; + private Map patientCount = Maps.newHashMap(); SearchFn(FhirEtlOptions options, String stageIdentifier) { super(options, stageIdentifier); diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java index 9356037f0..944739573 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java @@ -21,6 +21,7 @@ import ca.uhn.fhir.parser.IParser; import com.cerner.bunsen.exception.ProfileException; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig; import com.google.fhir.analytics.model.DatabaseConfiguration; @@ -128,6 +129,7 @@ abstract class FetchSearchPageFn extends DoFn> { protected AvroConversionUtil avroConversionUtil; + @SuppressWarnings("NullAway.Init") FetchSearchPageFn(FhirEtlOptions options, String stageIdentifier) { this.outputParquetViewPath = options.getOutputParquetViewPath(); this.sinkPath = options.getFhirSinkPath(); @@ -220,6 +222,7 @@ public void setup() throws SQLException, ProfileException { oAuthClientSecret, checkPatientEndpoint, fhirContext); + Preconditions.checkNotNull(fetchUtil); fhirSearchUtil = new FhirSearchUtil(fetchUtil); // TODO remove generateParquetFiles and instead rely on not setting outputParquetPath. if (generateParquetFiles @@ -291,12 +294,12 @@ protected void addFetchTime(long millis) { totalFetchTimeMillis.inc(millis); } - protected void processBundle(Bundle bundle) + protected void processBundle(@Nullable Bundle bundle) throws IOException, SQLException, ViewApplicationException, ProfileException { this.processBundle(bundle, null); } - protected void processBundle(Bundle bundle, @Nullable Set resourceTypes) + protected void processBundle(@Nullable Bundle bundle, @Nullable Set resourceTypes) throws IOException, SQLException, ViewApplicationException, ProfileException { if (bundle != null && bundle.getEntry() != null) { numFetchedResources.inc(bundle.getEntry().size()); diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtl.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtl.java index 8b444b00b..41862cea7 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtl.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtl.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.commons.collections.CollectionUtils; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,6 +142,7 @@ static void fetchPatientHistory( } } + @Nullable private static List buildFhirSearchPipeline( FhirEtlOptions options, AvroConversionUtil avroConversionUtil) throws ProfileException { FhirSearchUtil fhirSearchUtil = @@ -314,6 +316,7 @@ static void validateOptions(FhirEtlOptions options) { } // TODO: Implement active period feature for JDBC mode with a HAPI source server (issue #278). + @Nullable private static List buildHapiJdbcPipeline(FhirEtlOptions options) throws SQLException, IOException { Preconditions.checkArgument(!Strings.isNullOrEmpty(options.getFhirDatabaseConfigPath())); @@ -329,7 +332,8 @@ private static List buildHapiJdbcPipeline(FhirEtlOptions options) List pipelines = new ArrayList<>(); long totalNumOfResources = 0l; for (String resourceType : options.getResourceList().split(",")) { - int numResources = resourceCount.get(resourceType); + int numResources = + resourceCount.get(resourceType) != null ? resourceCount.get(resourceType) : 0; if (numResources == 0) { continue; } @@ -509,6 +513,7 @@ private static Map> validateAndFetchNdjsonFileMappings( * @param options the pipeline options to be used. * @return the created Pipeline instance or null if nothing needs to be done. */ + @Nullable static List setupAndBuildPipelines( FhirEtlOptions options, AvroConversionUtil avroConversionUtil) throws IOException, SQLException, ViewDefinitionException, ProfileException { @@ -549,7 +554,11 @@ public static void main(String[] args) options.getStructureDefinitionsPath(), options.getRecursiveDepth()); List pipelines = setupAndBuildPipelines(options, avroConversionUtil); - EtlUtils.runMultiplePipelinesWithTimestamp(pipelines, options); + if (pipelines != null) { + EtlUtils.runMultiplePipelinesWithTimestamp(pipelines, options); + } else { + log.info("No work needs to be done!"); + } log.info("DONE!"); } diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirSearchUtil.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirSearchUtil.java index 5c291d6e8..84314a793 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirSearchUtil.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirSearchUtil.java @@ -40,6 +40,7 @@ import org.apache.http.NameValuePair; import org.apache.http.client.utils.URLEncodedUtils; import org.hl7.fhir.r4.model.Bundle; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,7 @@ public class FhirSearchUtil { } // TODO update all types to FHIR version independent interfaces! + @Nullable public Bundle searchByUrl(String searchUrl, int count, SummaryEnum summaryMode) { try { IGenericClient client = fetchUtil.getSourceClient(); @@ -106,7 +108,8 @@ public Map searchResourceCounts(String resourceList, String sin return hashMap; } - public String getNextUrl(Bundle bundle) { + @Nullable + public String getNextUrl(@Nullable Bundle bundle) { if (bundle != null && bundle.getLink(Bundle.LINK_NEXT) != null) { return bundle.getLink(Bundle.LINK_NEXT).getUrl(); } diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/HapiRowDescriptor.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/HapiRowDescriptor.java index 26aac9210..001180998 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/HapiRowDescriptor.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/HapiRowDescriptor.java @@ -64,5 +64,5 @@ static HapiRowDescriptor create( abstract String jsonResource(); // FHIR tags. - List tags; + @Nullable List tags; } diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcFetchHapi.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcFetchHapi.java index 5a7ad792d..87dada597 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcFetchHapi.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcFetchHapi.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -104,7 +104,8 @@ public HapiRowDescriptor mapRow(ResultSet resultSet) throws Exception { String lastUpdated = resultSet.getString("res_updated"); String fhirVersion = resultSet.getString("res_version"); String resourceVersion = resultSet.getString("res_ver"); - numMappedResourcesMap.get(resourceType).inc(); + if (numMappedResourcesMap.get(resourceType) != null) + numMappedResourcesMap.get(resourceType).inc(); return HapiRowDescriptor.create( resourceId, forcedId, diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcResourceWriter.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcResourceWriter.java index f9f02cc5e..f1be7d065 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcResourceWriter.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcResourceWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ import java.util.stream.Collectors; import javax.sql.DataSource; import org.hl7.fhir.r4.model.Resource; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,7 @@ public class JdbcResourceWriter { private static final String ID_COLUMN = "id"; - private final ViewManager viewManager; + @Nullable private final ViewManager viewManager; private final IParser parser; @@ -142,9 +143,9 @@ static void createTables(FhirEtlOptions options) if (Strings.isNullOrEmpty(vDef.getName())) { throw new ViewDefinitionException("Field `name` in ViewDefinition is not defined."); } - if (vDef.getAllColumns().get(ID_COLUMN) == null - || !ViewApplicator.GET_RESOURCE_KEY.equals( - vDef.getAllColumns().get(ID_COLUMN).getPath())) { + ViewDefinition.Column idColumn = + vDef.getAllColumns() != null ? vDef.getAllColumns().get(ID_COLUMN) : null; + if (idColumn == null || !ViewApplicator.GET_RESOURCE_KEY.equals(idColumn.getPath())) { throw new ViewDefinitionException( String.format( "To write view '%s' to DB, there should be a column '%s' with path '%s'.", @@ -196,18 +197,22 @@ private static void deleteRowsById(DataSource dataSource, String tableName, Stri * * @param resourceType the type of resource to be deleted * @param id the id of the resource to be deleted - * @throws SQLException + * @throws SQLException if a database access error occurs */ public void deleteResourceById(String resourceType, String id) throws SQLException { if (viewManager == null) { deleteRowsById(jdbcDataSource, resourceType, id); } else { ImmutableList views = viewManager.getViewsForType(resourceType); - for (ViewDefinition vDef : views) { - if (Strings.isNullOrEmpty(vDef.getName())) { - throw new SQLException("Field `name` in ViewDefinition is not defined."); + if (views != null) { + for (ViewDefinition vDef : views) { + if (Strings.isNullOrEmpty(vDef.getName())) { + throw new SQLException("Field `name` in ViewDefinition is not defined."); + } + deleteRowsById(jdbcDataSource, vDef.getName(), id); } - deleteRowsById(jdbcDataSource, vDef.getName(), id); + } else { + log.warn("No views found for resource type {}!", resourceType); } } } diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java index e7f7c2ff1..1cd96f425 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java @@ -202,14 +202,11 @@ private static String getUpdateTime(GenericRecord record) { * This method identifies if the record is a deleted record. For a deleted record, the meta.tag * would be updated with a ActionType.REMOVE during the incremental parquet file creation, the * same information is being reused to check if the record is deleted or not. - * - * @param record - * @return */ private static Boolean isRecordDeleted(GenericRecord record) { + Preconditions.checkNotNull(record); Object tag = ((GenericRecord) record.get(META_KEY)).get(TAG_KEY); - if (tag != null && tag instanceof Collection) { - Collection tagCollection = (Collection) tag; + if (tag != null && tag instanceof Collection tagCollection) { if (!tagCollection.isEmpty()) { Iterator iterator = tagCollection.iterator(); while (iterator.hasNext()) { @@ -226,8 +223,10 @@ private static Boolean isRecordDeleted(GenericRecord record) { return Boolean.FALSE; } + @Nullable private static GenericRecord findLastRecord( Iterable genericRecords, Counter numDuplicates) { + Preconditions.checkNotNull(genericRecords); // Note we are assuming all times have the same time-zone to avoid parsing date values. String lastUpdated = null; GenericRecord lastRecord = null; @@ -243,7 +242,7 @@ private static GenericRecord findLastRecord( if (numRec > 1) { numDuplicates.inc(); } - if (numRec > 2) { + if (numRec > 2 && lastRecord != null && lastRecord.get(ID_KEY) != null) { log.warn("Record with ID {} repeated more than twice!", lastRecord.get(ID_KEY)); } return lastRecord; @@ -352,7 +351,7 @@ public static Pipeline writeMergedViewsPipeline( throws IOException { Pipeline pipeline = Pipeline.create(options); log.info("Merging materialized view {}", viewName); - ViewDefinition viewDef = viewManager.getViewDefinition(viewName); + ViewDefinition viewDef = viewManager != null ? viewManager.getViewDefinition(viewName) : null; if (viewDef != null) { Schema schema = ViewSchema.getAvroSchema(viewDef); PCollection> groupedRecords = @@ -406,10 +405,21 @@ public static Pipeline writeMergedResources( @ProcessElement public void processElement(ProcessContext c) { KV> e = c.element(); - GenericRecord lastRecord = findLastRecord(e.getValue(), numDuplicates); - if (!isRecordDeleted(lastRecord)) { - numOutputRecords.inc(); - c.output(lastRecord); + if (e != null) { + GenericRecord lastRecord = findLastRecord(e.getValue(), numDuplicates); + if (lastRecord != null) { + if (!isRecordDeleted(lastRecord)) { + numOutputRecords.inc(); + c.output(lastRecord); + } + } else { + log.warn( + "Null last record found for resource type {} with id {}", + type, + e.getKey()); + } + } else { + log.warn("Null element found in grouped records for resource type {}", type); } } })); diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessGenericRecords.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessGenericRecords.java index ada76949b..a3b293d30 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessGenericRecords.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessGenericRecords.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,10 +40,12 @@ public class ProcessGenericRecords extends FetchSearchPageFn { private static final int BUNDLE_SIZE = 10; private String resourceType; + private List cachedResources; private Counter totalAvroConversionTime; private Counter totalAvroConversions; + @SuppressWarnings("NullAway.Init") ProcessGenericRecords(FhirEtlOptions options, String resourceType) { super(options, "ProcessGenericRecords_" + resourceType); this.resourceType = resourceType; @@ -65,7 +67,7 @@ public void setup() throws SQLException, ProfileException { @Override public void finishBundle(FinishBundleContext context) { - if (!cachedResources.isEmpty()) { + if (cachedResources != null && !cachedResources.isEmpty()) { try { processBundle(flushCachToBundle()); } catch (SQLException | ViewApplicationException | ProfileException | IOException e) { @@ -99,7 +101,7 @@ public void processElement(@Element GenericRecord record) processBundle(flushCachToBundle()); } } catch (IllegalArgumentException e) { - log.error("Dropping bad record because: " + e.getMessage()); + log.error("Dropping bad record because: {}", e.getMessage()); } } } diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/metrics/FlinkJobListener.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/metrics/FlinkJobListener.java index 81234e5b0..19144be38 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/metrics/FlinkJobListener.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/metrics/FlinkJobListener.java @@ -37,7 +37,11 @@ public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th logger.info("Submitting the job with ID {} ", this); FlinkPipelineMetrics flinkPipelineMetrics = (FlinkPipelineMetrics) PipelineMetricsProvider.getPipelineMetrics(FlinkRunner.class); - flinkPipelineMetrics.addJobClient(jobClient); + if (flinkPipelineMetrics != null && jobClient != null) { + flinkPipelineMetrics.addJobClient(jobClient); + } else { + logger.error("FlinkPipelineMetrics instance or jobClient instance is null"); + } } @Override @@ -56,6 +60,7 @@ public void onJobExecuted( jobExecutionResult); FlinkPipelineMetrics flinkPipelineMetrics = (FlinkPipelineMetrics) PipelineMetricsProvider.getPipelineMetrics(FlinkRunner.class); - flinkPipelineMetrics.removeJobClient(jobExecutionResult.getJobID().toHexString()); + if (flinkPipelineMetrics != null) + flinkPipelineMetrics.removeJobClient(jobExecutionResult.getJobID().toHexString()); } } From 04078dfae8e876f7ed717839f2ba00a50831c133 Mon Sep 17 00:00:00 2001 From: Martin Ndegwa Date: Tue, 21 Oct 2025 16:58:24 +0300 Subject: [PATCH 2/3] Addresses PR feedback --- .../fhir/analytics/ConvertResourceFn.java | 19 ++++++++++++------- .../google/fhir/analytics/FetchResources.java | 5 ++++- .../fhir/analytics/FetchSearchPageFn.java | 1 + .../fhir/analytics/JdbcResourceWriter.java | 6 +++--- .../google/fhir/analytics/ParquetMerger.java | 8 ++++++-- .../fhir/analytics/ProcessGenericRecords.java | 5 +++-- .../analytics/metrics/FlinkJobListener.java | 2 +- .../google/fhir/analytics/ParquetUtil.java | 2 +- .../fhir/analytics/view/ViewManager.java | 3 +-- 9 files changed, 32 insertions(+), 19 deletions(-) diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java index 6da57bfed..7314a8a0b 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java @@ -133,9 +133,8 @@ public void writeResource(HapiRowDescriptor element) return; } } - if (totalParseTimeMillisMap.get(resourceType) != null) - totalParseTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); - if (forcedId == null || forcedId.equals("")) { + incrementElapsedTimeCounter(totalParseTimeMillisMap, resourceType, startTime); + if (forcedId == null || forcedId.isEmpty()) { resource.setId(resourceId); } else { resource.setId(forcedId); @@ -148,8 +147,7 @@ public void writeResource(HapiRowDescriptor element) if (parquetUtil != null) { startTime = System.currentTimeMillis(); parquetUtil.write(resource); - if (totalGenerateTimeMillisMap.get(resourceType) != null) - totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); + incrementElapsedTimeCounter(totalGenerateTimeMillisMap, resourceType, startTime); } if (!sinkPath.isEmpty()) { startTime = System.currentTimeMillis(); @@ -159,8 +157,7 @@ public void writeResource(HapiRowDescriptor element) fhirStoreUtil.uploadResource(resource); } - if (totalPushTimeMillisMap.get(resourceType) != null) - totalPushTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); + incrementElapsedTimeCounter(totalPushTimeMillisMap, resourceType, startTime); } if (sinkDbConfig != null) { if (isResourceDeleted) { @@ -234,6 +231,14 @@ private Resource createNewFhirResource(String fhirVersion, String resourceType) } } + private void incrementElapsedTimeCounter( + HashMap counterMap, String resourceType, long startTime) { + Counter counter = counterMap.get(resourceType); + if (counter != null) { + counter.inc(System.currentTimeMillis() - startTime); + } + } + private String getFhirBasePackageName(String fhirVersion) { FhirVersionEnum fhirVersionEnum = FhirVersionEnum.forVersionString(fhirVersion); return switch (fhirVersionEnum) { diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchResources.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchResources.java index 3557c3292..2508447a3 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchResources.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchResources.java @@ -106,7 +106,10 @@ static class SearchFn extends FetchSearchPageFn { // StartBundle; we don't need to `synchronized` access of it because each instance of the DoFn // class is accessed by a single thread. // https://beam.apache.org/documentation/programming-guide/#user-code-thread-compatibility - private Map patientCount = Maps.newHashMap(); + + // Suppressing warning as this is initialized in StartBundle. + @SuppressWarnings("NullAway") + private Map patientCount = null; SearchFn(FhirEtlOptions options, String stageIdentifier) { super(options, stageIdentifier); diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java index 944739573..e7e6fd9fe 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java @@ -129,6 +129,7 @@ abstract class FetchSearchPageFn extends DoFn> { protected AvroConversionUtil avroConversionUtil; + // Suppressing NullAway warning because all final fields are initialized in the constructor. @SuppressWarnings("NullAway.Init") FetchSearchPageFn(FhirEtlOptions options, String stageIdentifier) { this.outputParquetViewPath = options.getOutputParquetViewPath(); diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcResourceWriter.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcResourceWriter.java index a743373bc..50be4f89f 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcResourceWriter.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/JdbcResourceWriter.java @@ -137,7 +137,7 @@ static void createTables(FhirEtlOptions options) ViewManager viewManager = ViewManager.createForDir(viewDir); for (String resourceType : Splitter.on(',').split(options.getResourceList())) { ImmutableList views = viewManager.getViewsForType(resourceType); - if (views == null || views.isEmpty()) { + if (views.isEmpty()) { log.warn("No views found for resource type {} in directory {}!", resourceType, viewDir); } else { for (ViewDefinition vDef : views) { @@ -205,7 +205,7 @@ public void deleteResourceById(String resourceType, String id) throws SQLExcepti deleteRowsById(jdbcDataSource, resourceType, id); } else { ImmutableList views = viewManager.getViewsForType(resourceType); - if (views != null) { + if (!views.isEmpty()) { for (ViewDefinition vDef : views) { if (Strings.isNullOrEmpty(vDef.getName())) { throw new SQLException("Field `name` in ViewDefinition is not defined."); @@ -238,7 +238,7 @@ public void writeResource(Resource resource) throws SQLException, ViewApplicatio } } else { ImmutableList views = viewManager.getViewsForType(resource.fhirType()); - if (views != null) { + if (!views.isEmpty()) { for (ViewDefinition vDef : views) { if (Strings.isNullOrEmpty(vDef.getName())) { throw new SQLException("Field `name` in ViewDefinition is not defined."); diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java index 1cd96f425..3204874d0 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java @@ -242,8 +242,12 @@ private static GenericRecord findLastRecord( if (numRec > 1) { numDuplicates.inc(); } - if (numRec > 2 && lastRecord != null && lastRecord.get(ID_KEY) != null) { - log.warn("Record with ID {} repeated more than twice!", lastRecord.get(ID_KEY)); + if (numRec + > 2) { // NullAway complains of lastRecord possibly being null here. It is guaranteed for + // numRec>1 + Object lastRecordIdKey = lastRecord != null ? lastRecord.get(ID_KEY) : null; + if (lastRecordIdKey != null) + log.warn("Record with ID {} repeated more than twice!", lastRecordIdKey); } return lastRecord; } diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessGenericRecords.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessGenericRecords.java index a3b293d30..39ba29c33 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessGenericRecords.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessGenericRecords.java @@ -41,7 +41,8 @@ public class ProcessGenericRecords extends FetchSearchPageFn { private String resourceType; - private List cachedResources; + // Beam also re-initializes these during setup, re-initialized here to make NullAway happy. + private List cachedResources = new ArrayList<>(); private Counter totalAvroConversionTime; private Counter totalAvroConversions; @@ -67,7 +68,7 @@ public void setup() throws SQLException, ProfileException { @Override public void finishBundle(FinishBundleContext context) { - if (cachedResources != null && !cachedResources.isEmpty()) { + if (!cachedResources.isEmpty()) { try { processBundle(flushCachToBundle()); } catch (SQLException | ViewApplicationException | ProfileException | IOException e) { diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/metrics/FlinkJobListener.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/metrics/FlinkJobListener.java index 19144be38..3df3bdbab 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/metrics/FlinkJobListener.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/metrics/FlinkJobListener.java @@ -40,7 +40,7 @@ public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th if (flinkPipelineMetrics != null && jobClient != null) { flinkPipelineMetrics.addJobClient(jobClient); } else { - logger.error("FlinkPipelineMetrics instance or jobClient instance is null"); + logger.warn("FlinkPipelineMetrics instance or jobClient instance is null"); } } diff --git a/pipelines/common/src/main/java/com/google/fhir/analytics/ParquetUtil.java b/pipelines/common/src/main/java/com/google/fhir/analytics/ParquetUtil.java index 06dafb88e..4ca5763bf 100644 --- a/pipelines/common/src/main/java/com/google/fhir/analytics/ParquetUtil.java +++ b/pipelines/common/src/main/java/com/google/fhir/analytics/ParquetUtil.java @@ -273,7 +273,7 @@ public synchronized void write(Resource resource) } if (!Strings.isNullOrEmpty(outputParquetViewPath)) { ImmutableList views = viewManager.getViewsForType(resource.fhirType()); - if (views != null) { + if (!views.isEmpty()) { for (ViewDefinition vDef : views) { write(resource, vDef); } diff --git a/pipelines/common/src/main/java/com/google/fhir/analytics/view/ViewManager.java b/pipelines/common/src/main/java/com/google/fhir/analytics/view/ViewManager.java index 1fcb65e54..29e7a9370 100644 --- a/pipelines/common/src/main/java/com/google/fhir/analytics/view/ViewManager.java +++ b/pipelines/common/src/main/java/com/google/fhir/analytics/view/ViewManager.java @@ -99,10 +99,9 @@ public ViewDefinition getViewDefinition(String viewName) { return viewNameMap.get(viewName); } - @Nullable public ImmutableList getViewsForType(String resourceType) { if (!viewMap.containsKey(resourceType)) { - return null; + return ImmutableList.of(); } return ImmutableList.copyOf(viewMap.get(resourceType)); } From 5ca19dd613251b15e09ca31f33dda41c40911036 Mon Sep 17 00:00:00 2001 From: Martin Ndegwa Date: Tue, 21 Oct 2025 18:00:34 +0300 Subject: [PATCH 3/3] Addressess PR feedback II --- .../main/java/com/google/fhir/analytics/ParquetMerger.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java index 3204874d0..c0df443c5 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java @@ -333,6 +333,8 @@ static List createMergerPipelines( } if ((!dwhViews1.isEmpty() || !dwhViews2.isEmpty()) && !Strings.isNullOrEmpty(options.getViewDefinitionsDir())) { + // ViewManager is guaranteed to be non-null here, adding Precondition for NullAway + Preconditions.checkNotNull(viewManager); for (String viewName : dwhViews1) { if (!dwhViews2.contains(viewName)) { continue; @@ -351,11 +353,11 @@ public static Pipeline writeMergedViewsPipeline( DwhFiles dwhFiles2, DwhFiles mergedDwhFiles, String viewName, - @Nullable ViewManager viewManager) + ViewManager viewManager) throws IOException { Pipeline pipeline = Pipeline.create(options); log.info("Merging materialized view {}", viewName); - ViewDefinition viewDef = viewManager != null ? viewManager.getViewDefinition(viewName) : null; + ViewDefinition viewDef = viewManager.getViewDefinition(viewName); if (viewDef != null) { Schema schema = ViewSchema.getAvroSchema(viewDef); PCollection> groupedRecords =