Coverage for src/cell_abm_pipeline/flows/summarize_manifest.py: 0%
53 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1"""
2Workflow for summarizing files in the manifest.
4.. code-block:: bash
6 (name)
7 └── YYYY-MM-DD
8 └── (name).SUMMARY.txt
10For each search location, flow will attempt to find all files matching the
11specified series name. After applying include and exclude filters, the manifest
12is updated and a summary of files, grouped by extension, is printed and saved to
13a dated directory.
14"""
16from dataclasses import dataclass, field
17from fnmatch import fnmatch
19from container_collection.manifest import summarize_manifest_files, update_manifest_contents
20from io_collection.keys import get_keys, make_key
21from io_collection.load import load_dataframe
22from io_collection.save import save_dataframe, save_text
23from prefect import flow
26@dataclass
27class ParametersConfig:
28 """Parameter configuration for summarize manifest flow."""
30 update_manifest: bool = True
31 """True if the manifest file should be updated, False otherwise."""
33 search_locations: list[str] = field(default_factory=lambda: [])
34 """List of locations to search for files (local path or S3 bucket)."""
36 include_filters: list[str] = field(default_factory=lambda: ["*"])
37 """List of Unix filename patterns for files to include in summary."""
39 exclude_filters: list[str] = field(default_factory=lambda: [])
40 """List of Unix filename patterns for files to exclude from summary."""
43@dataclass
44class ContextConfig:
45 """Context configuration for summarize manifest flow."""
47 working_location: str
48 """Location for input and output files (local path or S3 bucket)."""
50 manifest_location: str
51 """Location of manifest file (local path or S3 bucket)."""
54@dataclass
55class SeriesConfig:
56 """Series configuration for summarize manifest flow."""
58 name: str
59 """Name of the simulation series."""
61 manifest_key: str
62 """Key for manifest file."""
64 seeds: list[int]
65 """List of series random seeds."""
67 conditions: list[dict]
68 """List of series condition dictionaries (must include unique condition "key")."""
71@flow(name="summarize-manifest")
72def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None:
73 """Main summarize manifest flow."""
75 if parameters.update_manifest:
76 location_keys = {}
78 for location in parameters.search_locations:
79 all_keys = get_keys(location, series.name)
81 selected_keys = set()
82 unselected_keys = set()
84 # Filter files for matches to include filters.
85 for include in parameters.include_filters:
86 selected_keys.update([key for key in all_keys if fnmatch(key, include)])
88 # Filter files for matches to exclude filters.
89 for exclude in parameters.exclude_filters:
90 unselected_keys.update([key for key in all_keys if fnmatch(key, exclude)])
92 location_keys[location] = list(selected_keys - unselected_keys)
94 manifest = update_manifest_contents(location_keys)
95 save_dataframe(context.manifest_location, series.manifest_key, manifest, index=False)
96 else:
97 manifest = load_dataframe(context.manifest_location, series.manifest_key)
99 summary = summarize_manifest_files(manifest, series.name, series.conditions, series.seeds)
100 summary_key = make_key(series.name, "{{timestamp}}", f"{series.name}.SUMMARY.txt")
101 save_text(context.working_location, summary_key, summary)
103 print("\n" + summary)