Skip to content

Experiment helpers

utilities.experiment_helpers.experiment_launcher

Experiment launcher script.

This script helps running multiple experiments by taking a list of commands and executing them with a process pool. This way, a huge list of experiments can be left running unattended.

Display help message to run the code:

python experiment_launcher.py --help

Displays all the relevant arguments that can be used.

Authors:

Alberto Garcia Garcia (garciagarcia@ice.csic.es)

log_experiment(process_result)

Callback to log all the info returned from an experiment run.

Parameters:

Name Type Description Default
process_result Tuple[Path, str]

Tuple containing the process experiment command and the whole process output to console string.

required
Source code in utilities/experiment_helpers/experiment_launcher.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def log_experiment(process_result: typing.Tuple[pathlib.Path, str]) -> None:
    """
    Callback to log all the info returned from an experiment run.

    Args:
        process_result (Tuple[pathlib.Path, str]): Tuple containing the process experiment command and the
            whole process output to console string.
    """

    log.info("")
    log.info(
        "****************************************************************"
    )
    log.info('Ran experiment "{}"!'.format(process_result[0]))
    log.info("Process output:\n {}".format(process_result[1]))
    log.info("Process finished...")

main(args)

Execute different experiments using a multiprocessing pool.

This function initializes a multiprocessing pool to run several scripts defined by the user. It queues the different jobs, and manages their execution in parallel.

Parameters:

Name Type Description Default
args Namespace

Command-line arguments parsed by argparse. Required parameters include:

  • command_list (str): Path to a .txt file containing a list of commands to execute.
  • processes (int): Number of simultaneous processes for the pool.
required
Source code in utilities/experiment_helpers/experiment_launcher.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def main(args) -> None:
    """
    Execute different experiments using a multiprocessing pool.

    This function initializes a multiprocessing pool to run several scripts
    defined by the user. It queues the different jobs, and manages their execution in parallel.

    Args:
        args (argparse.Namespace): Command-line arguments parsed by argparse. Required parameters include:

            - command_list (str): Path to a `.txt` file containing a list of commands to execute.
            - processes (int): Number of simultaneous processes for the pool.
    """

    # Event on the master process that will be used to synchronize the child
    # processes and signal them for execution in the pool.
    event = mp.Event()
    # Lock on the master process to impose a delay in the process execution
    # so that none of them can be launched exactly at the same time.
    lock = mp.Lock()
    # A pool of processes with a defined capacity, a process spawning setup
    # routine and a general event to signal process execution.
    pool = mp.Pool(
        args.processes,
        setup_process_pool,
        (
            event,
            lock,
        ),
    )

    # Read the command list file, each command should be one single line.
    with open(args.command_list) as f:
        commands = f.readlines()
    commands = [x.strip() for x in commands]

    # Fill the pool with one process for each command in the list. Each one
    # of them will execute the experiment subroutine with the specified
    # command and will log its results upon completion.
    for command in commands:
        pool.apply_async(
            run_experiment, args=(command,), callback=log_experiment
        )

        log.info("Experiment process sent to pool for execution...")

    log.info("")
    log.info("***************************************************************")
    log.info("Launching experiments")
    log.info("***************************************************************")

    # Signal the processes to begin execution in the pool.
    event.set()

    # Wait for all processes to finish.
    pool.close()
    pool.join()

run_experiment(command)

Run experiment command.

This is the main routine for running a particular experiment. It runs the provided experiment command (a Python call to the experiment script with a set of CLI arguments) and captures all the output of the process.

Parameters:

Name Type Description Default
command str

Full command to execute the experiment.

required

Returns:

Type Description
Tuple[Path, str]

The experiment command and the convoluted output of the process.

Source code in utilities/experiment_helpers/experiment_launcher.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def run_experiment(command: str) -> typing.Tuple[pathlib.Path, str]:
    """
    Run experiment command.

    This is the main routine for running a particular experiment. It runs the
    provided experiment command (a Python call to the experiment script with a
    set of CLI arguments) and captures all the output of the process.

    Args:
        command (str): Full command to execute the experiment.

    Returns:
        (Tuple[pathlib.Path, str]): The experiment command and the convoluted output of the process.
    """

    # Acquire the lock and block any other process from executing
    # for two seconds. We do this in order not to launch two processes
    # at the exact same second so that their output folders (which are
    # named automatically MMDD_HHMMSS) are not overwritten.
    starting.acquire()
    threading.Timer(2, starting.release).start()

    # Once the process has released the lock for another process to wait
    # it can proceed with the execution of the experiment.
    log.info("Launching experiment {}".format(command))

    process_output = subprocess.check_output(
        command, stderr=subprocess.STDOUT, shell=True
    )

    log.info("Experiment finished...")

    return command, process_output.decode("utf-8")

setup_process_pool(event, lock)

Set up the process pool for multiprocessing with a global pause/resume event.

Parameters:

Name Type Description Default
event Event

Reference to a master process event that will signal the child processes to pause or resume execution.

required
lock lock

A reference to a master process lock that will coordinate the child process launching with waiting times.

required
Source code in utilities/experiment_helpers/experiment_launcher.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def setup_process_pool(event: mp.Event, lock: mp.Lock) -> None:
    """
    Set up the process pool for multiprocessing with a global pause/resume event.

    Args:
        event (mp.Event): Reference to a master process event that will signal the child
            processes to pause or resume execution.
        lock (mp.lock): A reference to a master process lock that will coordinate the
            child process launching with waiting times.
    """

    global unpaused
    unpaused = event

    global starting
    starting = lock

utilities.experiment_helpers.parameter_set_generator

Parameter-set generator module.

This module contains the methods necessary to produce a sweep of the simulation parameters, used in the parameter_sweeper.py and the run_simulation_set.py script.

If the --sampling_type argument is set to "grid", we require the following for each tunable parameter:

--argument low high count

This expands the parameter to a linspace between [low, high] with a "count" number of steps.

If the --sampling_type argument is set to "random", we require the following for each tunable parameter:

--argument low high

This expands the parameter to a list of values between [low, high] drawn from a uniform distribution. In this case, the number of values to be drawn for each parameter is specified by the argument --sampling_size.

Both expansion types are evaluated for each specified argument. Subsequently, a generator produces all possible parameter combinations if in "grid" mode or sets of random parameter values if in "random" mode.

Authors:

Michele Ronchi (ronchi@ice.csic.es)

check_expand_args(args_dict)

Check if the parsed input arguments are coherent and have the correct shape. If in grid mode: expand each simulation parameter in linear space in the specified ranges. If in random mode: draw random set of parameter values from uniform distributions in the specified ranges.

Parameters:

Name Type Description Default
args_dict dict

Dictionary of the parsed argument via CLI.

required

Returns:

Type Description
Tuple[list, list]

A list containing the expanded ranges of the parameters and a list containing the names of the expanded parameters.

