Coverage for src/io_collection/load/load_buffer.py: 100%

22 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-09-25 19:09 +0000

1import io 

2from pathlib import Path 

3 

4import boto3 

5 

6MAX_CONTENT_LENGTH = 2**31 - 1 

7 

8 

9def load_buffer(location: str, key: str) -> io.BytesIO: 

10 """ 

11 Load key into in-memory bytes buffer from specified location. 

12 

13 Method will load from the S3 bucket if the location begins with the 

14 **s3://** protocol, otherwise it assumes the location is a local path. 

15 

16 Parameters 

17 ---------- 

18 location 

19 Object location (local path or S3 bucket). 

20 key 

21 Object key. 

22 

23 Returns 

24 ------- 

25 : 

26 Loaded object buffer. 

27 """ 

28 

29 if location[:5] == "s3://": 

30 return _load_buffer_from_s3(location[5:], key) 

31 return _load_buffer_from_fs(location, key) 

32 

33 

34def _load_buffer_from_fs(path: str, key: str) -> io.BytesIO: 

35 """ 

36 Load key into in-memory bytes buffer from local file system. 

37 

38 Parameters 

39 ---------- 

40 path 

41 Local object path. 

42 key 

43 Object key. 

44 

45 Returns 

46 ------- 

47 : 

48 Loaded object buffer. 

49 """ 

50 

51 full_path = Path(path) / key 

52 with full_path.open("rb") as fileobj: 

53 return io.BytesIO(fileobj.read()) 

54 

55 

56def _load_buffer_from_s3(bucket: str, key: str) -> io.BytesIO: 

57 """ 

58 Load key into in-memory bytes buffer from AWS S3 bucket. 

59 

60 Objects larger than `MAX_CONTENT_LENGTH` are loaded in chunks. 

61 

62 Parameters 

63 ---------- 

64 bucket 

65 AWS S3 bucket name. 

66 key 

67 Object key. 

68 

69 Returns 

70 ------- 

71 : 

72 Loaded object buffer. 

73 """ 

74 

75 s3_client = boto3.client("s3") 

76 obj = s3_client.get_object(Bucket=bucket, Key=key) 

77 

78 # Check if body needs to be loaded in chunks. 

79 content_length = obj["ContentLength"] 

80 

81 if content_length > MAX_CONTENT_LENGTH: 

82 body = bytearray() 

83 for chunk in obj["Body"].iter_chunks(chunk_size=MAX_CONTENT_LENGTH): 

84 body += chunk 

85 return io.BytesIO(body) 

86 

87 return io.BytesIO(obj["Body"].read())