Skip to content

Samplers

utilities.samplers.memory_efficient_sampling

Sampling a random subset from a csv file without loading the full dataset into memory.

Authors:

Vanessa Graber (graber @ ice.csic.es)
Celsa Pardo Araujo (pardo @ ice.csic.es)

choose_rows(number_of_rows_to_select, total_number_of_rows, previously_chosen_rows=None)

Choose a subset of random indices from all the indices of a dataframe.

Parameters:

Name Type Description Default
number_of_rows_to_select int

Number of rows to randomly select from the full dataset without taking into account the headers.

required
total_number_of_rows int

Number of rows in the full dataset without taking into account the headers.

required
previously_chosen_rows list

Rows previously chosen from previous subset.

None

Returns:

Type Description
list

A sorted list of the randomly chosen indices.

Source code in utilities/samplers/memory_efficient_sampling.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def choose_rows(
    number_of_rows_to_select: int,
    total_number_of_rows: int,
    previously_chosen_rows: Optional[List[int]] = None,
) -> List[int]:
    """
    Choose a subset of random indices from all the indices of a dataframe.

    Args:
        number_of_rows_to_select (int): Number of rows to randomly select from the full dataset without taking into
            account the headers.
        total_number_of_rows (int): Number of rows in the full dataset without taking into account the headers.
        previously_chosen_rows (list): Rows previously chosen from previous subset.

    Returns:
        (list): A sorted list of the randomly chosen indices.
    """

    if previously_chosen_rows is None:
        previously_chosen_rows = []

    # We remove from the list of indices those that were previously chosen as we want to obtain a unique sample
    # of indices.
    data_set = np.setdiff1d(
        np.arange(total_number_of_rows), np.array(previously_chosen_rows)
    )

    # Select the desired number of indices and sample randomly.
    sample = random.sample(data_set.tolist(), number_of_rows_to_select)

    sample_sorted = sorted(sample)

    return sample_sorted

select(file_path, size_subset, size_full_dataset, previously_chosen_rows=None)

Select a random subset from a dataset without loading the full file into memory. The following implementation only works when the full dataset has two headers as our final_pop_dyn.csv, it won't work otherwise.

Parameters:

Name Type Description Default
file_path Path

Path to the full dataset.

required
size_subset int

Number of rows of the desired random subset without taking into account the headers.

required
size_full_dataset int

Number of rows in the full dataset without taking into account the headers.

required
previously_chosen_rows list

Rows previously chosen from previous subset.

None

Returns:

Type Description
Dataframe

Dataframe of the random subset.

Source code in utilities/samplers/memory_efficient_sampling.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def select(
    file_path: pathlib.Path,
    size_subset: int,
    size_full_dataset: int,
    previously_chosen_rows: Optional[List[int]] = None,
) -> pd.DataFrame:
    """
    Select a random subset from a dataset without loading the full file into memory.
    The following implementation only works when the full dataset has two headers as our `final_pop_dyn.csv`, it won't
    work otherwise.

    Args:
        file_path (pathlib.Path): Path to the full dataset.
        size_subset (int): Number of rows of the desired random subset without taking into account the headers.
        size_full_dataset (int): Number of rows in the full dataset without taking into account the headers.
        previously_chosen_rows (list): Rows previously chosen from previous subset.

    Returns:
        (Dataframe): Dataframe of the random subset.
    """

    selected_rows = choose_rows(
        size_subset, size_full_dataset, previously_chosen_rows
    )

    # Creating an empty list where the chosen rows will be appended.
    data = []

    # Reading the file using an iterator tool to allow for step by step iteration through the data.
    with file_path.open("r") as f:
        # We separately read our two header lines to only iterate through data.
        header_1 = f.readline()
        header_2 = f.readline()
        iterator = iter(f)

        for i, value in enumerate(selected_rows):
            # Iterating through our iterable data file to reach the rows selected previously.
            # We use the islice function to extract the respective data rows individually.
            if i == 0:
                data += list(islice(iterator, value, value + 1))
            else:
                loc = value - selected_rows[i - 1] - 1

                data += list(islice(iterator, loc, loc + 1))

        result = [header_1, header_2] + data

    # Saving the sampled data rows as a pandas DataFrame.
    df = pd.read_csv(StringIO("".join(result)), header=[0, 1])

    # Redefining the data header to allow compatibility with the original format.
    df = df.set_index(("Unnamed: 0_level_0", "Unnamed: 0_level_1"))
    df.index.name = ""

    return df