Source code in utilities/experiment_helpers/parameter_set_generator.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def check_expand_args(args_dict: dict) -> Tuple[list, list]:
    """
    Check if the parsed input arguments are coherent and have the correct shape.
    If in grid mode: expand each simulation parameter in linear space in the specified ranges.
    If in random mode: draw random set of parameter values from uniform distributions in the specified ranges.

    Args:
        args_dict (dict): Dictionary of the parsed argument via CLI.

    Returns:
        (Tuple[list, list]): A list containing the expanded ranges of the parameters and a list containing the names of the
            expanded parameters.
    """

    # Check if the provided parameters are compatible with the simulation configuration file.
    check_parameter_compatibility(args_dict)

    cli_args: list = []
    cli_str: list = []

    path_to_software = cfg["path_to_software"]
    # Open the parameter dictionary to load requirements.
    config_sweeper_path = pathlib.Path().joinpath(
        path_to_software,
        "utilities/experiment_helpers/config_sweeper.json",
    )
    f = open(config_sweeper_path)
    check_arg = json.load(f)

    required_parameters = []
    forbidden_parameters = []

    var_names = []
    var_expanded_ranges = []

    for arg in args_dict.keys():
        log.info(arg)
        value = args_dict[arg]
        log.info(value)

        if value is None:
            if arg == "sampling_size":
                if args_dict["sampling_type"] == "random":
                    raise ValueError(
                        "In random mode you have to specify the parameter sampling_size."
                    )
            else:
                continue

        elif type(value) is str:
            # If the value of this parameter is a string, this can be either
            # the directory path where the multirun output is saved, the directory path where
            # a dynamical database is saved or a selection parameter.
            # In the latter case we have to: (a) check whether the selection is valid,
            # (b) capture the list of required parameters, and (c) gather the forbidden ones
            # (i.e., those that belong to other types of selections).
            if arg == "save_dir":
                cli_args.append("--save_dir")
                cli_str.append(value)
            elif arg == "dyn_data":
                cli_args.append("--dyn_data")
                cli_str.append(value)
            elif value in check_arg[arg]:
                cli_args.append(arg)
                cli_str.append(value)
                required_parameters.extend(check_arg[arg][value])
                forbidden_parameters.extend(
                    [
                        item
                        for sublist in [
                            v for k, v in check_arg[arg].items() if k != value
                        ]
                        for item in sublist
                    ]
                )
            else:
                # If the value for an argument is not in the dictionary of
                # possible values, we throw an exception.
                raise ValueError(
                    f"The value {value} is not feasible for parameter {arg}"
                )

        elif type(value) is list:
            # If the value is a list, we assume it will be a specification of three values
            # if sampling_type = grid or two values if sampling_type = random.
            # We then expand the parameter accordingly.
            var_range = expand_parameter(args_dict, arg, value)
            var_expanded_ranges.append(list(var_range))
            var_names.append(arg)

    log.info(f"Required parameters {required_parameters}")
    log.info(f"Forbidden parameters {forbidden_parameters}")

    # Check if all the required parameters are specified.
    for p in required_parameters:
        if p not in args_dict.keys() or args_dict[p] is None:
            raise ValueError(f"Required parameter {p} not present.")

    # Check if none of the incompatible parameters are required.
    for p in forbidden_parameters:
        if p in args_dict.keys() and args_dict[p] is not None:
            raise ValueError(f"Forbidden parameter {p} is present")

    return var_names, var_expanded_ranges

check_parameter_compatibility(args_dict)

Check if the parsed input arguments are coherent with the ones provided in the configuration file of the simulator.

Parameters:

Name Type Description Default
args_dict dict

Dictionary of the parsed argument via CLI.

required
Source code in utilities/experiment_helpers/parameter_set_generator.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def check_parameter_compatibility(args_dict: dict) -> None:
    """
    Check if the parsed input arguments are coherent with the ones provided in the configuration file of the simulator.

    Args:
        args_dict (dict): Dictionary of the parsed argument via CLI.
    """

    if (
        (cfg["kick_model"] == "km_maxwell") and (args_dict["vk_c"] is not None)
    ) or (
        (cfg["kick_model"] == "km_exp") and (args_dict["sigma_k"] is not None)
    ):
        raise ValueError(
            "The provided kick-velocity distribution parameter is not compatible "
            "with the model {} in the configuration file.".format(
                cfg["kick_model"]
            )
        )
    elif (
        (cfg["spin_period_model"] == "normal")
        and (
            (args_dict["P_initial_log10_mean"] is not None)
            or (args_dict["P_initial_log10_sigma"] is not None)
        )
    ) or (
        (cfg["spin_period_model"] == "log-normal")
        and (
            (args_dict["P_initial_mean"] is not None)
            or (args_dict["P_initial_sigma"] is not None)
        )
    ):
        raise ValueError(
            "The provided spin-period distribution parameters are not compatible with the model {} "
            "in the configuration file.".format(cfg["spin_period_model"])
        )
    elif (
        (
            (cfg["magnetic_field_model"] == "log-normal")
            and (
                (args_dict["B_initial_log10_mean_comp1"] is not None)
                or (args_dict["B_initial_log10_sigma_comp1"] is not None)
                or (args_dict["B_initial_log10_mean_comp2"] is not None)
                or (args_dict["B_initial_log10_sigma_comp2"] is not None)
                or (args_dict["B_initial_log10_weight_comp1"] is not None)
                or (args_dict["B_initial_log10_rise_mean"] is not None)
                or (args_dict["B_initial_log10_rise_sigma"] is not None)
                or (args_dict["B_initial_log10_decay_mean"] is not None)
                or (args_dict["B_initial_log10_decay_sigma"] is not None)
                or (args_dict["B_initial_log10_slope"] is not None)
            )
        )
        or (
            (cfg["magnetic_field_model"] == "double_log-normal")
            and (
                (args_dict["B_initial_log10_mean"] is not None)
                or (args_dict["B_initial_log10_sigma"] is not None)
                or (args_dict["B_initial_log10_rise_mean"] is not None)
                or (args_dict["B_initial_log10_rise_sigma"] is not None)
                or (args_dict["B_initial_log10_decay_mean"] is not None)
                or (args_dict["B_initial_log10_decay_sigma"] is not None)
                or (args_dict["B_initial_log10_slope"] is not None)
            )
        )
        or (
            (cfg["magnetic_field_model"] == "smooth_top-hat")
            and (
                (args_dict["B_initial_log10_mean"] is not None)
                or (args_dict["B_initial_log10_sigma"] is not None)
                or (args_dict["B_initial_log10_mean_comp1"] is not None)
                or (args_dict["B_initial_log10_sigma_comp1"] is not None)
                or (args_dict["B_initial_log10_mean_comp2"] is not None)
                or (args_dict["B_initial_log10_sigma_comp2"] is not None)
                or (args_dict["B_initial_log10_weight_comp1"] is not None)
            )
        )
    ):
        raise ValueError(
            "The provided magnetic-field distribution parameters are not compatible with the model {} "
            "in the configuration file.".format(cfg["magnetic_field_model"])
        )
    else:
        log.info(
            "The provided parameters are compatible with the configuration file."
        )

expand_parameter(args_dict, parameter_name, range_values)

Expand the simulation parameters. If in grid mode: expand each simulation parameter in linear space in the specified ranges. If in random mode: draw random set of parameter values from uniform distributions in the specified ranges.

Parameters:

Name Type Description Default
args_dict dict

Dictionary of the parsed argument via CLI.

required
parameter_name str

Name of the parameter to expand.

required
range_values list

Range of values where to expand the parameter.

required

Returns:

Type Description
ndarray

A list containing the expanded range of the parameter.

Source code in utilities/experiment_helpers/parameter_set_generator.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def expand_parameter(
    args_dict: dict, parameter_name: str, range_values: list
) -> np.ndarray:
    """
    Expand the simulation parameters.
    If in grid mode: expand each simulation parameter in linear space in the specified ranges.
    If in random mode: draw random set of parameter values from uniform distributions in the specified ranges.

    Args:
        args_dict (dict): Dictionary of the parsed argument via CLI.
        parameter_name (str): Name of the parameter to expand.
        range_values (list): Range of values where to expand the parameter.

    Returns:
        (np.ndarray): A list containing the expanded range of the parameter.
    """

    expanded_parameter = []

    if args_dict["sampling_type"] == "grid":
        # The three values [low, high, steps] are used to expand each of the arguments
        # with linear spacing in the range [low, high] with a number of specified steps.
        if len(range_values) != 3:
            raise ValueError(
                f"In grid mode the list must have length 3 for parameter {parameter_name}"
            )
        expanded_parameter = np.linspace(
            range_values[0], range_values[1], int(range_values[2])
        )

    elif args_dict["sampling_type"] == "random":
        # The two values [low, high] define the range from which a number of values
        # (specified according to the sampling_size argument) is drawn from a uniform distribution.
        if len(range_values) != 2:
            raise ValueError(
                f"In random mode the list must have length 2 for parameter {parameter_name}"
            )
        expanded_parameter = np.random.uniform(
            range_values[0], range_values[1], int(args_dict["sampling_size"])
        )

    return expanded_parameter

