-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsyncer.py
141 lines (126 loc) · 5.48 KB
/
syncer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os
import psycopg2
import pandas as pd
import hashlib
POSTGRES_CONF = {
'dbname': os.getenv("POSTGRES_DB", "postgres"),
'user': os.getenv("POSTGRES_USER", "postgres"),
'password': os.getenv("POSTGRES_PASSWORD"),
'host': os.getenv("POSTGRES_HOST", "postgres"),
'port': os.getenv("POSTGRES_PORT", "5432"),
}
# Check if the DB is up.
try:
with psycopg2.connect(**POSTGRES_CONF) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
except psycopg2.OperationalError as e:
print("Waiting for the DB to be ready...")
exit(1)
# The plugins will be used to fetch bank transactions and return them in a
# format that can be processed by the syncer. The plugins should have a function
# called `fetch_transactions` that return a list of transactions.
IMPORTER_PLUGINS = os.getenv("IMPORTER_PLUGINS", None)
if IMPORTER_PLUGINS is None:
print("No importer plugin specified.")
exit(1)
# Separate multiple plugins by comma.
IMPORTER_PLUGINS = IMPORTER_PLUGINS.split(',')
# Try to import the plugin.
try:
importers = [
__import__(plugin, fromlist=['fetch_transactions'])
for plugin in IMPORTER_PLUGINS
]
except ImportError as e:
print(f"Failed to import plugin: {e}")
exit(1)
print(f"Successfully imported plugins {IMPORTER_PLUGINS}")
# The plugin will be used to classify the bank transactions. The plugin should
# have a function called `classify_transactions` that takes a list of transactions
# and returns a list of classified transactions.
CLASSIFIER_PLUGIN = os.getenv("CLASSIFIER_PLUGIN", None)
if CLASSIFIER_PLUGIN is None:
print("No classifier plugin specified.")
exit(1)
# Try to import the plugin.
try:
classifier = __import__(CLASSIFIER_PLUGIN, fromlist=['classify_transactions'])
except ImportError:
print(f"Failed to import plugin: {CLASSIFIER_PLUGIN}")
exit(1)
print(f"Successfully imported plugin: {CLASSIFIER_PLUGIN}")
# Create tables if they don't exist.
with psycopg2.connect(**POSTGRES_CONF) as connection, connection.cursor() as cursor:
cursor.execute("""CREATE TABLE IF NOT EXISTS transactions (
iban TEXT NOT NULL,
internal BOOLEAN DEFAULT FALSE,
date DATE NOT NULL,
description TEXT NOT NULL,
amount DECIMAL NOT NULL,
balance DECIMAL NOT NULL,
currency TEXT NOT NULL,
classification TEXT,
hash VARCHAR(255) NOT NULL,
PRIMARY KEY (iban, date, description, amount, balance, currency)
)""")
# Fetch transactions from all importers.
transactions = []
for importer in importers:
transactions.extend(importer.fetch_transactions())
df = pd.DataFrame(transactions)
# Drop duplicate transactions to avoid normal transactions being marked as internal.
# Note: We make the duplicate removal solely based on the date and amount. This is
# because different importers may provide different encodings and metadata for the
# same transaction. E.g. when importing from CSV and PDF at the same time.
# The assumption here is that the same amount will only occur once on the same day.
# If you want to modify the behavior, set the TRANSACTION_DEDUPLICATION_COLUMNS env
# variable to a comma-separated list of columns to deduplicate on.
cols = os.getenv("TRANSACTION_DEDUPLICATION_COLUMNS", "date,amount").split(',')
df_wo_dupes = df.drop_duplicates(subset=cols, inplace=False)
# Print out which transactions were removed.
print(f"Removed {len(df) - len(df_wo_dupes)} duplicate transactions:")
print(df[~df.index.isin(df_wo_dupes.index)])
df = df_wo_dupes
# Strip all whitespace from ibans (e.g. " DE 1234 ..." -> "DE1234...")
strip = lambda x: x.replace(' ', '')
df['iban'] = df['iban'].apply(strip)
# Mark transactions seen on two accounts as internal in the column 'internal'.
# These aren't any expenses or income since they are between our own accounts.
df['amount_abs'] = df['amount'].abs()
# Only override None values in the internal column, in case the importer has
# already marked some transactions as internal.
df['internal'] = df.duplicated(subset=['date', 'amount_abs'], keep=False) | df['internal'].fillna(False)
# Classify transactions
df['classification'] = df.apply(classifier.classify_transaction, axis=1)
# Calculate a hash for each transaction to create links in Grafana.
def sha256(t):
h = hashlib.sha256()
h.update(t['iban'].encode())
h.update(str(t['date']).encode())
h.update(t['description'].encode())
h.update(str(t['amount']).encode())
h.update(str(t['balance']).encode())
h.update(t['currency'].encode())
return h.hexdigest()
df['hash'] = df.apply(sha256, axis=1)
# Insert transactions into the database.
with psycopg2.connect(**POSTGRES_CONF) as connection, connection.cursor() as cursor:
for i, transaction in df.iterrows():
cursor.execute("""
INSERT INTO transactions (iban, internal, date, description, amount, balance, currency, classification, hash)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (iban, date, description, amount, balance, currency)
DO NOTHING
""", (
transaction['iban'],
transaction['internal'], # Whether the transaction is between our own accounts
transaction['date'],
transaction['description'],
transaction['amount'],
transaction['balance'],
transaction['currency'],
transaction['classification'],
transaction['hash'],
))
print(f"Successfully inserted {i+1} transactions into the database.")