Skip to content

Learning utilities

mlpoppyns.learning.utils.benchmark

Network benchmarking.

Utility functions for benchmarking model performance. For more details see here.

Authors:

Alberto Garcia Garcia (garciagarcia@ice.csic.es)

benchmark(model, device, input_dummy, output_dummy)

Measure median time for forward/backward passes of a model on a device.

Parameters:

Name Type Description Default
model module

Model to benchmark.

required
device device

Device in which the model will be executed.

required
input_dummy tensor

Dummy tensor for input purposes.

required
output_dummy tensor

Dummy tensor for output purposes.

required

Returns:

Type Description
Tuple[float, float]

A tuple that contains the median time spent in the forward pass and the backward pass, both in milliseconds.

Source code in mlpoppyns/learning/utils/benchmark.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def benchmark(
    model: torch.nn.Module,
    device: torch.device,
    input_dummy: torch.tensor,
    output_dummy: torch.tensor,
) -> typing.Tuple[float, float]:
    """
    Measure median time for forward/backward passes of a model on a device.

    Args:
        model (torch.module): Model to benchmark.
        device (torch.device): Device in which the model will be executed.
        input_dummy (torch.tensor): Dummy tensor for input purposes.
        output_dummy (torch.tensor): Dummy tensor for output purposes.

    Returns:
        (Tuple[float, float]): A tuple that contains the median time spent in the
            forward pass and the backward pass, both in milliseconds.
    """

    # Move dummies to the required device (CPU or GPU).
    input_dummy = input_dummy.to(device)
    output_dummy = output_dummy.to(device)

    # Dry runs.
    num_dry_runs = 5
    for i in range(num_dry_runs):
        _, _ = measure(model, device, input_dummy, output_dummy)

    # Benchmarking for a defined number of repetitions.
    num_repetitions = 1024
    t_forward = []
    t_backward = []

    for i in range(num_repetitions):
        t_fp, t_bp = measure(model, device, input_dummy, output_dummy)
        t_forward.append(t_fp)
        t_backward.append(t_bp)

    # Compute medians for forward and backward passes, convert to milliseconds.
    t_forward = np.median(np.asarray(t_forward) * 1e3)
    t_backward = np.median(np.asarray(t_backward) * 1e3)

    return t_forward, t_backward

measure(model, device, input_dummy, output_dummy)

Measure timing for one single forward and backward pass with the model and the specified device.

Parameters:

Name Type Description Default
model module

Model to benchmark.

required
device device

Device in which the model will be executed.

required
input_dummy tensor

Dummy tensor for input purposes.

required
output_dummy tensor

Dummy tensor for output purposes.

required

Returns:

Type Description
Tuple[float, float]

A tuple that contains the time spent in the forward pass and the runtime of the backward pass, both in seconds.

Source code in mlpoppyns/learning/utils/benchmark.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def measure(
    model: torch.nn.Module,
    device: torch.device,
    input_dummy: torch.tensor,
    output_dummy: torch.tensor,
) -> typing.Tuple[float, float]:
    """
    Measure timing for one single forward and backward pass with the model and
    the specified device.

    Args:
        model (torch.module): Model to benchmark.
        device (torch.device): Device in which the model will be executed.
        input_dummy (torch.tensor): Dummy tensor for input purposes.
        output_dummy (torch.tensor): Dummy tensor for output purposes.

    Returns:
        (Tuple[float, float]): A tuple that contains the time spent in the forward pass
            and the runtime of the backward pass, both in seconds.
    """

    # Synchronize gpu time and measure forward pass.
    if device.type == "cuda":
        torch.cuda.synchronize()

    t0 = time.time()
    y_pred = model(input_dummy)

    if device.type == "cuda":
        torch.cuda.synchronize()

    elapsed_forward = time.time() - t0

    # Zero gradients, synchronize time and measure backward pass.
    model.zero_grad()
    t0 = time.time()
    y_pred.backward(output_dummy)

    if device.type == "cuda":
        torch.cuda.synchronize()

    elapsed_backward = time.time() - t0

    return elapsed_forward, elapsed_backward

mlpoppyns.learning.utils.data_folder_struct_sbi

Create data folder structure for running sbi.

Authors:

Celsa Pardo Araujo (pardo@ice.csic.es)

create_structure(base, structure)

Recursively creates a fixed directory and file structure on the filesystem.

Parameters:

Name Type Description Default
base str

Base directory path where the folder structure will be created.

required
structure Dict[str, Optional[dict]]

Nested dictionary representing the folder/file structure.

required
Source code in mlpoppyns/learning/utils/data_folder_struct_sbi.py
14
15
16
17
18
19
20
21
22
23
24
25
def create_structure(base: str, structure: Dict[str, Optional[dict]]) -> None:
    """
    Recursively creates a fixed directory and file structure on the filesystem.

    Args:
        base (str): Base directory path where the folder structure will be created.
        structure (Dict[str, Optional[dict]]): Nested dictionary representing the folder/file structure.
    """
    for name, sub_structure in structure.items():
        path = os.path.join(base, name)
        os.makedirs(path, exist_ok=True)
        create_structure(path, sub_structure)

mlpoppyns.learning.utils.json_utils

JSON utils.

Authors:

Alberto Garcia Garcia (garciagarcia@ice.csic.es)

read_json(filename)

Read a specified JSON file and generate an ordered dictionary.

Parameters:

Name Type Description Default
filename str

File path to the JSON file.

required

Returns:

Type Description
dict

A dictionary containing the JSON information.

Source code in mlpoppyns/learning/utils/json_utils.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def read_json(filename: str) -> dict:
    """
    Read a specified JSON file and generate an ordered dictionary.

    Args:
        filename (str): File path to the JSON file.

    Returns:
        (dict): A dictionary containing the JSON information.
    """

    json_filename = pathlib.Path(filename)
    with json_filename.open("rt") as handle:
        return json.load(handle, object_hook=collections.OrderedDict)

mlpoppyns.learning.utils.metric_tracker

Metric tracker.

Authors:

Alberto Garcia Garcia (garciagarcia@ice.csic.es)

MetricTracker

Metric tracker.

This class is responsible for keeping track of metrics throughout training and testing. It is able to update the values, add new metrics to be tracked, reset them all or extract higher-level info. such as averaging.

Source code in mlpoppyns/learning/utils/metric_tracker.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class MetricTracker:
    """
    Metric tracker.

    This class is responsible for keeping track of metrics throughout
    training and testing. It is able to update the values, add new
    metrics to be tracked, reset them all or extract higher-level info.
    such as averaging.
    """

    def __init__(
        self,
        keys: Optional[set] = None,
        writer: Optional[SummaryWriter] = None,
    ) -> None:
        """
        Metric tracker initialization.

        Args:
            keys (Optional[set]): A set of metric keys/identifiers to initialize the tracking.
            writer (Optional[SummaryWriter]): A TensorBoard writer to output metric info to.
        """

        self.writer = writer
        self._data = pd.DataFrame(
            index=keys, columns=["total", "counts", "average"]
        )
        self.reset()

    def reset(self) -> None:
        """
        Resets all tracked metrics values to zero.
        """

        for col in self._data.columns:
            self._data[col].values[:] = 0

    def update(self, key: str, value: float, n: int = 1) -> None:
        """
        Metric update.

        Updates a given metric adding the provided value and a specified count.
        If the metric is not yet tracked, it creates a new track for it.

        Args:
            key (str): Identifier/key for the metric in the dictionary.
            value (float): Value to add to the metric entry.
            n (int): Count value.
        """

        if key not in self._data.index:
            self._data = pd.concat(
                [
                    self._data,
                    pd.DataFrame(
                        index=[key], columns=["total", "counts", "average"]
                    ),
                ]
            )
            self._data.total[key] = 0
            self._data.counts[key] = 0
            self._data.average[key] = 0

        if self.writer is not None:
            self.writer.add_scalar(key, value)

        self._data.total[key] += value * n
        self._data.counts[key] += n
        self._data.average[key] = (
            self._data.total[key] / self._data.counts[key]
        )

    def avg(self, key: str) -> float:
        """
        Metric average.

        Args:
            key (str): Key of the metric.

        Returns:
            (float): The average of that metric.
        """

        return self._data.average[key]

    def result(self) -> dict:
        """
        Results dictionary.

        Returns:
            (dict): The averages of all tracked metrics in a dictionary.
        """

        return dict(self._data.average)

__init__(keys=None, writer=None)

Metric tracker initialization.

Parameters:

Name Type Description Default
keys Optional[set]

A set of metric keys/identifiers to initialize the tracking.

None
writer Optional[SummaryWriter]

A TensorBoard writer to output metric info to.

None
Source code in mlpoppyns/learning/utils/metric_tracker.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(
    self,
    keys: Optional[set] = None,
    writer: Optional[SummaryWriter] = None,
) -> None:
    """
    Metric tracker initialization.

    Args:
        keys (Optional[set]): A set of metric keys/identifiers to initialize the tracking.
        writer (Optional[SummaryWriter]): A TensorBoard writer to output metric info to.
    """

    self.writer = writer
    self._data = pd.DataFrame(
        index=keys, columns=["total", "counts", "average"]
    )
    self.reset()

avg(key)

Metric average.

Parameters:

Name Type Description Default
key str

Key of the metric.

required

Returns:

Type Description
float

The average of that metric.

Source code in mlpoppyns/learning/utils/metric_tracker.py
87
88
89
90
91
92
93
94
95
96
97
98
def avg(self, key: str) -> float:
    """
    Metric average.

    Args:
        key (str): Key of the metric.

    Returns:
        (float): The average of that metric.
    """

    return self._data.average[key]

reset()

Resets all tracked metrics values to zero.

Source code in mlpoppyns/learning/utils/metric_tracker.py
44
45
46
47
48
49
50
def reset(self) -> None:
    """
    Resets all tracked metrics values to zero.
    """

    for col in self._data.columns:
        self._data[col].values[:] = 0

result()

Results dictionary.

Returns:

Type Description
dict

The averages of all tracked metrics in a dictionary.

Source code in mlpoppyns/learning/utils/metric_tracker.py
100
101
102
103
104
105
106
107
108
def result(self) -> dict:
    """
    Results dictionary.

    Returns:
        (dict): The averages of all tracked metrics in a dictionary.
    """

    return dict(self._data.average)

update(key, value, n=1)

Metric update.

