diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 5be8e01bc..9dfe60cd8 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -79,9 +79,8 @@ steps: entrypoint: /bin/bash args: - -c - - cd e2e-tests; - docker build -t ${_REPOSITORY}/e2e-tests:${_TAG} .; - docker build -t ${_REPOSITORY}/e2e-tests/controller-spark:${_TAG} ./controller-spark/.; + - docker build -t ${_REPOSITORY}/e2e-tests:${_TAG} -f e2e-tests/Dockerfile ./e2e-tests/; + docker build -t ${_REPOSITORY}/e2e-tests/controller-spark:${_TAG} -f e2e-tests/controller-spark/Dockerfile .; waitFor: ['-'] - name: '${_REPOSITORY}/synthea-uploader:${_TAG}' @@ -203,17 +202,18 @@ steps: id: 'Run E2E Test for Dockerized Controller and Spark Thriftserver' waitFor: [ 'Bring up controller and Spark containers', 'Run E2E Test for BULK_EXPORT mode with HAPI source' ] +# allowFailure: true # The controller logs don't appear in Cloud Build output because we run it in # the detached mode. For debugging controller failures we can use something like # the following (and forcing the previous step to have 0 exit code or by # adding allowFailure: true). -# - name: 'gcr.io/cloud-builders/docker' -# id: 'PRINT CONTROLLER LOGS' -# entrypoint: /bin/bash -# args: -# - -c -# - docker logs pipeline-controller +#- name: 'gcr.io/cloud-builders/docker' +# id: 'PRINT CONTROLLER LOGS' +# entrypoint: /bin/bash +# args: +# - -c +# - docker logs pipeline-controller - name: 'docker/compose' id: 'Bring down controller and Spark containers' diff --git a/doc/docs/additional.md b/doc/docs/additional.md index df50e64ad..38350bd0b 100644 --- a/doc/docs/additional.md +++ b/doc/docs/additional.md @@ -213,17 +213,11 @@ controller dwh delete --snapshot-id first. A valid snapshot-id is the full id as shown in the list e.g. `dwh/controller_DEV_DWH_TIMESTAMP_2025_08_14T17_47_15_357080Z` -**Note on CLI Access in Docker container:** If you are running the pipeline -controller in a docker container as defined in ./Dockerfile e.g. such as when -using the Single Machine docker compose configuration, you can access the CLI -tool by running the following command from the host machine. +**Note on CLI Access with Docker ** If you are running the pipeline controller +in a docker container as defined in ./Dockerfile e.g. such as when using the +Single Machine docker compose configuration, you can access the CLI tool by +running the following command from the host machine. ```sh -docker exec -it /bin/bash -``` - -And then access the CLI tool as described above. - -```sh -controller [options] +docker exec controller [options] ``` diff --git a/e2e-tests/controller-spark/Dockerfile b/e2e-tests/controller-spark/Dockerfile index 980489dc8..f18bef01c 100644 --- a/e2e-tests/controller-spark/Dockerfile +++ b/e2e-tests/controller-spark/Dockerfile @@ -33,5 +33,10 @@ ENV PARQUET_SUBDIR="dwh" ENV DOCKER_NETWORK="--use_docker_network" ENV HOME_DIR="/workspace/e2e-tests/controller-spark" ENV DWH_TYPE="PARQUET" +ENV CONTROLLER_CLI_DIR="./controller-cli" + +COPY ./pipelines/controller-cli/ ${CONTROLLER_CLI_DIR}/ +WORKDIR ${CONTROLLER_CLI_DIR} +RUN pip3 install . ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK} ${DWH_TYPE} diff --git a/e2e-tests/controller-spark/controller_spark_sql_validation.sh b/e2e-tests/controller-spark/controller_spark_sql_validation.sh index daabae49c..e505498a7 100755 --- a/e2e-tests/controller-spark/controller_spark_sql_validation.sh +++ b/e2e-tests/controller-spark/controller_spark_sql_validation.sh @@ -187,29 +187,39 @@ function fhir_source_query() { # runs; should be one of "FULL", "INCREMENTAL", "VIEWS". ####################################################################### function run_pipeline() { + print_message "DEBUG LOG :: RUNNING PIPELINE_CONTROLLER_URL" local runMode=$1 - curl --location --request POST "${PIPELINE_CONTROLLER_URL}/run?runMode=${runMode}" \ - --connect-timeout 5 \ - --header 'Content-Type: application/json' \ - --header 'Accept: */*' -v + controller "${PIPELINE_CONTROLLER_URL}" run --mode "${runMode}" + print_message "DEBUG LOG :: END RUNNING PIPELINE_CONTROLLER_URL" } function wait_for_completion() { local runtime="15 minute" local end_time=$(date -ud "$runtime" +%s) + print_message "DEBUG LOG :: WAIT FOR COMPLETION OF PIPELINE, end_time = ${end_time}" + while [[ $(date -u +%s) -le ${end_time} ]] do - local pipeline_status=$(curl --location --request GET "${PIPELINE_CONTROLLER_URL}/status?" \ - --connect-timeout 5 \ - --header 'Content-Type: application/json' \ - --header 'Accept: */*' -v \ - | jq -r '.pipelineStatus') - - if [[ "${pipeline_status}" == "RUNNING" ]] - then + print_message "DEBUG LOG :: WAIT FOR COMPLETION OF PIPELINE INSIDE LOOP" + local controller_output + controller_output=$(controller "${PIPELINE_CONTROLLER_URL}" status 2>/dev/null | sed -n '/{/,/}/p') + print_message "DEBUG LOG :: WFC PIPELINE LOOP, CONTROLLER OUTPUT = ${controller_output}" + local pipeline_status="" + sleep 5 + if [[ -n "${controller_output}" ]]; then + pipeline_status=$(echo "{ + \"pipelineStatus\": \"IDLE\", + \"stats\": null + }" | jq -r '.pipelineStatus // ""') + print_message "DEBUG LOG :: WFC PIPELINE LOOP, PIPELINE STATUS= ${pipeline_status}" + fi + + if [[ "${pipeline_status}" == "RUNNING" ]]; then + print_message "DEBUG LOG :: WFC PIPELINE LOOP, SLEEPING FOR 5 SECONDS" sleep 5 else + print_message "DEBUG LOG :: WFC PIPELINE LOOP, BREAKING OUT OF LOOP" break fi done diff --git a/pipelines/controller-cli/src/main.py b/pipelines/controller-cli/src/main.py index 7c590d59a..4287cd431 100644 --- a/pipelines/controller-cli/src/main.py +++ b/pipelines/controller-cli/src/main.py @@ -29,8 +29,7 @@ def process_response(response: str, args: argparse.Namespace): print( - f"Command: {args.command} " - f"{args.subcommand if hasattr(args, 'subcommand') else ''}" + f"Command: {args.command} {args.subcommand if hasattr(args, 'subcommand') else ''}" ) print(f"Request url: {args.url}") print("Response:") @@ -44,10 +43,14 @@ def _make_api_request( verb: str, url: str, params: Optional[Dict[str, Any]] = None ) -> Optional[Dict[str, Any]]: try: + headers = {"Content-Type": "application/json", "Accept": "*/*"} + if verb == HTTP_POST: - response = requests.post(url, json={}, timeout=5) + response = requests.post( + url, json={}, params=params, headers=headers, timeout=5 + ) else: - response = requests.get(url, params=params, timeout=5) + response = requests.get(url, params=params, headers=headers, timeout=5) response.raise_for_status() @@ -85,7 +88,7 @@ def next_scheduled(args: argparse.Namespace) -> str: print(f"Error processing: {e}") -def status(args: str) -> str: +def status(args: argparse.Namespace) -> str: try: response = _make_api_request(HTTP_GET, f"{args.url}/status") process_response(response, args) @@ -93,11 +96,10 @@ def status(args: str) -> str: print(f"Error processing: {e}") -def run(args: str) -> str: +def run(args: argparse.Namespace) -> str: try: - response = _make_api_request( - HTTP_POST, f"{args.url}/run?runMode={args.mode.upper()}" - ) + params = {"runMode": args.mode.upper()} + response = _make_api_request(HTTP_POST, f"{args.url}/run", params=params) process_response(response, args) except requests.exceptions.RequestException as e: print(f"Error processing: {e}") @@ -178,10 +180,7 @@ def main(): config_parser = subparsers.add_parser("config", help="show config values") config_parser.add_argument( - "--config-name", - "-cn", - required=False, - help="name of the configuration key", + "--config-name", "-cn", required=False, help="name of the configuration key" ) config_parser.set_defaults(func=config) diff --git a/pipelines/controller/src/main/java/com/google/fhir/analytics/ApiController.java b/pipelines/controller/src/main/java/com/google/fhir/analytics/ApiController.java index 14047c11d..9a28ce547 100644 --- a/pipelines/controller/src/main/java/com/google/fhir/analytics/ApiController.java +++ b/pipelines/controller/src/main/java/com/google/fhir/analytics/ApiController.java @@ -34,6 +34,7 @@ import java.nio.channels.ReadableByteChannel; import java.time.LocalDateTime; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -74,10 +75,10 @@ public class ApiController { @Autowired private DwhFilesManager dwhFilesManager; @PostMapping("/run") - public ResponseEntity runBatch( + public ResponseEntity> runBatch( @RequestParam(name = "runMode", required = true) String runMode) { if (pipelineManager.isRunning()) { - return new ResponseEntity<>("Another pipeline is running.", HttpStatus.INTERNAL_SERVER_ERROR); + return new ResponseEntity<>(Collections.singletonMap("message","Another pipeline is running."), HttpStatus.INTERNAL_SERVER_ERROR); } logger.info("Received request to start the pipeline ..."); try { @@ -96,9 +97,9 @@ public ResponseEntity runBatch( } catch (Exception e) { logger.error("Error in starting the pipeline", e); return new ResponseEntity<>( - "An unknown error has occurred.", HttpStatus.INTERNAL_SERVER_ERROR); + Collections.singletonMap("message","An unknown error has occurred."), HttpStatus.INTERNAL_SERVER_ERROR); } - return new ResponseEntity<>(SUCCESS, HttpStatus.OK); + return new ResponseEntity<>(Collections.singletonMap("message", SUCCESS), HttpStatus.OK); } @GetMapping("/status")