Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Parameters;
import org.jspecify.annotations.Nullable;

/**
* This class contains methods to trigger and fetch the details of bulk export api on the FHIR
Expand All @@ -64,7 +65,7 @@ public class BulkExportApiClient {
* @return the absolute url via which the status and details of the job can be fetched
*/
public String triggerBulkExportJob(
List<String> resourceTypes, String since, FhirVersionEnum fhirVersionEnum) {
List<String> resourceTypes, @Nullable String since, FhirVersionEnum fhirVersionEnum) {
Map<String, List<String>> headers = new HashMap<>();
headers.put(HttpHeaders.ACCEPT, Arrays.asList("application/fhir+ndjson"));
headers.put("Prefer", Arrays.asList("respond-async"));
Expand Down Expand Up @@ -129,7 +130,7 @@ public BulkExportHttpResponse fetchBulkExportHttpResponse(String bulkExportStatu
}

private IBaseParameters fetchBulkExportParameters(
FhirVersionEnum fhirVersionEnum, List<String> resourceTypes, String since) {
FhirVersionEnum fhirVersionEnum, List<String> resourceTypes, @Nullable String since) {
since = Strings.nullToEmpty(since);
return switch (fhirVersionEnum) {
case R4 -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,21 @@ public void writeResource(HapiRowDescriptor element)
return;
}
}
totalParseTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
if (forcedId == null || forcedId.equals("")) {
incrementElapsedTimeCounter(totalParseTimeMillisMap, resourceType, startTime);
if (forcedId == null || forcedId.isEmpty()) {
resource.setId(resourceId);
} else {
resource.setId(forcedId);
}
resource.setMeta(meta);

numFetchedResourcesMap.get(resourceType).inc(1);
if (numFetchedResourcesMap.get(resourceType) != null)
numFetchedResourcesMap.get(resourceType).inc(1);

if (parquetUtil != null) {
startTime = System.currentTimeMillis();
parquetUtil.write(resource);
totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
incrementElapsedTimeCounter(totalGenerateTimeMillisMap, resourceType, startTime);
}
if (!sinkPath.isEmpty()) {
startTime = System.currentTimeMillis();
Expand All @@ -155,7 +156,8 @@ public void writeResource(HapiRowDescriptor element)
} else {
fhirStoreUtil.uploadResource(resource);
}
totalPushTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);

incrementElapsedTimeCounter(totalPushTimeMillisMap, resourceType, startTime);
}
if (sinkDbConfig != null) {
if (isResourceDeleted) {
Expand Down Expand Up @@ -229,6 +231,14 @@ private Resource createNewFhirResource(String fhirVersion, String resourceType)
}
}

private void incrementElapsedTimeCounter(
HashMap<String, Counter> counterMap, String resourceType, long startTime) {
Counter counter = counterMap.get(resourceType);
if (counter != null) {
counter.inc(System.currentTimeMillis() - startTime);
}
}

