- Created migration scripts for AnyDesk sessions and hardware assets. - Implemented apply_migration_115.py to execute migration for AnyDesk sessions. - Added set_customer_wiki_slugs.py script to update customer wiki slugs based on a predefined folder list. - Developed run_migration.py to apply AnyDesk migration schema. - Added tests for Service Contract Wizard to ensure functionality and dry-run mode.
273 lines
7.8 KiB
Python
273 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Set customers.wiki_slug based on a provided folder list.
|
|
|
|
Usage:
|
|
python scripts/set_customer_wiki_slugs.py # dry run
|
|
python scripts/set_customer_wiki_slugs.py --apply # write changes
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
from typing import Dict, List, Tuple
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
FOLDERS = [
|
|
"Alcare",
|
|
"Arbodania",
|
|
"Bellevue",
|
|
"DreamHack",
|
|
"Glarmester-Svensson",
|
|
"GODT_Media",
|
|
"Grønnegaards",
|
|
"HarbourHouse",
|
|
"hedegaardsvej88",
|
|
"highwire",
|
|
"ImpactTV",
|
|
"Kjæden",
|
|
"K-pro",
|
|
"Laudpeople",
|
|
"Maskinsikkerhed",
|
|
"Nordisk Film TV A/S",
|
|
"Norva24",
|
|
"PFA diverse info",
|
|
"PFA-The-Union",
|
|
"Portalen",
|
|
"SamNetworks",
|
|
"skuespillerforeningen",
|
|
"Snowman",
|
|
"Stena",
|
|
"Sydkysten",
|
|
"TMNconsult",
|
|
"TrinityHr",
|
|
"Zantay",
|
|
"NEMB",
|
|
]
|
|
|
|
|
|
SUFFIX_TOKENS = {
|
|
"a/s",
|
|
"as",
|
|
"aps",
|
|
"ab",
|
|
"ltd",
|
|
"gmbh",
|
|
"inc",
|
|
"llc",
|
|
}
|
|
|
|
|
|
def normalize_name(value: str) -> str:
|
|
if not value:
|
|
return ""
|
|
text = value.casefold()
|
|
text = (
|
|
text.replace("&", " and ")
|
|
.replace("æ", "ae")
|
|
.replace("ø", "oe")
|
|
.replace("å", "aa")
|
|
)
|
|
for ch in "/_-.,":
|
|
text = text.replace(ch, " ")
|
|
tokens = [token for token in text.split() if token]
|
|
while tokens and tokens[-1] in SUFFIX_TOKENS:
|
|
tokens.pop()
|
|
return "".join(ch for ch in "".join(tokens) if ch.isalnum())
|
|
|
|
|
|
def fetch_customers(conn) -> List[Dict[str, str]]:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute(
|
|
"SELECT id, name, wiki_slug FROM customers WHERE deleted_at IS NULL"
|
|
)
|
|
return cur.fetchall()
|
|
|
|
|
|
def build_lookup(customers: List[Dict[str, str]]) -> Dict[str, List[Dict[str, str]]]:
|
|
lookup: Dict[str, List[Dict[str, str]]] = {}
|
|
for cust in customers:
|
|
key = normalize_name(cust.get("name", ""))
|
|
if not key:
|
|
continue
|
|
lookup.setdefault(key, []).append(cust)
|
|
return lookup
|
|
|
|
|
|
def plan_updates(
|
|
lookup: Dict[str, List[Dict[str, str]]],
|
|
customers: List[Dict[str, str]],
|
|
) -> Tuple[List[Tuple[int, str, str]], List[str], List[str], Dict[str, List[str]]]:
|
|
updates: List[Tuple[int, str, str]] = []
|
|
missing: List[str] = []
|
|
ambiguous: List[str] = []
|
|
suggestions: Dict[str, List[str]] = {}
|
|
|
|
for folder in FOLDERS:
|
|
key = normalize_name(folder)
|
|
matches = lookup.get(key, [])
|
|
if not matches:
|
|
missing.append(folder)
|
|
folder_key = normalize_name(folder)
|
|
if folder_key:
|
|
ranked = []
|
|
for cust in customers:
|
|
cand_key = normalize_name(cust.get("name", ""))
|
|
if not cand_key:
|
|
continue
|
|
score = 1.0 - (
|
|
abs(len(folder_key) - len(cand_key))
|
|
/ max(len(folder_key), len(cand_key), 1)
|
|
)
|
|
overlap = (
|
|
sum(1 for ch in folder_key if ch in cand_key)
|
|
/ max(len(folder_key), 1)
|
|
)
|
|
ranked.append((score + overlap, cust.get("name", "")))
|
|
ranked.sort(reverse=True)
|
|
suggestions[folder] = [name for _, name in ranked[:3] if name]
|
|
continue
|
|
if len(matches) > 1:
|
|
ambiguous.append(folder)
|
|
continue
|
|
cust = matches[0]
|
|
cust_id = cust.get("id")
|
|
current_slug = cust.get("wiki_slug") or ""
|
|
if current_slug.strip() != folder:
|
|
updates.append((cust_id, cust.get("name", ""), folder))
|
|
return updates, missing, ambiguous, suggestions
|
|
|
|
|
|
def rank_candidates(
|
|
folder: str,
|
|
customers: List[Dict[str, str]],
|
|
limit: int = 5,
|
|
) -> List[Dict[str, str]]:
|
|
folder_key = normalize_name(folder)
|
|
if not folder_key:
|
|
return []
|
|
ranked = []
|
|
for cust in customers:
|
|
cand_key = normalize_name(cust.get("name", ""))
|
|
if not cand_key:
|
|
continue
|
|
score = 1.0 - (
|
|
abs(len(folder_key) - len(cand_key)) / max(len(folder_key), len(cand_key), 1)
|
|
)
|
|
overlap = sum(1 for ch in folder_key if ch in cand_key) / max(len(folder_key), 1)
|
|
ranked.append((score + overlap, cust))
|
|
ranked.sort(key=lambda item: item[0], reverse=True)
|
|
return [cust for _, cust in ranked[:limit]]
|
|
|
|
|
|
def prompt_for_missing(
|
|
missing: List[str],
|
|
customers: List[Dict[str, str]],
|
|
) -> List[Tuple[int, str, str]]:
|
|
selections: List[Tuple[int, str, str]] = []
|
|
if not missing:
|
|
return selections
|
|
|
|
print("\nInteractive matching for missing folders:")
|
|
for folder in missing:
|
|
print(f"\nFolder: {folder}")
|
|
candidates = rank_candidates(folder, customers)
|
|
if not candidates:
|
|
print(" No candidates.")
|
|
continue
|
|
print(" Choose a customer (0 to skip):")
|
|
for idx, cust in enumerate(candidates, start=1):
|
|
print(f" {idx}) {cust.get('id')}: {cust.get('name')}")
|
|
while True:
|
|
choice = input(" Selection: ").strip()
|
|
if choice == "" or choice == "0":
|
|
break
|
|
if not choice.isdigit():
|
|
print(" Please enter a number from the list.")
|
|
continue
|
|
index = int(choice)
|
|
if not 1 <= index <= len(candidates):
|
|
print(" Please enter a number from the list.")
|
|
continue
|
|
cust = candidates[index - 1]
|
|
current_slug = cust.get("wiki_slug") or ""
|
|
if current_slug.strip() == folder:
|
|
print(" Already set. Skipping.")
|
|
break
|
|
selections.append((cust.get("id"), cust.get("name", ""), folder))
|
|
break
|
|
|
|
return selections
|
|
|
|
|
|
def apply_updates(conn, updates: List[Tuple[int, str, str]]) -> int:
|
|
if not updates:
|
|
return 0
|
|
with conn.cursor() as cur:
|
|
for cust_id, _, slug in updates:
|
|
cur.execute(
|
|
"UPDATE customers SET wiki_slug = %s WHERE id = %s",
|
|
(slug, cust_id),
|
|
)
|
|
conn.commit()
|
|
return len(updates)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--apply", action="store_true", help="Write updates to database")
|
|
parser.add_argument(
|
|
"--interactive",
|
|
action="store_true",
|
|
help="Prompt to match missing folders",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
db_url = os.environ.get("DATABASE_URL")
|
|
if not db_url:
|
|
print("DATABASE_URL is not set", file=sys.stderr)
|
|
return 1
|
|
|
|
conn = psycopg2.connect(db_url)
|
|
try:
|
|
customers = fetch_customers(conn)
|
|
lookup = build_lookup(customers)
|
|
updates, missing, ambiguous, suggestions = plan_updates(lookup, customers)
|
|
if args.interactive and missing:
|
|
updates.extend(prompt_for_missing(missing, customers))
|
|
|
|
print("Planned updates:")
|
|
for cust_id, name, slug in updates:
|
|
print(f" - {cust_id}: {name} -> {slug}")
|
|
|
|
if missing:
|
|
print("\nNo match for:")
|
|
for folder in missing:
|
|
hint = suggestions.get(folder, [])
|
|
if hint:
|
|
print(f" - {folder} (suggestions: {', '.join(hint)})")
|
|
else:
|
|
print(f" - {folder}")
|
|
|
|
if ambiguous:
|
|
print("\nMultiple matches for:")
|
|
for folder in ambiguous:
|
|
print(f" - {folder}")
|
|
|
|
if args.apply:
|
|
count = apply_updates(conn, updates)
|
|
print(f"\nApplied updates: {count}")
|
|
else:
|
|
print("\nDry run only. Use --apply to write changes.")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|