utilities.samplers.pop_sampler

Population sampler script.

This script randomly samples a population with a reduced number of stars for every population simulated from the simulator initialize_evolve_population.py with different initial parameters.

This expects that a set of populations have been generated either using directly that script or using the helper with its particular directory tree.

The user can choose the total number of stars to randomly select from the original simulated population, they can provide a weighted selection according to the stars' distances from the Sun and can provide a distance cut-off to select only stars that are nearer to the Sun.

The resampled population is also saved in a .pkl.gz file alongside with the .json file containing the related labels.

Display help message to run the code:

python pop_sampler.py --help

Displays all the relevant arguments that can be used.

Authors:

Michele Ronchi (ronchi@ice.csic.es)

calculate_selection_weights(d)

Calculate the weights to assign to every star for selection. Weights are evaluated as a function of distance from the Sun, nearest stars are easier and more likely to be detected.

Parameters:

Name Type Description Default
d ndarray

Array of distances from the Sun [kpc].

required

Returns:

Type Description
ndarray

Array of selection weights.

Source code in utilities/samplers/pop_sampler.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def calculate_selection_weights(d: np.ndarray) -> np.ndarray:
    """
    Calculate the weights to assign to every star for selection.
    Weights are evaluated as a function of distance from the Sun, nearest stars are easier
    and more likely to be detected.

    Args:
        d (np.ndarray): Array of distances from the Sun [kpc].

    Returns:
        (np.ndarray): Array of selection weights.
    """

    # This function has been fine-tuned to match the distribution of distances from the Sun of
    # the 224 neutron stars with observed proper motion. In this sample we selected neutron stars
    # that are likely to be not recycled and isolated (i.e., with a spin period derivative Pdot>10^(-17)
    # and with no association to globular clusters or binary systems).
    weights = np.exp(-0.5 * d) / d

    # Normalize the weights to their sum.
    w = weights / np.sum(weights)

    return w

data_sampler(args)

This function reads the simulated population files (usually by the simulation helper) folder and creates simulated population files with a reduced number of stars by randomly sampling the original evolved population file.

Parameters:

Name Type Description Default
args Namespace

An argparse.Namespace object containing the following attributes:

  • data (str): Path to where the simulated populations are located.
  • save_dir (str): Path to where to save the resampled population files.
  • size (int): Number of stars to randomly sample from the population files.
  • distance_cut (float): Maximum distance from the Sun cut-off.
  • uniform (bool): If True stars are selected uniformly in distance from the simulated population.
