#!/usr/bin/env python3 # -*- coding: utf-8 -*- from csv import DictWriter from glob import glob from ofxparse import OfxParser from pathlib import Path from decimal import Decimal from bson.decimal128 import Decimal128 import logging import os import pymongo import time import watchdog.events import watchdog.observers DATE_FORMAT = "%d/%m/%Y" if 'WATCH_DIR' in os.environ: WATCH_DIR = os.environ['WATCH_DIR'] else: WATCH_DIR = '/mnt/data/' PATTERN = '*.qfx' BACKUP_DIR = 'Imported' CONVERTED_DIR = 'Converted' MONGO_URL = os.environ['DB_HOST'] MONGO_PORT = os.environ['DB_PORT'] MONGO_DB = os.environ['DB_NAME'] MONGO_COL = 'imported_transactions' ACCOUNT_COL = 'accounts' MONGO_URL = "mongodb://{}:{}".format(MONGO_URL, MONGO_PORT) myclient = pymongo.MongoClient(MONGO_URL) mydb = myclient[MONGO_DB] mongo_col = mydb[MONGO_COL] account_col = mydb[ACCOUNT_COL] logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) logging.basicConfig(format='ERROR: %(asctime)s - %(message)s', level=logging.ERROR) class Handler(watchdog.events.PatternMatchingEventHandler): def __init__(self): # Set the patterns for PatternMatchingEventHandler watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['*.qfx'], ignore_directories=True, case_sensitive=False) @staticmethod def write_csv(statement, out_file): logging.info("Writing: " + out_file) if len(statement) == 0: logging.info("No transactions to write.") return fields = ['date', 'memo', 'category', 'amount', 'name'] with open(out_file, 'w') as f: f.write("Date,Original Description,Category,Amount,Account Name") f.write("\r\n") writer = DictWriter(f, fieldnames=fields) for line in statement: writer.writerow(line) @staticmethod def transaction_exists(line): existing_trans = mongo_col.find_one(line) return existing_trans is not None @staticmethod def convert_decimal(dict_item): # This function iterates a dictionary looking for types of Decimal and converts them to Decimal128 # Embedded dictionaries and lists are called recursively. if dict_item is None: return None for k, v in list(dict_item.items()): if isinstance(v, dict): Handler.convert_decimal(v) elif isinstance(v, list): for l in v: Handler.convert_decimal(l) elif isinstance(v, Decimal): dict_item[k] = Decimal128(str(v)) return dict_item @staticmethod def get_statement_from_qfx(qfx): account = account_col.find_one({"number": qfx.account.number}) if account is None: logging.error("No account for account number {} exists. Create one and re-process the file".format(qfx.account.number)) statement = [] for transaction in qfx.account.statement.transactions: if transaction.payee.startswith("PENDING:"): continue line = { 'date': transaction.date.strftime(DATE_FORMAT), 'memo' : transaction.memo, 'category': 'Uncategorised', 'amount': transaction.amount, 'name': account['name'] } #mongo needs the decimal values in Decimal128, so create a version for it line_d128 = Handler.convert_decimal(line.copy()) if Handler.transaction_exists(line_d128): continue statement.append(line) result = mongo_col.insert_one(line_d128) logging.info("New db entry stored: {}".format(result.inserted_id)) return statement, account['name'] @staticmethod def unique_path(directory, name_pattern): counter = 0 while True: counter += 1 path = directory / name_pattern.format(counter) if not path.exists(): return path def on_created(self, event): logging.info('File found: {}'.format(event.src_path)) fileExists = False timeout = 0 while not fileExists: fileExists = os.path.isfile(event.src_path) time.sleep(1) timeout += 1 if timeout > 60: logging.error('Timeout waiting for file {} to exist. Aborting processing.'.format(event.src_path)) return historicalSize = -1 while (historicalSize != os.path.getsize(event.src_path)): logging.info('waiting for copy to finish....') historicalSize = os.path.getsize(event.src_path) time.sleep(1) logging.info("file copy has now finished") with open(event.src_path, 'r') as file: qfx = OfxParser.parse(file, fail_fast=False) statement, acct_name = Handler.get_statement_from_qfx(qfx) path = Path(event.src_path) path.resolve() converted_dir = path.parent / CONVERTED_DIR if not converted_dir.exists(): converted_dir.mkdir() out_file = str(path.parent / CONVERTED_DIR / (acct_name + '-' + qfx.signon.dtserver + '.csv')) Handler.write_csv(statement, out_file) #Now move the input file to backup archive_file_dir = path.parent / BACKUP_DIR archive_file = (path.stem + '{:04d}' + path.suffix) destination = Handler.unique_path(archive_file_dir, archive_file) if not archive_file_dir.exists(): archive_file_dir.mkdir() if not destination.exists(): path.replace(destination) logging.info("Processing successfully finished for {}".format(event.src_path)) if __name__ == "__main__": event_handler = Handler() observer = watchdog.observers.Observer() observer.schedule(event_handler, path=WATCH_DIR, recursive=False) observer.start() logging.info('Converter running, waiting for files in {}'.format(WATCH_DIR)) try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()