-
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #39 from Sayvai-io/dinesh
Created a tool for google drive in utils
- Loading branch information
Showing
2 changed files
with
197 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
from service import Create_Service | ||
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload | ||
import io | ||
from pprint import pprint | ||
import pandas as pd | ||
import os | ||
|
||
|
||
class Drive: | ||
"""Tool that allows you to work with Google Drive.""" | ||
|
||
name = "drive" | ||
description = ( | ||
"Useful for creating, update, delete and other changes in Google Drive." | ||
) | ||
|
||
def __init__(self, SERVICE_ACCOUNT_FILE: str, API_NAME: str = 'drive', API_VERSION: str = 'v3', | ||
SCOPES: list[str] = ['https://www.googleapis.com/auth/drive']): | ||
self.service = Create_Service(SERVICE_ACCOUNT_FILE, API_NAME, API_VERSION, SCOPES) | ||
|
||
"""Function to create a new folder in drive.""" | ||
|
||
def create_folder(self, folder_name: str, parent_id: str | None = None): | ||
if parent_id is not None: | ||
"""Create folder in mentioned directory.""" | ||
file_metadata = { | ||
'name': folder_name, | ||
'mimeType': 'application/vnd.google-apps.folder', | ||
'parents': [parent_id] | ||
} | ||
else: | ||
"""Creates folder in the outer directory.""" | ||
file_metadata = { | ||
'name': folder_name, | ||
'mimeType': 'application/vnd.google-apps.folder' | ||
} | ||
response = self.service.files().create(body=file_metadata).execute() | ||
print(f"Folder '{response['name']}' with id '{response['id']}' has been successfully created.") | ||
return "Folder creation is successful." | ||
|
||
"""Function to upload a file in drive.""" | ||
|
||
def upload_file(self, file_path: str, file_name, mime_type: str, parent_id: str | None = None): | ||
if parent_id is not None: | ||
"""Upload file in mentioned directory.""" | ||
file_metadata = { | ||
'name': file_name, | ||
'parents': [parent_id], | ||
'mimeType': mime_type | ||
} | ||
else: | ||
"""Upload file in the outer directory.""" | ||
file_metadata = { | ||
'name': file_name, | ||
'mimeType': mime_type | ||
} | ||
media = MediaFileUpload(file_path, mimetype=mime_type) | ||
response = self.service.files().create(body=file_metadata, media_body=media, fields='id').execute() | ||
print(f"File '{response['name']}' with id '{response['id']}' has been successfully uploaded.") | ||
return "File upload is successful." | ||
|
||
"""Function to download a file from drive.""" | ||
|
||
def download_file(self, file_id: str, file_name: str): | ||
request = self.service.files().get_media(fileId=file_id) | ||
fh = io.FileIO(file_name, 'wb') | ||
downloader = MediaIoBaseDownload(fd=fh, request=request) | ||
done = False | ||
while done is False: | ||
status, done = downloader.next_chunk() | ||
print("Download %d%%." % int(status.progress() * 100)) | ||
fh.seek(0) | ||
print(f"File '{file_name}' with id '{file_id}' has been successfully downloaded.") | ||
return "File download is successful." | ||
|
||
"""Function to copy a file in drive.""" | ||
|
||
def copy_file(self, source_file_id: str, parent_id: str): | ||
file_metadata = { | ||
'name': "sayvai_logo.jpg", | ||
'parents': [parent_id], | ||
'starred': True, | ||
'description': "This is a logo of SayvAI" | ||
} | ||
response = self.service.files().copy(fileId=source_file_id, body=file_metadata).execute() | ||
print(f"File '{response['name']}' with id '{response['id']}' has been successfully copied.") | ||
return "File copy is successful." | ||
|
||
"""Function to get drive account information.""" | ||
|
||
def get_account_info(self): | ||
response = self.service.about().get(fields="*").execute() | ||
pprint(response) | ||
for k, v in response.get('storageQuota').items(): | ||
print('{0}: {1:.2f}MB'.format(k, int(v) / 1024 ** 2)) | ||
print('{0}: {1:.2f}GB'.format(k, int(v) / 1024 ** 3)) | ||
return "Account information is successful." | ||
|
||
"""Function to get list of files in drive'}""" | ||
|
||
def get_list_of_files(self, parent_id: str): | ||
query = f"parents = '{parent_id}'" | ||
results = self.service.files().list(pageSize=1000, | ||
fields="nextPageToken, files(id, name, mimeType, size, modifiedTime)", | ||
q=query).execute() | ||
print(results) | ||
items = results.get('files', []) | ||
data = [] | ||
for row in items: | ||
if row["mimeType"] != "application/vnd.google-apps.folder": | ||
row_data = [] | ||
try: | ||
row_data.append(round(int(row["size"]) / 1000000, 2)) | ||
except KeyError: | ||
row_data.append(0.00) | ||
row_data.append(row["id"]) | ||
row_data.append(row["name"]) | ||
row_data.append(row["modifiedTime"]) | ||
row_data.append(row["mimeType"]) | ||
data.append(row_data) | ||
cleared_df = pd.DataFrame(data, columns=['size_in_MB', 'id', 'name', 'last_modification', 'type_of_file']) | ||
print(cleared_df) | ||
return "List of files is successful." | ||
|
||
"""Function to delete a file or folder in drive.""" | ||
|
||
def delete_file(self, file_id: str): | ||
self.service.files().delete(fileId=file_id).execute() | ||
print(f"File/Folder with id '{file_id}' has been successfully deleted.") | ||
return "File/Folder deletion is successful." | ||
|
||
"""Function to update a file in drive.""" | ||
|
||
def update_file(self, file_id: str, file_path: str, mime_type: str): | ||
media = MediaFileUpload(file_path, mimetype=mime_type) | ||
response = self.service.files().update(fileId=file_id, media_body=media).execute() | ||
print(f"File '{response['name']}' with id '{response['id']}' has been successfully updated.") | ||
return "File update is successful." |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# from google.oauth2 import service_account | ||
# from googleapiclient.discovery import build | ||
# | ||
# | ||
# def Create_Service(service_account_file, api_name, api_version, scopes): | ||
# credentials = service_account.Credentials.from_service_account_file( | ||
# service_account_file, scopes=scopes) | ||
# | ||
# service = build(api_name, api_version, credentials=credentials) | ||
# return service | ||
|
||
import pickle | ||
import os | ||
from google_auth_oauthlib.flow import Flow, InstalledAppFlow | ||
from googleapiclient.discovery import build | ||
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload | ||
from google.auth.transport.requests import Request | ||
import datetime | ||
|
||
|
||
def Create_Service(client_secret_file, api_name, api_version, *scopes): | ||
print(client_secret_file, api_name, api_version, scopes, sep='-') | ||
CLIENT_SECRET_FILE = client_secret_file | ||
API_SERVICE_NAME = api_name | ||
API_VERSION = api_version | ||
SCOPES = [scope for scope in scopes[0]] | ||
print(SCOPES) | ||
|
||
cred = None | ||
|
||
pickle_file = f'token_{API_SERVICE_NAME}_{API_VERSION}.pickle' | ||
# print(pickle_file) | ||
|
||
if os.path.exists(pickle_file): | ||
with open(pickle_file, 'rb') as token: | ||
cred = pickle.load(token) | ||
|
||
if not cred or not cred.valid: | ||
if cred and cred.expired and cred.refresh_token: | ||
cred.refresh(Request()) | ||
else: | ||
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) | ||
cred = flow.run_local_server() | ||
|
||
with open(pickle_file, 'wb') as token: | ||
pickle.dump(cred, token) | ||
|
||
try: | ||
service = build(API_SERVICE_NAME, API_VERSION, credentials=cred) | ||
print(API_SERVICE_NAME, 'service created successfully') | ||
return service | ||
except Exception as e: | ||
print('Unable to connect.') | ||
print(e) | ||
return None | ||
|
||
def convert_to_RFC_datetime(year=1900, month=1, day=1, hour=0, minute=0): | ||
dt = datetime.datetime(year, month, day, hour, minute, 0).isoformat() + 'Z' | ||
return dt |