246 lines
9.4 KiB
Python
246 lines
9.4 KiB
Python
|
|
"""
|
||
|
|
Import customers and contacts from OmniSync database with better error handling
|
||
|
|
"""
|
||
|
|
|
||
|
|
import psycopg2
|
||
|
|
from psycopg2.extras import RealDictCursor
|
||
|
|
import sqlite3
|
||
|
|
import os
|
||
|
|
import logging
|
||
|
|
|
||
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
OMNISYNC_DB = '/omnisync_data/fakturering.db'
|
||
|
|
|
||
|
|
|
||
|
|
def get_postgres_connection():
|
||
|
|
"""Get PostgreSQL connection"""
|
||
|
|
database_url = os.getenv('DATABASE_URL', 'postgresql://bmc_hub:bmc_hub@postgres:5432/bmc_hub')
|
||
|
|
return psycopg2.connect(database_url, cursor_factory=RealDictCursor)
|
||
|
|
|
||
|
|
|
||
|
|
def get_sqlite_connection():
|
||
|
|
"""Get SQLite connection to OmniSync database"""
|
||
|
|
return sqlite3.connect(OMNISYNC_DB)
|
||
|
|
|
||
|
|
|
||
|
|
def import_customers():
|
||
|
|
"""Import customers from OmniSync"""
|
||
|
|
sqlite_conn = get_sqlite_connection()
|
||
|
|
sqlite_cursor = sqlite_conn.cursor()
|
||
|
|
|
||
|
|
imported = 0
|
||
|
|
skipped = 0
|
||
|
|
errors = []
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Get active customers from OmniSync
|
||
|
|
sqlite_cursor.execute("""
|
||
|
|
SELECT
|
||
|
|
name, cvr_number, email, phone, address, city, postal_code,
|
||
|
|
country, website, vtiger_id, economic_customer_number,
|
||
|
|
email_domain, created_at
|
||
|
|
FROM customers
|
||
|
|
WHERE active = 1 AND deleted_at IS NULL
|
||
|
|
ORDER BY name
|
||
|
|
""")
|
||
|
|
|
||
|
|
customers = sqlite_cursor.fetchall()
|
||
|
|
logger.info(f"📥 Found {len(customers)} active customers in OmniSync")
|
||
|
|
|
||
|
|
for row in customers:
|
||
|
|
name, cvr, email, phone, address, city, postal_code, country, website, vtiger_id, economic_no, email_domain, created_at = row
|
||
|
|
|
||
|
|
# Skip if no name
|
||
|
|
if not name or name.strip() == '':
|
||
|
|
skipped += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Insert each customer individually
|
||
|
|
individual_conn = get_postgres_connection()
|
||
|
|
individual_cursor = individual_conn.cursor()
|
||
|
|
|
||
|
|
try:
|
||
|
|
individual_cursor.execute("""
|
||
|
|
INSERT INTO customers (
|
||
|
|
name, cvr_number, email, phone, address, postal_code, city,
|
||
|
|
country, website, vtiger_id, economic_customer_number,
|
||
|
|
email_domain, is_active, created_at
|
||
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
|
|
ON CONFLICT (cvr_number) DO UPDATE SET
|
||
|
|
name = EXCLUDED.name,
|
||
|
|
email = EXCLUDED.email,
|
||
|
|
phone = EXCLUDED.phone,
|
||
|
|
address = EXCLUDED.address,
|
||
|
|
postal_code = EXCLUDED.postal_code,
|
||
|
|
city = EXCLUDED.city,
|
||
|
|
website = EXCLUDED.website,
|
||
|
|
vtiger_id = EXCLUDED.vtiger_id,
|
||
|
|
economic_customer_number = EXCLUDED.economic_customer_number
|
||
|
|
RETURNING id
|
||
|
|
""", (
|
||
|
|
name, cvr, email, phone, address, postal_code, city,
|
||
|
|
country or 'Danmark', website, vtiger_id, economic_no,
|
||
|
|
email_domain, True, created_at
|
||
|
|
))
|
||
|
|
|
||
|
|
result = individual_cursor.fetchone()
|
||
|
|
individual_conn.commit()
|
||
|
|
|
||
|
|
if result:
|
||
|
|
imported += 1
|
||
|
|
if imported % 50 == 0:
|
||
|
|
logger.info(f" Imported {imported} customers...")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = str(e)[:100]
|
||
|
|
if imported + skipped < 10: # Only log first 10 errors in detail
|
||
|
|
logger.warning(f" ⚠️ Could not import '{name}': {error_msg}")
|
||
|
|
errors.append((name, error_msg))
|
||
|
|
skipped += 1
|
||
|
|
finally:
|
||
|
|
individual_cursor.close()
|
||
|
|
individual_conn.close()
|
||
|
|
|
||
|
|
logger.info(f"✅ Customers: {imported} imported, {skipped} skipped")
|
||
|
|
if len(errors) > 10:
|
||
|
|
logger.info(f" (Suppressed {len(errors)-10} error messages)")
|
||
|
|
|
||
|
|
return imported
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"❌ Customer import failed: {e}")
|
||
|
|
raise
|
||
|
|
finally:
|
||
|
|
sqlite_cursor.close()
|
||
|
|
sqlite_conn.close()
|
||
|
|
|
||
|
|
|
||
|
|
def import_contacts():
|
||
|
|
"""Import contacts from OmniSync"""
|
||
|
|
sqlite_conn = get_sqlite_connection()
|
||
|
|
sqlite_cursor = sqlite_conn.cursor()
|
||
|
|
|
||
|
|
imported = 0
|
||
|
|
skipped = 0
|
||
|
|
errors = []
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Get contacts
|
||
|
|
sqlite_cursor.execute("""
|
||
|
|
SELECT
|
||
|
|
c.id, c.first_name, c.last_name, c.email, c.phone, c.mobile,
|
||
|
|
c.title, c.department, c.active, c.vtiger_id, c.created_at
|
||
|
|
FROM contacts c
|
||
|
|
WHERE c.active = 1 AND c.deleted_at IS NULL
|
||
|
|
ORDER BY c.last_name, c.first_name
|
||
|
|
""")
|
||
|
|
|
||
|
|
contacts = sqlite_cursor.fetchall()
|
||
|
|
logger.info(f"📥 Found {len(contacts)} active contacts in OmniSync")
|
||
|
|
|
||
|
|
for row in contacts:
|
||
|
|
omnisync_id, first_name, last_name, email, phone, mobile, title, department, active, vtiger_id, created_at = row
|
||
|
|
|
||
|
|
# Skip if no name
|
||
|
|
if not first_name and not last_name:
|
||
|
|
skipped += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Individual connection per contact
|
||
|
|
postgres_conn = get_postgres_connection()
|
||
|
|
postgres_cursor = postgres_conn.cursor()
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Insert contact
|
||
|
|
postgres_cursor.execute("""
|
||
|
|
INSERT INTO contacts (
|
||
|
|
first_name, last_name, email, phone, mobile,
|
||
|
|
title, department, is_active, vtiger_id, created_at
|
||
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
|
|
RETURNING id
|
||
|
|
""", (
|
||
|
|
first_name or '', last_name or '', email, phone, mobile,
|
||
|
|
title, department, bool(active), vtiger_id, created_at
|
||
|
|
))
|
||
|
|
|
||
|
|
result = postgres_cursor.fetchone()
|
||
|
|
contact_id = result['id']
|
||
|
|
|
||
|
|
# Get company relationships from OmniSync
|
||
|
|
sqlite_cursor.execute("""
|
||
|
|
SELECT customer_id, is_primary, role, notes
|
||
|
|
FROM contact_companies
|
||
|
|
WHERE contact_id = ?
|
||
|
|
""", (omnisync_id,))
|
||
|
|
|
||
|
|
companies = sqlite_cursor.fetchall()
|
||
|
|
|
||
|
|
# Link to companies in BMC Hub
|
||
|
|
for company_row in companies:
|
||
|
|
customer_id_omnisync, is_primary, role, notes = company_row
|
||
|
|
|
||
|
|
# Find matching customer in BMC Hub by vtiger_id or name
|
||
|
|
# First get the OmniSync customer
|
||
|
|
sqlite_cursor.execute("SELECT vtiger_id, name FROM customers WHERE id = ?", (customer_id_omnisync,))
|
||
|
|
customer_data = sqlite_cursor.fetchone()
|
||
|
|
|
||
|
|
if customer_data:
|
||
|
|
vtiger_id_customer, customer_name = customer_data
|
||
|
|
|
||
|
|
# Find in BMC Hub
|
||
|
|
if vtiger_id_customer:
|
||
|
|
postgres_cursor.execute("SELECT id FROM customers WHERE vtiger_id = %s", (vtiger_id_customer,))
|
||
|
|
else:
|
||
|
|
postgres_cursor.execute("SELECT id FROM customers WHERE name = %s LIMIT 1", (customer_name,))
|
||
|
|
|
||
|
|
bmc_customer = postgres_cursor.fetchone()
|
||
|
|
|
||
|
|
if bmc_customer:
|
||
|
|
postgres_cursor.execute("""
|
||
|
|
INSERT INTO contact_companies (contact_id, customer_id, is_primary, role, notes)
|
||
|
|
VALUES (%s, %s, %s, %s, %s)
|
||
|
|
ON CONFLICT DO NOTHING
|
||
|
|
""", (contact_id, bmc_customer['id'], bool(is_primary), role, notes))
|
||
|
|
|
||
|
|
postgres_conn.commit()
|
||
|
|
imported += 1
|
||
|
|
if imported % 50 == 0:
|
||
|
|
logger.info(f" Imported {imported} contacts...")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = str(e)[:100]
|
||
|
|
if imported + skipped < 10:
|
||
|
|
logger.warning(f" ⚠️ Could not import '{first_name} {last_name}': {error_msg}")
|
||
|
|
errors.append((f"{first_name} {last_name}", error_msg))
|
||
|
|
skipped += 1
|
||
|
|
finally:
|
||
|
|
postgres_cursor.close()
|
||
|
|
postgres_conn.close()
|
||
|
|
|
||
|
|
logger.info(f"✅ Contacts: {imported} imported, {skipped} skipped")
|
||
|
|
if len(errors) > 10:
|
||
|
|
logger.info(f" (Suppressed {len(errors)-10} error messages)")
|
||
|
|
|
||
|
|
return imported
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"❌ Contact import failed: {e}")
|
||
|
|
raise
|
||
|
|
finally:
|
||
|
|
sqlite_cursor.close()
|
||
|
|
sqlite_conn.close()
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
logger.info("🚀 Starting OmniSync import...")
|
||
|
|
logger.info(f"📂 Source: {OMNISYNC_DB}")
|
||
|
|
|
||
|
|
customer_count = import_customers()
|
||
|
|
contact_count = import_contacts()
|
||
|
|
|
||
|
|
logger.info(f"\n🎉 Import completed!")
|
||
|
|
logger.info(f" Customers: {customer_count}")
|
||
|
|
logger.info(f" Contacts: {contact_count}")
|