-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paths3grep.py
117 lines (88 loc) · 3.04 KB
/
s3grep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
The MIT License (MIT)
Copyright (c) 2016 Kar Lun (Daniel) Ng
S3Grep - regex grep on multiple (gzipped) text files on S3
"""
import argparse
import gzip
import io
import logging
import os
import re
import sys
import urllib.parse
import boto3
import boto_stream
logger = logging.getLogger('s3grep')
def _parse_url(url: str) -> (str, str):
'''
get the bucket and path
:param url: full s3 path
:return: bucket, path
'''
splitresult = urllib.parse.urlsplit(url, allow_fragments=False)
# remove the first / from the path
return splitresult.netloc, splitresult.path[1:]
def _parse_args(argv):
parser = argparse.ArgumentParser(
prog=argv[0], description='s3grep to grep multiple (gzipped) text'
' files on s3 specified in the url '
'argument')
parser.add_argument('-u', '--url',
dest='url',
type=str,
help='the S3 location of file(s), '
'eg. s3://bucket/some-prefix',
required=True)
parser.add_argument('-d', '--debug',
dest='debug',
help='turn on the debug mode with high verbosity',
action='store_true',
default=False)
parser.add_argument('-r', '--regex',
dest='regex',
type=str,
help='the regex pattern used for line matching',
required=True)
return parser.parse_args(argv[1:])
def _setup_logging(verbose: bool):
logging.basicConfig(
format=
'%(asctime)s-%(levelname)s-%(name)s:%(lineno)d: %(message)s')
l = logging.getLogger()
if verbose is True:
l.setLevel(logging.DEBUG)
else:
l.setLevel(logging.INFO)
def _grep_a_file(bucketstr: str, key: str, regex: str,
output: io.TextIOWrapper):
'''
parse the s3 file line to see if it matches the regex
if yes, dump the line into output buffer
:param bucket:
:param key:
:param regex:
:param output: the output buffer
:return:
'''
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucketstr)
for obj in bucket.objects.filter(Prefix=key):
datadict = obj.get()
instream = boto_stream.BotoStreamBody(datadict['Body'])
instream = io.BufferedReader(instream, buffer_size=1 * 2 ^ 20)
filename, file_extension = os.path.splitext(key)
if file_extension == '.gz':
instream = gzip.GzipFile(fileobj=instream, mode='rb')
for line in io.TextIOWrapper(instream):
if re.search(regex, line) is not None:
output.write(obj.key + ":" + line)
def main():
args = _parse_args(sys.argv)
_setup_logging(args.debug)
bucket, key = _parse_url(args.url)
_grep_a_file(bucketstr=bucket, key=key, regex=args.regex,
output=sys.stdout)
pass
if __name__ == '__main__':
main()