Files
HSBCConverter/converter.py

194 lines
6.2 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from csv import DictWriter
from glob import glob
from ofxparse import OfxParser
from pathlib import Path
from decimal import Decimal
from bson.decimal128 import Decimal128
import logging
import os
import pymongo
import time
import watchdog.events
import watchdog.observers
DATE_FORMAT = "%d/%m/%Y"
if 'WATCH_DIR' in os.environ:
WATCH_DIR = os.environ['WATCH_DIR']
else:
WATCH_DIR = '/mnt/data/'
PATTERN = '*.qfx'
BACKUP_DIR = 'Imported'
CONVERTED_DIR = 'Converted'
MONGO_URL = os.environ['DB_HOST']
MONGO_PORT = os.environ['DB_PORT']
MONGO_DB = os.environ['DB_NAME']
MONGO_COL = 'hsbc_converter'
ACCOUNT_COL = 'accounts'
MONGO_URL = "mongodb://{}:{}".format(MONGO_URL, MONGO_PORT)
myclient = pymongo.MongoClient(MONGO_URL)
mydb = myclient[MONGO_DB]
mongo_col = mydb[MONGO_COL]
account_col = mydb[ACCOUNT_COL]
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
logging.basicConfig(format='ERROR: %(asctime)s - %(message)s', level=logging.ERROR)
class Handler(watchdog.events.PatternMatchingEventHandler):
def __init__(self):
# Set the patterns for PatternMatchingEventHandler
watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['*.qfx'],
ignore_directories=True, case_sensitive=False)
@staticmethod
def write_csv(statement, out_file):
logging.info("Writing: " + out_file)
if len(statement) == 0:
logging.info("No transactions to write.")
return
fields = ['date', 'memo', 'category', 'amount', 'name']
with open(out_file, 'w') as f:
f.write("Date,Original Description,Category,Amount,Account Name")
f.write("\r\n")
writer = DictWriter(f, fieldnames=fields)
for line in statement:
writer.writerow(line)
@staticmethod
def transaction_exists(line):
existing_trans = mongo_col.find_one(line)
return existing_trans is not None
@staticmethod
def convert_decimal(dict_item):
# This function iterates a dictionary looking for types of Decimal and converts them to Decimal128
# Embedded dictionaries and lists are called recursively.
if dict_item is None: return None
for k, v in list(dict_item.items()):
if isinstance(v, dict):
Handler.convert_decimal(v)
elif isinstance(v, list):
for l in v:
Handler.convert_decimal(l)
elif isinstance(v, Decimal):
dict_item[k] = Decimal128(str(v))
return dict_item
@staticmethod
def get_statement_from_qfx(qfx):
account = account_col.find_one({"number": qfx.account.number})
if account is None:
logging.error("No account for account number {} exists. Create one and re-process the file".format(qfx.account.number))
statement = []
for transaction in qfx.account.statement.transactions:
if transaction.payee.startswith("PENDING:"):
continue
line = {
'date': transaction.date.strftime(DATE_FORMAT),
'memo' : transaction.memo,
'category': 'Uncategorised',
'amount': transaction.amount,
'name': account['name']
}
#mongo needs the decimal values in Decimal128, so create a version for it
line_d128 = Handler.convert_decimal(line.copy())
if Handler.transaction_exists(line_d128):
continue
statement.append(line)
result = mongo_col.insert_one(line_d128)
logging.info("New db entry stored: {}".format(result.inserted_id))
return statement
@staticmethod
def unique_path(directory, name_pattern):
counter = 0
while True:
counter += 1
path = directory / name_pattern.format(counter)
if not path.exists():
return path
def on_created(self, event):
logging.info('File found: {}'.format(event.src_path))
fileExists = False
timeout = 0
while not fileExists:
fileExists = os.path.isfile(event.src_path)
time.sleep(1)
timeout += 1
if timeout > 10:
logging.error('Timeout waiting for file {} to exist. Aborting processing.'.format(event.src_path))
return
historicalSize = -1
while (historicalSize != os.path.getsize(event.src_path)):
historicalSize = os.path.getsize(event.src_path)
logging.info('waiting for copy to finish....')
time.sleep(1)
logging.info("file copy has now finished")
with open(event.src_path, 'r') as file:
qfx = OfxParser.parse(file, fail_fast=False)
statement = Handler.get_statement_from_qfx(qfx)
path = Path(event.src_path)
path.resolve()
converted_dir = path.parent / CONVERTED_DIR
if not converted_dir.exists():
converted_dir.mkdir()
out_file = str(path.parent / CONVERTED_DIR / ('converted' + path.stem + '.csv'))
Handler.write_csv(statement, out_file)
#Now move the input file to backup
archive_file_dir = path.parent / BACKUP_DIR
archive_file = (path.stem + '{:04d}' + path.suffix)
destination = Handler.unique_path(archive_file_dir, archive_file)
if not archive_file_dir.exists():
archive_file_dir.mkdir()
if not destination.exists():
path.replace(destination)
logging.info("Processing successfully finished for {}".format(event.src_path))
if __name__ == "__main__":
event_handler = Handler()
observer = watchdog.observers.Observer()
observer.schedule(event_handler, path=WATCH_DIR, recursive=False)
observer.start()
logging.info('Converter running, waiting for files in {}'.format(WATCH_DIR))
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()