Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.gson.Gson;
import java.io.IOException;
import java.io.Reader;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -95,7 +97,7 @@ public String triggerBulkExportJob(
*
* @param bulkExportStatusUrl the url of the bulk export job
* @return BulkExportHttpResponse - the status and details of the bulk export job
* @throws IOException
* @throws IOException in case of any error while fetching the status of bulk export job
*/
public BulkExportHttpResponse fetchBulkExportHttpResponse(String bulkExportStatusUrl)
throws IOException {
Expand All @@ -104,12 +106,14 @@ public BulkExportHttpResponse fetchBulkExportHttpResponse(String bulkExportStatu
httpResponseBuilder.httpStatus(httpResponse.getStatus());
if (httpResponse.getHeaders(EXPIRES) != null && !httpResponse.getHeaders(EXPIRES).isEmpty()) {
String expiresString = httpResponse.getHeaders(EXPIRES).get(0);
Date expires = new Date(expiresString);
DateTimeFormatter formatter = DateTimeFormatter.RFC_1123_DATE_TIME;
ZonedDateTime zonedDateTime = ZonedDateTime.parse(expiresString, formatter);
Date expires = Date.from(zonedDateTime.toInstant());
httpResponseBuilder.expires(expires);
}
if (!CollectionUtils.isEmpty(httpResponse.getHeaders(RETRY_AFTER))) {
String retryHeaderString = httpResponse.getHeaders(RETRY_AFTER).get(0);
httpResponseBuilder.retryAfter(Integer.valueOf(retryHeaderString));
httpResponseBuilder.retryAfter(Integer.parseInt(retryHeaderString));
}
if (!CollectionUtils.isEmpty(httpResponse.getHeaders(X_PROGRESS))) {
String xProgress = httpResponse.getHeaders(X_PROGRESS).get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class BulkExportUtil {
* @param since - the fhir resources fetched should have updated timestamp greater than this
* @param fhirVersionEnum - the fhir version of resource types
* @return the BulkExportResponse
* @throws IOException
* @throws IOException in case of any error while triggering or fetching the status of bulk export
* job
*/
public BulkExportResponse triggerBulkExport(
List<String> resourceTypes, @Nullable String since, FhirVersionEnum fhirVersionEnum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -121,7 +122,7 @@ public void writeResource(HapiRowDescriptor element)
// deleted
resource = createNewFhirResource(element.fhirVersion(), resourceType);
ActionType removeAction = ActionType.REMOVE;
meta.setLastUpdated(new Date());
meta.setLastUpdated(Date.from(Instant.now()));
meta.addTag(
new Coding(removeAction.getSystem(), removeAction.toCode(), removeAction.getDisplay()));
} else {
Expand Down Expand Up @@ -215,10 +216,10 @@ private Resource createNewFhirResource(String fhirVersion, String resourceType)
try {
// TODO create tests for this method and different versions of FHIR; casting to R4 resource
// does not seem right!
return (Resource)
Class.forName(getFhirBasePackageName(fhirVersion) + "." + resourceType)
.getConstructor()
.newInstance();
return Class.forName(getFhirBasePackageName(fhirVersion) + "." + resourceType)
.asSubclass(Resource.class)
.getConstructor()
.newInstance();
} catch (InstantiationException
| IllegalAccessException
| ClassNotFoundException
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 Google LLC
* Copyright 2020-2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,8 +21,6 @@
import com.google.fhir.analytics.view.ViewApplicationException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
Expand All @@ -40,15 +38,10 @@ public class FetchPatients extends PTransform<PCollection<KV<String, Integer>>,

private final FetchSearchPageFn<KV<String, Integer>> fetchSearchPageFn;

private final Schema schema;

FetchPatients(FhirEtlOptions options, Schema schema) {
FetchPatients(FhirEtlOptions options) {
Preconditions.checkState(!options.getActivePeriod().isEmpty());
List<String> dateRange = FhirSearchUtil.getDateRange(options.getActivePeriod());

int count = options.getBatchSize();
this.schema = schema;

fetchSearchPageFn =
new FetchSearchPageFn<KV<String, Integer>>(options, "PatientById") {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,7 @@ static void fetchPatientHistory(
PCollection<KV<String, Integer>> flattenedPatients =
patientIdList.apply(Flatten.pCollections());
PCollection<KV<String, Integer>> mergedPatients = flattenedPatients.apply(Sum.integersPerKey());
final String patientType = "Patient";
FetchPatients fetchPatients =
new FetchPatients(options, avroConversionUtil.getResourceSchema(patientType));
FetchPatients fetchPatients = new FetchPatients(options);
mergedPatients.apply(fetchPatients);
for (String resourceType : patientAssociatedResources) {
FetchPatientHistory fetchPatientHistory = new FetchPatientHistory(options, resourceType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,21 +214,25 @@ public interface FhirEtlOptions extends BasePipelineOptions {
void setActivePeriod(String value);

@Description(
"Fetch only FHIR resources that were updated after the given timestamp."
+ "The date format follows the dateTime format in the FHIR standard, without time-zone:\n"
+ "https://www.hl7.org/fhir/datatypes.html#dateTime\n"
+ "This feature is currently implemented only for HAPI JDBC mode.")
"""
Fetch only FHIR resources that were updated after the given timestamp.
The date format follows the dateTime format in the FHIR standard, without time-zone:
https://www.hl7.org/fhir/datatypes.html#dateTime
This feature is currently implemented only for HAPI JDBC mode.
""")
@Default.String("")
String getSince();

void setSince(String value);

@Description(
"Path to the sink database config; if not set, no sink DB is used.\n"
+ "If viewDefinitionsDir is set, the output tables will be the generated views\n"
+ "(the `name` field value will be used as the table name); if not, one table\n"
+ "per resource type is created with the JSON content of a resource and its\n"
+ "`id` column for each row.")
"""
Path to the sink database config; if not set, no sink DB is used.
If viewDefinitionsDir is set, the output tables will be the generated views
(the `name` field value will be used as the table name); if not, one table
per resource type is created with the JSON content of a resource and its
`id` column for each row.
""")
@Default.String("")
String getSinkDbConfigPath();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,12 +43,10 @@ public boolean equals(Object other) {
return true;
}

if (!(other instanceof ResourceTag)) {
if (!(other instanceof ResourceTag resourceTag)) {
return false;
}

ResourceTag resourceTag = (ResourceTag) other;

return coding != null
&& coding.equalsDeep(resourceTag.coding)
&& StringUtils.compare(resourceId, resourceTag.getResourceId()) == 0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,10 +27,6 @@ public interface PipelineMetrics {
/** Clears all the metrics */
void clearAllMetrics();

/**
* Sets the total number of resources
*
* @param totalNumOfResources
*/
/** Sets the total number of resources */
void setTotalNumOfResources(long totalNumOfResources);
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ Resource convertToHapi(GenericRecord record, String resourceType) throws Profile
AvroConverter converter = getConverter(resourceType);
IBaseResource resource = converter.avroToResource(record);
// TODO: fix this for other FHIR versions: https://github.com/google/fhir-data-pipes/issues/400
if (!(resource instanceof Resource)) {
if (resource instanceof Resource res) {
return res;
} else {
throw new IllegalArgumentException("Cannot convert input record to resource!");
}
return (Resource) resource;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public class DwhFiles {
* @param dwhRoot the root of the DWH
* @param viewRoot the root of the view dir under DWH; note this should be an absolute path, but
* it is usually "under" `dwhRoot`.
* @param fhirContext
* @param fhirContext the FHIR Context of the FHIR version we are working with.
*/
private DwhFiles(String dwhRoot, @Nullable String viewRoot, FhirContext fhirContext) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(dwhRoot));
Expand Down Expand Up @@ -199,8 +199,10 @@ public ResourceId getViewPath(String viewName) {
}

/**
* @param resourceType the type of the FHIR resources
* @return The file pattern for Parquet files of `resourceType` in this DWH.
* Returns the file-pattern for all Parquet files of a specific resource type.
*
* @param resourceType the type of the FHIR resources {@return The file pattern for Parquet files
* of `resourceType` in this DWH.}
*/
public String getResourceFilePattern(String resourceType) {
return String.format(
Expand All @@ -221,14 +223,14 @@ public String getViewFilePattern(String viewName) {
/**
* Returns all the child directories under the given base directory which are 1-level deep. Note
* in many cloud/distributed file-systems, we do not have "directories"; there are only buckets
* and files in those buckets. We use file-seprators (e.g., `/`) to simulate the concept of
* and files in those buckets. We use file-separators (e.g., `/`) to simulate the concept of
* directories. So for example, this method returns an empty set if `baseDir` is `bucket/test` and
* the only file in that bucket is `bucket/test/dir1/dir2/file.txt`. If `baseDir` is
* `bucket/test/dir1`, in the above example, `dir2` is returned.
*
* @param baseDir the path under which "directories" are looked for.
* @return The list of all child directories under the base directory
* @throws IOException
* @throws IOException if there is an error accessing the base directory
*/
static Set<ResourceId> getAllChildDirectories(String baseDir) throws IOException {
return getAllChildDirectories(baseDir, "");
Expand All @@ -241,8 +243,8 @@ static Set<ResourceId> getAllChildDirectories(String baseDir) throws IOException
*
* @param baseDir the baseDir of the DWH
* @param commonPrefix the prefix of directories under `baseDir` that we are looking for
* @return the list of found directory names.
* @throws IOException
* @return the set of found directory names.
* @throws IOException if there is an error accessing the base directory
*/
static Set<ResourceId> getAllChildDirectories(String baseDir, String commonPrefix)
throws IOException {
Expand Down Expand Up @@ -280,7 +282,7 @@ static Set<ResourceId> getAllChildDirectories(String baseDir, String commonPrefi
*
* @param commonPrefix the common prefix for the timestamped directories to search.
* @return the latest directory with the `commonPrefix`.
* @throws IOException
* @throws IOException if there is an error accessing the dwhRoot directory
*/
@Nullable
private static ResourceId getLatestPath(String dwhRoot, String commonPrefix) throws IOException {
Expand All @@ -300,6 +302,8 @@ private static ResourceId newTimestampedPath(String dwhRoot, String commonPrefix
}

/**
* Gets the latest incremental-run path under the given DWH root.
*
* @return the current incremental run path if one found; null otherwise.
*/
@Nullable
Expand Down Expand Up @@ -435,7 +439,7 @@ public static void copyDirToDwh(String srcDwh, String dirName, String destDwh)
}

List<ResourceId> destResourceIdList = new ArrayList<>();
sourceResourceIdList.stream()
sourceResourceIdList
.forEach(
resourceId -> {
if (resourceId.getFilename() != null) {
Expand Down Expand Up @@ -580,7 +584,7 @@ public static String getFileSeparatorForDwhFiles(String dwhRootPrefix) {
};
}

/** This method returns the {@code path} by appending the {@code fileseparator} if required. */
/** This method returns the {@code path} by appending the {@code fileSeparator} if required. */
public static String getPathEndingWithFileSeparator(String path, String fileSeparator) {
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(fileSeparator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ protected Collection<MethodOutcome> uploadBundle(
}

Resource outcome = responseComponent.getOutcome();
if (outcome instanceof IBaseOperationOutcome) {
methodOutcome.setOperationOutcome((IBaseOperationOutcome) outcome);
if (outcome instanceof IBaseOperationOutcome operationOutcome) {
methodOutcome.setOperationOutcome(operationOutcome);
}

responses.add(methodOutcome);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,10 @@ FlatRow applyColumns(@Nullable IBase element, List<Column> columns)
List<RowElement> rowElements = new ArrayList<>();
for (Column col : columns) {
if (GET_RESOURCE_KEY.equals(col.getPath())) {
if (element == null || !(element instanceof IBaseResource)) {
if (element == null || !(element instanceof IBaseResource baseResource)) {
throw new ViewApplicationException(
GET_RESOURCE_KEY + " can only be applied at the root!");
}
IBaseResource baseResource = (IBaseResource) element;
rowElements.add(
new RowElement(
// TODO move all type inference to a single place outside View application.
Expand All @@ -322,11 +321,11 @@ FlatRow applyColumns(@Nullable IBase element, List<Column> columns)
eval = evaluateFhirPath(element, fhirPathForRef);
}
for (IBase refElem : eval) {
if (!(refElem instanceof IBaseReference)) {
if (!(refElem instanceof IBaseReference refElemBaseReference)) {
throw new ViewApplicationException(
"getReferenceKey can only be applied to Reference elements; got " + fhirPathForRef);
}
IIdType ref = ((IBaseReference) refElem).getReferenceElement();
IIdType ref = refElemBaseReference.getReferenceElement();
if (resType.isEmpty()) {
refs.add(ref);
} else {
Expand Down Expand Up @@ -365,7 +364,9 @@ public static class RowList {

private final ImmutableList<FlatRow> rows;

private RowList(List<FlatRow> rows, LinkedHashMap<String, Column> columnInfos) {
private RowList(
List<FlatRow> rows,
@SuppressWarnings("NonApiType") LinkedHashMap<String, Column> columnInfos) {
this.rows = ImmutableList.copyOf(rows);
this.columnInfos = ImmutableMap.copyOf(columnInfos);
}
Expand Down Expand Up @@ -466,6 +467,7 @@ private static Builder builder() {
}

// This can eventually be public, but currently it is not used outside this file.
@SuppressWarnings("EffectivelyPrivate")
private static class Builder {
private final LinkedHashMap<String, Column> columnInfos = new LinkedHashMap<>();
private final List<FlatRow> rows = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ void validateAndSetUp(boolean checkName, @Nullable String fhirVersion)
* @return the [ordered] map of new column names and their types as string.
* @throws ViewDefinitionException for repeated columns or other requirements not satisfied.
*/
@SuppressWarnings("NonApiType")
private LinkedHashMap<String, Column> validateAndReplaceConstantsInSelects(
@Nullable List<Select> selects, LinkedHashMap<String, Column> currentColumns)
throws ViewDefinitionException {
Expand All @@ -157,10 +158,12 @@ private LinkedHashMap<String, Column> validateAndReplaceConstantsInSelects(
return newCols;
}

@SuppressWarnings("NonApiType")
private static LinkedHashMap<String, Column> newTypeMap() {
return new LinkedHashMap<>();
}

@SuppressWarnings("NonApiType")
private static LinkedHashMap<String, Column> unionTypeMaps(
LinkedHashMap<String, Column> m1, LinkedHashMap<String, Column> m2) {
LinkedHashMap<String, Column> u = new LinkedHashMap<>();
Expand All @@ -169,6 +172,7 @@ private static LinkedHashMap<String, Column> unionTypeMaps(
return u;
}

@SuppressWarnings("NonApiType")
private LinkedHashMap<String, Column> validateAndReplaceConstantsInOneSelect(
Select select, LinkedHashMap<String, Column> currentColumns) throws ViewDefinitionException {
LinkedHashMap<String, Column> newCols = newTypeMap();
Expand Down Expand Up @@ -329,6 +333,8 @@ private String quoteString(String s) {
}

/**
* Converts the value of this constant to a string that can be used in FHIRPath expressions.
*
* @return a string that can replace this constant in FHIRPaths.
* @throws ViewDefinitionException if zero or more than one value is defined.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static ImmutableMap<String, JDBCType> getDbSchema(ViewDefinition view) {
*
* @param rowElements the input elements of the row
* @param statement the statement on which `set*()` methods are called to write the row elements
* @throws SQLException
* @throws SQLException if an error occurs while setting values in the statement
*/
public static void setValueInStatement(
ImmutableList<RowElement> rowElements, PreparedStatement statement) throws SQLException {
Expand Down
Loading
Loading