Skip to content

pytorch_dedispersion.dedisperse_candidates

dedisperse_and_find_candidates

dedisperse_and_find_candidates(
    config,
    verbose=False,
    remove_trend=False,
    window_size=None,
    gpu_index=0,
)

Perform dedispersion and find candidates.

Parameters:

Name Type Description Default
config dict

Configuration parameters.

required
verbose bool

Enable verbose output. Defaults to False.

False
remove_trend bool

Remove trend from the data. Defaults to False.

False
window_size int

Window size for trend removal. Required if remove_trend is True.

None
gpu_index int

GPU index to use. Defaults to 0.

0
Source code in pytorch_dedispersion/dedisperse_candidates.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def dedisperse_and_find_candidates(
        config: Dict[str, Any],
        verbose: bool = False,
        remove_trend: bool = False,
        window_size: Optional[int] = None,
        gpu_index: int = 0,
    ) -> None:
    """Perform dedispersion and find candidates.

    Args:
        config (dict): Configuration parameters.
        verbose (bool, optional): Enable verbose output. Defaults to False.
        remove_trend (bool, optional): Remove trend from the data. Defaults to False.
        window_size (int, optional): Window size for trend removal. Required if remove_trend is True.
        gpu_index (int, optional): GPU index to use. Defaults to 0.
    """
    if verbose:
        start_time = time()

    # Load file
    handler = FileHandler(config["SOURCE"])
    file_path = handler.load_file()
    if verbose:
        print(f"File loaded from: {file_path}")

    # Process data
    processor = DataProcessor(file_path)
    processor.load_data()
    data = processor.data
    frequencies = processor.get_frequencies()
    time_resolution = processor.header.tsamp
    if verbose:
        data_prep_time = time() - start_time
        print(f"Data prepared in {data_prep_time:.2f} seconds")
        start_time = time()

    # Load bad channels if specified
    bad_channels = []
    if "BAD_CHANNEL_FILE" in config:
        bad_channels = load_bad_channels(config["BAD_CHANNEL_FILE"])
        if verbose:
            print(f"Loaded bad channels: {bad_channels}")

    # Mask bad channels
    if bad_channels:
        data = np.delete(data, bad_channels, axis=0)
        frequencies = np.delete(frequencies, bad_channels)


    # Dedispersion parameters
    dm_range = generate_dm_range(config["DM_RANGES"])

    # Convert data to tensor
    if torch.cuda.is_available():
        device = torch.device(f'cuda:{gpu_index}')
    else:
        device = torch.device('cpu')

    if verbose:
        print(f"Using device: {device}")
    #data_tensor = torch.tensor(data, dtype=torch.float32).to(device)
    frequencies_tensor = torch.tensor(frequencies, dtype=torch.float32).to(device)
    freq_start = frequencies_tensor[0]
    dm_range = dm_range.to(device)

    # Print initial GPU memory usage
    if verbose and device.type == 'cuda':
        print_gpu_memory_usage(device, "Initial GPU memory usage:")

    # Calculate required memory for dedispersion
    if device.type == "cuda":
        total_gpu_memory = torch.cuda.get_device_properties(device).total_memory
        available_memory = total_gpu_memory * 0.90
    else:
        total_gpu_memory = None  # Not relevant for CPU execution
        available_memory = None



    required_memory_per_frequency = data.shape[1] * 4 * len(dm_range) * 4
    if available_memory is not None:
        batch_size = int(available_memory / required_memory_per_frequency)
    else:
        batch_size = 100  # Fallback: process 1 frequency channel at a time if on CPU


    if batch_size < 1:
        raise MemoryError("Batch size is less than 1 frequency channel. Reduce the number of DM trials to fit within available GPU memory.")

    summed_data = torch.zeros((len(dm_range), data.shape[1]), device=device)

    if batch_size < len(frequencies):
        # Perform dedispersion in batches
        print("Performing dedispersion in batches due to memory constraints...")
        total_batches = len(range(0, len(frequencies), batch_size))
        for batch_idx, start_idx in enumerate(tqdm(range(0, len(frequencies), batch_size), total=total_batches, desc="Processing batches")):
            end_idx = min(start_idx + batch_size, len(frequencies))
            freq_batch = frequencies_tensor[start_idx:end_idx]
            data_batch = torch.tensor(data[start_idx:end_idx, :], dtype=torch.float32).to(device)

            dedisperse = Dedispersion(data_batch, freq_batch, dm_range, freq_start, time_resolution)
            dedispersed_data = dedisperse.perform_dedispersion()
            batch_summed_data = dedispersed_data.sum(dim=1)

            summed_data += batch_summed_data

            # Free up memory
            del freq_batch, data_batch, dedispersed_data, batch_summed_data
            torch.cuda.empty_cache()

    else:
        # Perform dedispersion without batching
        data_tensor = torch.tensor(data, dtype=torch.float32).to(device)
        dedisperse = Dedispersion(data_tensor, frequencies_tensor, dm_range, freq_start, time_resolution)
        dedispersed_data = dedisperse.perform_dedispersion()
        summed_data = dedispersed_data.sum(dim=1)

    if verbose:
        dedispersion_time = time() - start_time
        print(f"Dedispersion and summing completed in {dedispersion_time:.2f} seconds")
        start_time = time()

    # Print GPU memory usage after dedispersion
    if verbose and device.type == 'cuda':
        print_gpu_memory_usage(device, "GPU memory usage after dedispersion:")

    # Boxcar filtering
    boxcar = BoxcarFilter(summed_data)
    boxcar_data = boxcar.apply_boxcar(config["BOXCAR_WIDTHS"])
    if verbose:
        boxcar_time = time() - start_time
        print(f"Boxcar filtering completed in {boxcar_time:.2f} seconds")
        start_time = time()

    # Find candidates
    finder = CandidateFinder(boxcar_data, window_size)
    candidates = finder.find_candidates(config["SNR_THRESHOLD"], config["BOXCAR_WIDTHS"], remove_trend)
    if verbose:
        candidate_finding_time = time() - start_time
        print(f"Candidate finding completed in {candidate_finding_time:.2f} seconds")

    # Enhance candidate data with actual DM values and time in seconds
    enhanced_candidates = []
    for candidate in candidates:
        dm_index = candidate['DM Index']
        dm_value = dm_range[dm_index].item()
        sample_number = candidate['Time Sample']
        time_in_sec = sample_number * time_resolution
        enhanced_candidates.append({
            'SNR': candidate['SNR'],
            'Time Sample': sample_number,
            'Time (sec)': time_in_sec,
            'Boxcar Width': candidate['Boxcar Width'],
            'DM Value': dm_value
        })

    # Generate CSV file with candidate information
    timestamp = strftime("%Y%m%d-%H%M%S")
    filename = f"candidates_{timestamp}.csv"
    save_candidates_to_csv(enhanced_candidates, filename)
    if verbose:
        print(f"Candidates saved to {filename}")

generate_dm_range

generate_dm_range(dm_ranges)

Generate a tensor of DM values based on specified ranges and steps.

Parameters:

Name Type Description Default
dm_ranges list[dict]

List of DM range dictionaries.

required

Returns:

Type Description
Tensor

torch.Tensor: Tensor of DM values.

Source code in pytorch_dedispersion/dedisperse_candidates.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def generate_dm_range(dm_ranges: List[Dict[str, float]]) -> torch.Tensor:
    """Generate a tensor of DM values based on specified ranges and steps.

    Args:
        dm_ranges (list[dict]): List of DM range dictionaries.

    Returns:
        torch.Tensor: Tensor of DM values.
    """
    dm_values = []
    for dm_range in dm_ranges:
        start = dm_range["start"]
        stop = dm_range["stop"]
        step = dm_range["step"]
        dm_values.extend(torch.arange(start, stop, step).tolist())
    return torch.tensor(dm_values)

get_total_gpu_memory

get_total_gpu_memory()

Get the total GPU memory available.

Returns:

Name Type Description
int int

Total GPU memory in bytes.

Source code in pytorch_dedispersion/dedisperse_candidates.py
74
75
76
77
78
79
80
81
def get_total_gpu_memory() -> int:
    """Get the total GPU memory available.

    Returns:
        int: Total GPU memory in bytes.
    """
    total_memory = torch.cuda.get_device_properties(0).total_memory
    return total_memory

