Coverage for src/cell_abm_pipeline/flows/parse_arcade_simulations.py: 0%

43 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for parsing ARCADE simulations into tidy data. 

3 

4Working location structure: 

5 

6.. code-block:: bash 

7 

8 (name) 

9 ├── data 

10 │ ├── data.CELLS 

11 │ │ └── (name)_(key)_(seed).CELLS.tar.xz 

12 │ └── data.LOCATIONS 

13 │ └── (name)_(key)_(seed).LOCATIONS.tar.xz 

14 └── results 

15 └── (name)_(key)_(seed).csv 

16 

17Data from **data.CELLS** and **data.LOCATIONS** are parsed into **results**. If 

18the results file already exists, additional parsing will merge results based on 

19cell id and tick. 

20""" 

21 

22from dataclasses import dataclass, field 

23 

24from arcade_collection.output import merge_parsed_results, parse_cells_file, parse_locations_file 

25from container_collection.manifest import filter_manifest_files 

26from io_collection.keys import check_key, make_key 

27from io_collection.load import load_dataframe, load_tar 

28from io_collection.save import save_dataframe 

29from prefect import flow 

30 

31 

32@dataclass 

33class ParametersConfig: 

34 """Parameter configuration for parse arcade simulations flow.""" 

35 

36 regions: list[str] = field(default_factory=lambda: []) 

37 """List of subcellular regions to parse.""" 

38 

39 include_filters: list[str] = field(default_factory=lambda: ["*"]) 

40 """List of Unix filename patterns for files to include in parsing.""" 

41 

42 exclude_filters: list[str] = field(default_factory=lambda: []) 

43 """List of Unix filename patterns for files to exclude from parsing.""" 

44 

45 

46@dataclass 

47class ContextConfig: 

48 """Context configuration for parse arcade simulations flow.""" 

49 

50 working_location: str 

51 """Location for input and output files (local path or S3 bucket).""" 

52 

53 manifest_location: str 

54 """Location of manifest file (local path or S3 bucket).""" 

55 

56 

57@dataclass 

58class SeriesConfig: 

59 """Series configuration for parse arcade simulations flow.""" 

60 

61 name: str 

62 """Name of the simulation series.""" 

63 

64 manifest_key: str 

65 """Key for manifest file.""" 

66 

67 extensions: list[str] 

68 """List of file extensions in complete run.""" 

69 

70 

71@flow(name="parse-arcade-simulations") 

72def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

73 """Main parse arcade simulations flow.""" 

74 

75 manifest = load_dataframe(context.manifest_location, series.manifest_key) 

76 filtered_files = filter_manifest_files( 

77 manifest, series.extensions, parameters.include_filters, parameters.exclude_filters 

78 ) 

79 

80 for key, files in filtered_files.items(): 

81 results_key = make_key(series.name, "{{timestamp}}", "results", f"{key}.csv") 

82 

83 if check_key(context.working_location, results_key): 

84 continue 

85 

86 cells_tar = load_tar(**files["CELLS.tar.xz"]) 

87 cells = parse_cells_file(cells_tar, parameters.regions) 

88 

89 locs_tar = load_tar(**files["LOCATIONS.tar.xz"]) 

90 locs = parse_locations_file(locs_tar, parameters.regions) 

91 

92 results = merge_parsed_results(cells, locs) 

93 save_dataframe(context.working_location, results_key, results, index=False)