diff --git a/geos-trame/pyproject.toml b/geos-trame/pyproject.toml index 4e98f6ce1..1c823a34b 100644 --- a/geos-trame/pyproject.toml +++ b/geos-trame/pyproject.toml @@ -8,7 +8,8 @@ version = "1.0.0" description = "Geos Simulation Modeler" authors = [{name = "GEOS Contributors" }] maintainers = [{name = "Alexandre Benedicto", email = "alexandre.benedicto@external.totalenergies.com" }, - {name = "Paloma Martinez", email = "paloma.martinez@external.totalenergies.com" }] + {name = "Paloma Martinez", email = "paloma.martinez@external.totalenergies.com" }, + {name = "Jacques Franc", email = "jacques.franc@external.totalenergies.com" },] license = {text = "Apache-2.0"} classifiers = [ "Development Status :: 4 - Beta", @@ -31,12 +32,12 @@ keywords = [ dependencies = [ "typing-extensions==4.12.2", "trame==3.6.5", - "trame-vuetify==2.7.1", + "trame-vuetify==3.1.0", "trame-code==1.0.1", "trame-server==3.2.3", - "trame-client==3.5.0", + "trame-client==3.11.2", "trame-simput==2.4.3", - "trame-vtk>=2.8.14", + "trame-vtk==2.10.0", "matplotlib==3.9.4", "trame-matplotlib==2.0.3", "trame-components==2.4.2", diff --git a/geos-trame/src/geos/trame/app/core.py b/geos-trame/src/geos/trame/app/core.py index 0a8f40973..06a9a54e7 100644 --- a/geos-trame/src/geos/trame/app/core.py +++ b/geos-trame/src/geos/trame/app/core.py @@ -24,6 +24,9 @@ from geos.trame.app.ui.viewer.viewer import DeckViewer from geos.trame.app.components.alertHandler import AlertHandler +from geos.trame.app.io.simulation import Simulation, SimRunner +from geos.trame.app.ui.simulation_view import define_simulation_view + import sys @@ -38,10 +41,12 @@ def __init__( self, server: Server, file_name: str ) -> None: self.deckEditor: DeckEditor | None = None self.timelineEditor: TimelineEditor | None = None self.deckInspector: DeckInspector | None = None + self.simulationLauncher: Simulation | None = None self.server = server server.enable_module( module ) self.state.input_file = file_name + self.state.user_id = None # TODO handle hot_reload @@ -67,6 +72,10 @@ def __init__( self, server: Server, file_name: str ) -> None: self.region_viewer = RegionViewer() self.well_viewer = WellViewer( 5, 5 ) + ######## Simulation runner + self.sim_runner: SimRunner = SimRunner( self.state.user_id ) + self.simulation = Simulation( self.sim_runner, server=server ) + # Data loader self.data_loader = DataLoader( self.tree, self.region_viewer, self.well_viewer, trame_server=server ) @@ -177,24 +186,6 @@ def build_ui( self ) -> None: ): vuetify.VIcon( "mdi-content-save-outline" ) - with html.Div( - style= - "height: 100%; width: 300px; display: flex; align-items: center; justify-content: space-between;", - v_if=( "tab_idx == 1", ), - ): - vuetify.VBtn( - "Run", - style="z-index: 1;", - ) - vuetify.VBtn( - "Kill", - style="z-index: 1;", - ) - vuetify.VBtn( - "Clear", - style="z-index: 1;", - ) - # input file editor with vuetify.VCol( v_show=( "tab_idx == 0", ), classes="flex-grow-1 pa-0 ma-0" ): if self.tree.input_file is not None: @@ -208,3 +199,16 @@ def build_ui( self ) -> None: "The file " + self.state.input_file + " cannot be parsed.", file=sys.stderr, ) + + with vuetify.VCol( v_show=( "tab_idx == 1" ), classes="flex-grow-1 pa-0 ma-0" ): + if self.simulation is not None: + define_simulation_view( self.server ) + else: + self.ctrl.on_add_error( + "Error", + "The execution context " + self.state.exec_context + " is not consistent.", + ) + print( + "The execution context " + self.state.exec_context + " is not consistent.", + file=sys.stderr, + ) diff --git a/geos-trame/src/geos/trame/app/io/simulation.py b/geos-trame/src/geos/trame/app/io/simulation.py new file mode 100644 index 000000000..d18d0a611 --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/simulation.py @@ -0,0 +1,708 @@ +from abc import ABC, abstractmethod +from pathlib import Path +from dataclasses import dataclass, field, fields +from enum import Enum, unique +from typing import Callable, Optional, Union +import datetime +from trame_server.core import Server +from trame_server.state import State +from geos.trame.app.utils.async_file_watcher import AsyncPeriodicRunner + +from jinja2 import Template +import paramiko +import re +import os + +#TODO move outside +#TODO use Jinja on real launcher + + +@dataclass( frozen=True ) +class SimulationConstant: + SIMULATION_GEOS_PATH = "/workrd/users/" + HOST = "p4log01" # Only run on P4 machine + REMOTE_HOME_BASE = "/users" + PORT = 22 + SIMULATIONS_INFORMATION_FOLDER_PATH = "/workrd/users/" + SIMULATION_DEFAULT_FILE_NAME = "geosDeck.xml" + + # replace by conf-file json + + +#If proxyJump are needed +# +# proxy_cmd = "ssh -W {host}:{port} proxyuser@bastion.example.com".format( +# host=ssh_host, port=ssh_port +# ) +# from paramiko import ProxyCommand +# sock = ProxyCommand(proxy_cmd) + +# client = paramiko.SSHClient() +# client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +# client.connect( +# hostname=ssh_host, +# port=ssh_port, +# username=username, +# key_filename=keyfile, +# sock=sock, # <— tunnel created by ProxyCommand +# ) + +# Load template from file +# with open("slurm_job_template.j2") as f: +# template = Template(f.read()) + +#TODO from private-assets +template_str = """#!/bin/sh +#SBATCH --job-name="{{ job_name }}" +#SBATCH --ntasks={{ ntasks }} +#SBATCH --partition={{ partition }} +#SBATCH --comment={{ comment_gr }} +#SBACTH --account={{ account }} +#SBATCH --nodes={{ nodes }} +#SBATCH --time={{ time | default('00:10:00') }} +#SBATCH --mem={{ mem }} +#SBATCH --output=job_GEOS_%j.out +#SBATCH --error=job_GEOS_%j.err + +ulimit -s unlimited +ulimit -c unlimited + +module purge + +export HDF5_USE_FILE_LOCKING=FALSE +export OMP_NUM_THREADS=1 + +srun --mpi=pmix_v3 --hint=nomultithread \ + -n {{ ntasks }} geos \ + -o Outputs_${SLURM_JOBID} \ + -i {{ input_file | default('geosDeck.xml') }} | tee Outputs_${SLURM_JOBID}/log_${SLURM_JOBID}.out + +""" + +template_cb = """#!/bin/sh +#SBATCH --job-name="{{ job_name }}" +#SBATCH --ntasks={{ ntasks }} +#SBATCH --partition={{ partition }} +#SBATCH --comment={{ comment_gr }} +#SBACTH --account={{ account }} +#SBATCH --nodes={{ nodes }} +#SBATCH --time={{ time | default('00:10:00') }} +#SBATCH --mem={{ mem }} +#SBATCH --output=job_GEOS_%j.out +#SBATCH --err=job_GEOS_%j.err +#SBATCH --dependency=afterok:{{ dep_job_id }} + +srun tar cfz {{ dep_job_id }}.tgz Outputs_{{ dep_job_id }}/ && mv -v {{ dep_job_id }}.tgz {{ target_dl_path }} + +""" + + +class Authentificator: #namespacing more than anything else + + ssh_client: Optional[ paramiko.SSHClient ] = None + + @staticmethod + def _sftp_copy_tree( ssh_client, file_tree, remote_root ): + # Connect to remote server + sftp = ssh_client.open_sftp() + + Authentificator.dfs_tree( file_tree[ "structure" ], file_tree[ "root" ], sftp=sftp, remote_root=remote_root ) + + sftp.close() + + @staticmethod + def dfs_tree( node, path, sftp, remote_root ): + + lp = Path( path ) + rp = Path( remote_root ) / lp + + if isinstance( node, list ): + for file in node: + # sftp.put(lp/Path(file), rp/Path(file)) + with sftp.file( str( rp / Path( file.get( 'name' ) ) ), 'w' ) as f: + f.write( file.get( 'content' ) ) + print( f"copying {lp/Path(file.get('name'))} to {rp/Path(file.get('name'))}" ) + elif isinstance( node, dict ): + if "files" in node: + for file in node[ "files" ]: + # sftp.put( str(lp/Path(file)), str(rp/Path(file)) ) + with sftp.file( str( rp / Path( file.get( 'name' ) ) ), 'w' ) as f: + f.write( file.get( 'content' ) ) + print( f"copying {lp/Path(file.get('name'))} to {rp/Path(file.get('name'))}" ) + if "subfolders" in node: + for subfolder, content in node[ "subfolders" ].items(): + try: + sftp.stat( str( rp / Path( subfolder ) ) ) + except FileNotFoundError: + print( f"creating {rp/Path(subfolder)}" ) + sftp.mkdir( str( rp / Path( subfolder ) ) ) + Authentificator.dfs_tree( content, lp / Path( subfolder ), sftp, remote_root ) + + for folder, content in node.items(): + if folder not in [ "files", "subfolders" ]: + try: + sftp.stat( str( rp / Path( folder ) ) ) + except FileNotFoundError: + print( f"creating {rp/Path(folder)}" ) + sftp.mkdir( str( rp / Path( folder ) ) ) + Authentificator.dfs_tree( content, lp / Path( folder ), sftp, remote_root ) + + @staticmethod + def kill_job( id ): + if Authentificator.ssh_client: + Authentificator._execute_remote_command( Authentificator.ssh_client, f"scancel {id}" ) + return None + + @staticmethod + def get_key( id, pword ): + + try: + home = os.environ.get( "HOME" ) + PRIVATE_KEY = paramiko.RSAKey.from_private_key_file( f"{home}/.ssh/id_trame" ) + return PRIVATE_KEY + except paramiko.SSHException as e: + print( f"Error loading private key: {e}\n" ) + except FileNotFoundError as e: + print( f"Private key not found: {e}\n Generating key ..." ) + PRIVATE_KEY = Authentificator.gen_key() + temp_client = paramiko.SSHClient() + temp_client.set_missing_host_key_policy( paramiko.AutoAddPolicy() ) + temp_client.connect( SimulationConstant.HOST, + SimulationConstant.PORT, + username=id, + password=pword, + timeout=10 ) + Authentificator._transfer_file_sftp( temp_client, f"{home}/.ssh/id_trame.pub", + f"{SimulationConstant.REMOTE_HOME_BASE}/{id}/.ssh/id_trame.pub" ) + Authentificator._execute_remote_command( + temp_client, + f" cat {SimulationConstant.REMOTE_HOME_BASE}/{id}/.ssh/id_trame.pub | tee -a {SimulationConstant.REMOTE_HOME_BASE}/{id}/.ssh/authorized_keys" + ) + + return PRIVATE_KEY + + @staticmethod + def gen_key(): + + home = os.environ.get( "HOME" ) + file_path = f"{home}/.ssh/id_trame" + key = paramiko.RSAKey.generate( bits=4096 ) + key.write_private_key_file( file_path ) + + # Get public key in OpenSSH format + public_key = f"{key.get_name()} {key.get_base64()}" + with open( file_path + ".pub", "w" ) as pub_file: + pub_file.write( public_key ) + + print( "SSH key pair generated: id_trame (private), id_trame.pub (public)" ) + + return key + + @staticmethod + def _create_ssh_client( host, port, username, password=None, key=None ) -> paramiko.SSHClient: + """ + Initializes and returns an SSH client connection. + Uses context manager for automatic cleanup. + """ + client = paramiko.SSHClient() + # Automatically adds the hostname and new host keys to the host files (~/.ssh/known_hosts) + client.set_missing_host_key_policy( paramiko.AutoAddPolicy() ) + + try: + print( f"Connecting to {host} using key-based authentication..." ) + client.connect( host, port, username, pkey=key, timeout=10 ) + + return client + except paramiko.AuthenticationException: + print( "Authentication failed. Check your credentials or key." ) + return None + except paramiko.SSHException as e: + print( f"Could not establish SSH connection: {e}" ) + return None + except Exception as e: + print( f"An unexpected error occurred: {e}" ) + return None + + @staticmethod + def _execute_remote_command( client, command ): + """ + Executes a single command on the remote server and prints the output. + """ + if not client: + return + + print( f"\n--- Executing Command: '{command}' ---" ) + try: + # Executes the command. stdin, stdout, and stderr are file-like objects. + # Ensure command ends with a newline character for some shell environments. + stdin, stdout, stderr = client.exec_command( command ) + + # Wait for the command to finish and read the output + exit_status = stdout.channel.recv_exit_status() + + # Print standard output + stdout_data = stdout.read().decode().strip() + if stdout_data: + print( "STDOUT:" ) + print( stdout_data ) + + # Print standard error (if any) + stderr_data = stderr.read().decode().strip() + if stderr_data: + print( "STDERR:" ) + print( stderr_data ) + + print( f"Command exited with status: {exit_status}" ) + return ( exit_status, stdout_data, stderr_data ) + + except Exception as e: + print( f"Error executing command: {e}" ) + return ( -1, "", "" ) + + @staticmethod + def _transfer_file_sftp( client, local_path, remote_path, direction="put" ): + """ + Transfers a file using SFTP (Secure File Transfer Protocol). + Direction can be 'put' (upload) or 'get' (download). + """ + if not client: + return + + print( f"\n--- Starting SFTP Transfer ({direction.upper()}) ---" ) + + try: + # Establish an SFTP connection session + sftp = client.open_sftp() + + if direction == "put": + print( f"Uploading '{local_path}' to '{remote_path}'..." ) + sftp.put( local_path, remote_path ) + print( "Upload complete." ) + elif direction == "get": + print( f"Downloading '{remote_path}' to '{local_path}'..." ) + sftp.get( remote_path, local_path ) + print( "Download complete." ) + else: + print( "Invalid transfer direction. Use 'put' or 'get'." ) + + sftp.close() + return True + + except FileNotFoundError: + print( f"Error: Local file '{local_path}' not found." ) + return False + except IOError as e: + print( f"Error accessing remote file or path: {e}" ) + return False + except Exception as e: + print( f"An error occurred during SFTP: {e}" ) + return False + + +@unique +class SlurmJobStatus( Enum ): + PENDING = "PEND" + RUNNING = "R" + COMPLETING = "CG" + COMPLETED = "CD" + SUSPENDED = "S" + UNKNOWN = "UNKNOWN" + + @classmethod + def from_string( cls, job_str ) -> "SlurmJobStatus": + try: + return cls( job_str ) + except ValueError: + return cls.UNKNOWN + +@dataclass +class LauncherParams: + simulation_files_path: Optional[ str ] = None + simulation_cmd_filename: Optional[ str ] = None + simulation_job_name: Optional[ str ] = None + simulation_nb_process: int = 1 + + @classmethod + def from_server_state( cls, server_state: State ) -> "LauncherParams": + state = cls() + for f in fields( cls ): + setattr( state, f.name, server_state[ f.name ] ) + return state + + def is_complete( self ) -> bool: + return None not in [ getattr( self, f.name ) for f in fields( self ) ] + + def assert_is_complete( self ) -> None: + if not self.is_complete(): + raise RuntimeError( f"Incomplete simulation launch parameters : {self}." ) + + +def get_timestamp() -> str: + return datetime.utcnow().strftime( "%Y-%m-%d_%H-%M-%S.%f" )[ :-3 ] + + +def get_simulation_output_file_name( timestamp: str, user_name: str = "user_name" ): + return f"{user_name}_{timestamp}.json" + + +# def write_simulation_information_to_repo(info: SimulationInformation, sim_info_path: Path) -> Optional[Path]: +# return write_file( +# sim_info_path.as_posix(), +# get_simulation_output_file_name(info.timestamp, info.user_igg), +# json.dumps(info.to_dict()), # type: ignore +# ) + +# def get_simulation_screenshot_timestep(filename: str) -> int: +# """ +# From a given file name returns the time step. +# Filename is defined as: RenderView0_000000.png with 000000 the time step to parse and return +# """ +# if not filename: +# print("Simulation filename is not defined") +# return -1 + +# pattern = re.compile(r"RenderView[0-9]_[0-9]{6}\.png", re.IGNORECASE) +# if pattern.match(filename) is None: +# print("Simulation filename does not match the pattern: RenderView0_000000.png") +# return -1 + +# timestep = os.path.splitext(filename)[0].split("_")[-1] +# return int(timestep) if timestep else -1 + +# def get_most_recent_file_from_list(files_list: list[str]) -> Optional[str]: +# if not files_list: +# return None +# return max(files_list, key=get_simulation_screenshot_timestep) + +# def get_most_recent_simulation_screenshot(folder_path: Path) -> Optional[str]: +# return get_most_recent_file_from_list(os.listdir(folder_path)) if folder_path.exists() else None + + +class ISimRunner( ABC ): + """ + Abstract interface for sim runner. + Provides methods to trigger simulation, get simulation output path and knowing if simulation is done or not. + """ + pass + # @abstractmethod + # def launch_simulation(self, launcher_params: LauncherParams) -> tuple[Path, SimulationInformation]: + # pass + + # @abstractmethod + # def get_user_igg(self) -> str: + # pass + + # @abstractmethod + # def get_running_user_jobs(self) -> list[tuple[str, SlurmJobStatus]]: + # pass + + +class SimRunner( ISimRunner ): + """ + Runs sim on HPC. Wrap paramiko use + """ + + def __init__( self, user ): + super().__init__() + + # early test + self.local_upload_file = "test_upload.txt" + import time + with open( self.local_upload_file, "w" ) as f: + f.write( f"This file was uploaded at {time.ctime()}\n" ) + print( f"Created local file: {self.local_upload_file}" ) + + +class Simulation: + """ + Simulation component. + Fills the UI with the screenshot as read from the simulation outputs folder and a graph with the time series + from the simulation. + Requires a simulation runner providing information on the output path of the simulation to monitor and ways to + trigger the simulation. + """ + + def __init__( self, sim_runner: ISimRunner, server: Server, sim_info_dir: Optional[ Path ] = None ) -> None: + self._server = server + controller = server.controller + self._sim_runner = sim_runner + self._sim_info_dir = sim_info_dir or SimulationConstant.SIMULATIONS_INFORMATION_FOLDER_PATH + server.state.job_ids = [] + + server.state.status_colors = { + "PENDING": "#4CAF50", #PD + "RUNNING": "#3F51B5", #R + "CANCELLED": "#FFC107", #CA + "COMPLETED": "#484B45", #CD + "FAILED": "#E53935", #F + } + self._job_status_watcher: Optional[ AsyncPeriodicRunner ] = None + self._job_status_watcher_period_ms = 2000 + + #define triggers + @controller.trigger( "run_try_login" ) + def run_try_login() -> None: + + # if server.state.key: + Authentificator.ssh_client = Authentificator._create_ssh_client( + SimulationConstant.HOST, #test + SimulationConstant.PORT, + server.state.login, + key=Authentificator.get_key( server.state.login, server.state.password ) ) + + if Authentificator.ssh_client: + # id = os.environ.get('USER') + # Authentificator._execute_remote_command(Authentificator.ssh_client, f"ps aux") + # Authentificator._execute_remote_command(Authentificator.ssh_client, f"ls -l {SimulationConstant.REMOTE_HOME_BASE}/{id}") + + # server.state.update({"access_granted" : True, "key_path" : f"{SimulationConstant.REMOTE_HOME_BASE}/{id}/.ssh/id_trame" }) + # server.state.flush() + server.state.access_granted = True + print( "login login login" ) + + @staticmethod + def gen_tree( xml_filename ): + + import re + xml_pattern = re.compile( r"\.xml$", re.IGNORECASE ) + mesh_pattern = re.compile( r"\.(vtu|vtm|pvtu|pvtm)$", re.IGNORECASE ) + table_pattern = re.compile( r"\.(txt|dat|csv|geos)$", re.IGNORECASE ) + xml_matches = [] + mesh_matches = [] + table_matches = [] + + pattern_file = r"[\w\-.]+\.(?:vtu|pvtu|dat|txt|xml|geos)\b" # all files + pattern_xml_path = r"\"(.*/)*([\w\-.]+\.(?:xml))\b" + pattern_mesh_path = r"\"(.*/)*([\w\-.]+\.(?:vtu|pvtu|vtm|pvtm))\b" + pattern_table_curly_path = r"((?:[\w\-/]+/)+)*([\w\-.]+\.(?:geos|csv|dat|txt))" + + for file in xml_filename: + if xml_pattern.search( file.get( "name", "" ) ): + xml_matches.append( file ) + elif mesh_pattern.search( file.get( "name", "" ) ): + mesh_matches.append( file ) + elif table_pattern.search( file.get( "name", "" ) ): + table_matches.append( file ) + + #assume the first XML is the main xml + # TODO relocate + xml_expected_file_matches = re.findall( pattern_file, xml_matches[ 0 ][ 'content' ].decode( "utf-8" ) ) + test_assert = { item.get( "name" ) + for item in xml_filename }.intersection( set( xml_expected_file_matches ) ) + + decoded = re.sub( pattern_xml_path, r'"\2', xml_matches[ 0 ][ 'content' ].decode( "utf-8" ) ) + decoded = re.sub( pattern_mesh_path, r'"mesh/\2', decoded ) + decoded = re.sub( pattern_table_curly_path, r"tables/\2", decoded ) + + xml_matches[ 0 ][ 'content' ] = decoded.encode( "utf-8" ) + + file_tree = { + 'root': '.', + "structure": { + "files": xml_matches, + "subfolders": { + "mesh": mesh_matches, + "tables": table_matches + } + } + } + return file_tree + + @controller.trigger( "run_simulation" ) + def run_simulation() -> None: + + # if server.state.access_granted and server.state.sd and server.state.simulation_xml_filename: + if server.state.access_granted and server.state.simulation_xml_filename: + template = Template( template_str ) + # sdi = server.state.sd + ci = { 'nodes': 1, 'total_ranks': 2 } + #TODO profile to use the correct amount + rendered = template.render( job_name=server.state.simulation_job_name, + input_file=[ + item for item in server.state.simulation_xml_filename + if item.get( 'type' ) == 'text/xml' + ][ 0 ].get( 'name' ), + nodes=ci[ 'nodes' ], + ntasks=ci[ 'total_ranks' ], + mem=f"0", + comment_gr=server.state.slurm_comment, + partition='p4_dev', + account='myaccount' ) + + if Authentificator.ssh_client: + #write slurm directly on remote + try: + sftp = Authentificator.ssh_client.open_sftp() + remote_path = Path( server.state.simulation_remote_path ) / Path( 'job.slurm' ) + with sftp.file( str( remote_path ), 'w' ) as f: + f.write( rendered ) + + # except FileExistsError: + # print(f"Error: Local file '{remote_path}' not found.") + except PermissionError as e: + print( f"Permission error: {e}" ) + except IOError as e: + print( f"Error accessing remote file or path: {e}" ) + except Exception as e: + print( f"An error occurred during SFTP: {e}" ) + + Authentificator._sftp_copy_tree( Authentificator.ssh_client, + gen_tree( server.state.simulation_xml_filename ), + server.state.simulation_remote_path ) + + _, sout, serr = Authentificator._execute_remote_command( + Authentificator.ssh_client, f'cd {server.state.simulation_remote_path} && sbatch job.slurm' ) + + #TODO encapsulate + job_lines = sout.strip() + job_id = re.search( r"Submitted batch job (\d+)", job_lines ) + + server.state.job_ids.append( { 'job_id': job_id[ 1 ] } ) + + self.start_result_streams() + + Authentificator._transfer_file_sftp( Authentificator.ssh_client, + remote_path=f'{server.state.simulation_remote_path}/log.out', + local_path=f'{server.state.simulation_dl_path}/dl.test', + direction="get" ) + + else: + raise paramiko.SSHException + + @controller.trigger( "kill_all_simulations" ) + def kill_all_simulations() -> None: + # exec scancel jobid + for jobs in server.state.job_ids: + Authentificator.kill_job( jobs[ 'job_id' ] ) + + def __del__( self ): + self.stop_result_streams() + + def set_status_watcher_period_ms( self, period_ms ): + self._job_status_watcher_period_ms = period_ms + if self._job_status_watcher: + self._job_status_watcher.set_period_ms( period_ms ) + + def _update_job_status( self ) -> None: + sim_info = self.get_last_user_simulation_info() + job_status = sim_info.get_simulation_status( self._sim_runner.get_running_user_jobs ) + sim_path = sim_info.get_simulation_dir( job_status ) + + self._server.controller.set_simulation_status( job_status ) + self._server.controller.set_simulation_time_stamp( sim_info.timestamp ) + + self._update_screenshot_display( sim_info.get_screenshot_path( sim_path ) ) + self._update_plots( sim_info.get_timeseries_path( sim_path ) ) + + # Stop results stream if job is done + if job_status == SimulationStatus.DONE: + self.stop_result_streams() + + # TODO: might be useful for history + # + # def get_last_user_simulation_info(self) -> SimulationInformation: + # last_sim_information = self.get_last_information_path() + # return SimulationInformation.from_file(last_sim_information) + + # def get_last_information_path(self) -> Optional[Path]: + # user_igg = self._sim_runner.get_user_igg() + + # user_files = list(reversed(sorted(self._sim_info_dir.glob(f"{user_igg}*.json")))) + # if not user_files: + # return None + # + # return user_files[0] + + def stop_result_streams( self ): + if self._job_status_watcher is not None: + self._job_status_watcher.stop() + + def start_result_streams( self ) -> None: + self.stop_result_streams() + + self._job_status_watcher = AsyncPeriodicRunner( self.check_jobs, period_ms=self._job_status_watcher_period_ms ) + + def check_jobs( self ): + if Authentificator.ssh_client: + try: + jid = self._server.state.job_ids + for index, job in enumerate( jid ): + job_id = job[ 'job_id' ] + _, sout, serr = Authentificator._execute_remote_command( + Authentificator.ssh_client, f'sacct -j {job_id} -o JobID,JobName,State --noheader' ) + job_line = sout.strip().split( "\n" )[ -1 ] + + jid[ index ][ 'status' ] = job_line.split()[ 2 ] + if ( jid[ index ][ 'status' ] == 'COMPLETED' ): + # tar and copy back + Authentificator._execute_remote_command( + Authentificator.ssh_client, + f'cd {self._server.state.simulation_remote_path} && tar cvfz {job_id}.tgz Outputs_{job_id}/' + ) + Authentificator._transfer_file_sftp( + Authentificator.ssh_client, + f'{self._server.state.simulation_dl_path}/{job_id}.tgz', + f'{self._server.state.simulation_remote_path}/{job_id}.tgz', + direction='get' ) + elif ( jid[ index ][ 'status' ] == 'RUNNING' ): + # getthe completed status + pattern = re.compile( r'\((\d+(?:\.\d+)?)%\s*completed\)' ) + with Authentificator.ssh_client.open_sftp().file( + str( + Path( self._server.state.simulation_remote_path ) / + Path( f"job_GEOS_{job_id}.out" ) ), "r" ) as f: + for line in f: + m = pattern.search( line ) + if m: + self._server.state.simulation_progress = str( m.group( 1 ) ) + + jid[ index ][ 'name' ] = job_line.split()[ 1 ] + print( + f"{job_line}-{job_id}\n job id:{jid[index]['job_id']}\n status:{jid[index]['status']}\n name:{jid[index]['name']} \n --- \n" + ) + self._server.state.job_ids = jid + self._server.state.dirty( "job_ids" ) + self._server.state.flush() + + except PermissionError as e: + print( f"Permission error: {e}" ) + except IOError as e: + print( f"Error accessing remote file or path: {e}" ) + except Exception as e: + print( f"An error occurred during SFTP: {e}" ) + else: + return None + + def start_simulation( self ) -> None: + state = self._server.state + script_path = None + try: + launcher_params = LauncherParams.from_server_state( self._server.state ) + launcher_params.assert_is_complete() + + script_path, sim_info = self._sim_runner.launch_simulation( launcher_params ) + self._write_sim_info( launcher_params, sim_info ) + self.start_result_streams() + state.simulation_error = "" + except Exception as e: + print( "Error occurred: ", e ) + state.simulation_error = str( e ) + finally: + state.avoid_rewriting = False + if isinstance( script_path, Path ) and script_path.is_file(): + os.remove( script_path ) + + +def path_to_string( p: Union[ str, Path ] ) -> str: + return Path( p ).as_posix() + +def write_file( folder_path: str, filename: str, file_content: str ) -> Optional[ Path ]: + try: + Path( folder_path ).mkdir( exist_ok=True ) + file_path = Path( f"{folder_path}/{filename}" ) + with open( file_path, "w" ) as f: + f.write( file_content ) + return file_path.absolute() + except Exception as e: + print( "error occurred when copying file to", folder_path, e ) + return None diff --git a/geos-trame/src/geos/trame/app/main.py b/geos-trame/src/geos/trame/app/main.py index 2ad3b293a..d2629b752 100644 --- a/geos-trame/src/geos/trame/app/main.py +++ b/geos-trame/src/geos/trame/app/main.py @@ -7,6 +7,10 @@ from trame.app import get_server # type: ignore from trame_server import Server +import sys + +sys.path.insert( 0, "/data/pau901/SIM_CS/users/jfranc/geosPythonPackages/geos-trame/src" ) + from geos.trame.app.core import GeosTrame diff --git a/geos-trame/src/geos/trame/app/ui/simulationStatusView.py b/geos-trame/src/geos/trame/app/ui/simulationStatusView.py new file mode 100644 index 000000000..8b85d441b --- /dev/null +++ b/geos-trame/src/geos/trame/app/ui/simulationStatusView.py @@ -0,0 +1,80 @@ +from enum import Enum, auto, unique + +from trame_client.widgets.html import H3, Div +from trame_server import Server +from trame_vuetify.widgets.vuetify3 import VCard + + +@unique +class SimulationStatus( Enum ): + SCHEDULED = auto() + RUNNING = auto() + COMPLETING = auto() + COPY_BACK = auto() + DONE = auto() + NOT_RUN = auto() + UNKNOWN = auto() + + +class SimulationStatusView: + """ + Simple component containing simulation status in a VCard with some coloring depending on the status. + """ + + def __init__( self, server: Server ): + + def state_name( state_str ): + return f"{type(self).__name__}_{state_str}_{id(self)}" + + self._text_state = state_name( "text" ) + self._date_state = state_name( "date" ) + self._time_state = state_name( "time" ) + self._color_state = state_name( "color" ) + self._state = server.state + + for s in [ self._text_state, self._date_state, self._time_state, self._color_state ]: + self._state.client_only( s ) + + with VCard( + classes="p-8", + style=( f"`border: 4px solid ${{{self._color_state}}}; width: 300px; margin:auto; padding: 4px;`", ), + ) as self.ui: + H3( f"{{{{{self._text_state}}}}}", style="text-align:center;" ) + Div( f"{{{{{self._date_state}}}}} {{{{{self._time_state}}}}}", style="text-align:center;" ) + + self.set_status( SimulationStatus.NOT_RUN ) + self.set_time_stamp( "" ) + + def set_status( self, status: SimulationStatus ): + self._state[ self._text_state ] = status.name + self._state[ self._color_state ] = self.status_color( status ) + self._state.flush() + + def set_time_stamp( self, time_stamp: str ): + date, time = self.split_time_stamp( time_stamp ) + self._state[ self._time_state ] = time + self._state[ self._date_state ] = date + self._state.flush() + + @staticmethod + def split_time_stamp( time_stamp: str ) -> tuple[ str, str ]: + default_time_stamp = "", "" + if not time_stamp: + return default_time_stamp + + time_stamp = time_stamp.split( "_" ) + if len( time_stamp ) < 2: + return default_time_stamp + + return time_stamp[ 0 ].replace( "-", "/" ), time_stamp[ 1 ].split( "." )[ 0 ].replace( "-", ":" ) + + @staticmethod + def status_color( status: SimulationStatus ) -> str: + return { + SimulationStatus.DONE: "#4CAF50", + SimulationStatus.RUNNING: "#3F51B5", + SimulationStatus.SCHEDULED: "#FFC107", + SimulationStatus.COMPLETING: "#C5E1A5", + SimulationStatus.COPY_BACK: "#C5E1A5", + SimulationStatus.UNKNOWN: "#E53935", + }.get( status, "#607D8B" ) diff --git a/geos-trame/src/geos/trame/app/ui/simulation_view.py b/geos-trame/src/geos/trame/app/ui/simulation_view.py new file mode 100644 index 000000000..17dbef3d8 --- /dev/null +++ b/geos-trame/src/geos/trame/app/ui/simulation_view.py @@ -0,0 +1,277 @@ +from trame.widgets import html +from trame.widgets import vuetify3 as vuetify + +from geos.trame.app.io.simulation import SimulationConstant, Authentificator +import json + + +class SuggestDecomposition: + + def __init__( self, cluster_name, n_unknowns, job_type='cpu' ): + + # return ["P4: 1x22", "P4: 2x11"] + with open( '/assets/cluster.json', 'r' ) as file: + all_cluster = json.load( file ) + self.selected_cluster = list( filter( lambda d: d.get( 'name' ) == cluster_name, + all_cluster[ "clusters" ] ) )[ 0 ] + self.n_unknowns = n_unknowns + self.job_type = job_type + + # @property + # def selected_cluster(self): + # return self.selected_cluster + + @staticmethod + def compute( n_unknowns, + memory_per_unknown_bytes, + node_memory_gb, + cores_per_node, + min_unknowns_per_rank=10000, + strong_scaling=True ): + """ + Suggests node/rank distribution for a cluster computation. + + Parameters: + - n_unknowns: total number of unknowns + - memory_per_unknown_bytes: estimated memory per unknown + - node_memory_gb: available memory per node + - cores_per_node: cores available per node + - min_unknowns_per_rank: minimum for efficiency + - strong_scaling: True if problem size is fixed + + Note: + - 10,000-100,000 unknowns per rank is often a sweet spot for many PDE solvers + - Use power-of-2 decompositions when possible (helps with communication patterns) + - For 3D problems, try to maintain cubic subdomains (minimizes surface-to-volume ratio, reducing communication) + - Don't oversubscribe: avoid using more ranks than provide parallel efficiency + + """ + + # Memory constraint + node_memory_bytes = node_memory_gb * 1e9 + max_unknowns_per_node = int( 0.8 * node_memory_bytes / memory_per_unknown_bytes ) + + # Compute minimum nodes needed + min_nodes = max( 1, ( n_unknowns + max_unknowns_per_node - 1 ) // max_unknowns_per_node ) + + # Determine ranks per node + unknowns_per_node = n_unknowns // min_nodes + unknowns_per_rank = max( min_unknowns_per_rank, unknowns_per_node // cores_per_node ) + + # Calculate total ranks needed + n_ranks = max( 1, n_unknowns // unknowns_per_rank ) + + # Distribute across nodes + ranks_per_node = min( cores_per_node, ( n_ranks + min_nodes - 1 ) // min_nodes ) + n_nodes = ( n_ranks + ranks_per_node - 1 ) // ranks_per_node + + return { + 'nodes': n_nodes, + 'ranks_per_node': ranks_per_node, + 'total_ranks': n_nodes * ranks_per_node, + 'unknowns_per_rank': n_unknowns // ( n_nodes * ranks_per_node ) + } + + def to_list( self ): + + if self.job_type == 'cpu': #make it an enum + sd = SuggestDecomposition.compute( self.n_unknowns, 64, self.selected_cluster[ 'mem_per_node' ], + self.selected_cluster[ 'cpu' ][ 'per_node' ] ) + # elif job_type == 'gpu': + # selected_cluster['n_nodes']*selected_cluster['gpu']['per_node'] + + return [ + f"{self.selected_cluster['name']}: {sd['nodes']} x {sd['ranks_per_node']}", + f"{self.selected_cluster['name']}: {sd['nodes'] * 2} x {sd['ranks_per_node'] // 2}" + ] + + +def define_simulation_view( server ) -> None: + + @server.state.change( "simulation_xml_temp" ) + def on_temp_change( simulation_xml_temp: list, **_ ): + current_list = server.state.simulation_xml_filename + + new_list = current_list + simulation_xml_temp + server.state.simulation_xml_filename = new_list + server.state.simulation_xml_temp = [] + + @server.state.change( "simulation_xml_filename" ) + def on_simfiles_change( simulation_xml_filename: list, **_ ): + import re + pattern = re.compile( r"\.xml$", re.IGNORECASE ) + has_xml = any( + pattern.search( file if isinstance( file, str ) else file.get( "name", "" ) ) + for file in simulation_xml_filename ) + server.state.is_valid_jobfiles = has_xml + + def kill_job( index_to_remove: int ) -> None: + # for now just check there is an xml + jid = list( server.state.job_ids ) + if 0 <= index_to_remove < len( jid ): + # 1. Supprimer l'élément de la copie de la liste + removed_id = jid[ index_to_remove ][ 'job_id' ] + Authentificator.kill_job( removed_id ) + del jid[ index_to_remove ] + + server.state.job_ids = jid + print( f"Job {removed_id} kill. Still running: {len(jid)}" ) + else: + print( f"Error: supress index does not exist ({index_to_remove})." ) + + def run_remove_jobfile( index_to_remove: int ) -> None: + current_files = list( server.state.simulation_xml_filename ) + if 0 <= index_to_remove < len( current_files ): + del current_files[ index_to_remove ] + + server.state.simulation_xml_filename = current_files + print( f"Fichier à l'index {index_to_remove} supprimé. Nouveaux fichiers: {len(current_files)}" ) + else: + print( f"Erreur: Index de suppression invalide ({index_to_remove})." ) + + with vuetify.VContainer(): + with vuetify.VRow(): + with vuetify.VCol( cols=4 ): + vuetify.VTextField( v_model=( + "login", + None, + ), + label="Login", + dense=True, + hide_details=True, + clearable=True, + prepend_icon="mdi-login" ) + with vuetify.VCol( cols=4 ): + vuetify.VTextField( v_model=( + "password", + None, + ), + label="Password", + type="password", + dense=True, + hide_details=True, + clearable=True, + prepend_icon="mdi-onepassword" ) + + # + server.state.access_granted = False + server.state.is_valid_jobfiles = False + server.state.simulation_xml_filename = [] + + sd = SuggestDecomposition( 'p4', 12 ) + items = sd.to_list() + vuetify.VDivider( vertical=True, thickness=5, classes="mx-4" ) + with vuetify.VCol( cols=2 ): + vuetify.VSelect( label="Cluster", items=( "items", items ) ) + + with vuetify.VRow(): + with vuetify.VCol( cols=8 ): + vuetify.VTextField( v_model=( + "key_path", + None, + ), + label="Path to ssh key", + dense=True, + hide_details=True, + clearable=True, + prepend_icon="mdi-key-chain-variant" ) + + # + vuetify.VDivider( vertical=True, thickness=5, classes="mx-4" ) + with vuetify.VCol( cols=1 ): + vuetify.VBtn( "Log in", click="trigger('run_try_login')", + disabled=( "access_granted", ) ) # type: ignore + # + vuetify.VDivider( vertical=True, thickness=5, classes="mx-4" ) + with vuetify.VCol( cols=1 ): + vuetify.VTextField( + v_model=( "slurm_comment", ), + label="Comment to slurm", + dense=True, + hide_details=True, + clearable=True, + ) # type: ignore + + vuetify.VDivider( thickness=5, classes="my-4" ) + + with vuetify.VRow(): + with vuetify.VCol( cols=4 ): + vuetify.VFileUpload( + v_model=( "simulation_xml_temp", [] ), + title="Simulation file name", + density='comfortable', + hide_details=True, + # clearable=True, + multiple=True, + filter_by_type='.xml,.vtu,.vtm,.pvtu,.pvtm,.dat,.csv,.txt', + # readonly=True, + disabled=( "!access_granted", ) ) + with vuetify.VCol( cols=4 ): + with vuetify.VList(): + with vuetify.VListItem( v_for=( "(file,i) in simulation_xml_filename" ), + key="i", + value="file", + prepend_icon="mdi-minus-circle-outline", + click=( run_remove_jobfile, "[i]" ) ): + vuetify.VListItemTitle( "{{ file.name }}" ) + vuetify.VListItemSubtitle( "{{ file.size ? (file.size / 1024).toFixed(1) + ' KB' : 'URL' }}" ) + + with vuetify.VRow(), vuetify.VCol(): + vuetify.VTextField( v_model=( "simulation_remote_path", ), + label="Path where to write files and launch code", + prepend_icon="mdi-upload", + dense=True, + hide_details=True, + clearable=True, + disabled=( "!access_granted", ) + # TODO callback validation of path + ) + + with vuetify.VRow(), vuetify.VCol(): + vuetify.VTextField( v_model=( "simulation_dl_path", ), + label="Simulation download path", + dense=True, + clearable=True, + prepend_icon="mdi-download", + disabled=( "!access_granted", ) + # TODO callback validation of path + ) + + with vuetify.VRow(): + with vuetify.VCol( cols=4 ): + vuetify.VTextField( v_model=( "simulation_job_name", "geosJob" ), + label="Job Name", + dense=True, + hide_details=True, + clearable=True, + disabled=( "!access_granted", ) ) + + vuetify.VSpacer() + with vuetify.VCol( cols=1 ): + vuetify.VBtn( "Run", + click="trigger('run_simulation')", + disabled=( "!is_valid_jobfiles", ), + classes="ml-auto" ), # type: ignore + + vuetify.VDivider( thickness=5, classes="my-4" ) + + with vuetify.VRow(): + vuetify.VSpacer() + with vuetify.VCol( cols=1 ): + vuetify.VBtn( "Kill All", click="trigger('kill_all_simulations')" ), # type: ignore + + color_expression = "status_colors[job_ids[i].status] || '#607D8B'" + with vuetify.VRow(): + with vuetify.VCol( cols=4 ): + with vuetify.VList(): + with vuetify.VListItem( v_for=( "(jobs,i) in job_ids" ), + key="i", + value="jobs", + base_color=( color_expression, ), + prepend_icon="mdi-minus-circle-outline", + click=( kill_job, "[i]" ) ): + vuetify.VListItemTitle( "{{ jobs.status }} -- {{ jobs.name }} -- {{ jobs.job_id }}" ) + vuetify.VProgressLinear( v_model=( "simulation_progress", "0" ), ) + + with vuetify.VRow( v_if="simulation_error" ): + html.Div( "An error occurred while running simulation :
{{simulation_error}}", style="color:red;" ) diff --git a/geos-trame/src/geos/trame/app/ui/timeline.py b/geos-trame/src/geos/trame/app/ui/timeline.py index d6961c0ed..6d3559f9f 100644 --- a/geos-trame/src/geos/trame/app/ui/timeline.py +++ b/geos-trame/src/geos/trame/app/ui/timeline.py @@ -3,7 +3,7 @@ # SPDX-FileContributor: Lionel Untereiner from typing import Any -from trame.widgets import gantt +# from trame.widgets import gantt from trame.widgets import vuetify3 as vuetify from trame_simput import get_simput_manager @@ -72,18 +72,18 @@ def __init__( self, source: DeckTree, **kwargs: Any ) -> None: vuetify.VAlert( "{{ item.summary }}" ) vuetify.Template( "{{ item.start_date }}", raw_attrs=[ "v-slot:opposite" ] ) - with vuetify.VContainer( "Events chart" ): - gantt.Gantt( - canEdit=True, - dateLimit=30, - startDate="2024-11-01 00:00", - endDate="2024-12-01 00:00", - # title='Gantt-pre-test', - fields=fields, - update=( self.update_from_js, "items" ), - items=( "items", items ), - classes="fill_height", - ) + # with vuetify.VContainer( "Events chart" ): + # gantt.Gantt( + # canEdit=True, + # dateLimit=30, + # startDate="2024-11-01 00:00", + # endDate="2024-12-01 00:00", + # # title='Gantt-pre-test', + # fields=fields, + # update=( self.update_from_js, "items" ), + # items=( "items", items ), + # classes="fill_height", + # ) def update_from_js( self, *items: tuple ) -> None: """Update method called from javascript.""" diff --git a/geos-trame/src/geos/trame/app/utils/async_file_watcher.py b/geos-trame/src/geos/trame/app/utils/async_file_watcher.py new file mode 100644 index 000000000..17b3df3aa --- /dev/null +++ b/geos-trame/src/geos/trame/app/utils/async_file_watcher.py @@ -0,0 +1,115 @@ +import asyncio +import os +from asyncio import CancelledError, ensure_future +from io import TextIOWrapper +from pathlib import Path +from typing import Callable, Optional, Union + +from trame_server.utils import asynchronous + + +class AsyncPeriodicRunner: + """ + While started, runs given callback at given period. + """ + + def __init__( self, callback: Callable, period_ms=100 ): + self.last_m_time = None + self.callback = callback + self.period_ms = period_ms + self.task = None + self.start() + + def __del__( self ): + self.stop() + + def set_period_ms( self, period_ms ): + self.period_ms = period_ms + + def start( self ): + self.stop() + self.task = asynchronous.create_task( self._runner() ) + + def stop( self ): + if not self.task: + return + + ensure_future( self._wait_for_cancel() ) + + async def _wait_for_cancel( self ): + """ + Cancel and await cancel error for the task. + If cancel is done outside async, it may raise warnings as cancelled exception may be triggered outside async + loop. + """ + if not self.task or self.task.done() or self.task.cancelled(): + self.task = None + return + + try: + self.task.cancel() + await self.task + except CancelledError: + self.task = None + + async def _runner( self ): + while True: + self.callback() + await asyncio.sleep( self.period_ms / 1000.0 ) + + +class AsyncFileWatcher( AsyncPeriodicRunner ): + + def __init__( self, path_to_watch: Path, on_modified_callback: Callable, check_time_out_ms=100 ): + super().__init__( self._check_modified, check_time_out_ms ) + self.path_to_watch = Path( path_to_watch ) + self.last_m_time = None + self.on_modified_callback = on_modified_callback + + def get_m_time( self ): + if not self.path_to_watch.exists(): + return None + return os.stat( self.path_to_watch ).st_mtime + + def _check_modified( self ): + if self.get_m_time() != self.last_m_time: + self.last_m_time = self.get_m_time() + self.on_modified_callback() + + +class AsyncSubprocess: + + def __init__( + self, + args, + timeout: Union[ float, None ] = None, + ) -> None: + self.args = args + self.timeout = timeout + self._writer: Optional[ TextIOWrapper ] = None + + self.stdout: Optional[ bytes ] = None + self.stderr: Optional[ bytes ] = None + self.process: Optional[ asyncio.subprocess.Process ] = None + self.exception: Optional[ RuntimeError ] = None + + async def run( self ) -> None: + cmd = " ".join( map( str, self.args ) ) + self.process = await self._init_subprocess( cmd ) + + try: + self.stdout, self.stderr = await asyncio.wait_for( self.process.communicate(), timeout=self.timeout ) + except asyncio.exceptions.TimeoutError: + self.process.kill() + self.stdout, self.stderr = await self.process.communicate() + self.exception = RuntimeError( "Process timed out" ) + finally: + if self.process.returncode != 0: + self.exception = RuntimeError( f"Process exited with code {self.process.returncode}" ) + + async def _init_subprocess( self, cmd: str ) -> asyncio.subprocess.Process: + return await asyncio.create_subprocess_shell( + cmd=cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) diff --git a/geos-trame/src/geos/trame/assets/cluster.json b/geos-trame/src/geos/trame/assets/cluster.json new file mode 100644 index 000000000..d8bfa4e3c --- /dev/null +++ b/geos-trame/src/geos/trame/assets/cluster.json @@ -0,0 +1,14 @@ +{ + "clusters": [ + { + "name": "p4", + "simulation_default_path": "/www", + "geos_version_default": "daily_rhel", + "simulation_information_default_path": "/www", + "simulation_default_filename": "geosDeck.xml", + "n_nodes": 212, + "cpu": { "types": ["AMD EPYC 4th gen"], "per_node": 192 }, + "mem_per_node": 768 + } + ] +}