load_bad_channels

load_bad_channels(file_path)

Load bad channel indices from a file.

Parameters:

Name Type Description Default
file_path str

Path to the bad channel file.

required

Returns:

Type Description
List[int]

list[int]: List of bad channel indices.

Source code in pytorch_dedispersion/dedisperse_candidates.py
83
84
85
86
87
88
89
90
91
92
93
94
95
def load_bad_channels(file_path: str) -> List[int]:
    """Load bad channel indices from a file.

    Args:
        file_path (str): Path to the bad channel file.

    Returns:
        list[int]: List of bad channel indices.
    """
    with open(file_path, 'r') as file:
        content = file.read().strip()
        bad_channels = list(map(int, content.split()))
    return bad_channels

load_config

load_config(config_file)

Load configuration from a JSON file.

Parameters:

Name Type Description Default
config_file str

Path to the configuration file.

required

Returns:

Name Type Description
dict Dict[str, Any]

Configuration parameters.

Source code in pytorch_dedispersion/dedisperse_candidates.py
15
16
17
18
19
20
21
22
23
24
25
26
def load_config(config_file: str) -> Dict[str, Any]:
    """Load configuration from a JSON file.

    Args:
        config_file (str): Path to the configuration file.

    Returns:
        dict: Configuration parameters.
    """
    with open(config_file, 'r') as file:
        config = json.load(file)
    return config

main

main()

Dedisperse data and find candidates.

Source code in pytorch_dedispersion/dedisperse_candidates.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def main() -> None:
    """Dedisperse data and find candidates."""
    parser = argparse.ArgumentParser(description="Dedisperse data and find candidates.")
    parser.add_argument("-c", "--config", type=str, required=True, help="Path to the configuration file.")
    parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output.")
    parser.add_argument("--remove-trend", action="store_true", help="Remove trend from the data.")
    parser.add_argument("--window-size", type=int, help="Window size for trend removal (required if --remove-trend is specified).")
    parser.add_argument("--gpu", type=int, default=0, help="GPU index to use (default: 0).")

    args = parser.parse_args()

    if args.remove_trend and not args.window_size:
        parser.error("--window-size is required when --remove-trend is specified.")

    config = load_config(args.config)

    dedisperse_and_find_candidates(
        config=config,
        verbose=args.verbose,
        remove_trend=args.remove_trend,
        window_size=args.window_size,
        gpu_index=args.gpu
    )

print_gpu_memory_usage

print_gpu_memory_usage(device, label='')

Print the current GPU memory usage.

Source code in pytorch_dedispersion/dedisperse_candidates.py
64
65
66
67
68
69
70
71
def print_gpu_memory_usage(device: torch.device, label: str = "") -> None:
    """Print the current GPU memory usage."""
    allocated = torch.cuda.memory_allocated(device)
    reserved = torch.cuda.memory_reserved(device)
    summary = torch.cuda.memory_summary(device=device)
    print(f"{label}\nMemory Allocated: {allocated / (1024 ** 2):.2f} MB")
    print(f"Memory Reserved: {reserved / (1024 ** 2):.2f} MB")
    print(summary)

save_candidates_to_csv

save_candidates_to_csv(candidates, filename)

Save candidate information to a CSV file.

Parameters:

Name Type Description Default
candidates list[dict]

List of detected candidates.

required
filename str

Path to the output CSV file.

required
Source code in pytorch_dedispersion/dedisperse_candidates.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def save_candidates_to_csv(candidates: List[Dict[str, Any]], filename: str) -> None:
    """Save candidate information to a CSV file.

    Args:
        candidates (list[dict]): List of detected candidates.
        filename (str): Path to the output CSV file.
    """
    with open(filename, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["SNR", "Sample Number", "Time (sec)", "Boxcar Width", "DM Value"])
        for candidate in candidates:
            writer.writerow([
                candidate['SNR'],
                candidate['Time Sample'],
                candidate['Time (sec)'],
                candidate['Boxcar Width'],
                candidate['DM Value']
            ])