From 7a6e91585b4157edcf9af8cc5a0bf17c012bd92c Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 29 Jul 2025 02:01:18 +0000 Subject: [PATCH 01/21] sim --- pipelinerl/actor.py | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index b0908d08..e4dbb955 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -410,7 +410,24 @@ def run(self, dataset: list[tuple[str, dict]]): logger.info( f"Max lag is {self.cfg.finetune.max_lag} samples, that makes {lag_groups} additional starting chunks" ) - can_submit_before_update = lag_groups + groups_per_update + #TODO: rm conv RL code + times_per_data_chunk = [] + time_on_desired_number_of_llms = 0 + desired_number_of_llms = 128 + current_number_of_llms = len(self.llms) # assumes 1 llm per gpu + assert groups_per_update * current_number_of_llms % desired_number_of_llms == 0, ( + f"groups_per_update * current_number_of_llms {groups_per_update * current_number_of_llms} " + f"should be divisible by desired_number_of_llms {desired_number_of_llms}" + ) + groups_per_update_adjusted = groups_per_update * current_number_of_llms / desired_number_of_llms + can_submit_before_update_non_adjusted = lag_groups + groups_per_update + can_submit_before_update = lag_groups + groups_per_update_adjusted + logger.info( + f"We only have {current_number_of_llms} llms instead of {desired_number_of_llms}," + f" thus instead of {groups_per_update} groups per update," + f" we can submit {groups_per_update_adjusted} groups per update," + ) + loop_start_time = time.time() else: groups_per_update = None can_submit_before_update = math.inf @@ -426,11 +443,25 @@ def run(self, dataset: list[tuple[str, dict]]): if self.trainer_state.propagated_weight_version > last_trainer_version: if max_lag is not None: - assert groups_per_update is not None - can_submit_before_update += groups_per_update + assert groups_per_update_adjusted is not None + can_submit_before_update += groups_per_update_adjusted + can_submit_before_update_non_adjusted += groups_per_update # the weights have been updated, publish the stats of the previous trainer version trainer_version_to_publish = last_trainer_version last_trainer_version = self.trainer_state.propagated_weight_version + time_on_desired_number_of_llms = max(times_per_data_chunk) + times_per_data_chunk = [] + loop_start_time = time.time() + elif published_samples == can_submit_before_update and published_samples < can_submit_before_update_non_adjusted: + logger.info( + f"Published {published_samples} samples which is less than {can_submit_before_update_non_adjusted}," + f" will now increment the number of samples that can be submitted before update to {can_submit_before_update+groups_per_update_adjusted}" + ) + end_time = time.time() + times_per_data_chunk.append(end_time - loop_start_time) + loop_start_time = end_time + if max_lag is not None: + can_submit_before_update += groups_per_update_adjusted # First, submit all problems you can until the problem queue is full if not self.is_scheduling_paused: @@ -499,6 +530,7 @@ def run(self, dataset: list[tuple[str, dict]]): "trainer_model_version": trainer_version_to_publish, "time_since_start": time.time() - loop_start_time, "groups_in_progress": in_progress, + "time_on_desired_number_of_llms": time_on_desired_number_of_llms, } trainer_version_to_publish = None else: From 9c33dcccf906e38763ac2f267042b21580960010 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 29 Jul 2025 02:16:47 +0000 Subject: [PATCH 02/21] add logging --- pipelinerl/actor.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index e4dbb955..8ddf033f 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -411,15 +411,15 @@ def run(self, dataset: list[tuple[str, dict]]): f"Max lag is {self.cfg.finetune.max_lag} samples, that makes {lag_groups} additional starting chunks" ) #TODO: rm conv RL code - times_per_data_chunk = [] + times_per_current_llms = [] time_on_desired_number_of_llms = 0 desired_number_of_llms = 128 current_number_of_llms = len(self.llms) # assumes 1 llm per gpu - assert groups_per_update * current_number_of_llms % desired_number_of_llms == 0, ( + assert (groups_per_update * current_number_of_llms) % desired_number_of_llms == 0, ( f"groups_per_update * current_number_of_llms {groups_per_update * current_number_of_llms} " f"should be divisible by desired_number_of_llms {desired_number_of_llms}" ) - groups_per_update_adjusted = groups_per_update * current_number_of_llms / desired_number_of_llms + groups_per_update_adjusted = groups_per_update * current_number_of_llms // desired_number_of_llms can_submit_before_update_non_adjusted = lag_groups + groups_per_update can_submit_before_update = lag_groups + groups_per_update_adjusted logger.info( @@ -449,8 +449,12 @@ def run(self, dataset: list[tuple[str, dict]]): # the weights have been updated, publish the stats of the previous trainer version trainer_version_to_publish = last_trainer_version last_trainer_version = self.trainer_state.propagated_weight_version - time_on_desired_number_of_llms = max(times_per_data_chunk) - times_per_data_chunk = [] + time_on_desired_number_of_llms = max(times_per_current_llms) + logger.info( + f"Time on current number of llms {sum(times_per_current_llms)}," + f" time on desired number of llms: {time_on_desired_number_of_llms:.2f} seconds" + ) + times_per_current_llms = [] loop_start_time = time.time() elif published_samples == can_submit_before_update and published_samples < can_submit_before_update_non_adjusted: logger.info( @@ -458,7 +462,7 @@ def run(self, dataset: list[tuple[str, dict]]): f" will now increment the number of samples that can be submitted before update to {can_submit_before_update+groups_per_update_adjusted}" ) end_time = time.time() - times_per_data_chunk.append(end_time - loop_start_time) + times_per_current_llms.append(end_time - loop_start_time) loop_start_time = end_time if max_lag is not None: can_submit_before_update += groups_per_update_adjusted From 6c9364925addd70c677d8c2cb7b664118d95cabd Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 29 Jul 2025 02:21:35 +0000 Subject: [PATCH 03/21] upd --- pipelinerl/actor.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index 8ddf033f..ab09ef86 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -411,8 +411,8 @@ def run(self, dataset: list[tuple[str, dict]]): f"Max lag is {self.cfg.finetune.max_lag} samples, that makes {lag_groups} additional starting chunks" ) #TODO: rm conv RL code - times_per_current_llms = [] - time_on_desired_number_of_llms = 0 + times_for_current_num_llms = [] + time_for_desired_num_of_llms = 0 desired_number_of_llms = 128 current_number_of_llms = len(self.llms) # assumes 1 llm per gpu assert (groups_per_update * current_number_of_llms) % desired_number_of_llms == 0, ( @@ -449,12 +449,12 @@ def run(self, dataset: list[tuple[str, dict]]): # the weights have been updated, publish the stats of the previous trainer version trainer_version_to_publish = last_trainer_version last_trainer_version = self.trainer_state.propagated_weight_version - time_on_desired_number_of_llms = max(times_per_current_llms) + time_for_desired_num_of_llms = max(times_for_current_num_llms) logger.info( - f"Time on current number of llms {sum(times_per_current_llms)}," - f" time on desired number of llms: {time_on_desired_number_of_llms:.2f} seconds" + f"Time on current number of llms {sum(times_for_current_num_llms)}," + f" time on desired number of llms: {time_for_desired_num_of_llms:.2f} seconds" ) - times_per_current_llms = [] + times_for_current_num_llms = [] loop_start_time = time.time() elif published_samples == can_submit_before_update and published_samples < can_submit_before_update_non_adjusted: logger.info( @@ -462,7 +462,7 @@ def run(self, dataset: list[tuple[str, dict]]): f" will now increment the number of samples that can be submitted before update to {can_submit_before_update+groups_per_update_adjusted}" ) end_time = time.time() - times_per_current_llms.append(end_time - loop_start_time) + times_for_current_num_llms.append(end_time - loop_start_time) loop_start_time = end_time if max_lag is not None: can_submit_before_update += groups_per_update_adjusted @@ -534,7 +534,7 @@ def run(self, dataset: list[tuple[str, dict]]): "trainer_model_version": trainer_version_to_publish, "time_since_start": time.time() - loop_start_time, "groups_in_progress": in_progress, - "time_on_desired_number_of_llms": time_on_desired_number_of_llms, + "time_for_desired_num_of_llms": time_for_desired_num_of_llms, } trainer_version_to_publish = None else: From 4096941de8906cbfc6a81d8b9eb74c6f5d11c815 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 29 Jul 2025 10:15:02 +0000 Subject: [PATCH 04/21] cumulative --- pipelinerl/actor.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index ab09ef86..b25686af 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -374,6 +374,7 @@ def run(self, dataset: list[tuple[str, dict]]): published_samples = 0 submitted_groups = 0 finished_groups = 0 + cumulative_time_to_deduct = 0 expected_rollouts = -1 if self.is_training else len(dataset) if expected_rollouts > 0: logger.info(f"Will stop after {expected_rollouts} rollouts") @@ -449,10 +450,14 @@ def run(self, dataset: list[tuple[str, dict]]): # the weights have been updated, publish the stats of the previous trainer version trainer_version_to_publish = last_trainer_version last_trainer_version = self.trainer_state.propagated_weight_version - time_for_desired_num_of_llms = max(times_for_current_num_llms) + time_for_desired_num_of_llms = max(times_for_current_num_llms) + time_for_current_num_of_llms = sum(times_for_current_num_llms) + time_to_deduct = time_for_current_num_of_llms - time_for_desired_num_of_llms + cumulative_time_to_deduct += time_to_deduct logger.info( - f"Time on current number of llms {sum(times_for_current_num_llms)}," + f"Time on current number of llms {time_for_current_num_of_llms}," f" time on desired number of llms: {time_for_desired_num_of_llms:.2f} seconds" + f" time to deduct {time_to_deduct} seconds. Total time to deduct {cumulative_time_to_deduct:.2f} seconds" ) times_for_current_num_llms = [] loop_start_time = time.time() @@ -534,7 +539,7 @@ def run(self, dataset: list[tuple[str, dict]]): "trainer_model_version": trainer_version_to_publish, "time_since_start": time.time() - loop_start_time, "groups_in_progress": in_progress, - "time_for_desired_num_of_llms": time_for_desired_num_of_llms, + "cumulative_time_to_deduct": cumulative_time_to_deduct, } trainer_version_to_publish = None else: From 6e5bf8d5c71a83cc4e71f1c05470018ef859841e Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 29 Jul 2025 10:33:10 +0000 Subject: [PATCH 05/21] finetune tiem to deduct --- pipelinerl/finetune_loop.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index 0948e056..f4994b08 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -301,6 +301,9 @@ def run_finetuning_loop( args = cfg.finetune if "finetune" in cfg else cfg validate_packing_config(args) + cumulative_time_to_deduct = 0.0 + desired_num_of_processes = 128 + if not args.gradient_accumulation_passes % num_processes == 0: raise ValueError("gradient_accumulation_passes must be divisible by num_processes") gradient_accumulation_passes_per_gpu = args.seq_parallel * (args.gradient_accumulation_passes // num_processes) @@ -581,6 +584,7 @@ def batch_generator_fn(): logger.info("next batch should be a sentinel batch") time_waiting_for_data += time.time() - before_getting_next_batch + after_getting_next_batch = time.time() # check if too old, don't drop but count if ( args.max_lag is not None @@ -683,8 +687,15 @@ def toggle_sync(sync: bool): writer.write(trigger_message) if not do_optimizer_step: + forward_pass_took = time.time() - after_getting_next_batch + forward_pass_took_for_desired_num_of_processes = ( + forward_pass_took * (desired_num_of_processes / get_accelerator().state.num_processes) + ) + time_to_deduct = forward_pass_took - forward_pass_took_for_desired_num_of_processes + cumulative_time_to_deduct += time_to_deduct continue + target_samples_per_lead += samples_per_lead_per_step target_samples += samples_per_step @@ -710,6 +721,12 @@ def toggle_sync(sync: bool): optimizer_step_and_zero_grad() lr_scheduler.step() + forward_pass_took = time.time() - after_getting_next_batch + forward_pass_took_for_desired_num_of_processes = ( + forward_pass_took * (desired_num_of_processes / get_accelerator().state.num_processes) + ) + time_to_deduct = forward_pass_took - forward_pass_took_for_desired_num_of_processes + cumulative_time_to_deduct += time_to_deduct metrics_dict = {} time_to_stop = training_metrics.completed_steps >= final_train_steps time_to_log = training_metrics.completed_steps % args.log_each_n_steps == 0 @@ -739,6 +756,7 @@ def toggle_sync(sync: bool): "stats/queue/batches": batch_queue.qsize(), "stats/time_waiting_for_data": training_metrics.time_waiting_for_data, "stats/lag": training_metrics.last_broadcasted_version - lag_stats["min_version"], + "stats/cumulative_time_to_deduct": cumulative_time_to_deduct, "throughput/tokens_perGPU_per_sec": this_worker_tokens / sum(passes_took) if passes_took else 0, "throughput/tokens_per_step": this_worker_tokens * get_accelerator().state.num_processes, "throughput/micro_batches_per_step": len(tokens_processed), From d2fb2881f08378b492ededfda29c23595a806eb8 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 29 Jul 2025 10:34:30 +0000 Subject: [PATCH 06/21] typo --- pipelinerl/finetune_loop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index f4994b08..87578191 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -689,7 +689,7 @@ def toggle_sync(sync: bool): if not do_optimizer_step: forward_pass_took = time.time() - after_getting_next_batch forward_pass_took_for_desired_num_of_processes = ( - forward_pass_took * (desired_num_of_processes / get_accelerator().state.num_processes) + forward_pass_took * (get_accelerator().state.num_processes / desired_num_of_processes) ) time_to_deduct = forward_pass_took - forward_pass_took_for_desired_num_of_processes cumulative_time_to_deduct += time_to_deduct @@ -723,7 +723,7 @@ def toggle_sync(sync: bool): forward_pass_took = time.time() - after_getting_next_batch forward_pass_took_for_desired_num_of_processes = ( - forward_pass_took * (desired_num_of_processes / get_accelerator().state.num_processes) + forward_pass_took * (get_accelerator().state.num_processes / desired_num_of_processes) ) time_to_deduct = forward_pass_took - forward_pass_took_for_desired_num_of_processes cumulative_time_to_deduct += time_to_deduct From 6f2688e969dacc888cfcdd4b56006c82df3a883e Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 30 Jul 2025 02:44:20 +0000 Subject: [PATCH 07/21] typo --- pipelinerl/finetune_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index 87578191..7a2a64f7 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -568,6 +568,7 @@ def batch_generator_fn(): rl_config = RLConfig(**args.rl) # samples_per_step will be used to normalize the loss rl_config.batch_size = samples_per_step + desired_num_of_processes = 128 while training_metrics.completed_steps < final_train_steps: # We include time waiting for data in the step time if first_pass: From 1e2dd92d1b2ff0caa046962b630d470bf1215b98 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 30 Jul 2025 03:07:23 +0000 Subject: [PATCH 08/21] upd --- pipelinerl/finetune_loop.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index 7a2a64f7..f7355198 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -301,9 +301,6 @@ def run_finetuning_loop( args = cfg.finetune if "finetune" in cfg else cfg validate_packing_config(args) - cumulative_time_to_deduct = 0.0 - desired_num_of_processes = 128 - if not args.gradient_accumulation_passes % num_processes == 0: raise ValueError("gradient_accumulation_passes must be divisible by num_processes") gradient_accumulation_passes_per_gpu = args.seq_parallel * (args.gradient_accumulation_passes // num_processes) @@ -569,6 +566,7 @@ def batch_generator_fn(): # samples_per_step will be used to normalize the loss rl_config.batch_size = samples_per_step desired_num_of_processes = 128 + cumulative_time_to_deduct = 0.0 while training_metrics.completed_steps < final_train_steps: # We include time waiting for data in the step time if first_pass: From 2c17ab64e4e34272bf7adbeab205dfc96b462c9c Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Thu, 31 Jul 2025 16:50:17 +0000 Subject: [PATCH 09/21] more logging --- pipelinerl/actor.py | 6 ++++++ pipelinerl/finetune_loop.py | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index b25686af..74bf25cc 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -374,6 +374,8 @@ def run(self, dataset: list[tuple[str, dict]]): published_samples = 0 submitted_groups = 0 finished_groups = 0 + cumulative_time_for_current_num_llms = 0 + cumulative_time_for_desired_num_llms = 0 cumulative_time_to_deduct = 0 expected_rollouts = -1 if self.is_training else len(dataset) if expected_rollouts > 0: @@ -454,6 +456,8 @@ def run(self, dataset: list[tuple[str, dict]]): time_for_current_num_of_llms = sum(times_for_current_num_llms) time_to_deduct = time_for_current_num_of_llms - time_for_desired_num_of_llms cumulative_time_to_deduct += time_to_deduct + cumulative_time_for_current_num_llms += time_for_current_num_of_llms + cumulative_time_for_desired_num_llms += time_for_desired_num_of_llms logger.info( f"Time on current number of llms {time_for_current_num_of_llms}," f" time on desired number of llms: {time_for_desired_num_of_llms:.2f} seconds" @@ -540,6 +544,8 @@ def run(self, dataset: list[tuple[str, dict]]): "time_since_start": time.time() - loop_start_time, "groups_in_progress": in_progress, "cumulative_time_to_deduct": cumulative_time_to_deduct, + "cumulative_time_for_current_num_llms": cumulative_time_for_current_num_llms, + "cumulative_time_for_desired_num_llms": cumulative_time_for_desired_num_ll } trainer_version_to_publish = None else: diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index f7355198..1a153b60 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -567,6 +567,8 @@ def batch_generator_fn(): rl_config.batch_size = samples_per_step desired_num_of_processes = 128 cumulative_time_to_deduct = 0.0 + cumulative_time_for_desired_num_of_processes = 0.0 + cumulative_time_for_current_num_of_processes = 0.0 while training_metrics.completed_steps < final_train_steps: # We include time waiting for data in the step time if first_pass: @@ -692,6 +694,8 @@ def toggle_sync(sync: bool): ) time_to_deduct = forward_pass_took - forward_pass_took_for_desired_num_of_processes cumulative_time_to_deduct += time_to_deduct + cumulative_time_for_desired_num_of_processes += forward_pass_took_for_desired_num_of_processes + cumulative_time_for_current_num_of_processes += forward_pass_took continue @@ -726,6 +730,8 @@ def toggle_sync(sync: bool): ) time_to_deduct = forward_pass_took - forward_pass_took_for_desired_num_of_processes cumulative_time_to_deduct += time_to_deduct + cumulative_time_for_desired_num_of_processes += forward_pass_took_for_desired_num_of_processes + cumulative_time_for_current_num_of_processes += forward_pass_took metrics_dict = {} time_to_stop = training_metrics.completed_steps >= final_train_steps time_to_log = training_metrics.completed_steps % args.log_each_n_steps == 0 @@ -756,6 +762,8 @@ def toggle_sync(sync: bool): "stats/time_waiting_for_data": training_metrics.time_waiting_for_data, "stats/lag": training_metrics.last_broadcasted_version - lag_stats["min_version"], "stats/cumulative_time_to_deduct": cumulative_time_to_deduct, + "stats/cumulative_time_for_desired_num_of_processes": cumulative_time_for_desired_num_of_processes, + "stats/cumulative_time_for_current_num_of_processes": cumulative_time_for_current_num_of_processes, "throughput/tokens_perGPU_per_sec": this_worker_tokens / sum(passes_took) if passes_took else 0, "throughput/tokens_per_step": this_worker_tokens * get_accelerator().state.num_processes, "throughput/micro_batches_per_step": len(tokens_processed), From 399cd7c2dcca4ca59bbfd40f97685fad0705ce66 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Thu, 31 Jul 2025 17:41:28 +0000 Subject: [PATCH 10/21] typo --- pipelinerl/actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index 74bf25cc..2ca08437 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -545,7 +545,7 @@ def run(self, dataset: list[tuple[str, dict]]): "groups_in_progress": in_progress, "cumulative_time_to_deduct": cumulative_time_to_deduct, "cumulative_time_for_current_num_llms": cumulative_time_for_current_num_llms, - "cumulative_time_for_desired_num_llms": cumulative_time_for_desired_num_ll + "cumulative_time_for_desired_num_llms": cumulative_time_for_desired_num_llms, } trainer_version_to_publish = None else: From 064af74227ae769e957eb5953ac693a4ca8a6fc6 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 6 Aug 2025 17:40:29 +0000 Subject: [PATCH 11/21] upd --- conf/base.yaml | 1 + pipelinerl/actor.py | 2 +- pipelinerl/finetune_loop.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/conf/base.yaml b/conf/base.yaml index ac44fdde..9f44646d 100644 --- a/conf/base.yaml +++ b/conf/base.yaml @@ -5,6 +5,7 @@ defaults: - _self_ seed: 42 +desired_num_gpus: 128 finetune: seed: ${..seed} diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index 2ca08437..ce98d417 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -416,7 +416,7 @@ def run(self, dataset: list[tuple[str, dict]]): #TODO: rm conv RL code times_for_current_num_llms = [] time_for_desired_num_of_llms = 0 - desired_number_of_llms = 128 + desired_number_of_llms = self.cfg.desired_num_gpus current_number_of_llms = len(self.llms) # assumes 1 llm per gpu assert (groups_per_update * current_number_of_llms) % desired_number_of_llms == 0, ( f"groups_per_update * current_number_of_llms {groups_per_update * current_number_of_llms} " diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index 1a153b60..40351824 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -565,7 +565,7 @@ def batch_generator_fn(): rl_config = RLConfig(**args.rl) # samples_per_step will be used to normalize the loss rl_config.batch_size = samples_per_step - desired_num_of_processes = 128 + desired_num_of_processes = args.desired_num_gpus cumulative_time_to_deduct = 0.0 cumulative_time_for_desired_num_of_processes = 0.0 cumulative_time_for_current_num_of_processes = 0.0 From fff1fb4811c76a9386840b46575fd07cc30f4bb8 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 12 Aug 2025 14:58:30 +0000 Subject: [PATCH 12/21] writing took --- pipelinerl/preprocess.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pipelinerl/preprocess.py b/pipelinerl/preprocess.py index d758ff36..27b0edb8 100644 --- a/pipelinerl/preprocess.py +++ b/pipelinerl/preprocess.py @@ -447,6 +447,7 @@ def run_preprocessing_loop( current_length = 0 batch_boundary = published_samples + train_batch_size target_samples_per_lead = samples_per_trainer[0] + samples_per_lead_per_step + cumulative_writing_took = 0.0 # Per-trainer sample tracking (similar to finetune_loop.py) total_filtered_out = 0 # Track total filtered samples across all batches @@ -622,6 +623,7 @@ def run_preprocessing_loop( f"batch done: {batch_done}" ) writing_took += time.time() - start_writing + cumulative_writing_took += writing_took if ( published_samples > last_published_samples @@ -638,6 +640,7 @@ def run_preprocessing_loop( "preprocessor/filtered_out_samples": num_filtered_out, "preprocessor/total_filtered_out_samples": total_filtered_out, "preprocessor/dropped_after_preprocessing": processed_entries_queue_popped_data, + "preprocessor/cumulative_writing_took": cumulative_writing_took, } if stats_aggregator.has_enough_data(): stats.update({"preprocessor/" + k: v for k, v in stats_aggregator.get_stats().items()}) From e65c46fa0e9fc36edb7027800da0daf6dbf8ffc6 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 12 Aug 2025 15:18:11 +0000 Subject: [PATCH 13/21] upd --- conf/base.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/conf/base.yaml b/conf/base.yaml index 9f44646d..89e86ce0 100644 --- a/conf/base.yaml +++ b/conf/base.yaml @@ -9,6 +9,7 @@ desired_num_gpus: 128 finetune: seed: ${..seed} + desired_num_gpus: ${..desired_num_gpus} actor: log_each_n_secs: 0 From ba6e75455561c71ffd9f0255c01b37c16b94daef Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 12 Aug 2025 17:46:56 +0000 Subject: [PATCH 14/21] track last time update --- pipelinerl/actor.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index ce98d417..0523820a 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -430,7 +430,7 @@ def run(self, dataset: list[tuple[str, dict]]): f" thus instead of {groups_per_update} groups per update," f" we can submit {groups_per_update_adjusted} groups per update," ) - loop_start_time = time.time() + start_sampling_time = time.time() else: groups_per_update = None can_submit_before_update = math.inf @@ -453,6 +453,10 @@ def run(self, dataset: list[tuple[str, dict]]): trainer_version_to_publish = last_trainer_version last_trainer_version = self.trainer_state.propagated_weight_version time_for_desired_num_of_llms = max(times_for_current_num_llms) + assert len(times_for_current_num_llms) == current_number_of_llms // desired_number_of_llms, ( + f"Expected {current_number_of_llms // desired_number_of_llms} times for current number of llms," + f" but got {len(times_for_current_num_llms)}" + ) time_for_current_num_of_llms = sum(times_for_current_num_llms) time_to_deduct = time_for_current_num_of_llms - time_for_desired_num_of_llms cumulative_time_to_deduct += time_to_deduct @@ -464,17 +468,26 @@ def run(self, dataset: list[tuple[str, dict]]): f" time to deduct {time_to_deduct} seconds. Total time to deduct {cumulative_time_to_deduct:.2f} seconds" ) times_for_current_num_llms = [] - loop_start_time = time.time() + start_sampling_time = time.time() elif published_samples == can_submit_before_update and published_samples < can_submit_before_update_non_adjusted: + end_time = time.time() + time_for_current_num_of_llms = end_time - start_sampling_time logger.info( - f"Published {published_samples} samples which is less than {can_submit_before_update_non_adjusted}," + f"Published {published_samples} samples which is less than {can_submit_before_update_non_adjusted}, took {time_for_current_num_of_llms:.2f} seconds." f" will now increment the number of samples that can be submitted before update to {can_submit_before_update+groups_per_update_adjusted}" ) - end_time = time.time() - times_for_current_num_llms.append(end_time - loop_start_time) - loop_start_time = end_time + times_for_current_num_llms.append(time_for_current_num_of_llms) + start_sampling_time = end_time if max_lag is not None: can_submit_before_update += groups_per_update_adjusted + elif published_samples == can_submit_before_update: + end_time = time.time() + time_for_current_num_of_llms = end_time - start_sampling_time + logger.info( + f"Published {published_samples} samples which is equal to {can_submit_before_update}, took {time_for_current_num_of_llms:.2f} seconds." + f" will now increment the number of samples that can be submitted before update to {can_submit_before_update+groups_per_update_adjusted}" + ) + times_for_current_num_llms.append(time_for_current_num_of_llms) # First, submit all problems you can until the problem queue is full if not self.is_scheduling_paused: From a634b394192af0d1b77b159c5e54a44140c47dfe Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Tue, 12 Aug 2025 18:40:33 +0000 Subject: [PATCH 15/21] be more careful --- pipelinerl/actor.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index 0523820a..ad32f4e1 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -481,13 +481,14 @@ def run(self, dataset: list[tuple[str, dict]]): if max_lag is not None: can_submit_before_update += groups_per_update_adjusted elif published_samples == can_submit_before_update: - end_time = time.time() - time_for_current_num_of_llms = end_time - start_sampling_time - logger.info( - f"Published {published_samples} samples which is equal to {can_submit_before_update}, took {time_for_current_num_of_llms:.2f} seconds." - f" will now increment the number of samples that can be submitted before update to {can_submit_before_update+groups_per_update_adjusted}" - ) - times_for_current_num_llms.append(time_for_current_num_of_llms) + if len(times_for_current_num_llms) < current_number_of_llms // desired_number_of_llms: + end_time = time.time() + time_for_current_num_of_llms = end_time - start_sampling_time + logger.info( + f"Published {published_samples} samples which is equal to {can_submit_before_update}, took {time_for_current_num_of_llms:.2f} seconds." + f" will now increment the number of samples that can be submitted before update to {can_submit_before_update+groups_per_update_adjusted}" + ) + times_for_current_num_llms.append(time_for_current_num_of_llms) # First, submit all problems you can until the problem queue is full if not self.is_scheduling_paused: From de245a70fff5f40dccee9ebc86f4b33cea2b6ba5 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 13 Aug 2025 01:22:50 +0000 Subject: [PATCH 16/21] typo --- pipelinerl/actor.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index ad32f4e1..daf020d8 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -452,22 +452,6 @@ def run(self, dataset: list[tuple[str, dict]]): # the weights have been updated, publish the stats of the previous trainer version trainer_version_to_publish = last_trainer_version last_trainer_version = self.trainer_state.propagated_weight_version - time_for_desired_num_of_llms = max(times_for_current_num_llms) - assert len(times_for_current_num_llms) == current_number_of_llms // desired_number_of_llms, ( - f"Expected {current_number_of_llms // desired_number_of_llms} times for current number of llms," - f" but got {len(times_for_current_num_llms)}" - ) - time_for_current_num_of_llms = sum(times_for_current_num_llms) - time_to_deduct = time_for_current_num_of_llms - time_for_desired_num_of_llms - cumulative_time_to_deduct += time_to_deduct - cumulative_time_for_current_num_llms += time_for_current_num_of_llms - cumulative_time_for_desired_num_llms += time_for_desired_num_of_llms - logger.info( - f"Time on current number of llms {time_for_current_num_of_llms}," - f" time on desired number of llms: {time_for_desired_num_of_llms:.2f} seconds" - f" time to deduct {time_to_deduct} seconds. Total time to deduct {cumulative_time_to_deduct:.2f} seconds" - ) - times_for_current_num_llms = [] start_sampling_time = time.time() elif published_samples == can_submit_before_update and published_samples < can_submit_before_update_non_adjusted: end_time = time.time() @@ -480,8 +464,8 @@ def run(self, dataset: list[tuple[str, dict]]): start_sampling_time = end_time if max_lag is not None: can_submit_before_update += groups_per_update_adjusted - elif published_samples == can_submit_before_update: - if len(times_for_current_num_llms) < current_number_of_llms // desired_number_of_llms: + elif published_samples == can_submit_before_update_non_adjusted: + if len(times_for_current_num_llms) < desired_number_of_llms // current_number_of_llms: end_time = time.time() time_for_current_num_of_llms = end_time - start_sampling_time logger.info( @@ -489,6 +473,22 @@ def run(self, dataset: list[tuple[str, dict]]): f" will now increment the number of samples that can be submitted before update to {can_submit_before_update+groups_per_update_adjusted}" ) times_for_current_num_llms.append(time_for_current_num_of_llms) + time_for_desired_num_of_llms = max(times_for_current_num_llms) + assert len(times_for_current_num_llms) == desired_number_of_llms // current_number_of_llms , ( + f"Expected {desired_number_of_llms // current_number_of_llms} times for current number of llms," + f" but got {len(times_for_current_num_llms)}" + ) + time_for_current_num_of_llms = sum(times_for_current_num_llms) + time_to_deduct = time_for_current_num_of_llms - time_for_desired_num_of_llms + cumulative_time_to_deduct += time_to_deduct + cumulative_time_for_current_num_llms += time_for_current_num_of_llms + cumulative_time_for_desired_num_llms += time_for_desired_num_of_llms + logger.info( + f"Time on current number of llms {time_for_current_num_of_llms}," + f" time on desired number of llms: {time_for_desired_num_of_llms:.2f} seconds" + f" time to deduct {time_to_deduct} seconds. Total time to deduct {cumulative_time_to_deduct:.2f} seconds" + ) + times_for_current_num_llms = [] # First, submit all problems you can until the problem queue is full if not self.is_scheduling_paused: From c7a848242302b48bdcb89f92cbcd9a81599c8dd5 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 13 Aug 2025 02:04:41 +0000 Subject: [PATCH 17/21] upd --- pipelinerl/actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index daf020d8..f7fb2e68 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -462,6 +462,7 @@ def run(self, dataset: list[tuple[str, dict]]): ) times_for_current_num_llms.append(time_for_current_num_of_llms) start_sampling_time = end_time + times_for_current_num_llms = [] if max_lag is not None: can_submit_before_update += groups_per_update_adjusted elif published_samples == can_submit_before_update_non_adjusted: @@ -488,7 +489,6 @@ def run(self, dataset: list[tuple[str, dict]]): f" time on desired number of llms: {time_for_desired_num_of_llms:.2f} seconds" f" time to deduct {time_to_deduct} seconds. Total time to deduct {cumulative_time_to_deduct:.2f} seconds" ) - times_for_current_num_llms = [] # First, submit all problems you can until the problem queue is full if not self.is_scheduling_paused: From 079a95d4490f3c2d048bf34b3f96202c677a9943 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 13 Aug 2025 02:07:29 +0000 Subject: [PATCH 18/21] upd --- pipelinerl/actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index f7fb2e68..901fb9a4 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -453,6 +453,7 @@ def run(self, dataset: list[tuple[str, dict]]): trainer_version_to_publish = last_trainer_version last_trainer_version = self.trainer_state.propagated_weight_version start_sampling_time = time.time() + times_for_current_num_llms = [] elif published_samples == can_submit_before_update and published_samples < can_submit_before_update_non_adjusted: end_time = time.time() time_for_current_num_of_llms = end_time - start_sampling_time @@ -462,7 +463,6 @@ def run(self, dataset: list[tuple[str, dict]]): ) times_for_current_num_llms.append(time_for_current_num_of_llms) start_sampling_time = end_time - times_for_current_num_llms = [] if max_lag is not None: can_submit_before_update += groups_per_update_adjusted elif published_samples == can_submit_before_update_non_adjusted: From 97b96818b4ab380b8bdf1bee7db3a65b87f3e498 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 13 Aug 2025 02:11:08 +0000 Subject: [PATCH 19/21] upd --- pipelinerl/actor.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index 901fb9a4..24b1a582 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -484,6 +484,11 @@ def run(self, dataset: list[tuple[str, dict]]): cumulative_time_to_deduct += time_to_deduct cumulative_time_for_current_num_llms += time_for_current_num_of_llms cumulative_time_for_desired_num_llms += time_for_desired_num_of_llms + wandb.log({ + "actor/cumulative_time_for_current_num_llms2": cumulative_time_for_current_num_llms, + "actor/cumulative_time_for_desired_num_llms2": cumulative_time_for_desired_num_llms, + "actor/cumulative_time_to_deduct2": cumulative_time_to_deduct, + }) logger.info( f"Time on current number of llms {time_for_current_num_of_llms}," f" time on desired number of llms: {time_for_desired_num_of_llms:.2f} seconds" From a92b5f5ce3e1dad1c541616cd93928ecbbd383d7 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Thu, 11 Sep 2025 01:50:26 +0000 Subject: [PATCH 20/21] upd conf --- conf/base.yaml | 2 +- conf/debug_finetune_preprocessor_sft.yaml | 89 +++++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 conf/debug_finetune_preprocessor_sft.yaml diff --git a/conf/base.yaml b/conf/base.yaml index 89e86ce0..0237e0c2 100644 --- a/conf/base.yaml +++ b/conf/base.yaml @@ -5,7 +5,7 @@ defaults: - _self_ seed: 42 -desired_num_gpus: 128 +desired_num_gpus: 64 finetune: seed: ${..seed} diff --git a/conf/debug_finetune_preprocessor_sft.yaml b/conf/debug_finetune_preprocessor_sft.yaml new file mode 100644 index 00000000..d9786874 --- /dev/null +++ b/conf/debug_finetune_preprocessor_sft.yaml @@ -0,0 +1,89 @@ +# Debug configuration for finetune+preprocessor+sft mode +# This configuration runs all three components together for testing + +# @package _global_ +defaults: + - _self_ + - streams: local + - wandb: default + +# Debug mode configuration +debug: + mode: "finetune+preprocessor+sft" + streams_from: null + place_inference_workers: true + use_existing_llms: false + +# Experiment configuration +output_dir: ??? +seed: 42 +force_restart: false + +# Model configuration +model_path: /mnt/llmd/base_models/Qwen2.5-7B +max_seq_length: 2048 +batch_size: 100 + +# Dataset configuration +dataset_loader: pipelinerl.domains.math.load_datasets +dataset_loader_params: {} +train_dataset_names: + - open_reasoner_zero_57k +test_dataset_names: + - aime_2024 + +# World configuration +world: + replicas: 1 + actor_fraction: 1 + preprocessor_fraction: 1 + finetune_fraction: 2 + env_replicas: 0 + actor_group_port: 9000 + environment_start_port: 7777 + +# LLM configuration +me: + llm_urls: "http://localhost:8000" + +llm: + parameters: + temperature: 1.0 + top_p: 0.95 + top_k: 50 + +# Finetune configuration +finetune: + input: "sft_data" # Use SFT data as input + model_class: "causal-language-modeling" + train_batch_size: 1 + gradient_accumulation_passes: 1 + seq_parallel: 1 + seq_packing: false + rl: + kl_coef: 0.0 + value_loss_coef: 0.0 + +# Preprocess configuration +preprocess: + input: "actor" + output: "sft_data" + dataset_buffer_size: 0 + ring_buffer_size: 1000 + +# Streams configuration +streams: + backend: local + base_path: null + port: 6379 + save: "" + +# Wandb configuration +wandb: + use_wandb: true + project: "debug-finetune-preprocessor-sft" + name: null + tags: ["debug", "finetune", "preprocessor", "sft"] + + + From a84dd611866e2648c98af80a5e7e2590f90c7387 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Fri, 3 Oct 2025 22:41:41 +0000 Subject: [PATCH 21/21] shuffle --- pipelinerl/preprocess.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipelinerl/preprocess.py b/pipelinerl/preprocess.py index 27b0edb8..c62e52ea 100644 --- a/pipelinerl/preprocess.py +++ b/pipelinerl/preprocess.py @@ -16,6 +16,7 @@ from queue import Empty, Full from typing import List +import random import datasets import transformers from litellm import BaseModel, Field @@ -555,6 +556,7 @@ def run_preprocessing_loop( batch_done = False start_writing = time.time() + random.shuffle(processed_entries_queue) while (len(processed_entries_queue) > 0 and not batch_done) or (cfg.preprocess.dataset_buffer_size and not batch_done): logger.debug(f"[inner loop] trainer {trainer_id} has {samples_per_trainer[trainer_id]} samples, target is {target_samples_per_lead}") if cfg.finetune.seq_packing: