25
25
26
26
import pandas as pd
27
27
28
- from smac .runhistory .runhistory import RunHistory
28
+ from smac .runhistory .runhistory import DataOrigin , RunHistory
29
29
from smac .stats .stats import Stats
30
30
from smac .tae import StatusType
31
31
@@ -172,11 +172,10 @@ def __init__(
172
172
self .search_space : Optional [ConfigurationSpace ] = None
173
173
self ._metric : Optional [autoPyTorchMetric ] = None
174
174
self ._logger : Optional [PicklableClientLogger ] = None
175
- self .run_history : Optional [ RunHistory ] = None
175
+ self .run_history : RunHistory = RunHistory ()
176
176
self .trajectory : Optional [List ] = None
177
177
self .dataset_name : Optional [str ] = None
178
178
self .cv_models_ : Dict = {}
179
- self .num_run : int = 1
180
179
self .experiment_task_name : str = 'runSearch'
181
180
182
181
# By default try to use the TCP logging port or get a new port
@@ -492,6 +491,9 @@ def _do_dummy_prediction(self) -> None:
492
491
assert self ._metric is not None
493
492
assert self ._logger is not None
494
493
494
+ # For dummy estimator, we always expect the num_run to be 1
495
+ num_run = 1
496
+
495
497
self ._logger .info ("Starting to create dummy predictions." )
496
498
497
499
memory_limit = self ._memory_limit
@@ -511,14 +513,14 @@ def _do_dummy_prediction(self) -> None:
511
513
logger_port = self ._logger_port ,
512
514
cost_for_crash = get_cost_of_crash (self ._metric ),
513
515
abort_on_first_run_crash = False ,
514
- initial_num_run = self . num_run ,
516
+ initial_num_run = num_run ,
515
517
stats = stats ,
516
518
memory_limit = memory_limit ,
517
519
disable_file_output = True if len (self ._disable_file_output ) > 0 else False ,
518
520
all_supported_metrics = self ._all_supported_metrics
519
521
)
520
522
521
- status , cost , runtime , additional_info = ta .run (self . num_run , cutoff = self ._total_walltime_limit )
523
+ status , cost , runtime , additional_info = ta .run (num_run , cutoff = self ._total_walltime_limit )
522
524
if status == StatusType .SUCCESS :
523
525
self ._logger .info ("Finished creating dummy predictions." )
524
526
else :
@@ -560,20 +562,12 @@ def _do_traditional_prediction(self, time_left: int) -> None:
560
562
This method currently only supports classification.
561
563
562
564
Args:
563
- num_run: (int)
564
- An identifier to indicate the current machine learning algorithm
565
- being processed
566
565
time_left: (int)
567
566
Hard limit on how many machine learning algorithms can be fit. Depending on how
568
567
fast a traditional machine learning algorithm trains, it will allow multiple
569
568
models to be fitted.
570
569
func_eval_time_limit_secs: (int)
571
570
Maximum training time each algorithm is allowed to take, during training
572
-
573
- Returns:
574
- num_run: (int)
575
- The incremented identifier index. This depends on how many machine learning
576
- models were fitted.
577
571
"""
578
572
579
573
# Mypy Checkings -- Traditional prediction is only called for search
@@ -582,16 +576,19 @@ def _do_traditional_prediction(self, time_left: int) -> None:
582
576
assert self ._logger is not None
583
577
assert self ._dask_client is not None
584
578
585
- self .num_run += 1
579
+ self ._logger .info ("Starting to create traditional classifier predictions." )
580
+
581
+ # Initialise run history for the traditional classifiers
582
+ run_history = RunHistory ()
586
583
587
584
memory_limit = self ._memory_limit
588
585
if memory_limit is not None :
589
586
memory_limit = int (math .ceil (memory_limit ))
590
587
available_classifiers = get_available_classifiers ()
591
588
dask_futures = []
592
589
593
- total_number_classifiers = len (available_classifiers ) + self . num_run
594
- for n_r , classifier in enumerate (available_classifiers , start = self . num_run ):
590
+ total_number_classifiers = len (available_classifiers )
591
+ for n_r , classifier in enumerate (available_classifiers ):
595
592
596
593
# Only launch a task if there is time
597
594
start_time = time .time ()
@@ -610,7 +607,7 @@ def _do_traditional_prediction(self, time_left: int) -> None:
610
607
logger_port = self ._logger_port ,
611
608
cost_for_crash = get_cost_of_crash (self ._metric ),
612
609
abort_on_first_run_crash = False ,
613
- initial_num_run = n_r ,
610
+ initial_num_run = self . _backend . get_next_num_run () ,
614
611
stats = stats ,
615
612
memory_limit = memory_limit ,
616
613
disable_file_output = True if len (self ._disable_file_output ) > 0 else False ,
@@ -624,9 +621,6 @@ def _do_traditional_prediction(self, time_left: int) -> None:
624
621
)
625
622
])
626
623
627
- # Increment the launched job index
628
- self .num_run = n_r
629
-
630
624
# When managing time, we need to take into account the allocated time resources,
631
625
# which are dependent on the number of cores. 'dask_futures' is a proxy to the number
632
626
# of workers /n_jobs that we have, in that if there are 4 cores allocated, we can run at most
@@ -653,6 +647,11 @@ def _do_traditional_prediction(self, time_left: int) -> None:
653
647
if status == StatusType .SUCCESS :
654
648
self ._logger .info (
655
649
f"Fitting { cls } took { runtime } s, performance:{ cost } /{ additional_info } " )
650
+ configuration = additional_info ['pipeline_configuration' ]
651
+ origin = additional_info ['configuration_origin' ]
652
+ run_history .add (config = configuration , cost = cost ,
653
+ time = runtime , status = status , seed = self .seed ,
654
+ origin = origin )
656
655
else :
657
656
if additional_info .get ('exitcode' ) == - 6 :
658
657
self ._logger .error (
@@ -679,6 +678,13 @@ def _do_traditional_prediction(self, time_left: int) -> None:
679
678
"Please consider increasing the run time to further improve performance." )
680
679
break
681
680
681
+ self ._logger .debug ("Run history traditional: {}" .format (run_history ))
682
+ # add run history of traditional to api run history
683
+ self .run_history .update (run_history , DataOrigin .EXTERNAL_SAME_INSTANCES )
684
+ run_history .save_json (os .path .join (self ._backend .internals_directory , 'traditional_run_history.json' ),
685
+ save_external = True )
686
+ return
687
+
682
688
def _run_dummy_predictions (self ) -> None :
683
689
dummy_task_name = 'runDummy'
684
690
self ._stopwatch .start_task (dummy_task_name )
@@ -727,13 +733,17 @@ def _run_ensemble(self, dataset: BaseDataset, optimize_metric: str,
727
733
dataset_name = dataset .dataset_name ,
728
734
output_type = STRING_TO_OUTPUT_TYPES [dataset .output_type ],
729
735
task_type = STRING_TO_TASK_TYPES [self .task_type ],
730
- metrics = [self ._metric ], opt_metric = optimize_metric ,
736
+ metrics = [self ._metric ],
737
+ opt_metric = optimize_metric ,
731
738
ensemble_size = self .ensemble_size ,
732
739
ensemble_nbest = self .ensemble_nbest ,
733
740
max_models_on_disc = self .max_models_on_disc ,
741
+ seed = self .seed ,
742
+ max_iterations = None ,
743
+ read_at_most = sys .maxsize ,
734
744
ensemble_memory_limit = self ._memory_limit ,
735
- seed = self . seed , max_iterations = None , random_state = self .seed ,
736
- read_at_most = sys . maxsize , precision = precision ,
745
+ random_state = self .seed ,
746
+ precision = precision ,
737
747
logger_port = self ._logger_port
738
748
)
739
749
self ._stopwatch .stop_task (ensemble_task_name )
@@ -756,8 +766,9 @@ def _start_smac(self, proc_smac: AutoMLSMBO) -> None:
756
766
assert self ._logger is not None
757
767
758
768
try :
759
- self . run_history , self .trajectory , budget_type = \
769
+ run_history , self .trajectory , budget_type = \
760
770
proc_smac .run_smbo ()
771
+ self .run_history .update (run_history , DataOrigin .INTERNAL )
761
772
trajectory_filename = os .path .join (
762
773
self ._backend .get_smac_output_directory_for_run (self .seed ),
763
774
'trajectory.json' )
@@ -802,8 +813,10 @@ def _run_smac(self, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager,
802
813
func_eval_time_limit_secs = self ._func_eval_time_limit_secs ,
803
814
dask_client = self ._dask_client ,
804
815
memory_limit = self ._memory_limit ,
805
- n_jobs = self .n_jobs , watcher = self ._stopwatch ,
806
- metric = self ._metric , seed = self .seed ,
816
+ n_jobs = self .n_jobs ,
817
+ watcher = self ._stopwatch ,
818
+ metric = self ._metric ,
819
+ seed = self .seed ,
807
820
include = self .include_components ,
808
821
exclude = self .exclude_components ,
809
822
disable_file_output = self ._disable_file_output ,
@@ -813,7 +826,7 @@ def _run_smac(self, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager,
813
826
pipeline_config = {** self .pipeline_options , ** budget_config },
814
827
ensemble_callback = proc_ensemble ,
815
828
logger_port = self ._logger_port ,
816
- start_num_run = self .num_run ,
829
+ start_num_run = self ._backend . get_next_num_run ( peek = True ) ,
817
830
search_space_updates = self .search_space_updates
818
831
)
819
832
@@ -930,18 +943,23 @@ def _finish_experiment(self, proc_ensemble: EnsembleBuilderManager,
930
943
self ._logger .info ("Starting to clean up the logger" )
931
944
self ._clean_logger ()
932
945
933
- def _search (self , optimize_metric : str ,
934
- dataset : BaseDataset , budget_type : Optional [str ] = None ,
935
- budget : Optional [float ] = None ,
936
- total_walltime_limit : int = 100 ,
937
- func_eval_time_limit_secs : Optional [int ] = None ,
938
- enable_traditional_pipeline : bool = True ,
939
- memory_limit : Optional [int ] = 4096 ,
940
- smac_scenario_args : Optional [Dict [str , Any ]] = None ,
941
- get_smac_object_callback : Optional [Callable ] = None ,
942
- all_supported_metrics : bool = True ,
943
- precision : int = 32 , disable_file_output : List = [],
944
- load_models : bool = True ) -> 'BaseTask' :
946
+ def _search (
947
+ self ,
948
+ optimize_metric : str ,
949
+ dataset : BaseDataset ,
950
+ budget_type : Optional [str ] = None ,
951
+ budget : Optional [float ] = None ,
952
+ total_walltime_limit : int = 100 ,
953
+ func_eval_time_limit_secs : Optional [int ] = None ,
954
+ enable_traditional_pipeline : bool = True ,
955
+ memory_limit : Optional [int ] = 4096 ,
956
+ smac_scenario_args : Optional [Dict [str , Any ]] = None ,
957
+ get_smac_object_callback : Optional [Callable ] = None ,
958
+ all_supported_metrics : bool = True ,
959
+ precision : int = 32 ,
960
+ disable_file_output : List = [],
961
+ load_models : bool = True
962
+ ) -> 'BaseTask' :
945
963
"""
946
964
Search for the best pipeline configuration for the given dataset.
947
965
@@ -1033,7 +1051,6 @@ def _search(self, optimize_metric: str,
1033
1051
total_walltime_limit = total_walltime_limit )
1034
1052
1035
1053
self ._adapt_time_resource_allocation ()
1036
- self .num_run = 1
1037
1054
self ._run_dummy_predictions ()
1038
1055
1039
1056
if enable_traditional_pipeline :
@@ -1098,7 +1115,7 @@ def refit(
1098
1115
'train_indices' : dataset .splits [split_id ][0 ],
1099
1116
'val_indices' : dataset .splits [split_id ][1 ],
1100
1117
'split_id' : split_id ,
1101
- 'num_run' : 0
1118
+ 'num_run' : self . _backend . get_next_num_run (),
1102
1119
})
1103
1120
X .update ({** self .pipeline_options , ** budget_config })
1104
1121
if self .models_ is None or len (self .models_ ) == 0 or self .ensemble_ is None :
@@ -1175,7 +1192,7 @@ def fit(self,
1175
1192
'train_indices' : dataset .splits [split_id ][0 ],
1176
1193
'val_indices' : dataset .splits [split_id ][1 ],
1177
1194
'split_id' : split_id ,
1178
- 'num_run' : 0
1195
+ 'num_run' : self . _backend . get_next_num_run (),
1179
1196
})
1180
1197
X .update ({** self .pipeline_options , ** budget_config })
1181
1198
0 commit comments