From b7b5fed9ea3c1109e9df5b2bb98075cadda8ce73 Mon Sep 17 00:00:00 2001 From: Jordi Deu-Pons Date: Wed, 18 Oct 2023 12:09:30 +0200 Subject: [PATCH] Allow to add custom traces Signed-off-by: Jordi Deu-Pons --- .../executor/BashWrapperBuilder.groovy | 23 +++++++ .../groovy/nextflow/processor/TaskBean.groovy | 3 + .../nextflow/processor/TaskConfig.groovy | 13 ++++ .../nextflow/script/ProcessConfig.groovy | 3 +- .../nextflow/script/WorkflowMetadata.groovy | 7 ++ .../trace/DefaultObserverFactory.groovy | 5 ++ .../trace/TraceMetadataObserver.groovy | 64 +++++++++++++++++++ .../groovy/nextflow/trace/TraceRecord.groovy | 15 +++-- .../nextflow/executor/command-trace.txt | 5 ++ 9 files changed, 130 insertions(+), 8 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/trace/TraceMetadataObserver.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy index bc21186d1a..6a33bd4442 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy @@ -323,6 +323,9 @@ class BashWrapperBuilder { // patch root ownership problem on files created with docker binding.fix_ownership = fixOwnership() ? "[ \${NXF_OWNER:=''} ] && (shopt -s extglob; GLOBIGNORE='..'; chown -fR --from root \$NXF_OWNER ${workDir}/{*,.*}) || true" : null + binding.custom_trace_collect = getCustomTraceCollect() + binding.custom_trace_write = getCustomTraceWrite() + binding.trace_script = isTraceRequired() ? getTraceScript(binding) : null return binding @@ -428,6 +431,26 @@ class BashWrapperBuilder { return result } + private String getCustomTraceCollect() { + if( !customTraces ) + return null + String result='' + for( String key : customTraces.keySet() ) { + result += "local custom_${key}=\$(${customTraces.get(key)})" + } + return result + } + + private String getCustomTraceWrite() { + if( !customTraces ) + return null + String result='' + for( String key : customTraces.keySet() ) { + result += "echo \"custom_${key}=\$custom_${key}\" >> \$trace_file" + } + return result + } + private String getCondaActivateSnippet() { if( !condaEnv ) return null diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy index c20a8ac2eb..a6d5791252 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy @@ -51,6 +51,8 @@ class TaskBean implements Serializable, Cloneable { List moduleNames + Map customTraces + Path workDir Path targetDir @@ -156,6 +158,7 @@ class TaskBean implements Serializable, Cloneable { this.stageOutMode = task.config.getStageOutMode() this.resourceLabels = task.config.getResourceLabels() + this.customTraces = task.config.getCustomTraces() } @Override diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index 3a24dc6c04..b081c2e3b4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -206,6 +206,19 @@ class TaskConfig extends LazyMap implements Cloneable { return get('stageOutMode') } + Map getCustomTraces() { + def value = get('customTraces') + if( value == null ) + return null + + if( value instanceof Map ) { + //TODO validate key names + return (Map) value + } + + throw new IllegalArgumentException("Not a valid `customTraces` value: ${value}") + } + boolean getDebug() { // check both `debug` and `echo` for backward // compatibility until `echo` is not removed diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy index d88a11cf1d..fb345b2ba6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy @@ -91,7 +91,8 @@ class ProcessConfig implements Map, Cloneable { 'stdout', 'stageInMode', 'stageOutMode', - 'resourceLabels' + 'resourceLabels', + 'customTraces' ] /** diff --git a/modules/nextflow/src/main/groovy/nextflow/script/WorkflowMetadata.groovy b/modules/nextflow/src/main/groovy/nextflow/script/WorkflowMetadata.groovy index e8c066cea0..6b88d5e7c2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/WorkflowMetadata.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/WorkflowMetadata.groovy @@ -16,6 +16,8 @@ package nextflow.script +import nextflow.trace.TraceRecord + import java.nio.file.Path import java.nio.file.Paths import java.time.OffsetDateTime @@ -207,6 +209,11 @@ class WorkflowMetadata { */ Manifest manifest + /** + * The workflow completed traces + */ + List traces = [] + private Session session final private List onCompleteActions = [] diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy index 6c391625c9..ab14e5b222 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy @@ -25,6 +25,7 @@ class DefaultObserverFactory implements TraceObserverFactory { createTimelineObserver(result) createDagObserver(result) createAnsiLogObserver(result) + createTraceMetadataObserver(result) return result } @@ -101,4 +102,8 @@ class DefaultObserverFactory implements TraceObserverFactory { result << observer } + protected void createTraceMetadataObserver(Collection result) { + result << new TraceMetadataObserver(session) + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceMetadataObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceMetadataObserver.groovy new file mode 100644 index 0000000000..02e11f58ad --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceMetadataObserver.groovy @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.trace + +import groovy.transform.CompileStatic +import groovy.transform.PackageScope +import groovy.util.logging.Slf4j +import groovyx.gpars.agent.Agent +import nextflow.ISession +import nextflow.Session +import nextflow.processor.TaskHandler +import nextflow.processor.TaskId +import nextflow.processor.TaskProcessor +import nextflow.script.WorkflowMetadata + +import java.nio.file.Files +import java.nio.file.Path +import java.util.concurrent.ConcurrentHashMap + +@Slf4j +@CompileStatic +class TraceMetadataObserver implements TraceObserver { + + Session session + + TraceMetadataObserver(Session session) { + this.session = session + } + + @Override + void onProcessComplete(TaskHandler handler, TraceRecord trace) { + final taskId = handler.task.id + if( !trace ) { + log.debug "[WARN] Unable to find record for task run with id: ${taskId}" + return + } + + session.workflowMetadata.traces.add(trace) + } + + @Override + void onProcessCached(TaskHandler handler, TraceRecord trace) { + onProcessComplete(handler, trace) + } + + @Override + boolean enableMetrics() { + return true + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy index 34f2072751..15914f34a1 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy @@ -242,11 +242,6 @@ class TraceRecord implements Serializable { Map getStore() { store } - @Memoized - Set keySet() { - FIELDS.keySet() - } - def propertyMissing(String name, value) { put(name,value) } @@ -261,7 +256,7 @@ class TraceRecord implements Serializable { } def get( String name ) { - assert keySet().contains(name), "Not a valid TraceRecord field: '$name'" + assert name.startsWith("custom_") || FIELDS.containsKey(name), "Not a valid TraceRecord field: '$name'" if( name == 'env' ) { final ret = store.get(name) return ret ? secureEnvString(ret.toString()) : ret @@ -275,7 +270,7 @@ class TraceRecord implements Serializable { void put( String name, def value ) { - if( !keySet().contains(name) ) { + if( !name.startsWith('custom_') && !FIELDS.containsKey(name) ) { log.warn1 "Unknown trace record field: $name" return } @@ -434,6 +429,12 @@ class TraceRecord implements Serializable { if( value == null ) continue + // Parse custom traces always as string + if( name.startsWith("custom_") ) { + this.put(name, value) + continue + } + switch (name) { case '%cpu': case '%mem': diff --git a/modules/nextflow/src/main/resources/nextflow/executor/command-trace.txt b/modules/nextflow/src/main/resources/nextflow/executor/command-trace.txt index 62da5a46ce..228e615b6c 100644 --- a/modules/nextflow/src/main/resources/nextflow/executor/command-trace.txt +++ b/modules/nextflow/src/main/resources/nextflow/executor/command-trace.txt @@ -170,6 +170,10 @@ nxf_trace_linux() { local cpu_time0=$(2> /dev/null < /proc/$pid/stat awk '{printf "%.0f", ($16+$17)*10 }' || echo -n 'X') local io_stat0=($(2> /dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0')) local start_millis=$(nxf_date) + + ## collect custom traces + {{custom_trace_collect}} + ## capture error and kill mem watcher trap 'kill $mem_proc' ERR @@ -209,6 +213,7 @@ nxf_trace_linux() { echo "syscw=${io_stat1[3]}" >> $trace_file echo "read_bytes=${io_stat1[4]}" >> $trace_file echo "write_bytes=${io_stat1[5]}" >> $trace_file + {{custom_trace_write}} ## join nxf_mem_watch [ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true