195 lines
6.2 KiB
Python
195 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from csv import DictWriter
|
|
from glob import glob
|
|
from ofxparse import OfxParser
|
|
from pathlib import Path
|
|
|
|
from decimal import Decimal
|
|
from bson.decimal128 import Decimal128
|
|
|
|
import logging
|
|
import os
|
|
import pymongo
|
|
import time
|
|
import watchdog.events
|
|
import watchdog.observers
|
|
|
|
DATE_FORMAT = "%d/%m/%Y"
|
|
|
|
if 'WATCH_DIR' in os.environ:
|
|
WATCH_DIR = os.environ['WATCH_DIR']
|
|
else:
|
|
WATCH_DIR = '/mnt/data/'
|
|
|
|
PATTERN = '*.qfx'
|
|
BACKUP_DIR = 'Imported'
|
|
CONVERTED_DIR = 'Converted'
|
|
|
|
MONGO_URL = os.environ['DB_HOST']
|
|
MONGO_PORT = os.environ['DB_PORT']
|
|
MONGO_DB = os.environ['DB_NAME']
|
|
MONGO_COL = 'imported_transactions'
|
|
ACCOUNT_COL = 'accounts'
|
|
|
|
MONGO_URL = "mongodb://{}:{}".format(MONGO_URL, MONGO_PORT)
|
|
myclient = pymongo.MongoClient(MONGO_URL)
|
|
mydb = myclient[MONGO_DB]
|
|
mongo_col = mydb[MONGO_COL]
|
|
account_col = mydb[ACCOUNT_COL]
|
|
|
|
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
|
|
logging.basicConfig(format='ERROR: %(asctime)s - %(message)s', level=logging.ERROR)
|
|
|
|
class Handler(watchdog.events.PatternMatchingEventHandler):
|
|
def __init__(self):
|
|
# Set the patterns for PatternMatchingEventHandler
|
|
watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['*.qfx'],
|
|
ignore_directories=True, case_sensitive=False)
|
|
|
|
@staticmethod
|
|
def write_csv(statement, out_file):
|
|
logging.info("Writing: " + out_file)
|
|
|
|
if len(statement) == 0:
|
|
logging.info("No transactions to write.")
|
|
return
|
|
|
|
fields = ['date', 'memo', 'category', 'amount', 'name']
|
|
with open(out_file, 'w') as f:
|
|
f.write("Date,Original Description,Category,Amount,Account Name")
|
|
f.write("\r\n")
|
|
writer = DictWriter(f, fieldnames=fields)
|
|
for line in statement:
|
|
writer.writerow(line)
|
|
|
|
@staticmethod
|
|
def transaction_exists(line):
|
|
existing_trans = mongo_col.find_one(line)
|
|
|
|
return existing_trans is not None
|
|
|
|
@staticmethod
|
|
def convert_decimal(dict_item):
|
|
# This function iterates a dictionary looking for types of Decimal and converts them to Decimal128
|
|
# Embedded dictionaries and lists are called recursively.
|
|
if dict_item is None: return None
|
|
|
|
for k, v in list(dict_item.items()):
|
|
if isinstance(v, dict):
|
|
Handler.convert_decimal(v)
|
|
elif isinstance(v, list):
|
|
for l in v:
|
|
Handler.convert_decimal(l)
|
|
elif isinstance(v, Decimal):
|
|
dict_item[k] = Decimal128(str(v))
|
|
|
|
return dict_item
|
|
|
|
@staticmethod
|
|
def get_statement_from_qfx(qfx):
|
|
|
|
account = account_col.find_one({"number": qfx.account.number})
|
|
if account is None:
|
|
logging.error("No account for account number {} exists. Create one and re-process the file".format(qfx.account.number))
|
|
|
|
statement = []
|
|
for transaction in qfx.account.statement.transactions:
|
|
|
|
if transaction.payee.startswith("PENDING:"):
|
|
continue
|
|
|
|
line = {
|
|
'date': transaction.date.strftime(DATE_FORMAT),
|
|
'memo' : transaction.memo,
|
|
'category': 'Uncategorised',
|
|
'amount': transaction.amount,
|
|
'name': account['name']
|
|
}
|
|
|
|
#mongo needs the decimal values in Decimal128, so create a version for it
|
|
line_d128 = Handler.convert_decimal(line.copy())
|
|
|
|
if Handler.transaction_exists(line_d128):
|
|
continue
|
|
|
|
statement.append(line)
|
|
result = mongo_col.insert_one(line_d128)
|
|
logging.info("New db entry stored: {}".format(result.inserted_id))
|
|
|
|
return statement, account['name']
|
|
|
|
@staticmethod
|
|
def unique_path(directory, name_pattern):
|
|
counter = 0
|
|
while True:
|
|
counter += 1
|
|
path = directory / name_pattern.format(counter)
|
|
if not path.exists():
|
|
return path
|
|
|
|
def on_created(self, event):
|
|
logging.info('File found: {}'.format(event.src_path))
|
|
|
|
fileExists = False
|
|
timeout = 0
|
|
|
|
while not fileExists:
|
|
fileExists = os.path.isfile(event.src_path)
|
|
time.sleep(5)
|
|
timeout += 1
|
|
|
|
if timeout > 60:
|
|
logging.error('Timeout waiting for file {} to exist. Aborting processing.'.format(event.src_path))
|
|
return
|
|
|
|
|
|
while (historicalSize != os.path.getsize(event.src_path)):
|
|
logging.info('waiting for copy to finish....')
|
|
historicalSize = os.path.getsize(event.src_path)
|
|
time.sleep(1)
|
|
|
|
logging.info("file copy has now finished")
|
|
|
|
with open(event.src_path, 'r') as file:
|
|
qfx = OfxParser.parse(file, fail_fast=False)
|
|
statement, acct_name = Handler.get_statement_from_qfx(qfx)
|
|
|
|
path = Path(event.src_path)
|
|
path.resolve()
|
|
|
|
converted_dir = path.parent / CONVERTED_DIR
|
|
if not converted_dir.exists():
|
|
converted_dir.mkdir()
|
|
|
|
out_file = str(path.parent / CONVERTED_DIR / (acct_name + '-' + qfx.signon.dtserver + '.csv'))
|
|
Handler.write_csv(statement, out_file)
|
|
|
|
#Now move the input file to backup
|
|
archive_file_dir = path.parent / BACKUP_DIR
|
|
archive_file = (path.stem + '{:04d}' + path.suffix)
|
|
destination = Handler.unique_path(archive_file_dir, archive_file)
|
|
|
|
if not archive_file_dir.exists():
|
|
archive_file_dir.mkdir()
|
|
|
|
if not destination.exists():
|
|
path.replace(destination)
|
|
|
|
logging.info("Processing successfully finished for {}".format(event.src_path))
|
|
|
|
if __name__ == "__main__":
|
|
event_handler = Handler()
|
|
observer = watchdog.observers.Observer()
|
|
observer.schedule(event_handler, path=WATCH_DIR, recursive=False)
|
|
|
|
observer.start()
|
|
logging.info('Converter running, waiting for files in {}'.format(WATCH_DIR))
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
observer.stop()
|
|
observer.join()
|