Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor postbox #69

Merged
merged 4 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 0 additions & 204 deletions dkb_robo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,147 +680,6 @@ def _do_sso_redirect(self):
self.dkb_br = legacywrappper._new_instance(clientcookies)
self.logger.debug('api.Wrapper._do_sso_redirect() ended.\n')

def _download_document(self, path: str, document: Dict[str, str]) -> str:
""" filter standing orders """
self.logger.debug('api.Wrapper._download_document()\n')

rcode = 'unknown'
# create directory if not existing
directories = [path, f'{path}/{document["document_type"]}']
for directory in directories:
if not os.path.exists(directory):
self.logger.debug('api.Wrapper._download_document(): Create directory %s\n', directory)
os.makedirs(directory)

if 'filename' in document and 'link' in document:

# modify accept attribute in
dlc = self.client
dlc.headers['Accept'] = document['contenttype']
response = dlc.get(document['link'])
rcode = response.status_code
if document['contenttype'] == 'application/pdf' and not document['filename'].endswith('pdf'):
self.logger.info('api.Wrapper._download_document(): renaming %s', document['filename'])
document['filename'] = f'{document["filename"]}.pdf'

if response.status_code == 200:
self.logger.info('Saving %s/%s...', directories[1], document['filename'])
with open(f'{directories[1]}/{get_valid_filename(document["filename"])}', 'wb') as file:
file.write(response.content)

if not document['read']:
# set document status to "read"
self.logger.debug('api.Wrapper._download_document() set docment to "read"\n')
data_dic = {"data": {"attributes": {"read": True}, "type": "message"}}
dlc.headers['Accept'] = JSON_CONTENT_TYPE
dlc.headers['Content-type'] = JSON_CONTENT_TYPE
_response = self.client.patch(document['link'].replace('/documents/', '/messages/'), json=data_dic)

time.sleep(2)
else:
self.logger.error('api.Wrapper._download_document(): RC is not 200 but %s', response.status_code)

self.logger.debug('api.Wrapper._download_document() ended with: %s.\n', rcode)
return rcode

def _docdate_lookup(self, document: Dict[str, str]) -> str:
""" lookup document date """
self.logger.debug('api.Wrapper._docdate_lookup()\n')

doc_date = 'unknown'
if 'statementDate' in document['attributes']['metadata']:
doc_date = document['attributes']['metadata']['statementDate']
elif 'creationDate' in document['attributes']['metadata']:
doc_date = document['attributes']['metadata']['creationDate']

self.logger.debug('api.Wrapper._docdate_lookup() ended\n')
return doc_date

def _docfilename_lookup(self, document: Dict[str, str]) -> str:
""" lookup document filename """
self.logger.debug('api.Wrapper._docdfilename_lookup()\n')
doc_filename = document['attributes']['fileName']
# Depot related files don't have meaningful filenames but only contain the document id. Hence, we use subject
# instead and rely on the filename sanitization.
if 'dwpDocumentId' in document['attributes']['metadata'] and 'subject' in document['attributes']['metadata']:
doc_filename = document['attributes']['metadata']['subject']
self.logger.debug('api.Wrapper._docdate_lookup() ended with %s.\n', doc_filename)
return doc_filename

def _merge_postbox(self, msg_dic: Dict[str, str], pb_dic: Dict[str, str]) -> Dict[str, str]:
""" reformat postbox dictionary from DKB """
self.logger.debug('api.Wrapper._merge_postbox()\n')

message_dic = {}
if 'data' in pb_dic:
for document in pb_dic['data']:
message_dic[document['id']] = {
'filename': self._docfilename_lookup(document),
'contenttype': document['attributes']['contentType'],
'date': self._docdate_lookup(document),
'name': self._objectname_lookup(document)
}

if 'data' in msg_dic:
for message in msg_dic['data']:
if message['id'] in message_dic:
message_dic[message['id']]['document_type'] = self._get_document_type(message['attributes']['documentType'])
if 'read' in message['attributes']:
message_dic[message['id']]['read'] = message['attributes']['read']
message_dic[message['id']]['archived'] = message['attributes']['archived']
message_dic[message['id']]['link'] = self.base_url + self.api_prefix + '/documentstorage/documents/' + message['id']

self.logger.debug('api.Wrapper._merge_postbox() ended\n')
return message_dic

def _process_document(self, path: str, prepend_date: bool, document: Dict[str, str], documentname_list: Dict[str, str]) -> Tuple[List[str], str, str]:
""" check for duplicaton and download """
self.logger.debug('api.Wrapper._process_document()\n')