utilities.experiment_helpers.parameter_sweeper

Parameter-sweeper script.

This script generates the files necessary to launch multiple simulations with different parameter values using HTCondor at the PIC. It uses methods from the module parameter_set_generator.py.

If the --sampling_type argument is set to "grid", we require the following for each tunable parameter:

--argument low high count

This expands the parameter to a linspace between [low, high] with a "count" number of steps.

If the --sampling_type argument is set to "random", we require the following for each tunable parameter:

--argument low high

This expands the parameter to a list of values between [low, high] drawn from a uniform distribution. In this case, the number of values to be drawn for each parameter is specified by the argument --sampling_size.

Both expansion types are evaluated for each specified argument. Subsequently, a generator produces all possible parameter combinations if in "grid" mode or sets of random parameter values if in "random" mode. Each set will be saved in a JSON "parameter_override" file that will be used as input to a simulation.

Display help message to run the code:

python parameter_sweeper.py --help

Displays all the relevant arguments that can be used.

Authors:

Michele Ronchi (ronchi@ice.csic.es)

main(args)

Generate parameter sets for running simulations based on provided arguments.

This function takes command-line arguments, parses them, and generates parameter sets for running simulations. It supports two types of sampling: grid and random. The arguments to run the simulations are saved in a text file, and override JSON files are created for each simulation containing the corresponding generated parameter sets.

Parameters:

Name Type Description Default
args Namespace

