diff --git a/shard.lock b/shard.lock index b441099..001d832 100644 --- a/shard.lock +++ b/shard.lock @@ -151,7 +151,7 @@ shards: openssl_ext: git: https://github.com/spider-gazelle/openssl_ext.git - version: 2.8.2 + version: 2.8.3 pars: # Overridden git: https://github.com/spider-gazelle/pars.git @@ -187,7 +187,7 @@ shards: placeos-models: git: https://github.com/placeos/models.git - version: 9.81.1 + version: 9.82.0 placeos-resource: git: https://github.com/place-labs/resource.git diff --git a/spec/api/command_spec.cr b/spec/api/command_spec.cr index dc85864..db6b1a2 100644 --- a/spec/api/command_spec.cr +++ b/spec/api/command_spec.cr @@ -44,6 +44,50 @@ module PlaceOS::Core::Api ensure resource_manager.try &.stop end + + it "executes a command on a lazy module (launch_on_execute)" do + _, _, mod, resource_manager = create_resources + mod_id = mod.id.as(String) + + # Set module as lazy-load + mod.launch_on_execute = true + mod.running = true + mod.save! + + module_manager = module_manager_mock + # Register as lazy (don't spawn driver) + module_manager.load_module(mod) + Command.mock_module_manager = module_manager + + # Verify driver is not spawned + module_manager.local_processes.module_loaded?(mod_id).should be_false + module_manager.lazy_module?(mod_id).should be_true + + # Execute should work (will spawn driver on demand) + route = File.join(namespace, mod_id, "execute") + response = client.post(route, headers: json_headers, body: EXEC_PAYLOAD) + response.status_code.should eq 200 + + result = response.body rescue nil + result.should eq %("you can delete this file") + ensure + resource_manager.try &.stop + end + + it "returns 404 for non-lazy module that is not loaded" do + _, _, mod = setup(role: PlaceOS::Model::Driver::Role::Service) + mod_id = mod.id.as(String) + + # Don't load the module, but it's not lazy either + module_manager = module_manager_mock + Command.mock_module_manager = module_manager + + route = File.join(namespace, mod_id, "execute") + response = client.post(route, headers: json_headers, body: EXEC_PAYLOAD) + response.status_code.should eq 404 + ensure + module_manager.try &.stop + end end describe "command/:module_id/debugger" do diff --git a/spec/module_manager_spec.cr b/spec/module_manager_spec.cr index db4f377..d88fc75 100644 --- a/spec/module_manager_spec.cr +++ b/spec/module_manager_spec.cr @@ -52,6 +52,198 @@ module PlaceOS::Core end end + describe "lazy modules (launch_on_execute)" do + it "registers lazy module without spawning driver" do + _, driver, mod = setup(role: PlaceOS::Model::Driver::Role::Service) + + module_manager = module_manager_mock + builder = DriverResource.new(startup: true, module_manager: module_manager) + resource_manager = ResourceManager.new(driver_builder: builder) + resource_manager.start { } + + mod_id = mod.id.as(String) + + mod.reload! + mod.driver = mod.driver.not_nil!.reload! + + # Set module as lazy-load + mod.launch_on_execute = true + mod.running = true + mod.save! + + # Load the lazy module + module_manager.load_module(mod) + + # Driver should NOT be spawned + module_manager.local_processes.run_count.modules.should eq 0 + module_manager.local_processes.module_loaded?(mod_id).should be_false + + # But module should be registered as lazy + module_manager.lazy_module?(mod_id).should be_true + + # Metadata should be populated in Redis + metadata = Driver::RedisStorage.with_redis { |r| r.get("interface/#{mod_id}") } + metadata.should_not be_nil + ensure + module_manager.try &.stop + resource_manager.try &.stop + end + + it "spawns driver on execute and unloads after idle" do + # Use a short unload delay for testing + original_delay = ModuleManager.lazy_unload_delay + ModuleManager.lazy_unload_delay = 500.milliseconds + + _, driver, mod = setup(role: PlaceOS::Model::Driver::Role::Service) + + module_manager = module_manager_mock + builder = DriverResource.new(startup: true, module_manager: module_manager) + resource_manager = ResourceManager.new(driver_builder: builder) + resource_manager.start { } + + mod_id = mod.id.as(String) + + mod.reload! + mod.driver = mod.driver.not_nil!.reload! + + # Set module as lazy-load + mod.launch_on_execute = true + mod.running = true + mod.save! + + # Reload to get fresh associations + mod = Model::Module.find!(mod_id) + mod.driver = driver.reload! + + # Register the lazy module + module_manager.load_module(mod) + module_manager.lazy_module?(mod_id).should be_true + module_manager.local_processes.module_loaded?(mod_id).should be_false + + # Execute should spawn driver and load module + result, code = module_manager.local_processes.execute( + module_id: mod_id, + payload: ModuleManager.execute_payload(:used_for_place_testing), + user_id: nil + ) + + result.should eq %("you can delete this file") + code.should eq 200 + + # Module should now be loaded + module_manager.local_processes.module_loaded?(mod_id).should be_true + + # Wait for idle unload + sleep 1.second + + # Module should be unloaded and back to lazy state + module_manager.local_processes.module_loaded?(mod_id).should be_false + module_manager.lazy_module?(mod_id).should be_true + + # Metadata should still be in Redis + metadata = Driver::RedisStorage.with_redis { |r| r.get("interface/#{mod_id}") } + metadata.should_not be_nil + ensure + ModuleManager.lazy_unload_delay = original_delay.not_nil! + module_manager.try &.stop + resource_manager.try &.stop + end + + it "does not unload while executions are active" do + original_delay = ModuleManager.lazy_unload_delay + ModuleManager.lazy_unload_delay = 200.milliseconds + + _, driver, mod = setup(role: PlaceOS::Model::Driver::Role::Service) + + module_manager = module_manager_mock + builder = DriverResource.new(startup: true, module_manager: module_manager) + resource_manager = ResourceManager.new(driver_builder: builder) + resource_manager.start { } + + mod_id = mod.id.as(String) + + mod.reload! + mod.driver = mod.driver.not_nil!.reload! + + mod.launch_on_execute = true + mod.running = true + mod.save! + + module_manager.load_module(mod) + + # Start multiple concurrent executions + results = Channel(Tuple(String, Int32)).new(3) + + 3.times do + spawn do + r, c = module_manager.local_processes.execute( + module_id: mod_id, + payload: ModuleManager.execute_payload(:used_for_place_testing), + user_id: nil + ) + results.send({r, c}) + end + end + + # Collect results + 3.times do + result, code = results.receive + result.should eq %("you can delete this file") + code.should eq 200 + end + + # Module should still be loaded (unload scheduled but not executed yet) + # Give a tiny bit of time for the last execution to complete + sleep 50.milliseconds + module_manager.local_processes.module_loaded?(mod_id).should be_true + + # Wait for unload + sleep 500.milliseconds + module_manager.local_processes.module_loaded?(mod_id).should be_false + ensure + ModuleManager.lazy_unload_delay = original_delay.not_nil! + module_manager.try &.stop + resource_manager.try &.stop + end + + it "clears metadata when lazy module is stopped" do + _, driver, mod = setup(role: PlaceOS::Model::Driver::Role::Service) + + module_manager = module_manager_mock + builder = DriverResource.new(startup: true, module_manager: module_manager) + resource_manager = ResourceManager.new(driver_builder: builder) + resource_manager.start { } + + mod_id = mod.id.as(String) + + mod.reload! + mod.driver = mod.driver.not_nil!.reload! + + mod.launch_on_execute = true + mod.running = true + mod.save! + + module_manager.load_module(mod) + + # Metadata should exist + metadata = Driver::RedisStorage.with_redis { |r| r.get("interface/#{mod_id}") } + metadata.should_not be_nil + + # Stop the module + module_manager.stop_module(mod) + + # Metadata should be cleared + metadata = Driver::RedisStorage.with_redis { |r| r.get("interface/#{mod_id}") } + metadata.should be_nil + + # Module should not be in lazy tracking + module_manager.lazy_module?(mod_id).should be_false + ensure + module_manager.try &.stop + resource_manager.try &.stop + end + end + describe "startup" do it "registers to redis" do # Clear relevant tables diff --git a/src/api/command.cr b/src/api/command.cr index b7e0a98..179fbd4 100644 --- a/src/api/command.cr +++ b/src/api/command.cr @@ -27,9 +27,17 @@ module PlaceOS::Core::Api @[AC::Param::Info(description: "the user context for the execution", example: "user-1234")] user_id : String? = nil, ) : Nil + # Check if module is loaded, or is a lazy module that can be loaded on demand unless module_manager.process_manager(module_id, &.module_loaded?(module_id)) - Log.info { {module_id: module_id, message: "module not loaded"} } - raise Error::NotFound.new("module #{module_id} not loaded") + # Not loaded - check if it's a lazy module + unless module_manager.lazy_module?(module_id) + # Not a registered lazy module - check DB for launch_on_execute flag + mod = Model::Module.find(module_id) + unless mod && mod.launch_on_execute + Log.info { {module_id: module_id, message: "module not loaded"} } + raise Error::NotFound.new("module #{module_id} not loaded") + end + end end # NOTE:: we don't use the AC body helper for performance reasons. diff --git a/src/placeos-core/module_manager.cr b/src/placeos-core/module_manager.cr index 06951e8..d3f21d2 100644 --- a/src/placeos-core/module_manager.cr +++ b/src/placeos-core/module_manager.cr @@ -22,11 +22,19 @@ module PlaceOS::Core class_property uri : URI = URI.new("http", CORE_HOST, CORE_PORT) + # Delay before unloading an idle lazy module + class_property lazy_unload_delay : Time::Span = ENV["LAZY_UNLOAD_DELAY"]?.try(&.to_i.seconds) || 30.seconds + getter clustering : Clustering getter discovery : Clustering::Discovery protected getter store : DriverStore + # Track registered lazy modules (module_id => true) + # These are modules with launch_on_execute that are "running" but driver not spawned + getter lazy_modules : Hash(String, Bool) = {} of String => Bool + private getter lazy_modules_lock : Mutex = Mutex.new + def stop clustering.unregister stop_process_check @@ -142,6 +150,12 @@ module PlaceOS::Core allocated_uri = ModuleManager.core_uri(mod, rendezvous_hash) if allocated_uri == @clustering.uri + # Handle lazy-load modules: register without spawning driver + if mod.launch_on_execute && mod.running + register_lazy_module(mod) + return + end + driver = mod.driver! driver_id = driver.id.as(String) # repository_folder = driver.repository.not_nil!.folder_name @@ -178,9 +192,13 @@ module PlaceOS::Core # Stop and unload the module from node # def unload_module(mod : Model::Module) + module_id = mod.id.as(String) + + # Remove from lazy modules tracking if present + unregister_lazy_module(module_id) + stop_module(mod) - module_id = mod.id.as(String) process_manager(mod, &.unload(module_id)) Log.info { {message: "unloaded module", module_id: mod.id, name: mod.name, custom_name: mod.custom_name} } end @@ -188,6 +206,12 @@ module PlaceOS::Core def start_module(mod : Model::Module) module_id = mod.id.as(String) + # For lazy modules, just ensure metadata is in Redis (driver not spawned yet) + if mod.launch_on_execute + register_lazy_module(mod) + return + end + process_manager(mod) { |manager| manager.start(module_id, ModuleManager.start_payload(mod)) } Log.info { {message: "started module", module_id: mod.id, name: mod.name, custom_name: mod.custom_name} } @@ -211,6 +235,14 @@ module PlaceOS::Core def stop_module(mod : Model::Module) module_id = mod.id.as(String) + # For lazy modules, just remove from tracking and clear metadata + if mod.launch_on_execute + unregister_lazy_module(module_id) + clear_module_metadata(module_id) + Log.info { {message: "stopped lazy module", module_id: mod.id, name: mod.name, custom_name: mod.custom_name} } + return + end + process_manager(mod, &.stop(module_id)) Log.info { {message: "stopped module", module_id: mod.id, name: mod.name, custom_name: mod.custom_name} } end @@ -412,7 +444,7 @@ module PlaceOS::Core # Merge module settings merged_settings = mod.merge_settings rescue e - merged_settings = "{}".to_json + merged_settings = "{}" Log.error(exception: e) { {message: "Failed to merge module settings", module_id: mod.id, name: mod.name, custom_name: mod.custom_name} } end @@ -433,5 +465,73 @@ module PlaceOS::Core def self.needs_restart?(mod : Model::Module) : Bool mod.ip_changed? || mod.port_changed? || mod.tls_changed? || mod.udp_changed? || mod.makebreak_changed? || mod.uri_changed? end + + # Lazy Module Support + ########################################################################### + + # Register a lazy module: populate metadata in Redis without spawning driver + def register_lazy_module(mod : Model::Module) + module_id = mod.id.as(String) + + lazy_modules_lock.synchronize do + return if lazy_modules[module_id]? + lazy_modules[module_id] = true + end + + populate_lazy_module_metadata(mod) + Log.info { {message: "registered lazy module (driver not spawned)", module_id: module_id, name: mod.name, custom_name: mod.custom_name} } + end + + # Unregister a lazy module from tracking + def unregister_lazy_module(module_id : String) + lazy_modules_lock.synchronize do + lazy_modules.delete(module_id) + end + end + + # Check if a module is registered as lazy (running but driver not spawned) + def lazy_module?(module_id : String) : Bool + lazy_modules_lock.synchronize do + lazy_modules[module_id]? || false + end + end + + # Populate module metadata in Redis from build service (without spawning driver) + def populate_lazy_module_metadata(mod : Model::Module) + module_id = mod.id.as(String) + driver = mod.driver! + repository = driver.repository! + + # Fetch metadata from build service + result = store.metadata(driver.file_name, driver.commit, repository.branch, repository.uri) + unless result.success + Log.warn { {message: "failed to fetch lazy module metadata from build service", module_id: module_id, error: result.output} } + return + end + + begin + metadata = JSON.parse(result.output) + interface_data = metadata.as_h.dup + + # Add module-specific notes (same as driver does) + interface_data["notes"] = JSON::Any.new(mod.notes) + + # Store in Redis at same key driver would use + Driver::RedisStorage.with_redis do |redis| + redis.set("interface/#{module_id}", interface_data.to_json) + end + + Log.debug { {message: "populated lazy module metadata", module_id: module_id} } + rescue e + Log.error(exception: e) { {message: "failed to parse/store lazy module metadata", module_id: module_id} } + end + end + + # Clear module metadata from Redis + def clear_module_metadata(module_id : String) + Driver::RedisStorage.with_redis do |redis| + redis.del("interface/#{module_id}") + end + end end end diff --git a/src/placeos-core/process_manager/local.cr b/src/placeos-core/process_manager/local.cr index 722bb4e..9587ed0 100644 --- a/src/placeos-core/process_manager/local.cr +++ b/src/placeos-core/process_manager/local.cr @@ -12,6 +12,13 @@ module PlaceOS::Core private getter discovery : Clustering::Discovery private getter store : DriverStore + # Track active execute requests for lazy modules (module_id => count) + private getter lazy_execute_counts : Hash(String, Atomic(Int32)) = {} of String => Atomic(Int32) + private getter lazy_execute_lock : Mutex = Mutex.new + + # Track scheduled unload fibers to cancel them if new executions come in + private getter lazy_unload_scheduled : Hash(String, Bool) = {} of String => Bool + def initialize(@discovery : Clustering::Discovery) @store = DriverStore.new end @@ -54,6 +61,12 @@ module PlaceOS::Core end def execute(module_id : String, payload : String | IO, user_id : String?) + # Check if this is a lazy module that needs to be loaded + mod = Model::Module.find?(module_id) + if mod && mod.launch_on_execute + return execute_lazy(mod, payload, user_id) + end + super rescue exception : ModuleError if exception.message =~ /module #{module_id} not running on this host/ @@ -63,6 +76,137 @@ module PlaceOS::Core end end + # Execute on a lazy-load module: spawn driver if needed, execute, schedule unload + private def execute_lazy(mod : Model::Module, payload : String | IO, user_id : String?) + module_id = mod.id.as(String) + + # Track this execution + increment_lazy_execute_count(module_id) + + begin + # Ensure driver is spawned and module is loaded + ensure_lazy_module_loaded(mod) + + # Execute the request + manager = protocol_manager_by_module?(module_id) + raise ModuleError.new("No protocol manager for lazy module #{module_id}") if manager.nil? + + request_body = payload.is_a?(IO) ? payload.gets_to_end : payload + manager.execute(module_id, request_body, user_id: user_id) + ensure + # Decrement and potentially schedule unload + remaining = decrement_lazy_execute_count(module_id) + schedule_lazy_unload(mod) if remaining == 0 + end + end + + # Spawn driver and load module for lazy execution + private def ensure_lazy_module_loaded(mod : Model::Module) + module_id = mod.id.as(String) + + # Already loaded? + if protocol_manager_by_module?(module_id) + Log.debug { {message: "lazy module already loaded", module_id: module_id} } + return + end + + driver = mod.driver! + repository = driver.repository! + + driver_path = store.built?(driver.file_name, driver.commit, repository.branch, repository.uri) + raise ModuleError.new("Driver not compiled for lazy module #{module_id}") if driver_path.nil? + + ::Log.with_context(module_id: module_id, driver_key: driver_path) do + # Spawn driver and register module + load(module_id, driver_path.to_s) + + # Start the module instance + manager = protocol_manager_by_module?(module_id) + raise ModuleError.new("Failed to load lazy module #{module_id}") if manager.nil? + + manager.start(module_id, ModuleManager.start_payload(mod)) + + Log.info { {message: "spawned driver for lazy module execution", module_id: module_id, name: mod.name} } + end + end + + # Schedule unload of lazy module after idle timeout + private def schedule_lazy_unload(mod : Model::Module) + module_id = mod.id.as(String) + + lazy_execute_lock.synchronize do + # Mark that unload is scheduled + lazy_unload_scheduled[module_id] = true + end + + spawn do + sleep ModuleManager.lazy_unload_delay + + # Check if still no active executions and unload still scheduled + should_unload = lazy_execute_lock.synchronize do + scheduled = lazy_unload_scheduled.delete(module_id) + count = lazy_execute_counts[module_id]?.try(&.get) || 0 + scheduled && count == 0 + end + + if should_unload + unload_lazy_module(mod) + end + end + end + + # Unload a lazy module after idle timeout + private def unload_lazy_module(mod : Model::Module) + module_id = mod.id.as(String) + + # Double-check no active executions + count = lazy_execute_lock.synchronize do + lazy_execute_counts[module_id]?.try(&.get) || 0 + end + + if count > 0 + Log.debug { {message: "skipping lazy unload, active executions", module_id: module_id, count: count} } + return + end + + # Stop and unload the module + stop(module_id) + unload(module_id) + + Log.info { {message: "unloaded lazy module after idle timeout", module_id: module_id, name: mod.name} } + end + + # Increment active execution count for a lazy module + private def increment_lazy_execute_count(module_id : String) + lazy_execute_lock.synchronize do + lazy_execute_counts[module_id] ||= Atomic(Int32).new(0) + lazy_execute_counts[module_id].add(1) + + # Cancel any scheduled unload + lazy_unload_scheduled.delete(module_id) + end + end + + # Decrement active execution count, returns remaining count + private def decrement_lazy_execute_count(module_id : String) : Int32 + lazy_execute_lock.synchronize do + counter = lazy_execute_counts[module_id]? + return 0 unless counter + new_count = counter.sub(1) + # Clean up if zero + lazy_execute_counts.delete(module_id) if new_count == 0 + new_count + end + end + + # Check if a lazy module has active executions + def lazy_module_active?(module_id : String) : Bool + lazy_execute_lock.synchronize do + counter = lazy_execute_counts[module_id]? + counter ? counter.get > 0 : false + end + end + private def driver_manager(driver_key : String) path = driver_path(driver_key).to_s Log.info { {driver_path: path, message: "creating new driver protocol manager"} }