rcode = 'unknown'
if path:
if prepend_date or document['filename'] in documentname_list:
if document['filename'] in documentname_list:
self.logger.debug('api.Wrapper._filter_postbox(): duplicate document name. Renaming %s', document['filename'])
document['filename'] = f'{document["date"]}_{document["filename"]}'
if document['filename'] in documentname_list:
raise DKBRoboError(f"Duplicate document name and date: {document['filename']}")
rcode = self._download_document(path, document)
documentname_list.append(document['filename'])

self.logger.debug('api.Wrapper._process_document() ended\n')
return documentname_list, f'{path}/{document["document_type"]}/{get_valid_filename(document["filename"])}', rcode

def _filter_postbox(self, msg_dic: Dict[str, str], pb_dic: Dict[str, str], path: bool = None, download_all: bool = False, _archive: bool = False, prepend_date: bool = None) -> Dict[str, str]:
""" filter postbox """
self.logger.debug('api.Wrapper._filter_postbox()\n')

# merge message dictionaries
message_dic = self._merge_postbox(msg_dic, pb_dic)

# list to store filenames to check for duplicates
documentname_list = []

documents_dic = {}
for document in message_dic.values():
if 'read' in document:
if download_all or not document['read']:

# check filenames and download
documentname_list, document_name, rcode = self._process_document(path, prepend_date, document, documentname_list)

# store entry in dictionary
document_type = document.pop('document_type')
if document_type not in documents_dic:
documents_dic[document_type] = {}
documents_dic[document_type]['documents'] = {}
documents_dic[document_type]['documents'][document['name']] = {'link': document['link'], 'fname': document_name, 'date': document['date'], 'rcode': rcode}
else:
self.logger.error('api.Wrapper._filter_postbox(): document_dic incomplete: %s', document)

self.logger.debug('api.Wrapper._filter_postbox() ended.\n')
return documents_dic