An argparse.Namespace object containing the following attributes:

  • save_dir (str): Path to the directory where the multi-run output will be saved.
  • sampling_type (str): Type of sampling for the parameter space, either 'grid' or 'random'.
  • sampling_size (int): Number of random values to draw for each simulation parameter (required only if sampling_type is 'random').
  • sigma_k (list[float]): Range and number of values for the kick velocity sigma parameter.
  • vk_c (list[float]): Range and number of values for the kick velocity vk_c parameter.
  • h_c (list[float]): Range and number of values for the scale height h_c parameter.
  • P_initial_mean (list[float]): Range and number of values for the mean initial spin period (if spin_period_model = normal).
  • P_initial_sigma (list[float]): Range and number of values for the dispersion of the initial spin period (if spin_period_model = normal).
  • P_initial_log10_mean (list[float]): Range and number of values for the log10 mean initial spin period (if spin_period_model = log-normal).
  • P_initial_log10_sigma (list[float]): Range and number of values for the log10 dispersion of the initial spin period (if spin_period_model = log-normal).
  • B_initial_log10_mean (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength (if magnetic_field_model = log-normal).
  • B_initial_log10_sigma (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength (if magnetic_field_model = log-normal).
  • B_initial_log10_mean_comp1 (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength of the first log-normal component (if magnetic_field_model = double_log-normal).
  • B_initial_log10_sigma_comp1 (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength of the first log-normal component (if magnetic_field_model = double_log-normal).
  • B_initial_log10_mean_comp2 (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength of the second log-normal component (if magnetic_field_model = double_log-normal).
  • B_initial_log10_sigma_comp2 (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength of the second log-normal component (if magnetic_field_model = double_log-normal).
  • B_initial_log10_weight_comp1 (list[float]): Range and number of values for the relative weight of the first log-normal component with respect to the full pdf (if magnetic_field_model = double_log-normal).
  • B_initial_log10_rise_mean (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength of the first log-normal component (if magnetic_field_model = smooth_tophat).
  • B_initial_log10_rise_sigma (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength of the first log-normal component (if magnetic_field_model = smooth_tophat).
  • B_initial_log10_decay_mean (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength of the second log-normal component (if magnetic_field_model = smooth_tophat).
  • B_initial_log10_decay_sigma (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength of the second log-normal component (if magnetic_field_model = smooth_tophat).
  • B_initial_log10_slope (list[float]): Range and number of values for the slope connecting the first log-normal component to the second one (if magnetic_field_model = smooth_tophat).
  • a_late (list[float]): Range and number of values for the power-law slope of the late time magnetic field evolution.
  • L_radio_log10_mean (list[float]): Range and number of values for the mean of the log10 radio luminosity normalization.
  • epsilon_L (list[float]): Range and number of values for the power-law index of the log10 radio luminosity.
required
Source code in utilities/experiment_helpers/parameter_sweeper.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def main(args):
    """
    Generate parameter sets for running simulations based on provided arguments.

    This function takes command-line arguments, parses them, and generates
    parameter sets for running simulations. It supports two types of sampling:
    grid and random. The arguments to run the simulations are saved in a text file, and
    override JSON files are created for each simulation containing the corresponding
    generated parameter sets.

    Args:
        args (argparse.Namespace): An argparse.Namespace object containing the following attributes:

            - save_dir (str): Path to the directory where the multi-run output will be saved.
            - sampling_type (str): Type of sampling for the parameter space, either 'grid' or 'random'.
            - sampling_size (int): Number of random values to draw for each simulation parameter
            (required only if sampling_type is 'random').
            - sigma_k (list[float]): Range and number of values for the kick velocity sigma parameter.
            - vk_c (list[float]): Range and number of values for the kick velocity vk_c parameter.
            - h_c (list[float]): Range and number of values for the scale height h_c parameter.
            - P_initial_mean (list[float]): Range and number of values for the mean initial spin period (if
                spin_period_model = normal).
            - P_initial_sigma (list[float]): Range and number of values for the dispersion of the initial
                spin period (if spin_period_model = normal).
            - P_initial_log10_mean (list[float]): Range and number of values for the log10 mean initial
                spin period (if spin_period_model = log-normal).
            - P_initial_log10_sigma (list[float]): Range and number of values for the log10 dispersion
                of the initial spin period (if spin_period_model = log-normal).
            - B_initial_log10_mean (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength (if magnetic_field_model = log-normal).
            - B_initial_log10_sigma (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength (if magnetic_field_model = log-normal).
            - B_initial_log10_mean_comp1 (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength of the first log-normal component (if
                magnetic_field_model = double_log-normal).
            - B_initial_log10_sigma_comp1 (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength of the first log-normal component (if
                magnetic_field_model = double_log-normal).
            - B_initial_log10_mean_comp2 (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength of the second log-normal component (if
                magnetic_field_model = double_log-normal).
            - B_initial_log10_sigma_comp2 (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength of the second log-normal component (if
                magnetic_field_model = double_log-normal).
            - B_initial_log10_weight_comp1 (list[float]): Range and number of values for the relative weight of the first
                log-normal component with respect to the full pdf (if magnetic_field_model = double_log-normal).
            - B_initial_log10_rise_mean (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength of the first log-normal component (if
                magnetic_field_model = smooth_tophat).
            - B_initial_log10_rise_sigma (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength of the first log-normal component (if
                magnetic_field_model = smooth_tophat).
            - B_initial_log10_decay_mean (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength of the second log-normal component (if
                magnetic_field_model = smooth_tophat).
            - B_initial_log10_decay_sigma (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength of the second log-normal component (if
                magnetic_field_model = smooth_tophat).
            - B_initial_log10_slope (list[float]): Range and number of values for the slope connecting the first
                log-normal component to the second one (if magnetic_field_model = smooth_tophat).
            - a_late (list[float]): Range and number of values for the power-law slope of the late time
                magnetic field evolution.
            - L_radio_log10_mean (list[float]): Range and number of values for the mean of the log10
                radio luminosity normalization.
            - epsilon_L (list[float]): Range and number of values for the power-law index of the log10
                radio luminosity.
    """
    # Parse arguments provided to the parameter-sweeper script.
    log.info("Parsing arguments...")

    args_dict = vars(args)

    # Check and expand the parameters in the provided ranges.
    var_names, var_expanded_ranges = psg.check_expand_args(args_dict)

    if args_dict["sampling_type"] == "grid":
        # Create a generator of all possible combinations of parameters based on their expanded range lists.
        parameter_sets_gen = itertools.product(*var_expanded_ranges)

    elif args_dict["sampling_type"] == "random":
        # Create a generator of the random sets of parameters.
        list_var_expanded_ranges = np.array(var_expanded_ranges).T.tolist()
        parameter_sets_gen = list(map(tuple, list_var_expanded_ranges))

    else:
        raise ValueError(
            "The specified sampling type is not feasible, choose between grid or random."
        )

    # Save the input arguments to run each simulation in a file.
    log.info("Generating simulation parameter sets...")

    output_path = pathlib.Path(args.save_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    simulation_arguments_path = pathlib.Path().joinpath(
        output_path, "simulation_arguments.txt"
    )

    simulation_number: int = 0

    with open(simulation_arguments_path, "w") as f_sa:
        for s in parameter_sets_gen:
            log.info(f"Parameter set for simulation {simulation_number}:")
            log.info(s)

            # Generate output folders for the simulations.
            # Note that the numbering of the folders is limited to 6 digits here,
            # i.e., we can only generate simulations below 10 million.
            path_to_output = cfg["path_to_output"]
            output_path = pathlib.Path().joinpath(path_to_output, output_path)
            simulation_output_path = pathlib.Path().joinpath(
                output_path, f"{simulation_number:06}"
            )
            simulation_output_path.mkdir(parents=True, exist_ok=True)

            # Pack combination into a JSON override file and write it to the folder for a given simulation.
            simulation_override_json = {}
            for i in range(len(s)):
                simulation_override_json[var_names[i]] = s[i]

            simulation_output_path = pathlib.Path().joinpath(
                path_to_output, simulation_output_path
            )
            simulation_override_json_path = pathlib.Path().joinpath(
                simulation_output_path, "override.json"
            )

            with open(simulation_override_json_path, "w") as f:
                json.dump(simulation_override_json, f, indent=4)

            f_sa.write(
                f"{simulation_output_path} {simulation_override_json_path}"
                + "\n"
            )

            simulation_number += 1

    log.info("Generating parameter sets completed!")

utilities.experiment_helpers.run_simulation_set

Simulator helper script.

This script allows us to run the various simulator utilities in a multithreaded way.

If the --sampling_type argument is set to "grid", we require the following for each tunable parameter:

--argument low high count

This expands the parameter to a linspace between [low, high] with a "count" number of steps.

If the --sampling_type argument is set to "random", we require the following for each tunable parameter:

--argument low high

This expands the parameter to a list of values between [low, high] drawn from a uniform distribution. In this case, the number of values to be drawn for each parameter is specified by the argument --sampling_size.

Both expansion types are evaluated for each specified argument. Subsequently, a generator produces all possible parameter combinations if in "grid" mode or sets of random parameter values if in "random" mode. Each parameter combination will spawn a new process that enters a multithreaded pool for later execution, allowing the asynchronous simulation of many populations in parallel with a defined maximum number of threads.

NOTE: if an error occurs in one of the simulations, the script will not stop until all the processes will be terminated. The error will be only shown on the terminal in this case.

Display help message to run the code:

python run_simulation_set.py --help

Displays all the relevant arguments that can be used.

Authors:

Alberto Garcia Garcia (garciagarcia@ice.csic.es)
Michele Ronchi (ronchi@ice.csic.es)
Celsa Pardo Araujo (pardo@ice.csic.es)

log_error(e)

Log an exception raised during the simulation process.

Parameters:

Name Type Description Default
e Exception

The exception to log.

required
Source code in utilities/experiment_helpers/run_simulation_set.py
69
70
71
72
73
74
75
76
def log_error(e: Exception) -> None:
    """
    Log an exception raised during the simulation process.

    Args:
        e (Exception): The exception to log.
    """
    log.error("An error occurred during the simulation.", exc_info=e)

log_simulation(process_result)

Callback to log all the info returned from a simulation run.

Parameters:

Name Type Description Default
process_result Tuple[Path, str]

Tuple containing the process simulation command and the whole process output to console string.

required
Source code in utilities/experiment_helpers/run_simulation_set.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def log_simulation(process_result: typing.Tuple[pathlib.Path, str]) -> None:
    """
    Callback to log all the info returned from a simulation run.

    Args:
        process_result (Tuple[pathlib.Path, str]): Tuple containing the process simulation command and the
            whole process output to console string.
    """

    log.info("")
    log.info(
        "****************************************************************"
    )
    log.info(f"Ran simulation {process_result[0]}!")
    log.info(f"Process output:\n {process_result[1]}")
    log.info("Process finished...")

main(args)

Execute parameterized simulations using a multiprocessing pool.

This function initializes a multiprocessing pool to run simulations based on a set of parameters defined by the user. It parses the command-line arguments, expands parameter ranges for sampling, queues the simulations, and manages their execution in parallel. It also generates JSON files for each simulation that contain the parameters used for that run.

Parameters:

Name Type Description Default
args Namespace

Command-line arguments parsed by argparse. Required parameters include:

  • simulator_type (str): The name of the simulator script to run. Options include 'simulate_population_full', 'simulate_population_dyn', or 'simulate_population_magrot_det'.
  • dyn_data (str): (Optional) Path to the dynamically evolved population database, required if using 'simulate_population_magrot_det'.
  • save_dir (str): Directory path where the output of the simulations will be saved.
  • sampling_type (str): Method for sampling the parameter space. Choose between 'grid' and 'random'.
  • sampling_size (int): Number of random values to draw for each simulation parameter (required if sampling_type is 'random').
  • processes (int): Number of simultaneous processes for the multiprocessing pool (default is 1).
  • sigma_k (list[float]): Range and number of values for the kick velocity sigma parameter.
  • vk_c (list[float]): Range and number of values for the kick velocity vk_c parameter.
  • h_c (list[float]): Range and number of values for the scale height h_c parameter.
  • P_initial_mean (list[float]): Range and number of values for the mean initial spin period (if spin_period_model = normal).
  • P_initial_sigma (list[float]): Range and number of values for the dispersion of the initial spin period (if spin_period_model = normal).
  • P_initial_log10_mean (list[float]): Range and number of values for the log10 mean initial spin period (if spin_period_model = log-normal).
  • P_initial_log10_sigma (list[float]): Range and number of values for the log10 dispersion of the initial spin period (if spin_period_model = log-normal).
  • B_initial_log10_mean (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength (if magnetic_field_model = log-normal).
  • B_initial_log10_sigma (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength (if magnetic_field_model = log-normal).
  • B_initial_log10_mean_comp1 (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength of the first log-normal component (if magnetic_field_model = double_log-normal).
  • B_initial_log10_sigma_comp1 (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength of the first log-normal component (if magnetic_field_model = double_log-normal).
  • B_initial_log10_mean_comp2 (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength of the second log-normal component (if magnetic_field_model = double_log-normal).
  • B_initial_log10_sigma_comp2 (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength of the second log-normal component (if magnetic_field_model = double_log-normal).
  • B_initial_log10_weight_comp1 (list[float]): Range and number of values for the relative weight of the first log-normal component with respect to the full pdf (if magnetic_field_model = double_log-normal).
  • B_initial_log10_rise_mean (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength of the first log-normal component (if magnetic_field_model = smooth_tophat).
  • B_initial_log10_rise_sigma (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength of the first log-normal component (if magnetic_field_model = smooth_tophat).
  • B_initial_log10_decay_mean (list[float]): Range and number of values for the mean of the log10 initial magnetic field strength of the second log-normal component (if magnetic_field_model = smooth_tophat).
  • B_initial_log10_decay_sigma (list[float]): Range and number of values for the dispersion of the log10 initial magnetic field strength of the second log-normal component (if magnetic_field_model = smooth_tophat).
  • B_initial_log10_slope (list[float]): Range and number of values for the slope connecting the first log-normal component to the second one (if magnetic_field_model = smooth_tophat).
  • a_late (list[float]): Range and number of values for the power-law slope of the late time magnetic field evolution.
  • L_radio_log10_mean (list[float]): Range and number of values for the mean of the log10 radio luminosity normalization.
  • epsilon_L (list[float]): Range and number of values for the power-law index of the log10 radio luminosity.
required
Source code in utilities/experiment_helpers/run_simulation_set.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def main(args) -> None:
    """
    Execute parameterized simulations using a multiprocessing pool.

    This function initializes a multiprocessing pool to run simulations based on a set of parameters
    defined by the user. It parses the command-line arguments, expands parameter ranges for sampling,
    queues the simulations, and manages their execution in parallel. It also generates JSON files for
    each simulation that contain the parameters used for that run.

    Args:
        args (argparse.Namespace): Command-line arguments parsed by argparse. Required parameters include:

            - simulator_type (str): The name of the simulator script to run. Options include
              'simulate_population_full', 'simulate_population_dyn', or 'simulate_population_magrot_det'.
            - dyn_data (str): (Optional) Path to the dynamically evolved population database, required
              if using 'simulate_population_magrot_det'.
            - save_dir (str): Directory path where the output of the simulations will be saved.
            - sampling_type (str): Method for sampling the parameter space. Choose between 'grid' and 'random'.
            - sampling_size (int): Number of random values to draw for each simulation parameter (required
              if sampling_type is 'random').
            - processes (int): Number of simultaneous processes for the multiprocessing pool (default is 1).
            - sigma_k (list[float]): Range and number of values for the kick velocity sigma parameter.
            - vk_c (list[float]): Range and number of values for the kick velocity vk_c parameter.
            - h_c (list[float]): Range and number of values for the scale height h_c parameter.
            - P_initial_mean (list[float]): Range and number of values for the mean initial spin period (if
                spin_period_model = normal).
            - P_initial_sigma (list[float]): Range and number of values for the dispersion of the initial
                spin period (if spin_period_model = normal).
            - P_initial_log10_mean (list[float]): Range and number of values for the log10 mean initial
                spin period (if spin_period_model = log-normal).
            - P_initial_log10_sigma (list[float]): Range and number of values for the log10 dispersion
                of the initial spin period (if spin_period_model = log-normal).
            - B_initial_log10_mean (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength (if magnetic_field_model = log-normal).
            - B_initial_log10_sigma (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength (if magnetic_field_model = log-normal).
            - B_initial_log10_mean_comp1 (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength of the first log-normal component (if
                magnetic_field_model = double_log-normal).
            - B_initial_log10_sigma_comp1 (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength of the first log-normal component (if
                magnetic_field_model = double_log-normal).
            - B_initial_log10_mean_comp2 (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength of the second log-normal component (if
                magnetic_field_model = double_log-normal).
            - B_initial_log10_sigma_comp2 (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength of the second log-normal component (if
                magnetic_field_model = double_log-normal).
            - B_initial_log10_weight_comp1 (list[float]): Range and number of values for the relative weight of the first
                log-normal component with respect to the full pdf (if magnetic_field_model = double_log-normal).
            - B_initial_log10_rise_mean (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength of the first log-normal component (if
                magnetic_field_model = smooth_tophat).
            - B_initial_log10_rise_sigma (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength of the first log-normal component (if
                magnetic_field_model = smooth_tophat).
            - B_initial_log10_decay_mean (list[float]): Range and number of values for the mean of the log10
                initial magnetic field strength of the second log-normal component (if
                magnetic_field_model = smooth_tophat).
            - B_initial_log10_decay_sigma (list[float]): Range and number of values for the dispersion of the
                log10 initial magnetic field strength of the second log-normal component (if
                magnetic_field_model = smooth_tophat).
            - B_initial_log10_slope (list[float]): Range and number of values for the slope connecting the first
                log-normal component to the second one (if magnetic_field_model = smooth_tophat).
            - a_late (list[float]): Range and number of values for the power-law slope of the late time
                magnetic field evolution.
            - L_radio_log10_mean (list[float]): Range and number of values for the mean of the log10
                radio luminosity normalization.
            - epsilon_L (list[float]): Range and number of values for the power-law index of the log10
                radio luminosity.
    """
    # Event on the master process that will be used to synchronize the child
    # processes and signal them for execution in the pool.
    event = mp.Event()

    # Lock on the master process to impose a delay in the process execution
    # so that none of them can be launched exactly at the same time.
    lock = mp.Lock()

    # A pool of processes with a defined capacity, a process spawning setup
    # routine and a general event to signal process execution.
    log.info(f"Initializing pool with {args.processes} processes...")

    pool = mp.Pool(
        args.processes,
        setup_process_pool,
        (
            event,
            lock,
        ),
    )

    # Parse arguments provided to the simulation helper script.
    log.info("Parsing arguments...")

    args_dict = vars(args)

    # Check and expand the parameters in the provided ranges.
    var_names, var_expanded_ranges = psg.check_expand_args(args_dict)

    if args_dict["sampling_type"] == "grid":
        # Create a generator of all the possible combinations of parameters based on their expanded range lists.
        parameter_sets_gen = itertools.product(*var_expanded_ranges)

    elif args_dict["sampling_type"] == "random":
        # Create a generator of the random sets of parameters.
        var_expanded_ranges = np.array(var_expanded_ranges).T.tolist()
        parameter_sets_gen = list(map(tuple, var_expanded_ranges))

    else:
        raise ValueError(
            "The specified sampling type is not feasible, choose between grid or random."
        )

    # Set the simulation type and the path to the dynamical database if required.
    simulator_type = args_dict["simulator_type"]
    dyn_data_path = ""
    if simulator_type == "simulate_population_magrot_det":
        dyn_data_path = args_dict["dyn_data"]

    # Queue each set of parameter as a different simulation in the pool.
    log.info("Queuing simulations...")

    simulation_number: int = 0
    for s in parameter_sets_gen:
        log.info("Queuing simulation: ")
        log.info(s)

        # Generate output folder for the simulation.
        # Note that the numbering of the folders is limited to 6 digits here,
        # i.e., we can only generate simulations below 10 million.
        simulation_output_path = pathlib.Path().joinpath(
            args.save_dir, f"{simulation_number:06}"
        )
        simulation_output_path.mkdir(parents=True, exist_ok=True)

        # Save the set of parameter values into a JSON override file and write it to the folder for a given simulation.
        simulation_override_json = {}
        for i in range(len(s)):
            simulation_override_json[var_names[i]] = s[i]

        simulation_override_json_path = pathlib.Path().joinpath(
            simulation_output_path, "override.json"
        )

        with open(simulation_override_json_path, "w") as f:
            json.dump(simulation_override_json, f, indent=4, sort_keys=True)

        # Generate list for the command which consists of the python interpreter,
        # the script path and the path for the JSON override.
        server_path = cfg["path_to_software"]
        cmd: str = (
            f"python {server_path}/mlpoppyns/simulator/{simulator_type}.py"
        )
        cmd += f" --save_dir {simulation_output_path}"
        cmd += f" --parameter_override {simulation_override_json_path}"
        if simulator_type == "simulate_population_magrot_det":
            cmd += f" --dyn_data {dyn_data_path}"

        pool.apply_async(
            run_simulation,
            args=(cmd,),
            callback=log_simulation,
            error_callback=log_error,
        )

        simulation_number += 1

    log.info("")
    log.info("***************************************************************")
    log.info("Launching simulations")
    log.info("***************************************************************")

    # Signal the processes to begin execution in the pool.
    event.set()

    # Wait for all processes to finish.
    pool.close()
    pool.join()

robust_run_simulation_dask(*args, max_attempts=3, delay=5, **kwargs)

This function wraps around the run_simulation_dask function, adding retry logic to handle transient issues (e.g., connection problems). If an error occurs during the run_simulation_dask function, it will retry the operation a specified number of times with a delay between each attempt.

Parameters:

Name Type Description Default
args Any

Variable length argument list.

()
max_attempts int

Maximum number of retry attempts. Default is 3.

3
delay int

Delay between retry attempts in seconds. Default is 5.

5
kwargs Any

Arbitrary keyword arguments.

{}
Source code in utilities/experiment_helpers/run_simulation_set.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def robust_run_simulation_dask(
    *args: Any, max_attempts: int = 3, delay: int = 5, **kwargs: Any
) -> None:
    """
    This function wraps around the run_simulation_dask function, adding retry logic to handle transient issues
    (e.g., connection problems). If an error occurs during the run_simulation_dask function, it will retry the operation
    a specified number of times with a delay between each attempt.

    Args:
        args (Any): Variable length argument list.
        max_attempts (int, optional): Maximum number of retry attempts. Default is 3.
        delay (int, optional): Delay between retry attempts in seconds. Default is 5.
        kwargs (Any): Arbitrary keyword arguments.
    """
    attempts = 0
    while attempts < max_attempts:
        try:
            run_simulation_dask(*args, **kwargs)
            # If run_simulation_dask finishes successfully, exit function.
            return
        except Exception as e:
            # If an error occurs during the run_simulation_dask function, log the error message, including the current
            # time and the machine name.
            current_time = time.strftime("%Y-%m-%d %H:%M:%S")
            machine_name = platform.node()
            log.error(
                f"Attempt {attempts + 1} failed with error at {current_time} on {machine_name}: {e}"
            )
            # Wait for the seconds defined in the delay variable before retrying.
            time.sleep(delay)
            attempts += 1
            if attempts == max_attempts:
                # If the number of attempts reaches the maximum number raise an exception.
                log.error("Maximum retry attempts reached, failing task.")
                raise

run_simulation(command)

Run simulation command.

This is the main routine for running a particular simulation. It runs the provided simulation command (a Python call to the simulation script with a set of CLI arguments) and captures all the output of the process.

Parameters:

Name Type Description Default
command str

Full command to execute the simulation.

required

Returns:

Type Description
Tuple[Path, str]

The simulation command and the output of the process.

Source code in utilities/experiment_helpers/run_simulation_set.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def run_simulation(command: str) -> typing.Tuple[pathlib.Path, str]:
    """
    Run simulation command.

    This is the main routine for running a particular simulation. It runs the
    provided simulation command (a Python call to the simulation script with a
    set of CLI arguments) and captures all the output of the process.

    Args:
        command (str): Full command to execute the simulation.

    Returns:
        (Tuple[pathlib.Path, str]): The simulation command and the output of the process.
    """

    # Acquire the lock and block any other process from executing for two seconds.
    starting.acquire()
    threading.Timer(1, starting.release).start()

    # Once the process has released the lock for another process
    # it can proceed with the execution of the experiment.
    log.info(f"Launching simulation {command}")
    try:
        process_output = subprocess.check_output(
            command, stderr=subprocess.STDOUT, shell=True
        )

        log.info("Experiment finished...")
        return command, process_output.decode("utf-8")

    except subprocess.CalledProcessError as e:
        log.error(f"Simulation failed with error code {e.returncode}")
        log.error(e.output.decode("utf-8"))
        return command, e.output.decode("utf-8")

run_simulation_dask(args, simulator_type, simulation_output_path, simulation_override_json, dyn_data_path)

Run the simulation command, copying the output folder to the node before execution and back to the original location afterward to prevent overload at PIC. Unlike the run_simulation function below, this function does not capture all the terminal output of the process. This function is necessary for running train_tsnpe.py using Dask and HTCondor.

Parameters:

Name Type Description Default
args Namespace

Arguments required for the simulation, including output directory, parameter overrides, and optional dynamic data path.

required
simulator_type str

The type of simulator to use, determining the specific simulation script to run.

required
simulation_output_path str

Path to the simulation output folder.

required
simulation_override_json dict

Dictionary with the parameter values for the override.json file.

required
dyn_data_path str

Dynamical database path.

required
Source code in utilities/experiment_helpers/run_simulation_set.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def run_simulation_dask(
    args: argparse.Namespace,
    simulator_type: str,
    simulation_output_path: str,
    simulation_override_json: dict,
    dyn_data_path: str,
) -> None:
    """
    Run the simulation command, copying the output folder to the node before execution and back to the original location
    afterward to prevent overload at PIC. Unlike the run_simulation function below, this function does not capture all the
    terminal output of the process. This function is necessary for running `train_tsnpe.py` using Dask and HTCondor.

    Args:
        args (argparse.Namespace): Arguments required for the simulation, including output directory, parameter overrides,
            and optional dynamic data path.
        simulator_type (str): The type of simulator to use, determining the specific simulation script to run.
        simulation_output_path (str): Path to the simulation output folder.
        simulation_override_json (dict): Dictionary with the parameter values for the override.json file.
        dyn_data_path (str): Dynamical database path.
    """

    # Copy the dynamical database and the repository to the node if it is not already there.
    # This action prevents overloading the PIC with too many calls.
    try:
        if not os.path.exists(os.path.basename(dyn_data_path)):
            safe_copytree(
                dyn_data_path,
                os.path.basename(dyn_data_path),
            )
        if not os.path.exists("ML-Poppyns"):
            safe_copytree(
                "/data/magnesia/software/ML-Poppyns",
                "ML-Poppyns",
            )
        # Generate the output folder with the parameter_override.json file in each node.
        output_dir_path = pathlib.Path(args.save_dir)
        output_dir_path.mkdir(parents=True, exist_ok=True)

        with open(args.parameter_override, "w") as f:
            json.dump(simulation_override_json, f, indent=4)

        # Call either the simulate_population_magrot or simulate_population_dyn module depending on the case.
        if simulator_type == "simulate_population_magrot_det":
            magrot.simulate_population(args)
        else:
            dyn.simulate_population(args)

        # Copy the output folder back to the original location.
        safe_copytree(output_dir_path, simulation_output_path)
        # Remove the folder to prevent issues with overwriting.
        shutil.rmtree(output_dir_path)

        log.info(
            f"Copied output folder back to original location: {simulation_output_path}"
        )

    except subprocess.CalledProcessError as e:
        # Log any errors raised during the simulation.
        log.error(f"Error executing simulation with args: {args}")
        log.error(f"Error details: {str(e)}")
        raise

    log.info("Simulation finished")

safe_copytree(src, dst, max_attempts=3, delay=5)

Safely copy a directory tree with retries. This function attempts to copy a directory tree from the source path to the destination path. If the copy operation fails (e.g., due to connection issues), it will retry the operation a specified number of times with a delay between each attempt.

Parameters:

Name Type Description Default
src str

Source directory path.

required
dst str

Destination directory path.

required
max_attempts int

Maximum number of retry attempts. Default is 3 retries.

3
delay int

Delay between retry attempts in seconds. Default is 5 seconds.

5
Source code in utilities/experiment_helpers/run_simulation_set.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def safe_copytree(
    src: str, dst: str, max_attempts: int = 3, delay: int = 5
) -> None:
    """
    Safely copy a directory tree with retries. This function attempts to copy a directory tree from the source path to
    the destination path. If the copy operation fails (e.g., due to connection issues), it will retry the operation
    a specified number of times with a delay between each attempt.

    Args:
        src (str): Source directory path.
        dst (str): Destination directory path.
        max_attempts (int): Maximum number of retry attempts. Default is 3 retries.
        delay (int): Delay between retry attempts in seconds. Default is 5 seconds.
    """
    attempts = 0

    while attempts < max_attempts:
        try:
            shutil.copytree(src, dst, dirs_exist_ok=True)
            # If copytree finishes successfully, exit function.
            return
        except Exception as e:
            # If an error occurs during the copy operation, log the error message,including the current time and the
            # machine name.
            current_time = time.strftime("%Y-%m-%d %H:%M:%S")
            machine_name = platform.node()
            log.error(
                f"Attempt {attempts + 1} failed with error at {current_time} on {machine_name}: {e}"
            )
            # Wait for the seconds defined in the delay variable before retrying.
            time.sleep(delay)
            attempts += 1
            if attempts == max_attempts:
                # If the number of attempts reaches the maximum number, raise an exception.
                log.error("Maximum retry attempts reached, failing task.")
                raise

setup_process_pool(event, lock)

Set up the process pool for multiprocessing with a global pause/resume event.

Parameters:

Name Type Description Default
event Event

Reference to a master process event that will signal the child processes to pause or resume execution.

required
lock Lock

A reference to a master process lock that will coordinate the child process launching with waiting times.

required
Source code in utilities/experiment_helpers/run_simulation_set.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def setup_process_pool(event: mp.Event, lock: mp.Lock) -> None:
    """
    Set up the process pool for multiprocessing with a global pause/resume event.

    Args:
        event (mp.Event): Reference to a master process event that will signal the child
            processes to pause or resume execution.
        lock (mp.Lock): A reference to a master process lock that will coordinate the
            child process launching with waiting times.
    """

    global unpaused
    unpaused = event

    global starting
    starting = lock

utilities.experiment_helpers.run_simulation_set_sbi

Simulator helper script.

This script allows us to run various simulator scripts in a multithreaded manner. Unlike the run_simulation_set.py script, which sample parameters randomly or on a grid, this script follows a prior distribution for parameter sampling. Note that this script can be run only when using a prior distribution from the sbi package that has the .sample() method available.

The number of values to be drawn for each parameter is specified by the argument --sampling_size.

Each parameter combination will spawn a new process when the simulator_multiprocess function is called. These processes enter a multithreaded pool for later execution, allowing the asynchronous simulation of many populations in parallel with a defined maximum number of threads. However, when the simulator_dask function is called, the multithreading is handled with Dask, enabling parallel execution of simulations across the Dask cluster in HTCondor.

NOTE: if an error occurs in one of the simulations, the script will not stop until all the processes have been terminated. The error will be only shown in the terminal in this case.

Display help message to run the code:

python run_simulation_set_sbi.py --help

Displays all the relevant arguments that can be used.

Authors:

Celsa Pardo Araujo (pardo@ice.csic.es)
Michele Ronchi (ronchi@ice.csic.es)

initialize_dask_cluster(logger, config)

Initialize a Dask cluster for distributed computing.

Parameters:

Name Type Description Default
logger Logger

Logger object.

required
config ConfigurationParser

Configuration object specifying dataset loading parameters.

required

Returns:

Type Description
HTCondorCluster

Initialized Dask cluster object.

Source code in utilities/experiment_helpers/run_simulation_set_sbi.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def initialize_dask_cluster(
    logger: Logger, config: configuration_parser.ConfigurationParser
) -> HTCondorCluster:
    """
    Initialize a Dask cluster for distributed computing.

    Args:
        logger (Logger): Logger object.
        config (ConfigurationParser): Configuration object specifying dataset loading parameters.

    Returns:
        (HTCondorCluster): Initialized Dask cluster object.
    """

    # Creating a folder to save the stdout and stderr of the terminal for each worker.
    htcondor_output_folder = f"{config.log_dir}/htcondor_output"
    pathlib.Path(htcondor_output_folder).mkdir(parents=True, exist_ok=True)
    logger.info(
        f"Saving the stdout and stderr of the terminal for each worker in {htcondor_output_folder}."
    )
    # Creating the cluster with Dask for HTCondor.
    extra = {
        "getenv": "True",
        "output": f"{htcondor_output_folder}/$(ClusterId)_$(ProcId)-out.txt",
        "error": f"{htcondor_output_folder}/$(ClusterId)_$(ProcId)-err.txt",
        "+flavour": '"long"',
    }

    # Specifying the computing requirements as needed for a single magneto-thermal simulation.
    # If nanny is set to True, each worker is started by a nanny process which can restart the worker if it fails.
    # We set 1 thread per worker to prevent system overload. The timeout duration to wait for a worker to start is set
    # to 60 seconds.

    cluster = HTCondorCluster(
        cores=1,
        memory="2 GB",
        disk="2 GB",
        job_extra_directives=extra,
        nanny=True,
        death_timeout="60s",
        worker_extra_args=["--nthreads", "1"],
    )

    # Scaling the cluster to the number of workers specified in the configuration file.
    num_workers_dask = config["workers_dask"]
    cluster.scale(num_workers_dask)

    # Wait for at least one worker to be ready.
    cluster.wait_for_workers(1)

    # Create Dask client connected to the cluster.
    client = Client(cluster)

    # Start the Dask dashboard for monitoring.
    logger.info(f"Dask client {client.dashboard_link}")

    return cluster

sample_without_nan(distribution, sampling_size, device, max_attempts=20)

Sample a distribution while removing NaN values from the sampled outputs. Stops after max_attempts if sufficient valid samples are not obtained.

Parameters:

Name Type Description Default
distribution Any

The distribution to sample from.

required
sampling_size int

The number of samples to draw from the distribution.

required
device device

Device used to run the script.

required
max_attempts int

The maximum number of attempts to sample (default is 20).

20

Returns:

Type Description
Tensor

torch.Tensor: A tensor of samples where all NaN values have been removed.

Source code in utilities/experiment_helpers/run_simulation_set_sbi.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def sample_without_nan(
    distribution: Any,
    sampling_size: int,
    device: torch.device,
    max_attempts: int = 20,
) -> torch.Tensor:
    """
    Sample a distribution while removing NaN values from the sampled outputs.
    Stops after max_attempts if sufficient valid samples are not obtained.

    Args:
        distribution (Any): The distribution to sample from.
        sampling_size (int): The number of samples to draw from the distribution.
        device (torch.device): Device used to run the script.
        max_attempts (int): The maximum number of attempts to sample (default is 20).

    Returns:
        torch.Tensor: A tensor of samples where all NaN values have been removed.
    """

    samples = []
    attempts = 0

    while len(samples) < sampling_size and attempts < max_attempts:
        remaining_samples = sampling_size - len(samples)

        new_samples = distribution.sample(
            (remaining_samples,), show_progress_bars=False
        )

        valid_samples = new_samples[
            ~torch.any(torch.isnan(new_samples), dim=1)
        ]

        samples.extend(valid_samples.tolist())

        attempts += 1

    if len(samples) < sampling_size:
        raise RuntimeError(
            f"Unable to obtain {sampling_size} valid samples after {max_attempts} attempts."
        )

    return torch.tensor(samples).to(device)

simulator_dask(args_dict, prior, dataset, device)

Execute simulations based on the provided prior distribution in a multithreaded manner using the Dask package.

Parameters:

Name Type Description Default
args_dict Dictionary

Dictionary with the arguments.

required
prior DirectPosterior

Prior distribution.

required
dataset DatasetMultichannelArray

Stores statistics and scaling information used in the prior distribution.

required
device device

Device used to run the script.

required
Source code in utilities/experiment_helpers/run_simulation_set_sbi.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def simulator_dask(
    args_dict: dict,
    prior: DirectPosterior,
    dataset: DatasetMultichannelArray,
    device: torch.device,
) -> None:
    """
    Execute simulations based on the provided prior distribution in a multithreaded manner using the Dask package.

    Args:
        args_dict (Dictionary): Dictionary with the arguments.
        prior (DirectPosterior): Prior distribution.
        dataset (DatasetMultichannelArray): Stores statistics and scaling information used in the prior distribution.
        device (torch.device): Device used to run the script.
    """
    # Create a list to hold delayed computations for each simulation.
    delayed_simulations = []

    # Parse arguments provided to the simulation helper script.
    log.info("Parsing arguments...")

    # Extracting the names of the parameters.
    var_names = dataset.target_names

    # Save the statistics for the filtered labels.
    par_max = torch.tensor(dataset.target_max).to(device)
    par_min = torch.tensor(dataset.target_min).to(device)
    par_std = torch.tensor(dataset.target_std).to(device)
    par_mean = torch.tensor(dataset.target_mean).to(device)

    # Create a generator of the random sets of parameters using the prior distribution.
    parameter_sets_gen_tensor = sample_without_nan(
        prior, args_dict["sampling_size"], device, max_attempts=20
    )

    # If the parameters were normalized or standardized, rescale quantities to their physical ranges.
    if dataset.normalize:
        parameter_sets_gen_tensor = (
            parameter_sets_gen_tensor * (par_max - par_min) + par_min
        )

    elif dataset.standardize:
        parameter_sets_gen_tensor = (
            parameter_sets_gen_tensor * par_std + par_mean
        )

    parameter_sets_gen = [
        tuple(subtensor.tolist()) for subtensor in parameter_sets_gen_tensor
    ]

    # Set the simulation type and the path to the dynamical database if required.
    simulator_type = args_dict["simulator_type"]
    dyn_data_path = ""

    if simulator_type == "simulate_population_magrot_det":
        dyn_data_path = args_dict["dyn_data"]

    # Queue each set of parameters as a different simulation in the pool.
    log.info("Queuing simulations...")

    simulation_number: int = 0

    # Create a delayed version of the 'run_simulation_dask' function using Dask that allows for lazy evaluation.
    # This enables parallel processing capabilities within Dask.
    run_simulation_delayed = dask.delayed(robust_run_simulation_dask)

    for s in parameter_sets_gen:
        log.info("Queuing simulation: ")
        log.info(s)

        # Setting output folder path for each simulation.
        # Note that the numbering of the folders is limited to 6 digits here,
        # i.e., we can only generate simulations below 10 million.
        folder_name = f"{simulation_number:06}"
        simulation_output_path_original = pathlib.Path().joinpath(
            args_dict["save_dir"], folder_name
        )
        # Save the set of parameter values into a dictionary.
        simulation_override_json = {}
        for i in range(len(s)):
            simulation_override_json[var_names[i]] = s[i]

        simulation_override_json_path = pathlib.Path().joinpath(
            folder_name, "override.json"
        )
        # Generate a list for the command (cmd), including the Python interpreter, the script path specified with
        # 'simulator_type', and the path for the JSON override.
        # Prepare arguments for the simulate_population function.
        simulation_args = argparse.Namespace(
            save_dir=folder_name,
            parameter_override=simulation_override_json_path,
            dyn_data=os.path.basename(dyn_data_path),
        )

        # Create delayed computation for each simulation. Each simulation will be attempted up to 5 times in case of an
        # error, with a 10-second wait between each attempt. This is done to avoid stopping the entire training process
        # if there is a connection issue with a worker.

        delayed_simulations.append(
            run_simulation_delayed(
                simulation_args,
                simulator_type,
                simulation_output_path_original,
                simulation_override_json,
                dyn_data_path,
                max_attempts=5,
                delay=10,
            )
        )

        simulation_number += 1

    log.info("")
    log.info("***************************************************************")
    log.info("Launching simulations")
    log.info("***************************************************************")

    # Compute the delayed computations, i.e., run the simulations in parallel with HTCondor.
    dask.compute(delayed_simulations)

simulator_multiprocess(args_dict, prior, dataset, device)

Execute simulations based on the provided prior distribution in a multithreaded manner using the package multiprocessing.

Parameters:

Name Type Description Default
args_dict Dictionary

Dictionary with the arguments.

required
prior DirectPosterior

Prior distribution.

required
dataset DatasetMultichannelArray

Stores statistics and scaling information used in the prior distribution.

required
device device

Device used to run the script.

required
Source code in utilities/experiment_helpers/run_simulation_set_sbi.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def simulator_multiprocess(
    args_dict: dict,
    prior: DirectPosterior,
    dataset: DatasetMultichannelArray,
    device: torch.device,
) -> None:
    """
    Execute simulations based on the provided prior distribution in a multithreaded manner using the package
    multiprocessing.

    Args:
        args_dict (Dictionary): Dictionary with the arguments.
        prior (DirectPosterior): Prior distribution.
        dataset (DatasetMultichannelArray):  Stores statistics and scaling information used in the prior distribution.
        device (torch.device): Device used to run the script.
    """
    # Event on the master process that will be used to synchronize the child
    # processes and signal them for execution in the pool.
    event = mp.Event()

    # Lock on the master process to impose a delay in the process execution
    # so that none of them can be launched exactly at the same time.
    lock = mp.Lock()

    # A pool of processes with a defined capacity, a process spawning setup
    # routine and a general event to signal process execution.
    nprocesses = args_dict["processes"]
    log.info(f"Initializing pool with {nprocesses} processes...")

    pool = mp.Pool(
        nprocesses,
        setup_process_pool,
        (
            event,
            lock,
        ),
    )

    # Parse arguments provided to the simulation helper script.
    log.info("Parsing arguments...")

    # Extracting the names of the parameters.
    var_names = dataset.target_names

    # Save the statistics for the filtered labels.
    par_max = torch.tensor(dataset.target_max).to(device)
    par_min = torch.tensor(dataset.target_min).to(device)
    par_std = torch.tensor(dataset.target_std).to(device)
    par_mean = torch.tensor(dataset.target_mean).to(device)

    # Create a generator of the random sets of parameters using the prior distribution.
    parameter_sets_gen_tensor = sample_without_nan(
        prior, args_dict["sampling_size"], device, max_attempts=20
    )

    # If the parameters were normalized or standardized, rescale quantities to their physical ranges.
    if dataset.normalize:
        parameter_sets_gen_tensor = (
            parameter_sets_gen_tensor * (par_max - par_min) + par_min
        )

    elif dataset.standardize:
        parameter_sets_gen_tensor = (
            parameter_sets_gen_tensor * par_std + par_mean
        )

    parameter_sets_gen = [
        tuple(subtensor.tolist()) for subtensor in parameter_sets_gen_tensor
    ]

    # Set the simulation type and the path to the dynamical database if required.
    simulator_type = args_dict["simulator_type"]
    dyn_data_path = ""
    if simulator_type == "simulate_population_magrot_det":
        dyn_data_path = args_dict["dyn_data"]

    # Queue each set of parameters as a different simulation in the pool.
    log.info("Queuing simulations...")

    simulation_number: int = 0
    for s in parameter_sets_gen:
        log.info("Queuing simulation: ")
        log.info(s)

        # Generate output folder for the simulation.
        # Note that the numbering of the folders is limited to 6 digits here,
        # i.e., we can only generate simulations below 10 million.
        simulation_output_path = pathlib.Path().joinpath(
            args_dict["save_dir"], f"{simulation_number:06}"
        )
        simulation_output_path.mkdir(parents=True, exist_ok=True)

        # Save the set of parameter values into a JSON override file and write it to the folder for a given simulation.
        simulation_override_json = {}
        for i in range(len(s)):
            simulation_override_json[var_names[i]] = s[i]

        simulation_override_json_path = pathlib.Path().joinpath(
            simulation_output_path, "override.json"
        )

        with open(simulation_override_json_path, "w") as f:
            json.dump(simulation_override_json, f, indent=4)

        # Generate a list for the command (cmd), including the Python interpreter, the script path specified with
        # 'simulator_type', and the path for the JSON override.
        server_path = cfg["path_to_software"]
        cmd: str = (
            f"python {server_path}/mlpoppyns/simulator/{simulator_type}.py"
        )
        cmd += f" --save_dir {simulation_output_path}"
        cmd += f" --parameter_override {simulation_override_json_path}"
        if simulator_type == "simulate_population_magrot_det":
            cmd += f" --dyn_data {dyn_data_path}"

        pool.apply_async(
            run_simulation,
            args=(cmd,),
            callback=log_simulation,
            error_callback=log.error,
        )

        simulation_number += 1

    log.info("")
    log.info("***************************************************************")
    log.info("Launching simulations")
    log.info("***************************************************************")

    # Signal the processes to begin execution in the pool.
    event.set()

    # Wait for all processes to finish.
    pool.close()
    pool.join()