required
Source code in utilities/samplers/pop_sampler.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def data_sampler(args: argparse.Namespace) -> None:
    """
    This function reads the simulated population files (usually by the simulation
    helper) folder and creates simulated population files with a reduced number
    of stars by randomly sampling the original evolved population file.

    Args:
        args (argparse.Namespace): An argparse.Namespace object containing the following attributes:

            - data (str): Path to where the simulated populations are located.
            - save_dir (str): Path to where to save the resampled population files.
            - size (int): Number of stars to randomly sample from the population files.
            - distance_cut (float): Maximum distance from the Sun cut-off.
            - uniform (bool): If True stars are selected uniformly in distance from the simulated population.
    """

    # Check if the parsed simulated populations' directory exists.
    root_path = pathlib.Path(args.data)
    if not root_path.exists():
        log.error(f"Directory {root_path} not found...")
        sys.exit()

    # Number of samples in the parsed directory.
    sample_number = len(os.listdir(root_path))

    for s in range(sample_number):
        # Create the resampled data directory path.
        data_path = f"{args.save_dir}/{s:06}"
        pathlib.Path(data_path).mkdir(parents=True, exist_ok=True)

        log.info(f"Resampling sample {s:06}")

        # Check if the simulated population file exists as a precondition.
        pop_path = pathlib.Path(f"{root_path}/{s:06}/final_population.pkl.gz")

        if not pop_path.exists():
            log.error(f"Population file not found in {pop_path}")
            sys.exit()

        # Check if files containing labels exists as a precondition.
        label_path = pathlib.Path(f"{root_path}/{s:06}/override.json")

        if not label_path.exists():
            log.error(f"File containing labels not found in {label_path}")
            sys.exit()

        with open(label_path) as f:
            override = json.load(f)

        # Save the file containing labels into the new directory path.
        override_dump_path = pathlib.Path().joinpath(
            f"{data_path}/", "override.json"
        )
        with open(override_dump_path, "w") as f:
            json.dump(override, f, indent=4, sort_keys=True)

        # Create a data frame object of the population file.
        df_pop = pd.read_pickle(pop_path, compression="gzip")

        # If a distance cut-off is provided, select only stars in the solar neighborhood.
        if args.distance_cut is not None:
            df_pop = df_pop[df_pop["d"]["[kpc]"] < args.distance_cut]

            if args.uniform:
                # Select stars randomly from the simulated population.
                df_select = df_pop.sample(int(np.floor(args.size)))

            else:
                # Select stars according to some weights that are function of the distance from the Sun.
                w = calculate_selection_weights(
                    df_pop["d"]["[kpc]"].to_numpy()
                )
                df_select = df_pop.sample(args.size, replace=False, weights=w)

        else:
            log.error("Please define a distance cut.")
            break

        # Save the resampled data frame as compressed binary file.
        output_path = f"{data_path}/final_population.pkl.gz"
        df_select.to_pickle(output_path, compression="gzip")

utilities.samplers.random_sampler

Calculating the cumulative distribution function for a given probability density function using the trapezoidal rule and drawing random values from the cumulative distribution and probability density function.

Authors:

Vanessa Graber (graber@ice.csic.es)
Michele Ronchi (ronchi@ice.csic.es)

cdf_calculator(x, pdf)

Calculating the cumulative distribution function for any given probability density function evaluated at the points x using the trapezoidal rule.

Parameters:

Name Type Description Default
x ndarray

Discrete set of values at which the pdf is evaluated.

required
pdf Callable

Probability density function.

required

Returns:

Type Description
ndarray

Normalized cumulative distribution function.

Source code in utilities/samplers/random_sampler.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def cdf_calculator(
    x: np.ndarray, pdf: Callable[[np.ndarray], np.ndarray]
) -> np.ndarray:
    """
    Calculating the cumulative distribution function for any given probability density
    function evaluated at the points x using the trapezoidal rule.

    Args:
        x (np.ndarray): Discrete set of values at which the pdf is evaluated.
        pdf (Callable): Probability density function.

    Returns:
        (np.ndarray): Normalized cumulative distribution function.
    """

    cdf = integrate.cumulative_trapezoid(pdf(x), x, initial=0)
    cdf = cdf / np.max(cdf)

    return cdf

random_from_cdf(x, cdf, num_draw)

Drawing random values from a given normalized cumulative distribution function.

Parameters:

Name Type Description Default
x ndarray

Discrete set of values at which the cdf is evaluated.

required
cdf ndarray

Normalized cumulative probability density function.

required
num_draw int

Number of values to draw.

required

Returns:

Type Description
ndarray

Random values drawn from the cdf.

Source code in utilities/samplers/random_sampler.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def random_from_cdf(
    x: np.ndarray, cdf: np.ndarray, num_draw: int
) -> np.ndarray:
    """
    Drawing random values from a given normalized cumulative distribution function.

    Args:
        x (np.ndarray): Discrete set of values at which the cdf is evaluated.
        cdf (np.ndarray): Normalized cumulative probability density function.
        num_draw (int): Number of values to draw.

    Returns:
        (np.ndarray): Random values drawn from the cdf.
    """

    cdf_rand = np.random.uniform(0, 1, num_draw)
    x_rand = np.interp(cdf_rand, cdf, x)

    return x_rand

