93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
|
|
import re
|
||
|
|
|
||
|
|
with open("app/modules/manual/frontend/views.py", "r") as f:
|
||
|
|
text = f.read()
|
||
|
|
|
||
|
|
# Make sure cache is imported
|
||
|
|
if "manual_cache" not in text:
|
||
|
|
text = text.replace("from app.core.database import", "from app.modules.manual.backend.cache import manual_cache\nfrom app.core.database import")
|
||
|
|
|
||
|
|
new_func = """async def manual_index(
|
||
|
|
request: Request,
|
||
|
|
module: Optional[str] = Query(default=None),
|
||
|
|
difficulty: Optional[str] = Query(default=None),
|
||
|
|
tag: Optional[str] = Query(default=None),
|
||
|
|
search: Optional[str] = Query(default=None),
|
||
|
|
):
|
||
|
|
cache_key = f"views:list:{module}:{difficulty}:{tag}:{search}"
|
||
|
|
cached = manual_cache.get(cache_key)
|
||
|
|
if cached:
|
||
|
|
rows, modules, unique_tags = cached
|
||
|
|
else:
|
||
|
|
filters = ["deleted_at IS NULL", "status = %s"]
|
||
|
|
params = ["published"]
|
||
|
|
# ... logic mapping ...
|
||
|
|
if module:
|
||
|
|
filters.append("module = %s")
|
||
|
|
params.append(module)
|
||
|
|
if difficulty:
|
||
|
|
filters.append("difficulty = %s")
|
||
|
|
params.append(difficulty)
|
||
|
|
if tag:
|
||
|
|
filters.append("tags @> %s")
|
||
|
|
params.append(f'["{tag}"]')
|
||
|
|
if search:
|
||
|
|
filters.append("(title ILIKE %s OR content ILIKE %s)")
|
||
|
|
params.extend([f"%{search}%", f"%{search}%"])
|
||
|
|
|
||
|
|
where_clause = " AND ".join(filters)
|
||
|
|
|
||
|
|
rows = execute_query(
|
||
|
|
f"SELECT id, slug, title, content, module, tags, difficulty, use_count, updated_at "
|
||
|
|
f"FROM manual_articles WHERE {where_clause} ORDER BY updated_at DESC",
|
||
|
|
tuple(params)
|
||
|
|
) or []
|
||
|
|
|
||
|
|
modules = execute_query(
|
||
|
|
"SELECT DISTINCT module FROM manual_articles WHERE deleted_at IS NULL ORDER BY module ASC"
|
||
|
|
) or []
|
||
|
|
|
||
|
|
all_tags: List[str] = []
|
||
|
|
for row in rows:
|
||
|
|
if "tags" in row and row["tags"]:
|
||
|
|
try:
|
||
|
|
import json
|
||
|
|
if isinstance(row["tags"], str):
|
||
|
|
t = json.loads(row["tags"])
|
||
|
|
if isinstance(t, list):
|
||
|
|
all_tags.extend(t)
|
||
|
|
elif isinstance(row["tags"], list):
|
||
|
|
all_tags.extend(row["tags"])
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
unique_tags = sorted(list(set(all_tags)))
|
||
|
|
|
||
|
|
manual_cache.set(cache_key, (rows, modules, unique_tags))
|
||
|
|
|
||
|
|
return templates.TemplateResponse(
|
||
|
|
"modules/manual/templates/list.html",
|
||
|
|
{
|
||
|
|
"request": request,
|
||
|
|
"articles": rows,
|
||
|
|
"available_modules": modules,
|
||
|
|
"available_tags": unique_tags,
|
||
|
|
"filters": {
|
||
|
|
"module": module or "",
|
||
|
|
"difficulty": difficulty or "",
|
||
|
|
"tag": tag or "",
|
||
|
|
"search": search or "",
|
||
|
|
},
|
||
|
|
},
|
||
|
|
)
|
||
|
|
"""
|
||
|
|
|
||
|
|
text = re.sub(
|
||
|
|
r'async def manual_index\(.*?return templates\.TemplateResponse\([\s\S]*?\n \)\n',
|
||
|
|
new_func,
|
||
|
|
text,
|
||
|
|
flags=re.DOTALL
|
||
|
|
)
|
||
|
|
|
||
|
|
with open("app/modules/manual/frontend/views.py", "w") as f:
|
||
|
|
f.write(text)
|