Source code for cell_abm_pipeline.flows.group_resource_usage

"""
Workflow for grouping resource usage.

Working location structure:

.. code-block:: bash

    (name)
    ├── data
    │   └── data.(category)
    │       └── (name)_(key)_(seed).(category).tar.xz
    ├── groups
    │   └── groups.RESOURCE_USAGE
    │       ├── (name).object_storage.csv
    │       └── (name).wall_clock.csv
    └── logs
        └── (job_id).log

Different groups use inputs from **data** and **logs**. Grouped data are saved
to **groups.RESOURCE_USAGE**.
"""

import os
import re
from dataclasses import dataclass, field

import boto3
import pandas as pd
from io_collection.keys import get_keys, make_key
from io_collection.load import load_text
from io_collection.save import save_dataframe
from prefect import flow

GROUPS: list[str] = [
    "object_storage",
    "wall_clock",
]

OBJECT_CATEGORIES = ["CELLS", "LOCATIONS"]

OBJECT_PATTERN = r"[_]*([A-z0-9\s\_]*)_([0-9]{4})\."

LOG_PATTERN = r"simulation \[ ([A-z0-9\s\_]+) \| ([0-9]{4}) \] finished in ([0-9\.]+) minutes"


[docs]@dataclass class ParametersConfigObjectStorage: """Parameter configuration for group resource usage subflow - object storage.""" search_locations: list[str] = field(default_factory=lambda: []) """List of locations (local paths or S3 buckets) to search for files.""" categories: list[str] = field(default_factory=lambda: OBJECT_CATEGORIES) """List of object storage categories.""" pattern: str = OBJECT_PATTERN """Pattern to match for object key and seed."""
[docs]@dataclass class ParametersConfigWallClock: """Parameter configuration for group resource usage subflow - wall clock.""" search_locations: list[str] = field(default_factory=lambda: []) """List of locations (local paths or S3 buckets) to search for files.""" pattern: str = LOG_PATTERN """Pattern to match for object key, seed, and time.""" exceptions: list[str] = field(default_factory=lambda: []) """List of exception strings used to filter log files."""
[docs]@dataclass class ParametersConfig: """Parameter configuration for group resource usage flow.""" groups: list[str] = field(default_factory=lambda: GROUPS) """List of resource usages groups.""" object_storage: ParametersConfigObjectStorage = ParametersConfigObjectStorage() """Parameters for group object storage subflow.""" wall_clock: ParametersConfigWallClock = ParametersConfigWallClock() """Parameters for group wall clock subflow."""
[docs]@dataclass class ContextConfig: """Context configuration for group resource usage flow.""" working_location: str """Location for input and output files (local path or S3 bucket)."""
[docs]@dataclass class SeriesConfig: """Series configuration for group resource usage flow.""" name: str """Name of the simulation series."""
[docs]@flow(name="group-resource-usage") def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: """ Main group resource usage flow. Calls the following subflows, if the group is specified: - :py:func:`run_flow_group_object_storage` - :py:func:`run_flow_group_wall_clock` """ if "object_storage" in parameters.groups: run_flow_group_object_storage(context, series, parameters.object_storage) if "wall_clock" in parameters.groups: run_flow_group_wall_clock(context, series, parameters.wall_clock)
[docs]@flow(name="group-resource-usage_group-object-storage") def run_flow_group_object_storage( context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigObjectStorage ) -> None: """Group resource usage subflow for object storage size.""" group_key = make_key(series.name, "groups", "groups.RESOURCE_USAGE") all_sizes = [] for category in parameters.categories: for location in parameters.search_locations: file_keys = get_keys(location, make_key(series.name, "data", f"data.{category}")) for file_key in file_keys: key, seed = re.findall(parameters.pattern, file_key.split(series.name)[-1])[0] if location.startswith("s3://"): summary = boto3.resource("s3").ObjectSummary(location[5:], file_key) size = summary.size else: size = os.path.getsize(f"{location}{file_key}") all_sizes.append({"key": key, "seed": seed, "category": category, "size": size}) sizes_df = pd.DataFrame(all_sizes) sizes_df.sort_values(by=["key", "category", "seed"], ignore_index=True, inplace=True) save_dataframe( context.working_location, make_key(group_key, f"{series.name}.object_storage.csv"), sizes_df, index=False, )
[docs]@flow(name="group-resource-usage_group-wall-clock") def run_flow_group_wall_clock( context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigWallClock ) -> None: """Group resource usage subflow for wall clock time.""" group_key = make_key(series.name, "groups", "groups.RESOURCE_USAGE") all_times = [] for location in parameters.search_locations: file_keys = get_keys(location, make_key(series.name, "logs")) for file_key in file_keys: contents = load_text(location, file_key) if any(exception in contents for exception in parameters.exceptions): continue matches = re.findall(parameters.pattern, contents) for key, seed, time in matches: all_times.append({"key": key, "seed": seed, "time": time}) times_df = pd.DataFrame(all_times) times_df.sort_values(by=["key", "seed"], ignore_index=True, inplace=True) save_dataframe( context.working_location, make_key(group_key, f"{series.name}.wall_clock.csv"), times_df, index=False, )