random_from_pdf(x, pdf, num_draw)

Drawing random values from a given probability density function.

Parameters:

Name Type Description Default
x ndarray

Discrete set of values at which the pdf is evaluated.

required
pdf Callable

Probability density function.

required
num_draw int

Number of values to draw.

required

Returns:

Type Description
ndarray

Random values drawn from the pdf.

Source code in utilities/samplers/random_sampler.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def random_from_pdf(
    x: np.ndarray,
    pdf: Callable[[np.ndarray], np.ndarray],
    num_draw: int,
) -> np.ndarray:
    """
    Drawing random values from a given probability density function.

    Args:
        x (np.ndarray): Discrete set of values at which the pdf is evaluated.
        pdf (Callable): Probability density function.
        num_draw (int): Number of values to draw.

    Returns:
        (np.ndarray): Random values drawn from the pdf.
    """

    cdf = cdf_calculator(x, pdf)
    x_rand = random_from_cdf(x, cdf, num_draw)

    return x_rand

random_from_pdf_2d(x1, x2, pdf_2d, num_draw)

Drawing random values from a given 2D probability density function. x1 is the variable running along the rows (axis=0), x2 is the variable running along the columns (axis=1) of the 2D array defining the pdf.

Parameters:

Name Type Description Default
x1 ndarray

Discrete set of values for coordinate x1 at which the pdf is evaluated.

required
x2 ndarray

Discrete set of values for coordinate x2 at which the pdf is evaluated.

required
pdf_2d ndarray

2D probability density function.

required
num_draw int

Number of values to draw.

required

Returns:

Type Description
Tuple[ndarray, ndarray]

Random points of coordinates (x1, x2) drawn from the pdf.

Source code in utilities/samplers/random_sampler.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def random_from_pdf_2d(
    x1: np.ndarray,
    x2: np.ndarray,
    pdf_2d: np.ndarray,
    num_draw: int,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Drawing random values from a given 2D probability density function.
    x1 is the variable running along the rows (axis=0), x2 is the variable running along
    the columns (axis=1) of the 2D array defining the pdf.

    Args:
        x1 (np.ndarray): Discrete set of values for coordinate x1 at which the pdf is evaluated.
        x2 (np.ndarray): Discrete set of values for coordinate x2 at which the pdf is evaluated.
        pdf_2d (np.ndarray): 2D probability density function.
        num_draw (int): Number of values to draw.

    Returns:
        (Tuple[np.ndarray, np.ndarray]): Random points of coordinates (x1, x2) drawn from the pdf.
    """

    # Build the cumulative function grid by computing a cumulative function
    # for each column (i.e., for each value of x2) over axis=0.
    cum_func_grid = integrate.cumulative_trapezoid(
        pdf_2d, x1, axis=0, initial=0
    )

    # Compute the cumulative density function of the last row of the cumulative function grid
    # to find the total cdf for variable x2.
    cdf_x2 = integrate.cumulative_trapezoid(
        cum_func_grid[-1, :], x2, initial=0
    )
    cdf_x2 = cdf_x2 / np.max(cdf_x2)

    # Normalize the cumulative function grid, column-wise, to find the cdf for x1 given a value of x2.
    cdf_x1x2 = cum_func_grid / cum_func_grid.max(axis=0)

    # Draw a random x2 value.
    x2_rand = random_from_cdf(x2, cdf_x2, num_draw)

    # Find the indices of the cdfs for x1 corresponding to the values of x2 just drawn.
    idx = np.floor(
        (x2_rand - np.min(x2)) / (np.max(x2) - np.min(x2)) * len(x2)
    )
    idx = np.array(idx, dtype=int)

    # Draw a random x1 from the cdfs corresponding to the given x2 values.
    x1_rand = np.zeros_like(x2_rand)
    for i in range(len(x2_rand)):
        x1_rand[i] = random_from_cdf(x1, cdf_x1x2[:, idx[i]], 1)

    return x1_rand, x2_rand