From ded94994774c46a1dbb99e9c6644760c4de9ff6b Mon Sep 17 00:00:00 2001 From: Yernar Sadybekov Date: Wed, 16 Jul 2025 16:01:22 -0700 Subject: [PATCH] Refactor benchmark utilities: centralize core logic into `_run_benchmark_core` (#3191) Summary: Pull Request resolved: https://github.com/pytorch/torchrec/pull/3191 Removed redundant logic in `benchmark` and `benchmark_func` by moving shared timing, memory, and profiling code into a new `_run_benchmark_core` function. Ensured the backward-compatibility. Differential Revision: D78290979 --- .../distributed/benchmark/benchmark_utils.py | 372 ++++++++---------- 1 file changed, 166 insertions(+), 206 deletions(-) diff --git a/torchrec/distributed/benchmark/benchmark_utils.py b/torchrec/distributed/benchmark/benchmark_utils.py index 950e3bd6f..2bf78757f 100644 --- a/torchrec/distributed/benchmark/benchmark_utils.py +++ b/torchrec/distributed/benchmark/benchmark_utils.py @@ -652,62 +652,76 @@ def fx_script_module(eager_module: torch.nn.Module) -> torch.nn.Module: return sharded_module if not benchmark_unsharded_module else module -def benchmark( +def _run_benchmark_core( name: str, - model: torch.nn.Module, - warmup_inputs: Union[List[KeyedJaggedTensor], List[Dict[str, Any]]], - bench_inputs: Union[List[KeyedJaggedTensor], List[Dict[str, Any]]], - prof_inputs: Union[List[KeyedJaggedTensor], List[Dict[str, Any]]], + run_iter_fn: Callable[[], None], + profile_iter_fn: Optional[Callable[[Any], None]], # pyre-ignore [2] world_size: int, - output_dir: str, - num_benchmarks: int, - # pyre-ignore[2] - func_to_benchmark: Any, - benchmark_func_kwargs: Optional[Dict[str, Any]], rank: int, - enable_logging: bool = True, - device_type: str = "cuda", - benchmark_unsharded_module: bool = False, + num_benchmarks: int, + device_type: str, + output_dir: str, + pre_gpu_load: int = 0, + export_stacks: bool = False, + reset_accumulated_memory_stats: bool = False, ) -> BenchmarkResult: - memory_stats: List[MemoryStats] = [] - if enable_logging: - logger.info(f" BENCHMARK_MODEL[{name}]:\n{model}") + """Internal helper that contains the core benchmarking logic shared by + ``benchmark`` and ``benchmark_func``. All heavy–lifting (timing, memory + accounting, optional profiling) happens here so the public helpers can stay + small and focused on preparing the callables to execute. - for _input in warmup_inputs: - model(_input) + Args: + name: Human-readable benchmark name. + run_iter_fn: Zero-arg callable that executes one measured iteration. + profile_iter_fn: Optional callable that receives a ``torch.profiler`` + instance and runs the iterations that should be captured. + world_size, rank: Distributed context to correctly reset / collect GPU + stats. ``rank == -1`` means single-process mode. + num_benchmarks: Number of measured iterations. + device_type: "cuda" or "cpu". + output_dir: Where to write chrome traces / stack files. + pre_gpu_load: Number of dummy matmul operations to run before the first + measured iteration (helps simulating a loaded allocator). + export_stacks: Whether to export flamegraph-compatible stack files. + reset_accumulated_memory_stats: Whether to reset accumulated memory + stats in addition to peak memory stats. + """ + # Preparation & memory reset if device_type == "cuda": if rank == -1: - # Reset memory for measurement, no process per rank so do all for di in range(world_size): torch.cuda.reset_peak_memory_stats(di) + if reset_accumulated_memory_stats: + torch.cuda.reset_accumulated_memory_stats(di) else: torch.cuda.reset_peak_memory_stats(rank) + if reset_accumulated_memory_stats: + torch.cuda.reset_accumulated_memory_stats(rank) - start = [] - end = [] - if device_type == "cuda": - # Measure time taken for batches in bench_inputs - start = [torch.cuda.Event(enable_timing=True) for _ in range(num_benchmarks)] - end = [torch.cuda.Event(enable_timing=True) for _ in range(num_benchmarks)] - - if benchmark_func_kwargs is None: - # Need this to unwrap - benchmark_func_kwargs = {} + # Optional allocator warm-up to create fragmentation similar to production + if pre_gpu_load and device_type == "cuda": + _tmp = torch.rand(16384, 16384, device="cuda") + for _ in range(pre_gpu_load): + _tmp = _tmp * torch.rand(16384, 16384, device="cuda") - times = [] + # Timing loop + start_events, end_events, times = [], [], [] if device_type == "cuda": + start_events = [ + torch.cuda.Event(enable_timing=True) for _ in range(num_benchmarks) + ] + end_events = [ + torch.cuda.Event(enable_timing=True) for _ in range(num_benchmarks) + ] for i in range(num_benchmarks): - start[i].record() - func_to_benchmark(model, bench_inputs, **benchmark_func_kwargs) - end[i].record() - elif device_type == "cpu": - times = timeit.repeat( - lambda: func_to_benchmark(model, bench_inputs, **benchmark_func_kwargs), - number=1, - repeat=num_benchmarks, - ) + start_events[i].record() + run_iter_fn() + end_events[i].record() + else: + times = timeit.repeat(run_iter_fn, number=1, repeat=num_benchmarks) + # Make sure all kernels are finished before reading timers / stats if device_type == "cuda": if rank == -1: for di in range(world_size): @@ -715,79 +729,118 @@ def benchmark( else: torch.cuda.synchronize(rank) - # TODO: First Benchmark Run for Eager Mode produces outlier + # First Benchmark Run for Eager Mode produces outlier # Start counting after first as workaround for standard deviation if device_type == "cuda": elapsed_time = torch.tensor( - [si.elapsed_time(ei) for si, ei in zip(start[1:], end[1:])] + [s.elapsed_time(e) for s, e in zip(start_events[1:], end_events[1:])] ) else: - elapsed_time = torch.tensor(times) * 1e3 + elapsed_time = torch.tensor(times) * 1e3 # convert seconds ➜ milliseconds + # Memory statistics collection + mem_stats: List[MemoryStats] = [] if device_type == "cuda": if rank == -1: - # Add up all memory allocated in inference mode for di in range(world_size): - memory_stats.append(MemoryStats.for_device(di)) + mem_stats.append(MemoryStats.for_device(di)) else: - # Only add up memory allocated for current rank in training mode - memory_stats.append(MemoryStats.for_device(rank)) + mem_stats.append(MemoryStats.for_device(rank)) - if output_dir != "": - # Only do profiling if output_dir is set + # Optional detailed profiling + if output_dir and profile_iter_fn and device_type == "cuda": - # pyre-ignore[2] - def trace_handler(prof) -> None: - total_average = prof.profiler.total_average() - logger.info(f" TOTAL_AVERAGE:\n{name}\n{total_average}") - dir_path: str = output_dir - - # only 1 rank should output in pg case, rank = 0 + def _trace_handler(prof: torch.profiler.profile) -> None: + total_avg = prof.profiler.total_average() + logger.info(f" TOTAL_AVERAGE:\n{name}\n{total_avg}") if rank > 0: return - - trace_file: str = f"{dir_path}/trace-{name}.json" - stacks_cpu_file = f"{dir_path}/stacks-cpu-{name}.stacks" - stacks_cuda_file = f"{dir_path}/stacks-cuda-{name}.stacks" + trace_file = f"{output_dir}/trace-{name}.json" logger.info(f" PROFILE[{name}].chrome_trace:{trace_file}") - prof.export_chrome_trace(trace_file) - prof.export_stacks(stacks_cpu_file, "self_cpu_time_total") - prof.export_stacks(stacks_cuda_file, "self_cuda_time_total") - - # - git clone https://github.com/brendangregg/FlameGraph - # - cd FlameGraph - # - ./flamegraph.pl --title "CPU time" --countname "us." profiler.stacks > perf_viz.svg - - if device_type == "cuda": - with torch.profiler.profile( - activities=[ - torch.profiler.ProfilerActivity.CPU, - torch.profiler.ProfilerActivity.CUDA, - ], - record_shapes=True, - profile_memory=True, - with_stack=True, - with_flops=True, - with_modules=True, - on_trace_ready=trace_handler, - ) as p: - for _input in prof_inputs: - with record_function("## forward ##"): - model(_input) - p.step() - - if rank == -1: - for di in range(torch.cuda.device_count()): - torch.cuda.synchronize(torch.device(f"cuda:{di}")) - else: - torch.cuda.synchronize() + if export_stacks: + prof.export_stacks( + f"{output_dir}/stacks-cpu-{name}.stacks", "self_cpu_time_total" + ) + prof.export_stacks( + f"{output_dir}/stacks-cuda-{name}.stacks", "self_cuda_time_total" + ) + + with torch.profiler.profile( + activities=[ + torch.profiler.ProfilerActivity.CPU, + torch.profiler.ProfilerActivity.CUDA, + ], + record_shapes=True, + profile_memory=True, + with_flops=True, + with_modules=True, + with_stack=export_stacks, + on_trace_ready=_trace_handler, + ) as prof: + profile_iter_fn(prof) + + # Synchronize again after profiling to guarantee deterministic ordering + if rank == -1: + for di in range(torch.cuda.device_count()): + torch.cuda.synchronize(torch.device(f"cuda:{di}")) + else: + torch.cuda.synchronize(rank) return BenchmarkResult( - short_name=name, - elapsed_time=elapsed_time, - mem_stats=memory_stats, + short_name=name, elapsed_time=elapsed_time, mem_stats=mem_stats, rank=rank + ) + + +def benchmark( + name: str, + model: torch.nn.Module, + warmup_inputs: Union[List[KeyedJaggedTensor], List[Dict[str, Any]]], + bench_inputs: Union[List[KeyedJaggedTensor], List[Dict[str, Any]]], + prof_inputs: Union[List[KeyedJaggedTensor], List[Dict[str, Any]]], + world_size: int, + output_dir: str, + num_benchmarks: int, + # pyre-ignore[2] + func_to_benchmark: Any, + benchmark_func_kwargs: Optional[Dict[str, Any]], + rank: int, + enable_logging: bool = True, + device_type: str = "cuda", + benchmark_unsharded_module: bool = False, +) -> BenchmarkResult: + if enable_logging: + logger.info(f" BENCHMARK_MODEL[{name}]:\n{model}") + + # Warm-up forwards to stabilize kernels / JIT compilation + for _input in warmup_inputs: + model(_input) + + if benchmark_func_kwargs is None: + benchmark_func_kwargs = {} + + run_iter_fn: Callable[[], None] = lambda: func_to_benchmark( + model, bench_inputs, **benchmark_func_kwargs + ) + + def _profile_iter_fn(prof: torch.profiler.profile) -> None: + for _input in prof_inputs: + with record_function("## forward ##"): + model(_input) + prof.step() + + return _run_benchmark_core( + name=name, + run_iter_fn=run_iter_fn, + profile_iter_fn=_profile_iter_fn if output_dir else None, + world_size=world_size, rank=rank, + num_benchmarks=num_benchmarks, + device_type=device_type, + output_dir=output_dir, + pre_gpu_load=0, + export_stacks=True, + reset_accumulated_memory_stats=False, ) @@ -806,124 +859,31 @@ def benchmark_func( device_type: str = "cuda", pre_gpu_load: int = 0, ) -> BenchmarkResult: - memory_stats: List[MemoryStats] = [] - if device_type == "cuda": - if rank == -1: - # Reset memory for measurement, no process per rank so do all - for di in range(world_size): - torch.cuda.reset_peak_memory_stats(di) - torch.cuda.reset_accumulated_memory_stats(di) - else: - torch.cuda.reset_peak_memory_stats(rank) - torch.cuda.reset_accumulated_memory_stats(rank) - - start = [] - end = [] - if device_type == "cuda": - # Measure time taken for batches in bench_inputs - start = [torch.cuda.Event(enable_timing=True) for _ in range(num_benchmarks)] - end = [torch.cuda.Event(enable_timing=True) for _ in range(num_benchmarks)] - if benchmark_func_kwargs is None: - # Need this to unwrap benchmark_func_kwargs = {} - times = [] - if device_type == "cuda": - a = torch.rand(16384, 16384, device="cuda") - for _ in range(pre_gpu_load): - a = a * torch.rand(16384, 16384, device="cuda") - for i in range(num_benchmarks): - start[i].record() - func_to_benchmark(bench_inputs, **benchmark_func_kwargs) - end[i].record() - elif device_type == "cpu": - if bench_inputs is None or len(bench_inputs) == 0: - times = timeit.repeat( - lambda: func_to_benchmark(**benchmark_func_kwargs), - number=1, - repeat=num_benchmarks, - ) - else: - times = timeit.repeat( - lambda: func_to_benchmark(bench_inputs, **benchmark_func_kwargs), - number=1, - repeat=num_benchmarks, - ) - - if device_type == "cuda": - if rank == -1: - for di in range(world_size): - torch.cuda.synchronize(di) - else: - torch.cuda.synchronize(rank) - - # TODO: First Benchmark Run for Eager Mode produces outlier - # Start counting after first as workaround for standard deviation - if device_type == "cuda": - elapsed_time = torch.tensor( - [si.elapsed_time(ei) for si, ei in zip(start[1:], end[1:])] - ) - else: - elapsed_time = torch.tensor(times) * 1e3 - - if device_type == "cuda": - if rank == -1: - # Add up all memory allocated in inference mode - for di in range(world_size): - memory_stats.append(MemoryStats.for_device(di)) - else: - # Only add up memory allocated for current rank in training mode - memory_stats.append(MemoryStats.for_device(rank)) - - if profile_dir != "": - # Only do profiling if output_dir is set - - # pyre-ignore[2] - def trace_handler(prof) -> None: - total_average = prof.profiler.total_average() - logger.info(f" TOTAL_AVERAGE:\n{name}\n{total_average}") - dir_path: str = profile_dir - if rank == 0: - trace_file: str = f"{dir_path}/trace-{name}.json" - else: - trace_file: str = f"{dir_path}/trace-{name}-{rank}.json" - return # only 1 rank should output in pg case, rank = 0 - logger.info(f" PROFILE[{name}].chrome_trace:{trace_file}") - prof.export_chrome_trace(trace_file) - - if device_type == "cuda": - a = torch.rand(16384, 16384, device="cuda") - for _ in range(pre_gpu_load): - a = a * torch.rand(16384, 16384, device="cuda") - with torch.profiler.profile( - activities=[ - torch.profiler.ProfilerActivity.CPU, - torch.profiler.ProfilerActivity.CUDA, - ], - record_shapes=True, - profile_memory=True, - with_flops=True, - with_modules=True, - with_stack=False, # usually we don't want to show the entire stack in the trace - on_trace_ready=trace_handler, - ) as p: - for i in range(num_profiles): - with record_function(f"## profile {i} ##"): - func_to_benchmark(prof_inputs, **benchmark_func_kwargs) - p.step() - - if rank == -1: - for di in range(torch.cuda.device_count()): - torch.cuda.synchronize(torch.device(f"cuda:{di}")) - else: - torch.cuda.synchronize() + run_iter_fn: Callable[[], None] = lambda: func_to_benchmark( + bench_inputs, **benchmark_func_kwargs + ) - return BenchmarkResult( - short_name=name, - elapsed_time=elapsed_time, - mem_stats=memory_stats, + def _profile_iter_fn(prof: torch.profiler.profile) -> None: + for i in range(num_profiles): + with record_function(f"## profile {i} ##"): + func_to_benchmark(prof_inputs, **benchmark_func_kwargs) + prof.step() + + return _run_benchmark_core( + name=name, + run_iter_fn=run_iter_fn, + profile_iter_fn=_profile_iter_fn if profile_dir else None, + world_size=world_size, rank=rank, + num_benchmarks=num_benchmarks, + device_type=device_type, + output_dir=profile_dir, + pre_gpu_load=pre_gpu_load, + export_stacks=False, + reset_accumulated_memory_stats=True, )