-
Notifications
You must be signed in to change notification settings - Fork 25
/
google_drive_downloader.py
104 lines (88 loc) · 3.78 KB
/
google_drive_downloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from __future__ import print_function
import requests
import zipfile
import warnings
from sys import stdout
from os import makedirs
from os.path import dirname
from os.path import exists
class GoogleDriveDownloader:
"""
Minimal class to download shared files from Google Drive.
"""
CHUNK_SIZE = 32768
DOWNLOAD_URL = 'https://docs.google.com/uc?export=download'
@staticmethod
def download_file_from_google_drive(file_id, dest_path, overwrite=False, unzip=False, showsize=False):
"""
Downloads a shared file from google drive into a given folder.
Optionally unzips it.
Parameters
----------
file_id: str
the file identifier.
You can obtain it from the sharable link.
dest_path: str
the destination where to save the downloaded file.
Must be a path (for example: './downloaded_file.txt')
overwrite: bool
optional, if True forces re-download and overwrite.
unzip: bool
optional, if True unzips a file.
If the file is not a zip file, ignores it.
showsize: bool
optional, if True print the current download size.
Returns
-------
None
"""
destination_directory = dirname(dest_path)
if not exists(destination_directory):
makedirs(destination_directory)
if not exists(dest_path) or overwrite:
session = requests.Session()
print('Downloading {} into {}... '.format(file_id, dest_path), end='')
stdout.flush()
response = session.get(GoogleDriveDownloader.DOWNLOAD_URL, params={'id': file_id}, stream=True)
token = GoogleDriveDownloader._get_confirm_token(response)
if token:
params = {'id': file_id, 'confirm': token}
response = session.get(GoogleDriveDownloader.DOWNLOAD_URL, params=params, stream=True)
if showsize:
print() # Skip to the next line
current_download_size = [0]
GoogleDriveDownloader._save_response_content(response, dest_path, showsize, current_download_size)
print('Done.')
if unzip:
try:
print('Unzipping...', end='')
stdout.flush()
with zipfile.ZipFile(dest_path, 'r') as z:
z.extractall(destination_directory)
print('Done.')
except zipfile.BadZipfile:
warnings.warn('Ignoring `unzip` since "{}" does not look like a valid zip file'.format(file_id))
@staticmethod
def _get_confirm_token(response):
for key, value in response.cookies.items():
if key.startswith('download_warning'):
return value
return None
@staticmethod
def _save_response_content(response, destination, showsize, current_size):
with open(destination, 'wb') as f:
for chunk in response.iter_content(GoogleDriveDownloader.CHUNK_SIZE):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
if showsize:
print('\r' + GoogleDriveDownloader.sizeof_fmt(current_size[0]), end=' ')
stdout.flush()
current_size[0] += GoogleDriveDownloader.CHUNK_SIZE
# From https://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
@staticmethod
def sizeof_fmt(num, suffix='B'):
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return '{:.1f} {}{}'.format(num, unit, suffix)
num /= 1024.0
return '{:.1f} {}{}'.format(num, 'Yi', suffix)