Updates a given metric adding the provided value and a specified count. If the metric is not yet tracked, it creates a new track for it.

Parameters:

Name Type Description Default
key str

Identifier/key for the metric in the dictionary.

required
value float

Value to add to the metric entry.

required
n int

Count value.

1
Source code in mlpoppyns/learning/utils/metric_tracker.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def update(self, key: str, value: float, n: int = 1) -> None:
    """
    Metric update.

    Updates a given metric adding the provided value and a specified count.
    If the metric is not yet tracked, it creates a new track for it.

    Args:
        key (str): Identifier/key for the metric in the dictionary.
        value (float): Value to add to the metric entry.
        n (int): Count value.
    """

    if key not in self._data.index:
        self._data = pd.concat(
            [
                self._data,
                pd.DataFrame(
                    index=[key], columns=["total", "counts", "average"]
                ),
            ]
        )
        self._data.total[key] = 0
        self._data.counts[key] = 0
        self._data.average[key] = 0

    if self.writer is not None:
        self.writer.add_scalar(key, value)

    self._data.total[key] += value * n
    self._data.counts[key] += n
    self._data.average[key] = (
        self._data.total[key] / self._data.counts[key]
    )

mlpoppyns.learning.utils.posterior_sampler

Sampler from a sbi posterior distribution.

Display help message to run the code:

python posterior_sampler.py --help

Displays all the relevant arguments that can be used.

Authors:

Celsa Pardo Araujo (pardo@ice.csic.es)

handler(signum, frame)

Signal handler that raises a TimeoutError when a SIGALRM signal is received. This function is needed in the sample_with_timeout function to raise an error if the sampling exceeds the maximum time.

Parameters:

Name Type Description Default
signum int

The signal number.

required
frame Any

The current stack frame.

required

Raises:

Type Description
TimeoutError

Indicates that the operation timed out.

Source code in mlpoppyns/learning/utils/posterior_sampler.py
23
24
25
26
27
28
29
30
31
32
33
34
35
def handler(signum: int, frame: FrameType) -> None:
    """
    Signal handler that raises a TimeoutError when a SIGALRM signal is received. This function is needed in the
    sample_with_timeout function to raise an error if the sampling exceeds the maximum time.

    Args:
        signum (int): The signal number.
        frame (Any): The current stack frame.

    Raises:
        (TimeoutError): Indicates that the operation timed out.
    """
    raise TimeoutError("The operation timed out")

sample_with_timeout(posterior, simulation_output, n_samples, timeout=180)

Perform sampling from the posterior distribution with a specified timeout.

Parameters:

Name Type Description Default
posterior DirectPosterior

Posterior distribution.

required
simulation_output Tensor

Simulation output matrix.

required
n_samples int

The number of samples to draw from the posterior distribution.

required
timeout int

The maximum time in seconds to allow for n_samples to be drawn. Defaults to 180 seconds.

180

Returns:

Type Description
Tuple[Optional[Tensor], bool]

A tuple containing the result of the sampling (or None if it times out) and a boolean indicating whether the sampling was successful.

Source code in mlpoppyns/learning/utils/posterior_sampler.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def sample_with_timeout(
    posterior: DirectPosterior,
    simulation_output: torch.Tensor,
    n_samples: int,
    timeout: Optional[int] = 180,
) -> Tuple[Optional[torch.Tensor], bool]:
    """
    Perform sampling from the posterior distribution with a specified timeout.

    Args:
        posterior (DirectPosterior): Posterior distribution.
        simulation_output (torch.Tensor): Simulation output matrix.
        n_samples (int): The number of samples to draw from the posterior distribution.
        timeout (int, optional): The maximum time in seconds to allow for n_samples to be drawn. Defaults to 180 seconds.

    Returns:
        (Tuple[Optional[torch.Tensor], bool]): A tuple containing the result of the sampling (or None if it times out)
            and a boolean indicating whether the sampling was successful.
    """
    # Set the signal handler and start the alarm to enforce the function timeout.
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(timeout)

    try:
        # If the sampling completes before the timeout, return the result and set success to True.
        result = sampler(posterior, simulation_output, n_samples)
        success = True
    except TimeoutError:
        # If the timeout is reached, return None and set success to False.
        result = None
        success = False
    finally:
        # Cancel the timer whether an exception was raised or not to prevent the signal from being sent after
        # completion.
        signal.alarm(0)

    return result, success

sampler(posterior, simulation_output, n_samples)

Perform sampling from the posterior distribution. This function is designed to be used with a signal handler to enforce a timeout during sampling, see below.

Args: posterior (DirectPosterior): Posterior distribution. simulation_output (torch.Tensor): Simulation output matrix. n_samples (int): The number of samples to draw from the posterior distribution.

Returns:

Type Description
Tensor

The samples drawn from the posterior distribution.

Source code in mlpoppyns/learning/utils/posterior_sampler.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def sampler(
    posterior: DirectPosterior,
    simulation_output: torch.Tensor,
    n_samples: int,
) -> torch.Tensor:
    """
     Perform sampling from the posterior distribution. This function is designed to be used with a signal handler
     to enforce a timeout during sampling, see below.

     Args:
        posterior (DirectPosterior): Posterior distribution.
        simulation_output (torch.Tensor): Simulation output matrix.
        n_samples (int): The number of samples to draw from the posterior distribution.

    Returns:
        (torch.Tensor): The samples drawn from the posterior distribution.
    """
    return posterior.set_default_x(simulation_output).sample(
        (n_samples,), show_progress_bars=False
    )

mlpoppyns.learning.utils.posterior_sampler_mcmc_sbi

Posterior Sampling Script for SNLE-Trained SBI Models

