Coverage for src/cell_abm_pipeline/flows/group_resource_usage.py: 0%
84 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1"""
2Workflow for grouping resource usage.
4Working location structure:
6.. code-block:: bash
8 (name)
9 ├── data
10 │ └── data.(category)
11 │ └── (name)_(key)_(seed).(category).tar.xz
12 ├── groups
13 │ └── groups.RESOURCE_USAGE
14 │ ├── (name).object_storage.csv
15 │ └── (name).wall_clock.csv
16 └── logs
17 └── (job_id).log
19Different groups use inputs from **data** and **logs**. Grouped data are saved
20to **groups.RESOURCE_USAGE**.
21"""
23import os
24import re
25from dataclasses import dataclass, field
27import boto3
28import pandas as pd
29from io_collection.keys import get_keys, make_key
30from io_collection.load import load_text
31from io_collection.save import save_dataframe
32from prefect import flow
34GROUPS: list[str] = [
35 "object_storage",
36 "wall_clock",
37]
39OBJECT_CATEGORIES = ["CELLS", "LOCATIONS"]
41OBJECT_PATTERN = r"[_]*([A-z0-9\s\_]*)_([0-9]{4})\."
43LOG_PATTERN = r"simulation \[ ([A-z0-9\s\_]+) \| ([0-9]{4}) \] finished in ([0-9\.]+) minutes"
46@dataclass
47class ParametersConfigObjectStorage:
48 """Parameter configuration for group resource usage subflow - object storage."""
50 search_locations: list[str] = field(default_factory=lambda: [])
51 """List of locations (local paths or S3 buckets) to search for files."""
53 categories: list[str] = field(default_factory=lambda: OBJECT_CATEGORIES)
54 """List of object storage categories."""
56 pattern: str = OBJECT_PATTERN
57 """Pattern to match for object key and seed."""
60@dataclass
61class ParametersConfigWallClock:
62 """Parameter configuration for group resource usage subflow - wall clock."""
64 search_locations: list[str] = field(default_factory=lambda: [])
65 """List of locations (local paths or S3 buckets) to search for files."""
67 pattern: str = LOG_PATTERN
68 """Pattern to match for object key, seed, and time."""
70 exceptions: list[str] = field(default_factory=lambda: [])
71 """List of exception strings used to filter log files."""
74@dataclass
75class ParametersConfig:
76 """Parameter configuration for group resource usage flow."""
78 groups: list[str] = field(default_factory=lambda: GROUPS)
79 """List of resource usages groups."""
81 object_storage: ParametersConfigObjectStorage = ParametersConfigObjectStorage()
82 """Parameters for group object storage subflow."""
84 wall_clock: ParametersConfigWallClock = ParametersConfigWallClock()
85 """Parameters for group wall clock subflow."""
88@dataclass
89class ContextConfig:
90 """Context configuration for group resource usage flow."""
92 working_location: str
93 """Location for input and output files (local path or S3 bucket)."""
96@dataclass
97class SeriesConfig:
98 """Series configuration for group resource usage flow."""
100 name: str
101 """Name of the simulation series."""
104@flow(name="group-resource-usage")
105def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None:
106 """
107 Main group resource usage flow.
109 Calls the following subflows, if the group is specified:
111 - :py:func:`run_flow_group_object_storage`
112 - :py:func:`run_flow_group_wall_clock`
113 """
115 if "object_storage" in parameters.groups:
116 run_flow_group_object_storage(context, series, parameters.object_storage)
118 if "wall_clock" in parameters.groups:
119 run_flow_group_wall_clock(context, series, parameters.wall_clock)
122@flow(name="group-resource-usage_group-object-storage")
123def run_flow_group_object_storage(
124 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigObjectStorage
125) -> None:
126 """Group resource usage subflow for object storage size."""
128 group_key = make_key(series.name, "groups", "groups.RESOURCE_USAGE")
130 all_sizes = []
132 for category in parameters.categories:
133 for location in parameters.search_locations:
134 file_keys = get_keys(location, make_key(series.name, "data", f"data.{category}"))
136 for file_key in file_keys:
137 key, seed = re.findall(parameters.pattern, file_key.split(series.name)[-1])[0]
139 if location.startswith("s3://"):
140 summary = boto3.resource("s3").ObjectSummary(location[5:], file_key)
141 size = summary.size
142 else:
143 size = os.path.getsize(f"{location}{file_key}")
145 all_sizes.append({"key": key, "seed": seed, "category": category, "size": size})
147 sizes_df = pd.DataFrame(all_sizes)
148 sizes_df.sort_values(by=["key", "category", "seed"], ignore_index=True, inplace=True)
150 save_dataframe(
151 context.working_location,
152 make_key(group_key, f"{series.name}.object_storage.csv"),
153 sizes_df,
154 index=False,
155 )
158@flow(name="group-resource-usage_group-wall-clock")
159def run_flow_group_wall_clock(
160 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigWallClock
161) -> None:
162 """Group resource usage subflow for wall clock time."""
164 group_key = make_key(series.name, "groups", "groups.RESOURCE_USAGE")
166 all_times = []
168 for location in parameters.search_locations:
169 file_keys = get_keys(location, make_key(series.name, "logs"))
171 for file_key in file_keys:
172 contents = load_text(location, file_key)
174 if any(exception in contents for exception in parameters.exceptions):
175 continue
177 matches = re.findall(parameters.pattern, contents)
179 for key, seed, time in matches:
180 all_times.append({"key": key, "seed": seed, "time": time})
182 times_df = pd.DataFrame(all_times)
183 times_df.sort_values(by=["key", "seed"], ignore_index=True, inplace=True)
185 save_dataframe(
186 context.working_location,
187 make_key(group_key, f"{series.name}.wall_clock.csv"),
188 times_df,
189 index=False,
190 )