pypamguard.load_pamguard_multi_file

 1import os, glob
 2from pathlib import Path
 3from pypamguard.core.filters import Filters, WhitelistFilter
 4from .load_pamguard_binary_file import load_pamguard_binary_file
 5from .logger import logger, Verbosity
 6from pypamguard.chunks.generics import GenericModule
 7from pypamguard.core.readers import Report
 8from pypamguard.core.exceptions import CriticalException, MultiFileException
 9
10_last_root = None
11_last_mask = None
12_master_list = []
13_master_dict = {}
14_MAX_NAME_LEN = 80
15
16def find_binary_file(root, mask, file):
17    global _last_mask, _last_root, _master_list, _master_dict
18    if (not _last_root or not _last_mask) or (_last_root != root or _last_mask != mask):
19        _master_list = glob.glob(pathname=mask, root_dir=root, recursive=True)
20        _master_dict = {}
21        for reldir in _master_list:
22            path = os.path.join(root, reldir)
23            fname = os.path.basename(path)
24            short_name = fname[len(fname)-_MAX_NAME_LEN:] if len(fname) > _MAX_NAME_LEN else fname
25            if short_name not in _master_dict:
26                _master_dict[short_name] = path
27        _last_root = root
28        _last_mask = mask
29    if file in _master_dict:
30        return _master_dict[file]
31    else:
32        return None
33
34def load_pamguard_multi_file(data_dir: str | Path, file_names: list[str], item_uids: list[int]) -> tuple[list[GenericModule], Report]:
35    """
36    A function to load a number of PAMGuard data chunks at once from various binary files, filtering by UID.
37    Will return a tuple containing a list of `pypamguard.chunks.generics.GenericModule` objects (event data)
38    and a `core.readers.Report` object (with errors/warnings).
39
40    For example, the following code will expect three files, `file1.pgdf`, `file2.pgdf` and `file3.pgdf`
41    in the directory `./data` with the respective UIDs.
42    ```python
43    file_names=["file1.pgdf", "file1.pgdf", "file2.pgdf", "file3.pgdf", "file3.pgdf"]
44    item_uids=[7000001, 7000199, 10000001, 10002893, 6000001]
45    event_data, report = load_pamguard_multi_file("./data", file_names, item_uids)
46    ```
47
48    - A `FileNotFoundError` is raised if `data_dir` does not exist.
49    - A `ValueError` is raised if `file_names` and `item_uids` are not the same length.
50    - A `FileNotFoundError` is added to the report for each file that is not found.
51    - A `pypamguard.core.exceptions.MultiFileException` is added to the report for each file
52        that requires one or more UIDs that aren't found.
53    - If any warnings/errors occur when reading a file, they are added to the report.
54    """
55    if not os.path.exists(data_dir):
56        raise FileNotFoundError(f"Data directory {data_dir} does not exist.")
57    if len(file_names) != len(item_uids):
58        raise ValueError("file_names and item_uids must be the same length.")
59
60    file_name_dict = {}
61    report = Report()
62
63    event_data = []
64    logger.set_verbosity(verbosity=Verbosity.ERROR)
65        
66    # Each file name has one or more UIDs. Better represented by dict.
67    for file_name, uid in zip(file_names, item_uids):
68        if file_name not in file_name_dict:
69            file_name_dict[file_name] = []
70        file_name_dict[file_name].append(uid)
71
72    for file_name in file_name_dict:
73        logger.info(f"Loading {file_name}")
74        filter_obj = Filters({"uidlist": WhitelistFilter(file_name_dict[file_name])})
75        file_path = find_binary_file(data_dir, "**/*.pgdf", file_name)
76        if file_path is None:
77            report.add_error(FileNotFoundError(f"File {file_name} not found in {data_dir}."))
78            continue
79        file_data = load_pamguard_binary_file(file_path, filters=filter_obj, report = report)
80        file_data.add_file_info()
81        if len(file_data.data) != len(file_name_dict[file_name]):
82            report.add_error(MultiFileException(file_name, f"Expected {len(file_name_dict[file_name])} items in {file_name}, found {len(file_data.data)}."))
83        event_data.extend(file_data.data)
84
85    return event_data, report
def find_binary_file(root, mask, file):
17def find_binary_file(root, mask, file):
18    global _last_mask, _last_root, _master_list, _master_dict
19    if (not _last_root or not _last_mask) or (_last_root != root or _last_mask != mask):
20        _master_list = glob.glob(pathname=mask, root_dir=root, recursive=True)
21        _master_dict = {}
22        for reldir in _master_list:
23            path = os.path.join(root, reldir)
24            fname = os.path.basename(path)
25            short_name = fname[len(fname)-_MAX_NAME_LEN:] if len(fname) > _MAX_NAME_LEN else fname
26            if short_name not in _master_dict:
27                _master_dict[short_name] = path
28        _last_root = root
29        _last_mask = mask
30    if file in _master_dict:
31        return _master_dict[file]
32    else:
33        return None
def load_pamguard_multi_file( data_dir: str | pathlib._local.Path, file_names: list[str], item_uids: list[int]) -> tuple[list[pypamguard.chunks.generics.genmodule.GenericModule], pypamguard.core.readers.Report]:
35def load_pamguard_multi_file(data_dir: str | Path, file_names: list[str], item_uids: list[int]) -> tuple[list[GenericModule], Report]:
36    """
37    A function to load a number of PAMGuard data chunks at once from various binary files, filtering by UID.
38    Will return a tuple containing a list of `pypamguard.chunks.generics.GenericModule` objects (event data)
39    and a `core.readers.Report` object (with errors/warnings).
40
41    For example, the following code will expect three files, `file1.pgdf`, `file2.pgdf` and `file3.pgdf`
42    in the directory `./data` with the respective UIDs.
43    ```python
44    file_names=["file1.pgdf", "file1.pgdf", "file2.pgdf", "file3.pgdf", "file3.pgdf"]
45    item_uids=[7000001, 7000199, 10000001, 10002893, 6000001]
46    event_data, report = load_pamguard_multi_file("./data", file_names, item_uids)
47    ```
48
49    - A `FileNotFoundError` is raised if `data_dir` does not exist.
50    - A `ValueError` is raised if `file_names` and `item_uids` are not the same length.
51    - A `FileNotFoundError` is added to the report for each file that is not found.
52    - A `pypamguard.core.exceptions.MultiFileException` is added to the report for each file
53        that requires one or more UIDs that aren't found.
54    - If any warnings/errors occur when reading a file, they are added to the report.
55    """
56    if not os.path.exists(data_dir):
57        raise FileNotFoundError(f"Data directory {data_dir} does not exist.")
58    if len(file_names) != len(item_uids):
59        raise ValueError("file_names and item_uids must be the same length.")
60
61    file_name_dict = {}
62    report = Report()
63
64    event_data = []
65    logger.set_verbosity(verbosity=Verbosity.ERROR)
66        
67    # Each file name has one or more UIDs. Better represented by dict.
68    for file_name, uid in zip(file_names, item_uids):
69        if file_name not in file_name_dict:
70            file_name_dict[file_name] = []
71        file_name_dict[file_name].append(uid)
72
73    for file_name in file_name_dict:
74        logger.info(f"Loading {file_name}")
75        filter_obj = Filters({"uidlist": WhitelistFilter(file_name_dict[file_name])})
76        file_path = find_binary_file(data_dir, "**/*.pgdf", file_name)
77        if file_path is None:
78            report.add_error(FileNotFoundError(f"File {file_name} not found in {data_dir}."))
79            continue
80        file_data = load_pamguard_binary_file(file_path, filters=filter_obj, report = report)
81        file_data.add_file_info()
82        if len(file_data.data) != len(file_name_dict[file_name]):
83            report.add_error(MultiFileException(file_name, f"Expected {len(file_name_dict[file_name])} items in {file_name}, found {len(file_data.data)}."))
84        event_data.extend(file_data.data)
85
86    return event_data, report

A function to load a number of PAMGuard data chunks at once from various binary files, filtering by UID. Will return a tuple containing a list of pypamguard.chunks.generics.GenericModule objects (event data) and a core.readers.Report object (with errors/warnings).

For example, the following code will expect three files, file1.pgdf, file2.pgdf and file3.pgdf in the directory ./data with the respective UIDs.

file_names=["file1.pgdf", "file1.pgdf", "file2.pgdf", "file3.pgdf", "file3.pgdf"]
item_uids=[7000001, 7000199, 10000001, 10002893, 6000001]
event_data, report = load_pamguard_multi_file("./data", file_names, item_uids)
  • A FileNotFoundError is raised if data_dir does not exist.
  • A ValueError is raised if file_names and item_uids are not the same length.
  • A FileNotFoundError is added to the report for each file that is not found.
  • A pypamguard.core.exceptions.MultiFileException is added to the report for each file that requires one or more UIDs that aren't found.
  • If any warnings/errors occur when reading a file, they are added to the report.