-
Notifications
You must be signed in to change notification settings - Fork 128
/
microdados_vacinacao.py
112 lines (91 loc) · 3.62 KB
/
microdados_vacinacao.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import argparse
import csv
import re
import shlex
import subprocess
import sys
from pathlib import Path
from urllib.request import urlopen
from lxml.html import document_fromstring
from rows.utils import CsvLazyDictWriter, open_compressed
from tqdm import tqdm
from covid19br.vacinacao import censor, convert_row_uncensored
REGEXP_DATE = re.compile("([0-9]{4}-[0-9]{2}-[0-9]{2})")
def get_latest_url_and_date():
"""Scrapes CKAN in order to find the last available CSV for Brazil"""
repository_url = "https://opendatasus.saude.gov.br/dataset/covid-19-vacinacao/resource/ef3bd0b8-b605-474b-9ae5-c97390c197a8"
response = urlopen(repository_url)
html = response.read()
tree = document_fromstring(html)
download_url = tree.xpath(
"//a[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), 'dados completos')]/@href"
)[0]
date = REGEXP_DATE.findall(download_url)[0]
return download_url, date
def download_file_aria2c(url, filename, connections=4):
command = f"""
aria2c \
--dir "{filename.parent.absolute()}" \
-s "{connections}" \
-x "{connections}" \
-o "{filename.name}" \
"{url}"
""".strip()
subprocess.run(shlex.split(command))
def download_file_curl(url, filename):
with open(filename, mode="wb") as fobj:
p1 = subprocess.Popen(
shlex.split(f'curl "{url}"'),
stdout=subprocess.PIPE,
stderr=sys.stdout,
)
p2 = subprocess.Popen(
shlex.split("xz -0 -"),
stdin=p1.stdout,
stdout=fobj,
)
stdout, stderr = p2.communicate()
p2.wait()
p1.wait()
return stdout, stderr
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--connections", type=int, default=8)
parser.add_argument("--preserve-raw", action="store_true")
parser.add_argument("--buffering", type=int, default=8 * 1024 * 1024)
args = parser.parse_args()
# TODO: adicionar opção para selecionar qual dos 3 possíveis CSVs o script
# irá gerar.
# TODO: configurar saída do logger para arquivo e não stdout/stderr
# TODO: adicionar opção para salvar ou não CSV original (compactado)
url, date = get_latest_url_and_date()
output_path = Path(__file__).parent / "data" / "output"
filename_raw = output_path / f"microdados_vacinacao-raw-{date}.csv.xz"
filename_censored = output_path / "microdados_vacinacao.csv.gz"
filename_uncensored = output_path / "microdados_vacinacao-uncensored.csv.gz"
if not output_path.exists():
output_path.mkdir(parents=True)
download_file_curl(url, filename_raw)
with open_compressed(filename_raw) as fobj:
fobj_censored = open_compressed(
filename_censored, mode="w", buffering=args.buffering
)
writer_censored = CsvLazyDictWriter(fobj_censored)
censored_writerow = writer_censored.writerow
fobj_uncensored = open_compressed(
filename_uncensored, mode="w", buffering=args.buffering
)
writer_uncensored = CsvLazyDictWriter(fobj_uncensored)
uncensored_writerow = writer_uncensored.writerow
reader = csv.DictReader(fobj, delimiter=";")
for counter, row in tqdm(enumerate(reader), unit_scale=True, unit="row"):
row = convert_row_uncensored(row)
uncensored_writerow(row)
censor(row)
censored_writerow(row)
writer_censored.close()
writer_uncensored.close()
if not args.preserve_raw:
filename_raw.unlink()
if __name__ == "__main__":
main()