private String getFhirBasePackageName(String fhirVersion) {
FhirVersionEnum fhirVersionEnum = FhirVersionEnum.forVersionString(fhirVersion);
return switch (fhirVersionEnum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.hl7.fhir.r4.model.Reference;
import org.hl7.fhir.r4.model.Resource;
import org.joda.time.Instant;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,6 +62,7 @@ public class FetchResources
}

@VisibleForTesting
@Nullable
static String getSubjectPatientIdOrNull(Resource resource) {
String patientId = null;
// TODO: Instead of hard-coding `subject` here, use Patient Compartment:
Expand Down Expand Up @@ -104,6 +106,9 @@ static class SearchFn extends FetchSearchPageFn<SearchSegmentDescriptor> {
// StartBundle; we don't need to `synchronized` access of it because each instance of the DoFn
// class is accessed by a single thread.
// https://beam.apache.org/documentation/programming-guide/#user-code-thread-compatibility

// Suppressing warning as this is initialized in StartBundle.
@SuppressWarnings("NullAway")
private Map<String, Integer> patientCount = null;

SearchFn(FhirEtlOptions options, String stageIdentifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import ca.uhn.fhir.parser.IParser;
import com.cerner.bunsen.exception.ProfileException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig;
import com.google.fhir.analytics.model.DatabaseConfiguration;
Expand Down Expand Up @@ -128,6 +129,8 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {

protected AvroConversionUtil avroConversionUtil;

// Suppressing NullAway warning because all final fields are initialized in the constructor.
@SuppressWarnings("NullAway.Init")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider adding a comment why we have this here.

FetchSearchPageFn(FhirEtlOptions options, String stageIdentifier) {
this.outputParquetViewPath = options.getOutputParquetViewPath();
this.sinkPath = options.getFhirSinkPath();
Expand Down Expand Up @@ -220,6 +223,7 @@ public void setup() throws SQLException, ProfileException {
oAuthClientSecret,
checkPatientEndpoint,
fhirContext);
Preconditions.checkNotNull(fetchUtil);
fhirSearchUtil = new FhirSearchUtil(fetchUtil);
// TODO remove generateParquetFiles and instead rely on not setting outputParquetPath.
if (generateParquetFiles
Expand Down Expand Up @@ -291,12 +295,12 @@ protected void addFetchTime(long millis) {
totalFetchTimeMillis.inc(millis);
}

protected void processBundle(Bundle bundle)
protected void processBundle(@Nullable Bundle bundle)
throws IOException, SQLException, ViewApplicationException, ProfileException {
this.processBundle(bundle, null);
}

protected void processBundle(Bundle bundle, @Nullable Set<String> resourceTypes)
protected void processBundle(@Nullable Bundle bundle, @Nullable Set<String> resourceTypes)
throws IOException, SQLException, ViewApplicationException, ProfileException {
if (bundle != null && bundle.getEntry() != null) {
numFetchedResources.inc(bundle.getEntry().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.commons.collections.CollectionUtils;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -142,6 +143,7 @@ static void fetchPatientHistory(
}
}

@Nullable
private static List<Pipeline> buildFhirSearchPipeline(
FhirEtlOptions options, AvroConversionUtil avroConversionUtil) throws ProfileException {
FhirSearchUtil fhirSearchUtil =
Expand Down Expand Up @@ -316,6 +318,7 @@ static void validateOptions(FhirEtlOptions options) {
}

// TODO: Implement active period feature for JDBC mode with a HAPI source server (issue #278).
@Nullable
private static List<Pipeline> buildHapiJdbcPipeline(FhirEtlOptions options)
throws SQLException, IOException {
Preconditions.checkArgument(!Strings.isNullOrEmpty(options.getFhirDatabaseConfigPath()));
Expand All @@ -331,7 +334,8 @@ private static List<Pipeline> buildHapiJdbcPipeline(FhirEtlOptions options)
List<Pipeline> pipelines = new ArrayList<>();
long totalNumOfResources = 0L;
for (String resourceType : Splitter.on(',').splitToList(options.getResourceList())) {
int numResources = resourceCount.get(resourceType);
int numResources =
resourceCount.get(resourceType) != null ? resourceCount.get(resourceType) : 0;
if (numResources == 0) {
continue;
}
Expand Down Expand Up @@ -513,6 +517,7 @@ private static Map<String, List<String>> validateAndFetchNdjsonFileMappings(
* @param options the pipeline options to be used.
* @return the created Pipeline instance or null if nothing needs to be done.
*/
@Nullable
static List<Pipeline> setupAndBuildPipelines(
FhirEtlOptions options, AvroConversionUtil avroConversionUtil)
throws IOException, SQLException, ViewDefinitionException, ProfileException {
Expand Down Expand Up @@ -553,7 +558,11 @@ public static void main(String[] args)
options.getStructureDefinitionsPath(),
options.getRecursiveDepth());
List<Pipeline> pipelines = setupAndBuildPipelines(options, avroConversionUtil);
EtlUtils.runMultiplePipelinesWithTimestamp(pipelines, options);
if (pipelines != null) {
EtlUtils.runMultiplePipelinesWithTimestamp(pipelines, options);
} else {
log.info("No work needs to be done!");
}

log.info("DONE!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import org.hl7.fhir.r4.model.Bundle;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -54,6 +55,7 @@ public class FhirSearchUtil {
}

// TODO update all types to FHIR version independent interfaces!
@Nullable
public Bundle searchByUrl(String searchUrl, int count, SummaryEnum summaryMode) {
try {
IGenericClient client = fetchUtil.getSourceClient();
Expand Down Expand Up @@ -106,7 +108,8 @@ public Map<String, Integer> searchResourceCounts(String resourceList, String sin
return hashMap;
}

public String getNextUrl(Bundle bundle) {
@Nullable
public String getNextUrl(@Nullable Bundle bundle) {
if (bundle != null && bundle.getLink(Bundle.LINK_NEXT) != null) {
return bundle.getLink(Bundle.LINK_NEXT).getUrl();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ static HapiRowDescriptor create(
abstract String jsonResource();

// FHIR tags.
List<ResourceTag> tags;
@Nullable List<ResourceTag> tags;
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public HapiRowDescriptor mapRow(ResultSet resultSet) throws Exception {
String lastUpdated = resultSet.getString("res_updated");
String fhirVersion = resultSet.getString("res_version");
String resourceVersion = resultSet.getString("res_ver");
numMappedResourcesMap.get(resourceType).inc();
if (numMappedResourcesMap.get(resourceType) != null)
numMappedResourcesMap.get(resourceType).inc();
return HapiRowDescriptor.create(
resourceId,
forcedId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.hl7.fhir.r4.model.Resource;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,7 +54,7 @@ public class JdbcResourceWriter {

private static final String ID_COLUMN = "id";

private final ViewManager viewManager;
@Nullable private final ViewManager viewManager;

private final IParser parser;

Expand Down Expand Up @@ -136,16 +137,16 @@ static void createTables(FhirEtlOptions options)
ViewManager viewManager = ViewManager.createForDir(viewDir);
for (String resourceType : Splitter.on(',').split(options.getResourceList())) {
ImmutableList<ViewDefinition> views = viewManager.getViewsForType(resourceType);
if (views == null || views.isEmpty()) {
if (views.isEmpty()) {
log.warn("No views found for resource type {} in directory {}!", resourceType, viewDir);
} else {
for (ViewDefinition vDef : views) {
if (Strings.isNullOrEmpty(vDef.getName())) {
throw new ViewDefinitionException("Field `name` in ViewDefinition is not defined.");
}
if (vDef.getAllColumns().get(ID_COLUMN) == null
|| !ViewApplicator.GET_RESOURCE_KEY.equals(
vDef.getAllColumns().get(ID_COLUMN).getPath())) {
ViewDefinition.Column idColumn =
vDef.getAllColumns() != null ? vDef.getAllColumns().get(ID_COLUMN) : null;
if (idColumn == null || !ViewApplicator.GET_RESOURCE_KEY.equals(idColumn.getPath())) {
throw new ViewDefinitionException(
String.format(
"To write view '%s' to DB, there should be a column '%s' with path '%s'.",
Expand Down Expand Up @@ -197,18 +198,22 @@ private static void deleteRowsById(DataSource dataSource, String tableName, Stri
*
* @param resourceType the type of resource to be deleted
* @param id the id of the resource to be deleted
* @throws SQLException
* @throws SQLException if a database access error occurs
*/
public void deleteResourceById(String resourceType, String id) throws SQLException {
if (viewManager == null) {
deleteRowsById(jdbcDataSource, resourceType, id);
} else {
ImmutableList<ViewDefinition> views = viewManager.getViewsForType(resourceType);
for (ViewDefinition vDef : views) {
if (Strings.isNullOrEmpty(vDef.getName())) {
throw new SQLException("Field `name` in ViewDefinition is not defined.");
if (!views.isEmpty()) {
for (ViewDefinition vDef : views) {
if (Strings.isNullOrEmpty(vDef.getName())) {
throw new SQLException("Field `name` in ViewDefinition is not defined.");
}
deleteRowsById(jdbcDataSource, vDef.getName(), id);
}
deleteRowsById(jdbcDataSource, vDef.getName(), id);
} else {
log.warn("No views found for resource type {}!", resourceType);
}
}
}
Expand All @@ -233,7 +238,7 @@ public void writeResource(Resource resource) throws SQLException, ViewApplicatio
}
} else {
ImmutableList<ViewDefinition> views = viewManager.getViewsForType(resource.fhirType());
if (views != null) {
if (!views.isEmpty()) {
for (ViewDefinition vDef : views) {
if (Strings.isNullOrEmpty(vDef.getName())) {
throw new SQLException("Field `name` in ViewDefinition is not defined.");
Expand Down
Loading
Loading