Coverage for src/cell_abm_pipeline/flows/summarize_manifest.py: 0%

53 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for summarizing files in the manifest. 

3 

4.. code-block:: bash 

5 

6 (name) 

7 └── YYYY-MM-DD 

8 └── (name).SUMMARY.txt 

9 

10For each search location, flow will attempt to find all files matching the 

11specified series name. After applying include and exclude filters, the manifest 

12is updated and a summary of files, grouped by extension, is printed and saved to 

13a dated directory. 

14""" 

15 

16from dataclasses import dataclass, field 

17from fnmatch import fnmatch 

18 

19from container_collection.manifest import summarize_manifest_files, update_manifest_contents 

20from io_collection.keys import get_keys, make_key 

21from io_collection.load import load_dataframe 

22from io_collection.save import save_dataframe, save_text 

23from prefect import flow 

24 

25 

26@dataclass 

27class ParametersConfig: 

28 """Parameter configuration for summarize manifest flow.""" 

29 

30 update_manifest: bool = True 

31 """True if the manifest file should be updated, False otherwise.""" 

32 

33 search_locations: list[str] = field(default_factory=lambda: []) 

34 """List of locations to search for files (local path or S3 bucket).""" 

35 

36 include_filters: list[str] = field(default_factory=lambda: ["*"]) 

37 """List of Unix filename patterns for files to include in summary.""" 

38 

39 exclude_filters: list[str] = field(default_factory=lambda: []) 

40 """List of Unix filename patterns for files to exclude from summary.""" 

41 

42 

43@dataclass 

44class ContextConfig: 

45 """Context configuration for summarize manifest flow.""" 

46 

47 working_location: str 

48 """Location for input and output files (local path or S3 bucket).""" 

49 

50 manifest_location: str 

51 """Location of manifest file (local path or S3 bucket).""" 

52 

53 

54@dataclass 

55class SeriesConfig: 

56 """Series configuration for summarize manifest flow.""" 

57 

58 name: str 

59 """Name of the simulation series.""" 

60 

61 manifest_key: str 

62 """Key for manifest file.""" 

63 

64 seeds: list[int] 

65 """List of series random seeds.""" 

66 

67 conditions: list[dict] 

68 """List of series condition dictionaries (must include unique condition "key").""" 

69 

70 

71@flow(name="summarize-manifest") 

72def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

73 """Main summarize manifest flow.""" 

74 

75 if parameters.update_manifest: 

76 location_keys = {} 

77 

78 for location in parameters.search_locations: 

79 all_keys = get_keys(location, series.name) 

80 

81 selected_keys = set() 

82 unselected_keys = set() 

83 

84 # Filter files for matches to include filters. 

85 for include in parameters.include_filters: 

86 selected_keys.update([key for key in all_keys if fnmatch(key, include)]) 

87 

88 # Filter files for matches to exclude filters. 

89 for exclude in parameters.exclude_filters: 

90 unselected_keys.update([key for key in all_keys if fnmatch(key, exclude)]) 

91 

92 location_keys[location] = list(selected_keys - unselected_keys) 

93 

94 manifest = update_manifest_contents(location_keys) 

95 save_dataframe(context.manifest_location, series.manifest_key, manifest, index=False) 

96 else: 

97 manifest = load_dataframe(context.manifest_location, series.manifest_key) 

98 

99 summary = summarize_manifest_files(manifest, series.name, series.conditions, series.seeds) 

100 summary_key = make_key(series.name, "{{timestamp}}", f"{series.name}.SUMMARY.txt") 

101 save_text(context.working_location, summary_key, summary) 

102 

103 print("\n" + summary)