Coverage for src/io_collection/keys/get_keys.py: 100%

27 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-09-25 19:09 +0000

1from pathlib import Path 

2 

3import boto3 

4 

5 

6def get_keys(location: str, prefix: str) -> list[str]: 

7 """ 

8 Get list of objects at specified location with given prefix. 

9 

10 Method will check in the S3 bucket if the location begins with the **s3://** 

11 protocol, otherwise it assumes the location is a local path. 

12 

13 Parameters 

14 ---------- 

15 location 

16 Object location (local path or S3 bucket). 

17 prefix 

18 Object key prefix. 

19 

20 Returns 

21 ------- 

22 : 

23 List of all object keys at location. 

24 """ 

25 

26 if location[:5] == "s3://": 

27 return _get_keys_on_s3(location[5:], prefix) 

28 return _get_keys_on_fs(location, prefix) 

29 

30 

31def _get_keys_on_fs(path: str, prefix: str) -> list[str]: 

32 """ 

33 Get list of objects on local file system with given prefix. 

34 

35 Parameters 

36 ---------- 

37 path 

38 Local object path. 

39 prefix 

40 Object key prefix. 

41 

42 Returns 

43 ------- 

44 : 

45 List of all object keys on the local file system. 

46 """ 

47 

48 glob_pattern = f"{prefix}/**/*".replace("//", "/") 

49 all_files = Path(path).rglob(glob_pattern) 

50 regular_files = [str(file) for file in all_files if not file.is_dir()] 

51 return [file.replace(path, "").strip("/") for file in regular_files] 

52 

53 

54def _get_keys_on_s3(bucket: str, prefix: str) -> list[str]: 

55 """ 

56 Get list of objects in AWS S3 bucket with given prefix. 

57 

58 Parameters 

59 ---------- 

60 bucket 

61 AWS S3 bucket name. 

62 prefix 

63 Object key prefix. 

64 

65 Returns 

66 ------- 

67 : 

68 List of all object keys in the AWS bucket. 

69 """ 

70 

71 s3_client = boto3.client("s3") 

72 

73 bucket = bucket.replace("s3://", "") 

74 prefix = f"{prefix}/" 

75 response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) 

76 

77 get_response = True 

78 keys: list[str] = [] 

79 

80 while get_response: 

81 if "Contents" not in response: 

82 break 

83 

84 content_keys = [content["Key"] for content in response["Contents"]] 

85 keys = keys + content_keys 

86 

87 if response["IsTruncated"]: 

88 response = s3_client.list_objects_v2( 

89 Bucket=bucket, 

90 Prefix=prefix, 

91 ContinuationToken=response["NextContinuationToken"], 

92 ) 

93 else: 

94 get_response = False 

95 

96 return keys