This script samples from the posterior distribution of an SNLE-trained method and computes the log probability for each sample. Note that this is performed nchain times to create chains of posterior samples and log probability to latter use them with the harmonic package (https://github.com/astro-informatics/harmonic) to compute the model evidence at the observed data.

Display help message to run the code:

python posterior_sampler_mcmc_sbi.py --help

Displays all the relevant arguments that can be used.

Authors:

Celsa Pardo Araujo (pardo@ice.csic.es)

run_posterior_sampling(args)

Samples from the posterior distribution of an SNLE-trained SBI model and computes the log-probability of each sample. The process is repeated across multiple MCMC chains to facilitate evidence estimation using the harmonic package (https://github.com/astro-informatics/harmonic).

Parameters:

Name Type Description Default
args Namespace

An argparse.Namespace object containing the following attributes:

  • exp_path (str): Path to the experiment directory containing the configuration file.
  • round (int): Round number of the SNLE training to use.
  • learning_path (str): Directory where the trained inference models are stored.
  • mcmc_sampler (str): The MCMC sampler to use for posterior sampling. Options include 'slice_np' (stable) and 'slice_np_vectorized' (faster).
  • save_dir (str): Directory to save the sampled posterior chains and log-probabilities.
  • nsamples (int): Number of posterior samples to generate per chain.
  • nchains (int): Number of MCMC chains to generate.
  • thin_mcmc (int): Thinning parameter for the MCMC sampler. Keeps every n-th sample to reduce autocorrelation.
  • num_chains (int): Number of MCMC chains to run for posterior sampling.
  • device (str): Device to run the computation on ('cpu' or 'cuda').
required
Source code in mlpoppyns/learning/utils/posterior_sampler_mcmc_sbi.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def run_posterior_sampling(args: argparse.Namespace) -> None:
    """
    Samples from the posterior distribution of an SNLE-trained SBI model and computes the log-probability
    of each sample. The process is repeated across multiple MCMC chains to facilitate evidence estimation
    using the harmonic package (https://github.com/astro-informatics/harmonic).

    Args:
        args (argparse.Namespace): An argparse.Namespace object containing the following attributes:

            - exp_path (str): Path to the experiment directory containing the configuration file.
            - round (int): Round number of the SNLE training to use.
            - learning_path (str): Directory where the trained inference models are stored.
            - mcmc_sampler (str): The MCMC sampler to use for posterior sampling. Options include
              'slice_np' (stable) and 'slice_np_vectorized' (faster).
            - save_dir (str): Directory to save the sampled posterior chains and log-probabilities.
            - nsamples (int): Number of posterior samples to generate per chain.
            - nchains (int): Number of MCMC chains to generate.
            - thin_mcmc (int): Thinning parameter for the MCMC sampler. Keeps every n-th sample to reduce
                autocorrelation.
            - num_chains (int): Number of MCMC chains to run for posterior sampling.
            - device (str): Device to run the computation on ('cpu' or 'cuda').

    """

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    config_path = args.exp_path + "config_sbi.json"

    with open(config_path, "r") as file:
        config = json.load(file)

    # Loading the observed sample.
    dataset, _, matrix_obs = ut.prepare_dataset_sbi(
        config["observed_sample"]["dataset_path"], config, log, True
    )
    ensemble = config["trainer"]["ensemble"]
    ensemble_size = config["trainer"]["size_ensemble"] if ensemble else 1
    learning_path = os.path.join(args.learning_path, f"round_{args.round}")

    # Load inference and initialize the prior. To extend the prior, modify prior_ranges in the configuration file.
    inference_list = sbi_builder.load_inference(
        config, args.round, args.learning_path, ensemble
    )
    prior = sbi_builder.initialize_prior(config, args.device, dataset)

    posteriors_list = []
    for index in range(ensemble_size):
        trained_model_path = (
            os.path.join(
                learning_path, f"trained_model_ensemble_{index}.pickle"
            )
            if ensemble
            else os.path.join(learning_path, "trained_model.pickle")
        )

        inference = inference_list[index if ensemble else 0]

        with open(trained_model_path, "rb") as f:
            density_estimator = pickle.load(f)

        posterior = inference.build_posterior(
            density_estimator=density_estimator.to(args.device),
            prior=prior,
            mcmc_method=args.mcmc_sampler,
            mcmc_parameters={
                "num_chains": args.num_chains,
                "thin": args.thin_mcmc,
            },
        )

        posteriors_list.append(posterior)

    if ensemble:
        ensemble_size = len(inference_list)

        # Giving each network in the ensemble an equal weight.
        weights_ensemble = torch.ones(ensemble_size) / ensemble_size
        final_posterior = NeuralPosteriorEnsemble(
            posteriors_list,
            weights=weights_ensemble.to(args.device),
        )
    else:
        final_posterior = posteriors_list[0]

    posterior_obs_chain = [
        final_posterior.set_default_x(matrix_obs.to(args.device)).sample(
            (args.nsamples,), show_progress_bars=True
        )
        for _ in range(args.nchains)
    ]

    log_prob_chain = [
        final_posterior.log_prob(
            posterior_obs_chain[i], matrix_obs.to(args.device)
        )
        for i in range(args.nchains)
    ]

    # Move tensors to CPU and stack.
    posterior_obs_chain = np.stack(
        [tensor.cpu().numpy() for tensor in posterior_obs_chain]
    )
    log_prob_chain = np.stack(
        [tensor.cpu().numpy() for tensor in log_prob_chain]
    )

    torch.save(posterior_obs_chain, f"{args.save_dir}/posterior_obs_chain.pt")
    torch.save(log_prob_chain, f"{args.save_dir}/log_prob_chain.pt")

mlpoppyns.learning.utils.request_device

Request device.

Authors:

Alberto Garcia Garcia (garciagarcia@ice.csic.es)

request_device(logger, num_gpu=0)

Selects the requested devices for training/testing.

Parameters:

Name Type Description Default
logger Logger

A logger to log information to.

required
num_gpu int

Number of GPUs requested.

0

Returns:

Type Description
Tuple[device, list]

A tuple containing the kind of device the pipeline can run on and a list of devices if available. If no GPUs are available or zero are requested, the returned device is CPU.

Source code in mlpoppyns/learning/utils/request_device.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def request_device(
    logger: Logger, num_gpu: int = 0
) -> typing.Tuple[torch.device, list]:
    """
    Selects the requested devices for training/testing.

    Args:
        logger (Logger): A logger to log information to.
        num_gpu (int): Number of GPUs requested.

    Returns:
        (Tuple[torch.device, list]): A tuple containing the kind of device the pipeline can run on and
            a list of devices if available. If no GPUs are available or zero are requested, the returned device
            is CPU.
    """

    num_available_gpus = torch.cuda.device_count()

    # Check if GPUs are requested but no GPUs are available.
    if num_gpu > 0 and num_available_gpus == 0:
        logger.warning(
            "Warning: There's no GPU available on this machine,"
            "training will be performed on CPU."
        )
        num_gpu = 0

    # Check if the number of requested GPUs exceeds the available.
    if num_gpu > num_available_gpus:
        logger.warning(
            "Warning: The number of GPU's configured to use is {} "
            "but only {} are available "
            "on this machine.".format(num_gpu, num_available_gpus)
        )
        num_gpu = num_available_gpus

    device = torch.device("cuda:0" if num_gpu > 0 else "cpu")
    list_ids = list(range(num_gpu))

    return device, list_ids

mlpoppyns.learning.utils.sbi_builder

Builder of sbi.

Display help message to run the code:

python sbi_builder.py --help

Displays all the relevant arguments that can be used.

Authors:

Celsa Pardo Araujo (pardo@ice.csic.es)

build_inference_network(model_type, logger, config, device, prior)

Build an inference object (SNPE, SNLE, or SNRE) based on the selected model_type, using the provided configuration, device, and prior.

Parameters:

Name Type Description Default
model_type str

The inference model_type to use. Must be one of: "snpe", "snle", or "snre".

required
logger Logger

Logger object.

required
config ConfigurationParser

Configuration object that defines the architecture and training parameters for the model.

required
device device

The device (CPU or GPU) on which to build and run the model.

required
prior BoxUniform

The prior distribution over the parameters.

required

Returns:

Type Description
Union[SNPE_C, SNLE_A, SNRE_B]

Union[SNPE_C, SNLE_A, SNRE_B]: An instance of the corresponding sbi inference class, depending on the model_type specified.

Source code in mlpoppyns/learning/utils/sbi_builder.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def build_inference_network(
    model_type: str,
    logger: Logger,
    config: configuration_parser.ConfigurationParser,
    device: torch.device,
    prior: utils.BoxUniform,
) -> Union[SNPE_C, SNLE_A, SNRE_B]:
    """
    Build an inference object (SNPE, SNLE, or SNRE) based on the selected model_type,
    using the provided configuration, device, and prior.

    Args:
        model_type (str): The inference model_type to use. Must be one of:
            "snpe", "snle", or "snre".
        logger (Logger): Logger object.
        config (ConfigurationParser): Configuration object that defines the architecture
            and training parameters for the model.
        device (torch.device): The device (CPU or GPU) on which to build and run the model.
        prior (utils.BoxUniform): The prior distribution over the parameters.

    Returns:
        Union[SNPE_C, SNLE_A, SNRE_B]: An instance of the corresponding sbi inference class,
            depending on the model_type specified.
    """

    if model_type.lower() == "snpe":
        posterior_nn_args = {
            "model": config["density_estimator"]["type"],
            "hidden_features": config["density_estimator"]["args"][
                "hidden_features"
            ],
            "num_components": config["density_estimator"]["args"][
                "num_components"
            ],
            "device": device,
        }

        # If config["compression_input"]["use_compression"] is False, input data compression will be performed directly
        # within the training pipeline. In this case, the first component of the network is a CNN that compresses the
        # input data, and is trained jointly with the density estimator. Note that this option is only compatible
        # with the NPE approach but not NLE or NRE.
        if not config["compression_input"]["use_compression"]:
            # Build the embedding network.
            embedding_net = config.init_object("arch", learning_models)

            # Initialize weights.
            weight_initializer = config.init_object(
                "weights_initializer", learning_initializers
            )
            embedding_net.apply(weight_initializer)

            posterior_nn_args["embedding_net"] = embedding_net

        # The default density estimator has three hidden layers with a number of neurons = hidden_features.
        # The weights are initialized with the default initialization provided by PyTorch.
        neural_posterior = utils.posterior_nn(**posterior_nn_args)

        # Setting up the inference procedure.
        inference = SNPE(
            density_estimator=neural_posterior,
            device=f"{device}",
            prior=prior,
        )

        return inference

    elif model_type.lower() == "snle":
        inference = SNLE(
            density_estimator=config["density_estimator"]["type"],
            device=f"{device}",
            prior=prior,
        )

        return inference

    elif model_type.lower() == "snre":
        inference = SNRE(
            classifier=config["density_estimator"]["classifier_nre"],
            device=f"{device}",
            prior=prior,
        )

        return inference

    else:
        logger.exception(
            "The model type '{}' is not supported. ".format(model_type)
        )
        sys.exit(1)

compute_proposal_prior(posterior_obs, config, prior, device)

Compute the proposal prior by restricting the prior to the regions where the posterior of the observation has non-negligible probability mass.

Parameters:

Name Type Description Default
posterior_obs DirectPosterior

Posterior distribution at the observation.

required
config ConfigurationParser

Configuration object specifying the model settings.

required
prior BoxUniform

Prior distribution.

required
device device

Device used to run the script.

required

Returns:

Type Description
RestrictedPrior

The restricted prior based on the posterior distribution of the observation.

Source code in mlpoppyns/learning/utils/sbi_builder.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def compute_proposal_prior(
    posterior_obs: DirectPosterior,
    config: configuration_parser.ConfigurationParser,
    prior: utils.BoxUniform,
    device: torch.device,
) -> utils.RestrictedPrior:
    """
    Compute the proposal prior by restricting the prior to the regions where the posterior of the
    observation has non-negligible probability mass.

    Args:
        posterior_obs (DirectPosterior): Posterior distribution at the observation.
        config (configuration_parser.ConfigurationParser): Configuration object specifying the model settings.
        prior (utils.BoxUniform): Prior distribution.
        device (torch.device): Device used to run the script.

    Returns:
        (utils.RestrictedPrior): The restricted prior based on the posterior distribution of the observation.
    """
    # Computing the region of the posterior distribution used to constrain the prior. The quantile value is the default
    # in sbi. Instead of using the default 100,000 for the sample number, we sample 10,000 times which is possible as
    # our posteriors are Gaussian-like. This reduction also makes things faster.
    accept_reject_fn = utils.get_density_thresholder(
        posterior_obs,
        quantile=1e-4,
        num_samples_to_estimate_support=10000,
    )
    # Computing the new proposal by restricting the prior to the posterior of the observation.
    # If config["sir"] is set to true, the restricted prior sampling uses sampling importance
    # resampling (Rubin et al., 1988). Otherwise, it employs rejection sampling. Note that the latter
    # method may take longer for a narrower posterior distribution where the rejection rate is high.
    if config["trainer"]["sir"]:
        proposal = utils.RestrictedPrior(
            prior,
            accept_reject_fn,
            posterior=posterior_obs,
            sample_with="sir",
            device=f"{device}",
        )
    else:
        proposal = utils.RestrictedPrior(
            prior,
            accept_reject_fn,
            sample_with="rejection",
            device=f"{device}",
        )
    return proposal

initialize_inference(config, device, prior, logger, ensemble=False)

Initialize inference objects using the provided configuration.

Parameters:

Name Type Description Default
config ConfigurationParser

Configuration object specifying the settings.

required
device device

Device used to run the script.

required
prior BoxUniform

Prior distribution used for inference.

required
logger Logger

Logger object.

required
ensemble bool

Flag indicating if ensemble mode is enabled.

False

Returns:

Type Description
Union[List[SNPE], List[SNLE], List[SNRE]]

List of initialized inference objects.

Source code in mlpoppyns/learning/utils/sbi_builder.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def initialize_inference(
    config: configuration_parser.ConfigurationParser,
    device: torch.device,
    prior: utils.BoxUniform,
    logger: Logger,
    ensemble: bool = False,
) -> Union[List[SNPE], List[SNLE], List[SNRE]]:
    """
    Initialize inference objects using the provided configuration.

    Args:
        config (configuration_parser.ConfigurationParser): Configuration object specifying the settings.
        device (torch.device): Device used to run the script.
        prior (utils.BoxUniform): Prior distribution used for inference.
        logger (Logger): Logger object.
        ensemble (bool): Flag indicating if ensemble mode is enabled.

    Returns:
        (Union[List[SNPE], List[SNLE], List[SNRE]]): List of initialized inference objects.
    """
    inference_list = []
    model_type = config["trainer"]["type"]

    for _ in range(config["trainer"]["size_ensemble"] if ensemble else 1):
        inference = build_inference_network(
            model_type, logger, config, device, prior
        )
        inference_list.append(inference)

    return inference_list

initialize_prior(config, device, dataset)

Initialize the prior distribution for the model based on the configuration and dataset.

The prior is initialized as a uniform distribution over the parameter space. If the dataset is normalized or standardized, the prior is scaled accordingly to ensure that it fits the transformed space. If no normalization or standardization is applied, the prior is defined over the raw parameter space as specified in the configuration.

Parameters:

Name Type Description Default
config ConfigurationParser

Configuration object specifying the model settings.

required
device device

Device used to run the script.

required
dataset DatasetMultichannelArray

Dataset where the statistics are saved.

required

Returns:

Name Type Description
BoxUniform BoxUniform

The initialized prior distribution as a BoxUniform object.

Source code in mlpoppyns/learning/utils/sbi_builder.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
def initialize_prior(
    config: configuration_parser.ConfigurationParser,
    device: torch.device,
    dataset: dl.DatasetMultichannelArray,
) -> BoxUniform:
    """
    Initialize the prior distribution for the model based on the configuration and dataset.

    The prior is initialized as a uniform distribution over the parameter space.
    If the dataset is normalized or standardized, the prior is scaled accordingly to ensure
    that it fits the transformed space. If no normalization or standardization is applied,
    the prior is defined over the raw parameter space as specified in the configuration.

    Args:
        config (configuration_parser.ConfigurationParser): Configuration object specifying the model settings.
        device (torch.device): Device used to run the script.
        dataset (DatasetMultichannelArray): Dataset where the statistics are saved.

    Returns:
        BoxUniform: The initialized prior distribution as a BoxUniform object.
    """
    n_parameters = len(torch.tensor(config["prior_ranges"]["low"]))

    # Setting the prior distribution for the parameters. Note that we need to rescale the prior distribution to
    # ensure that it has the correct limits when restricted.
    if config["training_data_loader"]["normalize"]:
        # All the parameters are rescaled in the range [0, 1].
        prior = BoxUniform(
            low=torch.tensor(np.zeros(n_parameters)),
            high=torch.tensor(np.ones(n_parameters)),
            device=f"{device}",
        )
    elif config["training_data_loader"]["standardize"]:
        low = (
            torch.tensor(config["prior_ranges"]["low"]) - dataset.target_mean
        ) / dataset.target_std
        high = (
            torch.tensor(config["prior_ranges"]["high"]) - dataset.target_mean
        ) / dataset.target_std
        prior = BoxUniform(
            low=low,
            high=high,
            device=f"{device}",
        )
    else:
        # Set the prior range to the range of the parameters.
        prior = BoxUniform(
            low=torch.tensor(config["prior_ranges"]["low"]),
            high=torch.tensor(config["prior_ranges"]["high"]),
            device=f"{device}",
        )
    return prior

load_inference(config, round_number, save_dir, ensemble=False)

Load inference objects from pickle files. Note that this is used when resume mode is enabled or when doing inference with SNLE.

Parameters:

Name Type Description Default
config ConfigurationParser

Configuration object specifying the model settings.

required
round_number int

The round number to load the inference from.

required
save_dir Path

The directory to load the inference from.

required
ensemble bool

Flag indicating if ensemble mode is enabled. Defaults to False.

False

Returns:

Type Description
Union[List[SNPE], List[SNLE], List[SNRE]]

A list of inference objects.

Source code in mlpoppyns/learning/utils/sbi_builder.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def load_inference(
    config: configuration_parser.ConfigurationParser,
    round_number: int,
    save_dir: pathlib.Path,
    ensemble: bool = False,
) -> Union[List[SNPE], List[SNLE], List[SNRE]]:
    """
    Load inference objects from pickle files. Note that this is used when resume mode is enabled
    or when doing inference with SNLE.

    Args:
        config (configuration_parser.ConfigurationParser): Configuration object specifying the model settings.
        round_number (int): The round number to load the inference from.
        save_dir (pathlib.Path): The directory to load the inference from.
        ensemble (bool): Flag indicating if ensemble mode is enabled. Defaults to False.

    Returns:
        (Union[List[SNPE], List[SNLE], List[SNRE]]): A list of inference objects.
    """
    inference_list = []

    for i in range(config["trainer"]["size_ensemble"] if ensemble else 1):
        inference_path = (
            os.path.join(
                save_dir, f"round_{round_number}/inference_ensemble_{i}.pickle"
            )
            if ensemble
            else os.path.join(
                save_dir, f"round_{round_number}/inference.pickle"
            )
        )

        if not os.path.exists(inference_path):
            raise FileNotFoundError(
                f"The folder specified at {inference_path} in the config file does not contain a inference.pickle file.\n"
                "To use the resume mode or inference with SNLE, you need to specify the correct path."
            )

        with open(inference_path, "rb") as inference_file:
            inference = pickle.load(inference_file)

        inference_list.append(inference)

    return inference_list

load_posterior(config, logger, inference_list, device, round_current)

Load the trained density estimator for a given round.

Parameters:

Name Type Description Default
config ConfigurationParser

Configuration object specifying the model settings.

required
logger Logger

Logger object.

required
inference_list Union[List[SNPE], List[SNLE]]

List of sbi inference object.

required
device device

Device used to run the script.

required
round_current int

Current round number. This parameter starts at zero.

required

Returns:

Type Description
Union[DirectPosterior, NeuralPosteriorEnsemble]

Trained density estimator or ensemble of estimators.

Source code in mlpoppyns/learning/utils/sbi_builder.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
def load_posterior(
    config: configuration_parser.ConfigurationParser,
    logger: Logger,
    inference_list: Union[List[SNPE], List[SNLE], List[SNRE]],
    device: torch.device,
    round_current: int,
) -> Union[DirectPosterior, NeuralPosteriorEnsemble]:
    """
    Load the trained density estimator for a given round.

    Args:
        config (configuration_parser.ConfigurationParser): Configuration object specifying the model settings.
        logger (Logger): Logger object.
        inference_list (Union[List[SNPE], List[SNLE]]): List of sbi inference object.
        device (torch.device): Device used to run the script.
        round_current (int): Current round number. This parameter starts at zero.

    Returns:
        (Union[DirectPosterior, NeuralPosteriorEnsemble]): Trained density estimator or ensemble of estimators.
    """
    ensemble = config["trainer"]["ensemble"]
    ensemble_size = config["trainer"]["size_ensemble"] if ensemble else 1
    model_type = config["trainer"]["type"]
    posteriors_list = []

    # Load_dir folder is where the trained_model.pkl is saved.
    load_dir = config["infer"]["load_dir"]
    save_dir_round = load_dir + f"/round_{round_current}"

    for index in range(ensemble_size):
        trained_model_path = (
            os.path.join(
                save_dir_round, f"trained_model_ensemble_{index}.pickle"
            )
            if ensemble
            else os.path.join(save_dir_round, "trained_model.pickle")
        )

        inference = inference_list[index if ensemble else 0]

        with open(trained_model_path, "rb") as f:
            density_estimator = pickle.load(f)
        logger.info(
            f"Loaded pre-trained model for round {round_current}, ensemble index {index}."
        )

        if model_type == "snpe":
            posterior = inference.build_posterior(density_estimator.to(device))

        elif model_type == "snle" or "snre":
            posterior = inference.build_posterior(
                density_estimator=density_estimator.to(device),
                mcmc_method=config["mcmc_sampler"]["type"],
                mcmc_parameters={
                    "num_chains": config["mcmc_sampler"]["num_chains"],
                    "thin": config["mcmc_sampler"]["thin"],
                },
            )

        else:
            logger.exception(
                "The model type '{}' is not supported. ".format(model_type)
            )
            sys.exit(1)

        posteriors_list.append(posterior)

    if ensemble:
        weights_ensemble = torch.ones(ensemble_size) / ensemble_size
        final_posterior = NeuralPosteriorEnsemble(
            posteriors_list, weights=weights_ensemble.to(device)
        )
    else:
        final_posterior = posteriors_list[0]

    return final_posterior

train_posterior(config, save_dir_round, logger, inference_list, parameter_round, matrix_round, device, round_current, prof_log_path, prof_json_path, retrain_from_scratch=False, proposal=None)

Train the density estimator for a given round.

If resume is set to True in the configuration file, this mode allows training to continue from the last completed round if interrupted. It uses the previously saved state to resume training without starting over. If ensemble is set to True in the configuration file, multiple models (an ensemble) that differ only through their initialization are trained and their predictions are combined to ensure conservative coverages. Each of the neural networks will be trained on the same training dataset.

Note that the inference object should be different for each component of the ensemble to ensure independent weights for each component. Moreover, if config['trainer']['model_type'] == 'snle' or 'snre', then a MCMC sampler is needed to sample from the posterior distribution.

Parameters:

Name Type Description Default
config ConfigurationParser

Configuration object specifying the model settings.

required
save_dir_round Path

Directory where the trained model will be saved or is saved already.

required
logger Logger

Logger object.

required
inference_list Union[List[SNPE], List[SNLE]]

List of inference sbi objects.

required
parameter_round Tensor

Tensor containing the parameters for the current round.

required
matrix_round Tensor

Tensor containing the matrices for the current round.

required
device device

Device used to run the script.

required
round_current int

Current round number. This parameter starts at zero.

required
prof_json_path str

The profile.json path.

required
prof_log_path str

The profile.log path.

required
retrain_from_scratch bool

Whether to retrain the conditional density estimator for the posterior from scratch each round. Default value is False.

False
proposal DirectPosterior

The proposal prior used in the current round.

None

Returns:

Type Description
Union[DirectPosterior, NeuralPosteriorEnsemble]

Trained density estimator or ensemble of estimators.

Source code in mlpoppyns/learning/utils/sbi_builder.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def train_posterior(
    config: configuration_parser.ConfigurationParser,
    save_dir_round: pathlib.Path,
    logger: Logger,
    inference_list: Union[List[SNPE], List[SNLE]],
    parameter_round: torch.Tensor,
    matrix_round: torch.Tensor,
    device: torch.device,
    round_current: int,
    prof_log_path: str,
    prof_json_path: str,
    retrain_from_scratch: bool = False,
    proposal: DirectPosterior = None,
) -> Union[DirectPosterior, NeuralPosteriorEnsemble]:
    """
    Train the density estimator for a given round.

    If resume is set to True in the configuration file, this mode allows training to continue from the last completed
    round if interrupted. It uses the previously saved state to resume training without starting over.
    If ensemble is set to True in the configuration file, multiple models (an ensemble) that differ only through
    their initialization are trained and their predictions are combined to ensure conservative coverages. Each of
    the neural networks will be trained on the same training dataset.

    Note that the inference object should be different for each component of the ensemble to ensure independent weights
    for each component. Moreover, if `config['trainer']['model_type'] == 'snle' or 'snre'`, then a MCMC sampler is
    needed to sample from the posterior distribution.

    Args:
        config (configuration_parser.ConfigurationParser): Configuration object specifying the model settings.
        save_dir_round (pathlib.Path): Directory where the trained model will be saved or is saved already.
        logger (Logger): Logger object.
        inference_list (Union[List[SNPE], List[SNLE]]): List of inference sbi objects.
        parameter_round (torch.Tensor): Tensor containing the parameters for the current round.
        matrix_round (torch.Tensor): Tensor containing the matrices for the current round.
        device (torch.device): Device used to run the script.
        round_current (int): Current round number. This parameter starts at zero.
        prof_json_path (str): The profile.json path.
        prof_log_path (str): The profile.log path.
        retrain_from_scratch (bool): Whether to retrain the conditional density estimator for the posterior from
            scratch each round. Default value is False.
        proposal (DirectPosterior): The proposal prior used in the current round.

    Returns:
        (Union[DirectPosterior, NeuralPosteriorEnsemble]): Trained density estimator or ensemble of estimators.
    """
    ensemble = config["trainer"]["ensemble"]
    ensemble_size = config["trainer"]["size_ensemble"] if ensemble else 1
    resume = config["resume_training"]["resume"]
    last_round = config["resume_training"]["last_round"]
    model_type = config["trainer"]["type"]

    # Determining the number of the effective inference round of the sequential SBI approach. Note that the
    # round_current number is not the effective_round when resume mode is enabled, as we did not start from 0.
    effective_round = (
        round_current + int(last_round) if resume else round_current
    )

    posteriors_list = []

    for index in range(ensemble_size):
        if ensemble:
            trained_model_path = os.path.join(
                save_dir_round, f"trained_model_ensemble_{index}.pickle"
            )
            inference_model_path = os.path.join(
                save_dir_round, f"inference_ensemble_{index}.pickle"
            )
        else:
            trained_model_path = os.path.join(
                save_dir_round, "trained_model.pickle"
            )
            inference_model_path = os.path.join(
                save_dir_round, "inference.pickle"
            )

        inference = inference_list[index]

        # If we are in round 0 of training and resume mode is enabled, the model is loaded from the last
        # completed round instead of being trained again. This is needed to compute the proposal prior for the next
        # round.
        if resume and round_current == 0:
            if not os.path.exists(trained_model_path):
                raise FileNotFoundError(
                    "The folder specified in the config file at cfg['resume_training']['save_dir'] does not contain a trained_model.pkl file.\n"
                    "To use the resume mode, you need to specify the correct path."
                )

            with open(trained_model_path, "rb") as f:
                density_estimator = pickle.load(f)
            logger.info(
                f"Loaded pre-trained model for round {effective_round}, ensemble index {index}."
            )
        else:
            with timewith.TimeWith(
                f"[TrainingRound{effective_round}Ensemble{index}]",
                prof_log_path,
                prof_json_path,
                config["show_profiling"],
            ):
                train_args = {
                    "learning_rate": config["trainer"]["lr"],
                    "training_batch_size": config["trainer"]["batch_size"],
                    "validation_fraction": config["trainer"][
                        "validation_fraction"
                    ],
                    "show_train_summary": True,
                    "retrain_from_scratch": retrain_from_scratch,
                }

                # When using SNPE with a truncated prior, we set `force_first_round_loss = True` in the following to
                # disable the loss-function correction. Otherwise, the loss would be corrected using the proposal prior
                # during training. In the case where we do not truncate the prior and account for the correction, we
                # then pass the proposal prior to the append_simulations function.

                if (
                    config["trainer"]["truncated_prior"]
                    and model_type == "snpe"
                ):
                    train_args["force_first_round_loss"] = True

                kwargs = {}
                if (
                    model_type == "snpe"
                    and not config["trainer"]["truncated_prior"]
                ):
                    kwargs["proposal"] = proposal

                density_estimator = inference.append_simulations(
                    parameter_round.to(device),
                    matrix_round.to(device),
                    **kwargs,
                ).train(**train_args)

            logger.info(
                f"Trained density estimator for round {effective_round}, ensemble index {index}."
            )

            with open(trained_model_path, "wb") as output_file:
                pickle.dump(density_estimator.cpu(), output_file)
            logger.info(
                f"Saved trained model for round {effective_round}, ensemble index {index}."
            )

        if model_type == "snpe":
            posterior = inference.build_posterior(density_estimator.to(device))

        elif model_type == "snle" or "snre":
            posterior = inference.build_posterior(
                density_estimator=density_estimator.to(device),
                mcmc_method=config["mcmc_sampler"]["type"],
                mcmc_parameters={
                    "num_chains": config["mcmc_sampler"]["num_chains"],
                    "thin": config["mcmc_sampler"]["thin"],
                },
            )
        else:
            logger.exception(
                "The model type '{}' is not supported. ".format(model_type)
            )
            sys.exit(1)

        posteriors_list.append(posterior)

        logger.info(
            f"Saved inference for round {effective_round}, ensemble index {index}."
        )
        with open(inference_model_path, "wb") as inference_file:
            pickle.dump(inference, inference_file)

        # Saving the training statistics. If resuming in round 0, no training is performed, i.e., nothing is
        # saved.
        if not resume or round_current != 0:
            ut.save_training_statistics(
                config, inference, index, effective_round
            )

    if ensemble:
        # Giving each network in the ensemble an equal weight.
        weights_ensemble = torch.ones(ensemble_size) / ensemble_size
        final_posterior = NeuralPosteriorEnsemble(
            posteriors_list, weights=weights_ensemble.to(device)
        )
    else:
        final_posterior = posteriors_list[0]

    return final_posterior

mlpoppyns.learning.utils.sbi_utils

Sbi utilities.

Display help message to run the code:

python sbi_utils.py --help

Displays all the relevant arguments that can be used.

Authors:

Celsa Pardo Araujo (pardo@ice.csic.es)

calculate_smallest_hdr(posterior, theta, matrix, n_samples, logger, device)

Calculating the smallest highest density region of the posterior distribution, that contains the true value for the test dataset produced with the ground truths and simulation output stored in the arguments theta and matrix, respectively. Additionally, this function returns the posterior samples for each test case for further analysis.

Parameters:

Name Type Description Default
posterior DirectPosterior

Posterior distribution.

required
theta tensor

Tensor containing the values of the parameters used to generate the simulated population in matrix.

required
matrix tensor

Tensor containing the maps of the simulated population.

required
n_samples int

The number of samples to draw from the posterior distribution.

required
logger Logger

Logger object.

required
device device

Device used to run the script.

required

Returns:

Type Description
Tuple[ndarray, ndarray]

Smallest highest density region of the posterior that contains the true value, posterior samples for all test simulations.

Source code in mlpoppyns/learning/utils/sbi_utils.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def calculate_smallest_hdr(
    posterior: DirectPosterior,
    theta: torch.tensor,
    matrix: torch.tensor,
    n_samples: int,
    logger: Logger,
    device: torch.device,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Calculating the smallest highest density region of the posterior distribution, that contains the true value for the
    test dataset produced with the ground truths and simulation output stored in the arguments theta and matrix,
    respectively. Additionally, this function returns the posterior samples for each test case for further analysis.

    Args:
        posterior (DirectPosterior): Posterior distribution.
        theta (torch.tensor): Tensor containing the values of the parameters used to generate the simulated
            population in matrix.
        matrix (torch.tensor): Tensor containing the maps of the simulated population.
        n_samples (int): The number of samples to draw from the posterior distribution.
        logger (Logger): Logger object.
        device (torch.device): Device used to run the script.

    Returns:
        (Tuple[np.ndarray, np.ndarray]): Smallest highest density region of the posterior that contains the true value,
            posterior samples for all test simulations.
    """
    hdr = []

    # Counter for successful samples.
    successful_samples = 0
    posterior_samples_array = np.zeros(
        (theta.size(0), n_samples, theta.size(1))
    )

    for index in tqdm(
        range(theta.size(0)), desc="Computing Coverage Probability"
    ):
        simulation_output = matrix[index]

        # Adding batch dimension (e.g., converting the shape from [3,32,32] to [1,3,32,32]).
        # This is needed for sbi version 0.22.0.
        simulation_output = simulation_output.unsqueeze(0)
        true_value = theta[index]

        # Perform sampling with a timeout of 6000 seconds.
        posterior_samples, success = sampler.sample_with_timeout(
            posterior, simulation_output, n_samples, timeout=6000
        )

        if not success:
            # Skip this test sample if the sampling times out.
            logger.info(
                f"Skipping test sample with index {index} due to timeout."
            )
            continue

        posterior_samples_array[index] = posterior_samples.cpu().numpy()

        # Increment the successful samples counter.
        successful_samples += 1

        # Evaluating the PDF value of the ground truth.
        log_p_true = posterior.log_prob(
            true_value.to(device), simulation_output.to(device)
        )
        # Evaluating the PDF values of the posterior samples.
        log_p_samples = posterior.log_prob(
            posterior_samples.to(device), simulation_output.to(device)
        )
        # Determining the fraction of PDF values that are larger than that of the ground truth.
        hdr_value = (log_p_samples > log_p_true).float().mean()

        # Handle the device to ensure coverage works on both CPU and GPU.
        if device.type == "cuda":
            hdr_value = hdr_value.cpu().item()
        else:
            hdr_value = hdr_value.item()

        hdr.append(hdr_value)

    percentage_successful = (successful_samples / theta.size(0)) * 100

    # Log the number of successful test samples used to compute the coverage.
    logger.info(
        f"Percentage of successful samples used to compute coverage: {percentage_successful}"
    )

    return np.array(hdr), posterior_samples_array

cnn_compression(n_samples, parameter, config, dataset, input_shape, logger, normalize, standardize)

Apply CNN-based compression to compress input samples using a previously trained embedding network.

Parameters:

Name Type Description Default
n_samples int

Number of samples in the dataset.

required
parameter ndarray

Array to store physical parameters (theta).

required
config ConfigurationParser

Configuration including the CNN compression model path.

required
dataset DatasetMultichannelArray

Dataset object.

required
input_shape Tuple[int, int, int]

Expected input shape.

required
logger Logger

Logger for reporting.

required
normalize bool

Whether to normalize embedded values.

required
standardize bool

Whether to standardize embedded values.

required

Returns:

Type Description
Tuple[ndarray, ndarray]

Parameter array (theta) of shape (n_samples, n_parameters), CNN-compressed input array of shape (n_samples, len_output_layer).

Source code in mlpoppyns/learning/utils/sbi_utils.py
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
def cnn_compression(
    n_samples: int,
    parameter: np.ndarray,
    config: configuration_parser.ConfigurationParser,
    dataset: dl.DatasetMultichannelArray,
    input_shape: Tuple[int, int, int],
    logger: Logger,
    normalize: bool,
    standardize: bool,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Apply CNN-based compression to compress input samples using a previously trained embedding network.

    Args:
        n_samples (int): Number of samples in the dataset.
        parameter (np.ndarray): Array to store physical parameters (theta).
        config (ConfigurationParser): Configuration including the CNN compression model path.
        dataset (DatasetMultichannelArray): Dataset object.
        input_shape (Tuple[int, int, int]): Expected input shape.
        logger (Logger): Logger for reporting.
        normalize (bool): Whether to normalize embedded values.
        standardize (bool): Whether to standardize embedded values.

    Returns:
        (Tuple[np.ndarray, np.ndarray]): Parameter array (theta) of shape (n_samples, n_parameters), CNN-compressed
            input array of shape (n_samples, len_output_layer).
    """

    matrix_embedded = np.zeros(
        (n_samples, config["arch"]["args"]["len_output_layer"])
    )
    embedding_model_path = config["compression_input"]["cnn_model_path"]

    with open(embedding_model_path, "rb") as f:
        emb_neural_net = pickle.load(f)

    for i, (x, theta) in enumerate(dataset):
        x = np.moveaxis(x, -1, 0)
        if list(x.shape) != list(input_shape):
            logger.error(
                f"Input shape mismatch: got {x.shape}, expected {input_shape}"
            )
            sys.exit(1)

        x_embedded = (
            emb_neural_net._embedding_net(torch.tensor(x)).detach().numpy()
        )

        # Normalize or standardize the CNN-compressed vectors before passing them to the density estimator.
        if normalize:
            min_val = x_embedded.min()
            max_val = x_embedded.max()
            denom = max_val - min_val
            denom = denom if denom != 0 else 1e-8
            matrix_embedded[i] = (x_embedded - min_val) / denom

        elif standardize:
            mean_val = x_embedded.mean()
            std_val = x_embedded.std()
            std_val = std_val if std_val != 0 else 1e-8
            matrix_embedded[i] = (x_embedded - mean_val) / std_val

        else:
            matrix_embedded[i] = x_embedded

        parameter[i] = theta

    return parameter, matrix_embedded

compute_rank_coverage(save_dir, parameter, matrix, posterior, device, parameter_labels, logger, effective_round)

Compute and visualize ranks and coverage probability for a test dataset.

Parameters:

Name Type Description Default
save_dir pathlib

Directory to save the computed results and plots.

required
parameter Tensor

Tensor containing the parameters for the test dataset in the current round.

required
matrix Tensor

Tensor containing the matrices for the test dataset in the current round.

required
posterior DirectPosterior

Approximated posterior distribution.

required
device device

Device used to run the script.

required
parameter_labels List[str]

Labels for the parameters in the test dataset.

required
logger Logger

Logger object.

required
effective_round int

Number of the effective round during the sequential inference approach. Note that when resume is False, the effective round is the same as the actual round. On the other hand, if resume mode is enabled, effective_round = last_completed_round + actual_round.

required
Source code in mlpoppyns/learning/utils/sbi_utils.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def compute_rank_coverage(
    save_dir: pathlib,
    parameter: torch.tensor,
    matrix: torch.tensor,
    posterior: DirectPosterior,
    device: torch.device,
    parameter_labels: List[str],
    logger: Logger,
    effective_round: int,
) -> None:
    """
    Compute and visualize ranks and coverage probability for a test dataset.

    Args:
        save_dir (pathlib): Directory to save the computed results and plots.
        parameter (torch.Tensor): Tensor containing the parameters for the test dataset in the current round.
        matrix (torch.Tensor): Tensor containing the matrices for the test dataset in the current round.
        posterior (DirectPosterior): Approximated posterior distribution.
        device (torch.device): Device used to run the script.
        parameter_labels (List[str]): Labels for the parameters in the test dataset.
        logger (Logger): Logger object.
        effective_round (int): Number of the effective round during the sequential inference approach. Note that when
            resume is False, the effective round is the same as the actual round. On the other hand, if resume mode is
            enabled, effective_round = last_completed_round + actual_round.
    """
    logger.info(
        f"Computing coverage probability for the test dataset for round {effective_round}..."
    )
    num_posterior_samples = 10000
    hdr, posterior_samples_test_dataset = calculate_smallest_hdr(
        posterior,
        parameter.to(device),
        matrix.to(device),
        num_posterior_samples,
        logger=logger,
        device=device,
    )

    coverage_prob(hdr, n_betas=12, save_dir=save_dir)

    np.savez(
        f"{save_dir}/posterior_samples_test_data.npz",
        true_values=parameter.cpu().numpy(),
        posterior_samples=posterior_samples_test_dataset,
    )

    ranks, dap_samples = run_sbc(
        parameter.to(device),
        matrix.to(device),
        posterior,
        num_posterior_samples=num_posterior_samples,
    )

    if len(parameter) > 100:
        # Saving the ranks and the number of posterior samples to reproduce the plot.
        torch.save(ranks, f"{save_dir}/ranks.pt")
        logger.info(
            f"Number of posterior samples to generate the plot of the ranks is {num_posterior_samples}"
        )

        logger.info("Check the rank statistics...")
        # Check if the rank distributions follow a uniform distribution with three different tests
        # (see [here](https://www.mackelab.org/sbi/tutorial/13_diagnostics_simulation_based_calibration/)
        # for more details on these tests).
        check_stats = check_sbc(
            ranks,
            parameter.to(device),
            dap_samples.to(device),
            num_posterior_samples=num_posterior_samples,
            num_c2st_repetitions=5,
        )

        logger.info(
            f"kolmogorov-smirnov p-values \n - check_stats['ks_pvals'] = {check_stats['ks_pvals'].numpy()}"
        )

        logger.info(
            f"c2st accuracies \n - check_stats['c2st_ranks'] = {check_stats['c2st_ranks'].numpy()} "
            f"\n - check_stats['c2st_dap'] = {check_stats['c2st_dap'].numpy()}"
        )

        # Visually check if the ranks follow a uniform distribution.
        # The gray band represents the 99% credibility interval around the mean for a uniform distribution.
        f, ax = sbc_rank_plot(
            ranks=ranks,
            num_posterior_samples=num_posterior_samples,
            plot_type="hist",
            num_bins=30,  # When passing None the default is len(dataset_test) / 20.
            parameter_labels=parameter_labels,
        )

        f.savefig(
            f"{save_dir}/ranks_histograms.pdf",
            bbox_inches="tight",
        )
        plt.close()
        f, ax = sbc_rank_plot(
            ranks=ranks,
            num_posterior_samples=num_posterior_samples,
            plot_type="cdf",
            parameter_labels=parameter_labels,
        )

        f.savefig(
            f"{save_dir}/ranks_cumulative.pdf",
            bbox_inches="tight",
        )
        plt.close()

    else:
        logger.warning(
            "WARNING: Simulation-based Calibration cannot be performed due to the limited number of test samples."
            "For SBC, the number of test samples should be on the order of 1000s to give reliable results. "
            "We recommend using 10000."
        )

corner_plot(observed_samples, dataset, save_dir)

Plotting the corner plot for the posterior distribution.

Parameters:

Name Type Description Default
observed_samples tensor

Samples of the distribution to plot.

required
dataset DatasetMultichannelArray

Dataset where the statistics are saved.

required
save_dir str

Directory to save the corner plot.

required
Source code in mlpoppyns/learning/utils/sbi_utils.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def corner_plot(
    observed_samples: torch.tensor,
    dataset: dl.DatasetMultichannelArray,
    save_dir: str,
) -> None:
    """
    Plotting the corner plot for the posterior distribution.

    Args:
        observed_samples (torch.tensor): Samples of the distribution to plot.
        dataset (DatasetMultichannelArray): Dataset where the statistics are saved.
        save_dir (str): Directory to save the corner plot.
    """

    # Save the statistics for the filtered labels.
    par_max = torch.tensor(dataset.target_max)
    par_min = torch.tensor(dataset.target_min)
    par_std = torch.tensor(dataset.target_std)
    par_mean = torch.tensor(dataset.target_mean)

    # If the parameters were normalized or standardized rescale quantities to their physical ranges.
    if dataset.normalize:
        observed_samples = observed_samples * (par_max - par_min) + par_min

    elif dataset.standardize:
        observed_samples = observed_samples * par_std + par_mean

    # Saving the best estimated parameters and the 95% CI into the log.txt file.
    quantile = np.quantile(observed_samples, [0.025, 0.5, 0.975], axis=0)

    range_param = [[par_min[v], par_max[v]] for v in range(len(par_max))]

    param_median = quantile[1, :]

    # Corner plot of the inferred posterior distributions for each parameter.
    figure = corner.corner(
        observed_samples.detach().cpu().numpy(),
        bins=32,
        labels=dataset.target_names,
        range=range_param,
        quantiles=[0.025, 0.5, 0.975],
        levels=(
            1 - np.exp(-0.5),
            1 - np.exp(-2),
            1 - np.exp(-9.0 / 2.0),
        ),  # 1, 2 and 3 sigma levels
        show_titles=True,
        title_kwargs={"fontsize": 12},
    )
    corner.overplot_lines(figure, param_median, color="tab:red")

    corner.overplot_points(
        figure,
        param_median[None],
        marker="s",
        color="tab:red",
    )
    plt.savefig(save_dir)
    plt.close()

initialize_environment(config, logger)

Set device, profiling paths, and optionally Dask.

Parameters:

Name Type Description Default
config ConfigurationParser

Configuration object specifying the model settings.

required
logger Logger

Logger object.

required

Returns:

Type Description
Tuple[device, HTCondorCluster, str, str]

A tuple containing the selected device, the optional Dask cluster, the profiling log path, and the profiling JSON path.

Source code in mlpoppyns/learning/utils/sbi_utils.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def initialize_environment(
    config: configuration_parser.ConfigurationParser, logger: Logger
) -> Tuple[torch.device, HTCondorCluster, str, str]:
    """Set device, profiling paths, and optionally Dask.

    Args:
        config (configuration_parser.ConfigurationParser): Configuration object specifying the model settings.
        logger (Logger): Logger object.

    Returns:
        (Tuple[torch.device,HTCondorCluster,str,str]): A tuple containing the selected device, the optional Dask
            cluster, the profiling log path, and the profiling JSON path.
    """
    prof_log_path = str(pathlib.Path(config.log_dir) / config["profile_log"])
    prof_json_path = str(pathlib.Path(config.log_dir) / config["profile_json"])

    logger.info("Requesting %s GPUs...", config["n_gpu"])
    device, device_ids = request_device(logger, config["n_gpu"])
    logger.info("Devices obtained: %s", device_ids)

    cluster = None
    if config["enable_dask"]:
        with timewith.TimeWith(
            "[InitializingDask]",
            prof_log_path,
            prof_json_path,
            config["show_profiling"],
        ):
            logger.info("Initializing dask cluster...")
            cluster = initialize_dask_cluster(logger, config)

    return device, cluster, prof_log_path, prof_json_path

merge_all_rounds_dataset(base_path, last_completed_round)

Merge all dataset_full.csv files from each round into a single DataFrame. This is necessary in resume mode because, during the first round of resuming the training, we need to load all the previous training datasets from the earlier rounds.

Parameters:

Name Type Description Default
base_path Path

The base path where the generated datasets are stored.

required
last_completed_round int

Last completed round number.

required

Returns:

Type Description
Path

The path to the merged dataset.

Source code in mlpoppyns/learning/utils/sbi_utils.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def merge_all_rounds_dataset(
    base_path: pathlib.Path, last_completed_round: int
) -> pathlib.Path:
    """
    Merge all dataset_full.csv files from each round into a single DataFrame. This is necessary in resume mode because,
    during the first round of resuming the training, we need to load all the previous training datasets from the earlier
    rounds.

    Args:
        base_path (Path): The base path where the generated datasets are stored.
        last_completed_round (int): Last completed round number.

    Returns:
        (pathlib.Path): The path to the merged dataset.
    """
    dataframes = []

    # Iterate through the directories to find all the training datasets for each round.
    for i in range(last_completed_round + 1):
        round_path = os.path.join(base_path, f"round_{i}")
        dataset_path = os.path.join(round_path, "dataset_full.csv")

        df = pd.read_csv(dataset_path)
        dataframes.append(df)

    # Concatenate all the training datasets into one to use in the first round of the resumed inference.
    merged_df = pd.concat(dataframes, ignore_index=True)

    # Define the path for the merged dataset.
    output_path = os.path.join(
        base_path, f"combine_round_{last_completed_round + 1}"
    )
    os.makedirs(output_path, exist_ok=True)
    merged_dataset_path = os.path.join(output_path, "dataset_full.csv")

    # Save the merged dataframe to a CSV file.
    merged_df.to_csv(merged_dataset_path, index=False)

    return output_path

pca_compression(n_samples, input_shape, dataset, logger, config, parameter, normalize, standardize)

Compress input samples using a pre-trained PCA model.

Parameters:

Name Type Description Default
n_samples int

Number of samples in the dataset.

required
input_shape Tuple[int, int, int]

Expected shape of each input sample.

required
dataset DatasetMultichannelArray

Dataset object.

required
logger Logger

Logger object for error reporting.

required
config ConfigurationParser

Configuration containing the PCA model path.

required
parameter ndarray

Array to store extracted physical parameters.

required
normalize bool

Whether to normalize compressed values between 0 and 1.

required
standardize bool

Whether to standardize compressed values to zero mean and unit variance.

required

Returns:

Type Description
Tuple[ndarray, ndarray]

Parameter array (theta) of shape (n_samples, n_parameters), PCA-compressed input array of shape (n_samples, n_pca_components).

Source code in mlpoppyns/learning/utils/sbi_utils.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
def pca_compression(
    n_samples: int,
    input_shape: Tuple[int, int, int],
    dataset: dl.DatasetMultichannelArray,
    logger: Logger,
    config: configuration_parser.ConfigurationParser,
    parameter: np.ndarray,
    normalize: bool,
    standardize: bool,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Compress input samples using a pre-trained PCA model.

    Args:
        n_samples (int): Number of samples in the dataset.
        input_shape (Tuple[int, int, int]): Expected shape of each input sample.
        dataset (DatasetMultichannelArray): Dataset object.
        logger (Logger): Logger object for error reporting.
        config (ConfigurationParser): Configuration containing the PCA model path.
        parameter (np.ndarray): Array to store extracted physical parameters.
        normalize (bool): Whether to normalize compressed values between 0 and 1.
        standardize (bool): Whether to standardize compressed values to zero mean and unit variance.

    Returns:
        (Tuple[np.ndarray, np.ndarray]): Parameter array (theta) of shape (n_samples, n_parameters), PCA-compressed
            input array of shape (n_samples, n_pca_components).
    """
    matrix_raw = np.zeros(
        (n_samples, input_shape[0] * input_shape[1] * input_shape[2])
    )

    for i, (x, theta) in enumerate(dataset):
        x = np.moveaxis(x, -1, 0)
        if list(x.shape) != list(input_shape):
            logger.error(
                f"Input shape mismatch: got {x.shape}, expected {input_shape}"
            )
            sys.exit(1)

        matrix_raw[i] = x.reshape(-1)
        parameter[i] = theta

    pca_model_path = config["compression_input"]["pca_model_path"]
    if pca_model_path is None:
        logger.error(
            "PCA model path not provided in configuration under cfg[compression_input][pca_model_path]."
        )
        sys.exit(1)

    pca_model = joblib.load(pca_model_path)
    matrix_compressed = pca_model.transform(matrix_raw)

    # Normalize or standardize the PCA-compressed vectors before passing them to the density estimator.
    if normalize:
        min_vals = matrix_compressed.min(axis=1, keepdims=True)
        max_vals = matrix_compressed.max(axis=1, keepdims=True)
        denom = max_vals - min_vals
        denom[denom == 0] = 1e-8
        matrix_compressed = (matrix_compressed - min_vals) / denom

    elif standardize:
        mean_vals = matrix_compressed.mean(axis=1, keepdims=True)
        std_vals = matrix_compressed.std(axis=1, keepdims=True)
        std_vals[std_vals == 0] = 1e-8
        matrix_compressed = (matrix_compressed - mean_vals) / std_vals

    return parameter, matrix_compressed

prepare_dataset_sbi(dataset_folder, config, logger, atnf=False)

Prepare dataset for use in sbi. If compression is enabled, either PCA or CNN compression will be applied to the input matrix tensor.

Parameters:

Name Type Description Default
dataset_folder str

Path to the folder where the dataset is saved.

required
config ConfigurationParser

Configuration object specifying the model settings.

required
logger Logger

Logger object.

required
atnf bool

Indicates whether the PPdot density maps in the 'train_data_set' folder correspond to the observed population or to a simulated population. If set to True, the simulations correspond to the observed ATNF population. The default is False.

False

Returns:

Type Description
Tuple[DatasetMultichannelArray, tensor, tensor]

A tuple containing the dataset containing the statistics, parameter tensor and input matrix tensor.

Source code in mlpoppyns/learning/utils/sbi_utils.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
def prepare_dataset_sbi(
    dataset_folder: str,
    config: configuration_parser.ConfigurationParser,
    logger: Logger,
    atnf: Optional[bool] = False,
) -> Tuple[dl.DatasetMultichannelArray, torch.tensor, torch.tensor]:
    """
    Prepare dataset for use in sbi. If compression is enabled, either PCA or CNN compression will be applied to
    the input matrix tensor.

    Args:
        dataset_folder (str): Path to the folder where the dataset is saved.
        config (configuration_parser.ConfigurationParser): Configuration object specifying the model settings.
        logger (Logger): Logger object.
        atnf (bool, optional): Indicates whether the PPdot density maps in the 'train_data_set' folder correspond to
            the observed population or to a simulated population. If set to True, the simulations correspond to the
            observed ATNF population. The default is False.

    Returns:
        (Tuple[dl.DatasetMultichannelArray, torch.tensor, torch.tensor]): A tuple containing the dataset containing
            the statistics, parameter tensor and input matrix tensor.
    """

    # Adjusting the dataset_path based on whether the dataset is the observed one or a simulated population.
    dataset_path = (
        dataset_folder + "/dataset_atnf.csv"
        if atnf
        else dataset_folder + "/dataset_full.csv"
    )
    dataset_stat_path = config["training_data_loader"]["statistic_path"]

    if atnf:
        filter_inputs = config["observed_sample"]["filter_inputs"]
        filter_labels = []
    else:
        filter_inputs = config["training_data_loader"]["filter_inputs"]
        filter_labels = config["training_data_loader"]["filter_labels"]

    normalize = config["training_data_loader"]["normalize"]
    standardize = config["training_data_loader"]["standardize"]
    input_shape = config["arch"]["args"]["input_shape"]
    model_type = config["trainer"]["type"]

    n_parameters = len(filter_labels)
    # Load the density maps and parameters, and normalize or standardize them depending on the configuration file.
    try:
        dataset = dl.DatasetMultichannelArray(
            dataset_path=dataset_path,
            statistic_path=dataset_stat_path,
            filter_channels=filter_inputs,
            filter_labels=filter_labels,
            normalize=normalize,
            standardize=standardize,
        )
    except Exception:
        logger.exception("Error: an error occurred when loading the dataset.")
        sys.exit(1)

    n_samples = len(dataset)
    parameter = np.zeros((n_samples, n_parameters))

    use_compression_input = config["compression_input"]["use_compression"]
    compression_type = config["compression_input"]["compression_type"]

    # Preprocess the input data depending on whether we want to compress it before passing it to the density estimator.
    # For SNLE and SNRE, explicit compression is required. Here, we offer the option to use either PCA or a pre-trained
    # CNN for this purpose. In contrast, SNPE supports joint training of an embedding network with the density estimator,
    # allowing compressed representations to be extracted on-the-fly during training.

    if not use_compression_input and model_type == "snpe":
        parameter, matrix = raw_vector(
            n_samples, input_shape, dataset, logger, parameter
        )

    elif not use_compression_input and model_type != "snpe":
        logger.error(
            f"Model type '{model_type}' requires compressed input."
            "Set 'use_compression_input' to True and specify a valid 'compression_type'."
        )
        sys.exit(1)

    elif use_compression_input and model_type == "snpe":
        logger.error(
            "Currently, the option of using SNPE with a compression input is not available. Change the `use_compression` flag in the configuration file to false."
        )
        sys.exit(1)

    elif use_compression_input and compression_type == "cnn":
        parameter, matrix = cnn_compression(
            n_samples,
            parameter,
            config,
            dataset,
            input_shape,
            logger,
            normalize,
            standardize,
        )

    elif use_compression_input and compression_type == "pca":
        parameter, matrix = pca_compression(
            n_samples,
            input_shape,
            dataset,
            logger,
            config,
            parameter,
            normalize,
            standardize,
        )

    else:
        logger.error(
            "Invalid compression configuration: 'compression_input' enabled but 'compression_type' not recognized."
        )
        sys.exit(1)

    parameter = torch.from_numpy(parameter).float()
    matrix = torch.from_numpy(matrix).float()

    return dataset, parameter, matrix

raw_vector(n_samples, input_shape, dataset, logger, parameter)

Load raw data vectors without compression for use as input to the model.

Parameters:

Name Type Description Default
n_samples int

Number of samples in the dataset.

required
input_shape ndarray

Shape of input sample.

required
dataset DatasetMultichannelArray

Dataset to extract raw vectors from.

required
logger Logger

Logger for error reporting.

required
parameter ndarray

Array with the physical parameters.

required

Returns:

Type Description
Tuple[ndarray, ndarray]

Tensor of physical parameters (theta), Tensor of raw input vectors with shape (n_samples, *input_shape).

Source code in mlpoppyns/learning/utils/sbi_utils.py
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
def raw_vector(
    n_samples: int,
    input_shape: np.ndarray,
    dataset: dl.DatasetMultichannelArray,
    logger: Logger,
    parameter: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Load raw data vectors without compression for use as input to the model.

    Args:
        n_samples (int): Number of samples in the dataset.
        input_shape (np.ndarray): Shape of input sample.
        dataset (DatasetMultichannelArray): Dataset to extract raw vectors from.
        logger (Logger): Logger for error reporting.
        parameter (np.ndarray): Array with the physical parameters.

    Returns:
        (Tuple[np.ndarray, np.ndarray]): Tensor of physical parameters (theta), Tensor of raw input vectors with
            shape (n_samples, *input_shape).
    """
    matrix_raw = np.zeros((n_samples, *input_shape))

    # Rearranging the vectors to have the proper structure required by SBI, i.e., channels first.
    for i, (x, theta) in enumerate(dataset):
        x = np.moveaxis(x, -1, 0)  # channel-first
        if list(x.shape) != list(input_shape):
            logger.error(
                f"Input shape mismatch: got {x.shape}, expected {input_shape}"
            )
            sys.exit(1)
        matrix_raw[i] = x
        parameter[i] = theta

    return parameter, matrix_raw

save_training_statistics(config, inference, index, effective_round)

Save training statistics including scalars and training/validation loss plots.

Parameters:

Name Type Description Default
config ConfigurationParser

Configuration object specifying the model settings.

required
inference Union[SNPE_C, SNLE_A]

sbi inference object.

required
index int

The ensemble index, if ensemble is set to False index is equal to 0.

required
effective_round int

Number of the effective round during the sequential inference approach. Note that when resume is False, the effective round is the same as the actual round. On the other hand, if resume mode is enabled, effective_round = last_completed_round + actual_round.

required
Source code in mlpoppyns/learning/utils/sbi_utils.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def save_training_statistics(
    config: configuration_parser.ConfigurationParser,
    inference: Union[SNPE_C, SNLE_A],
    index: int,
    effective_round: int,
) -> None:
    """
    Save training statistics including scalars and training/validation loss plots.

    Args:
        config (configuration_parser.ConfigurationParser): Configuration object specifying the model settings.
        inference (Union[SNPE_C, SNLE_A]): sbi inference object.
        index (int): The ensemble index, if ensemble is set to False index is equal to 0.
        effective_round (int): Number of the effective round during the sequential inference approach. Note that when
            resume is False, the effective round is the same as the actual round. On the other hand, if resume mode is
            enabled, effective_round = last_completed_round + actual_round.
    """
    all_event_data = tbo._get_event_data_from_log_dir(
        inference._summary_writer.log_dir
    )
    training_statistics = all_event_data["scalars"]

    log_dir_round_path = os.path.join(
        config.log_dir, f"round_{effective_round}"
    )
    os.makedirs(log_dir_round_path, exist_ok=True)

    training_statistics_json_path = (
        os.path.join(log_dir_round_path, f"training_statistics_{index}.json")
        if config["trainer"]["ensemble"]
        else os.path.join(log_dir_round_path, "training_statistics.json")
    )
    training_statistics_plot_path = (
        os.path.join(log_dir_round_path, f"training_stats_{index}.png")
        if config["trainer"]["ensemble"]
        else os.path.join(log_dir_round_path, "training_stats.png")
    )

    with open(training_statistics_json_path, "w") as f:
        json.dump(training_statistics, f, indent=4, sort_keys=True)

    # Save the plot showing the evolution of the training and validation losses.
    f, ax = plt.subplots(figsize=(8, 6))
    ax.set_xlabel(r"Epoch")
    ax.set_ylabel(r"Accuracy")
    ax.plot(
        training_statistics["training_log_probs"]["step"],
        training_statistics["training_log_probs"]["value"],
        linestyle="-",
        linewidth=4,
        color="tab:blue",
        rasterized=True,
        label="training",
    )
    ax.plot(
        training_statistics["validation_log_probs"]["step"],
        training_statistics["validation_log_probs"]["value"],
        linestyle="-",
        linewidth=4,
        color="tab:orange",
        rasterized=True,
        label="validation",
    )
    plt.legend(bbox_to_anchor=(1.05, 1), frameon=False, loc=0, fontsize=10)

    f.savefig(training_statistics_plot_path, bbox_inches="tight")

wrapper_mlpoppyns(proposal, num_sim, config, effective_round, test, dataset, device)

Simulating num_sim of mock neutron star populations given the proposal distribution.

After simulation, generate density maps from the resulting populations.

Parameters:

Name Type Description Default
proposal Union[DirectPosterior, RestrictedPrior]

Proposal distribution used for sampling the parameters.

required
num_sim int

Number of simulations to perform.

required
config ConfigurationParser

Configuration object specifying the model settings.

required
effective_round int

Number of the effective round during the sequential inference approach. Note that when resume is False, the effective round is the same as the actual round. On the other hand, if resume mode is enabled, effective_round = last_completed_round + actual_round.

required
test bool

Flag indicating whether the simulations are for testing or training. If set to True, the simulations are for testing purposes.

required
dataset DatasetMultichannelArray

Dataset where the statistics are saved.

required
device device

Device used to run the script.

required

Returns:

Type Description
str

Path to the generated dataset.

Source code in mlpoppyns/learning/utils/sbi_utils.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def wrapper_mlpoppyns(
    proposal: Union[DirectPosterior, utils.RestrictedPrior],
    num_sim: int,
    config: configuration_parser.ConfigurationParser,
    effective_round: int,
    test: bool,
    dataset: dl.DatasetMultichannelArray,
    device: torch.device,
) -> str:
    """
    Simulating `num_sim` of mock neutron star populations given the `proposal` distribution.

    After simulation, generate density maps from the resulting populations.

    Args:
        proposal (Union[DirectPosterior,utils.RestrictedPrior]): Proposal distribution used for sampling the parameters.
        num_sim (int): Number of simulations to perform.
        config (configuration_parser.ConfigurationParser): Configuration object specifying the model settings.
        effective_round (int): Number of the effective round during the sequential inference approach. Note that when
            resume is False, the effective round is the same as the actual round. On the other hand, if resume mode is
            enabled, effective_round = last_completed_round + actual_round.
        test (bool): Flag indicating whether the simulations are for testing or training. If set to True, the
            simulations are for testing purposes.
        dataset (DatasetMultichannelArray): Dataset where the statistics are saved.
        device (torch.device): Device used to run the script.

    Returns:
        (str): Path to the generated dataset.
    """

    # Setting paths.
    # If 'test' is True, simulations are saved in the folder specified for the testing dataset in the config file.
    # Otherwise, simulations are saved in the folder specified for the training dataset in the config file.
    if test:
        sim_dir_path = (
            config["test_data_loader"]["dataset_path"]
            + f"/simulations/round_{effective_round}"
        )
        dataset_path = (
            config["test_data_loader"]["dataset_path"]
            + f"/generated_dataset/round_{effective_round}"
        )
    else:
        sim_dir_path = (
            config["training_data_loader"]["dataset_path"]
            + f"/simulations/round_{effective_round}"
        )
        dataset_path = (
            config["training_data_loader"]["dataset_path"]
            + f"/generated_dataset/round_{effective_round}"
        )

    # Extracting simulation parameters from the configuration file.
    dyn_data_path = config["dyn_data_loader"]["dataset_path"]
    args_dict = {
        "dyn_data": dyn_data_path,
        "save_dir": sim_dir_path,
        "simulator_type": "simulate_population_magrot_det",
        "sampling_size": num_sim,
        "processes": config["n_processes"],
    }

    args_gen = argparse.Namespace(
        data=str(sim_dir_path),
        save_dir=str(dataset_path),
        resolution_ppdot=config["arch"]["args"]["input_shape"][1],
        resolution_dyn=32,
        data_type="array",
    )

    # Running the simulations and generating the corresponding density maps for each simulation. The simulations are run
    # in a multithreaded manner. If config["enable_dask"] is equal to True, then multithreading will be performed with
    # the Dask package. Otherwise, it will be performed with the multiprocessing package.

    if config["enable_dask"]:
        simulator_dask(args_dict, proposal, dataset, device)
    else:
        simulator_multiprocess(args_dict, proposal, dataset, device)

    generate_dataset_surveys.generate_dataset(args_gen)

    return dataset_path