def _filter_standing_orders(self, full_list: Dict[str, str]) -> List[Dict[str, str]]:
""" filter standing orders """
self.logger.debug('api.Wrapper._filter_standing_orders()\n')
Expand Down Expand Up @@ -1021,27 +880,6 @@ def _get_card_details(self, cid: str, cards_dic: Dict[str, str]) -> Dict[str, st
self.logger.debug('api.Wrapper._get_card_details() ended\n')
return output_dic

def _get_document_name(self, doc_name: str) -> str:
self.logger.debug('api.Wrapper._get_document_name()\n')

return ' '.join(doc_name.split())

def _get_document_type(self, doc_type: str) -> str:
self.logger.debug('api.Wrapper._get_document_type()\n')
mapping_dic = {
'bankAccountStatement': 'Kontoauszüge',
'creditCardStatement': 'Kreditkartenabrechnungen',
'dwpRevenueStatement': 'Ertragsabrechnungen',
'dwpOrderStatement': 'Depotabrechnungen',
'dwpDepotStatement': 'Depotauszüge',
'exAnteCostInformation': 'Kosteninformationen'
}

result = mapping_dic.get(doc_type, doc_type)

self.logger.debug('api.Wrapper._get_document_type() ended\n')
return result

def _get_loans(self) -> Dict[str, str]:
""" get loands via API """
self.logger.debug('api.Wrapper._get_loans()\n')
Expand Down Expand Up @@ -1194,26 +1032,6 @@ def _new_session(self):
self.logger.debug('api.Wrapper._new_session()\n ended')
return client

def _objectname_lookup(self, document: Dict[str, str]) -> str:
""" lookup object name """
self.logger.debug('api.Wrapper._objectname_lookup()\n')

object_name = None

if 'cardId' in document['attributes']['metadata']:
for _acc_id, acc_data in self.account_dic.items():
if acc_data['id'] == document['attributes']['metadata']['cardId']:
object_name = f"{self._get_document_name(document['attributes']['metadata']['subject'])} {acc_data['account']}"
break
if not object_name:
_sinin, cardnr, _sinin = document['attributes']['fileName'].split('_', 2)
object_name = f"{self._get_document_name(document['attributes']['metadata']['subject'])} {cardnr}"
else:
object_name = self._get_document_name(document['attributes']['metadata']['subject'])

self.logger.debug('api.Wrapper._objectname_lookup() ended with: %s\n', object_name)
return object_name

def _print_app_2fa_confirmation(self, devicename: str):
""" 2fa confirmation message """
self.logger.debug('api.Wrapper._print_app_2fa_confirmation()\n')
Expand Down Expand Up @@ -1473,25 +1291,3 @@ def get_exemption_order(self) -> Dict[str, str]:
def logout(self):
""" logout function """
self.logger.debug('api.Wrapper.logout()\n')

def scan_postbox(self, path: str, download_all: bool, archive: bool, prepend_date: bool) -> Dict[str, str]:
""" scans the DKB postbox and creates a dictionary """
self.logger.debug('api.Wrapper.scan_postbox() path: %s, download_all: %s, archive: %s, prepend_date: %s\n', path, download_all, archive, prepend_date)

documents_dic = {}

msg_dic = pb_dic = {}

response = self.client.get(self.base_url + self.api_prefix + '/documentstorage/messages')
if response.status_code == 200:
msg_dic = response.json()

response = self.client.get(self.base_url + self.api_prefix + '/documentstorage/documents?page%5Blimit%5D=1000')
if response.status_code == 200:
pb_dic = response.json()

if msg_dic and pb_dic:
documents_dic = self._filter_postbox(msg_dic, pb_dic, path, download_all, archive, prepend_date)

self.logger.debug('api.Wrapper.scan_postbox() ended.\n')
return documents_dic
27 changes: 27 additions & 0 deletions dkb_robo/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# pylint: disable=c3001, e1101, r0913, w0108, w0622
""" dkb_robo cli """
from datetime import date
from pathlib import Path
import pathlib
from pprint import pprint
import sys
import csv
Expand Down Expand Up @@ -221,6 +223,31 @@ def scan_postbox(ctx, path, download_all, archive, prepend_date):
click.echo(_err.args[0], err=True)


@main.command()
@click.pass_context
@click.option(
"--path",
"-p",
type=click.Path(writable=True, path_type=pathlib.Path),
help="Path to save the documents to",
envvar="DKB_DOC_PATH",
)
@click.option("--all", "-A", is_flag=True, show_default=True, default=False, help="Download all documents", envvar="DKB_DOWNLOAD_ALL")
@click.option("--prepend-date", is_flag=True, show_default=True, default=False, help="Prepend date to filename", envvar="DKB_PREPEND_DATE")
@click.option("--mark-read", is_flag=True, show_default=True, default=True, help="Mark downloaded files read", envvar="DKB_MARK_READ")
@click.option("--use-account-folders", is_flag=True, show_default=True, default=False, help="Store files in separate folders per account/depot", envvar="DKB_ACCOUNT_FOLDERS")
@click.option("--list-only", is_flag=True, show_default=True, default=False, help="Only list documents, do not download", envvar="DKB_LIST_ONLY")
def download(ctx, path: Path, all: bool, prepend_date: bool, mark_read: bool, use_account_folders: bool, list_only: bool):
""" download document """
if path is None:
list_only = True
try:
with _login(ctx) as dkb:
ctx.obj["FORMAT"](dkb.download(path=path, download_all=all, prepend_date=prepend_date, mark_read=mark_read, use_account_folders=use_account_folders, list_only=list_only))
except dkb_robo.DKBRoboError as _err:
click.echo(_err.args[0], err=True)


def _load_format(output_format):
""" select output format based on cli option """
if output_format == "pprint":
Expand Down
36 changes: 35 additions & 1 deletion dkb_robo/dkb_robo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# pylint: disable=c0415, r0913
""" dkb internet banking automation library """
# -*- coding: utf-8 -*-
from pathlib import Path
import time
from dkb_robo.postbox import PostBox
from dkb_robo.utilities import logger_setup, validate_dates, get_dateformat
from dkb_robo.api import Wrapper

Expand Down Expand Up @@ -93,4 +96,35 @@ def get_transactions(self, transaction_url, atype, date_from, date_to, transacti
def scan_postbox(self, path=None, download_all=False, archive=False, prepend_date=False):
""" scan posbox and return document dictionary """
self.logger.debug('DKBRobo.scan_postbox()\n')
return self.wrapper.scan_postbox(path, download_all, archive, prepend_date)
return self.download(Path(path) if path is not None else None, download_all, prepend_date)

def download(self, path: Path, download_all: bool, prepend_date: bool = False, mark_read: bool = True, use_account_folders: bool = False, list_only: bool = False):
""" download postbox documents """
if path is None:
list_only = True
postbox = PostBox(self.wrapper.client)
documents = postbox.fetch_items()

if not download_all:
# only unread documents
documents = {id: item for id, item in documents.items()
if item.message and item.message.read is False}

accounts_by_id = {acc['id']: acc['account'] for acc in self.wrapper.account_dic.values()}
for doc in documents.values():
target = path / doc.category()

if use_account_folders:
target = target / doc.account(card_lookup=accounts_by_id)

filename = f"{doc.date()}_{doc.filename()}" if prepend_date else doc.filename()

if not list_only:
self.logger.info("Downloading %s to %s...", doc.subject(), target)
if doc.download(self.wrapper.client, target/filename):
if mark_read:
doc.mark_read(self.wrapper.client, True)
time.sleep(.5)
else:
self.logger.info("File already exists. Skipping %s.", filename)
return documents
Loading
Loading