160 lines
5.1 KiB
Python
160 lines
5.1 KiB
Python
import os
|
|
import random
|
|
import re
|
|
import threading
|
|
import time
|
|
|
|
from openai import OpenAI, RateLimitError
|
|
from lxml import etree
|
|
from lxml import html as lhtml
|
|
from lxml.etree import tostring
|
|
|
|
from db import cache_key, get_translation, set_translation, prune_translations
|
|
|
|
CACHE_TTL = int(os.getenv("CACHE_TTL_SECONDS", str(7 * 24 * 3600))) # default 7 days
|
|
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
|
|
_CONCURRENCY = int(os.getenv("OPENAI_MAX_CONCURRENT", "3"))
|
|
|
|
TRANSLATE_TAGS = [
|
|
"title",
|
|
"description",
|
|
"{http://purl.org/rss/1.0/modules/content/}encoded",
|
|
]
|
|
|
|
IMG_RE = re.compile(r'<img\s[^>]*>', re.IGNORECASE)
|
|
|
|
_LANG_NAMES = {
|
|
"sk": "Slovak",
|
|
"cs": "Czech",
|
|
"en": "English",
|
|
"de": "German",
|
|
"fr": "French",
|
|
"hu": "Hungarian",
|
|
"pl": "Polish",
|
|
"uk": "Ukrainian",
|
|
"ru": "Russian",
|
|
}
|
|
|
|
|
|
_STRIP_ATTRS = frozenset(['class', 'dir', 'style', 'id'])
|
|
_UNWRAP_TAGS = frozenset(['div', 'span'])
|
|
|
|
|
|
def simplify_html(html_str: str) -> str:
|
|
"""Strip Telegram boilerplate to reduce tokens sent to OpenAI.
|
|
|
|
Unwraps <div>/<span> wrappers, removes empty <p> tags, strips non-content
|
|
attributes (class, dir, style, id), and keeps only href on <a> tags.
|
|
"""
|
|
try:
|
|
root = lhtml.fragment_fromstring(html_str, create_parent='div')
|
|
except Exception:
|
|
return html_str
|
|
|
|
for el in root.iter():
|
|
if el.tag == 'a':
|
|
for attr in _STRIP_ATTRS:
|
|
el.attrib.pop(attr, None)
|
|
else:
|
|
el.attrib.clear()
|
|
|
|
for el in reversed(list(root.iter())):
|
|
if el.tag in _UNWRAP_TAGS:
|
|
if el.getparent() is not None:
|
|
el.drop_tag()
|
|
|
|
for el in list(root.iter('p')):
|
|
if not (el.text or '').strip() and not len(el):
|
|
el.getparent().remove(el)
|
|
|
|
inner = (root.text or '') + ''.join(
|
|
tostring(child, encoding='unicode') for child in root
|
|
)
|
|
return inner.strip()
|
|
|
|
_client = OpenAI(max_retries=3) # reads OPENAI_API_KEY from env
|
|
_sem = threading.Semaphore(_CONCURRENCY)
|
|
|
|
|
|
def _call_openai(text: str, lang: str, preserve_html: bool = False) -> str:
|
|
lang_name = _LANG_NAMES.get(lang, lang)
|
|
if preserve_html:
|
|
system = (
|
|
f"Translate ALL text content to {lang_name}. "
|
|
f"Every word must be in {lang_name} — do not leave any text in the source language or any other language. "
|
|
"Preserve all HTML tags and attributes exactly. "
|
|
"Return only the translated HTML."
|
|
)
|
|
else:
|
|
system = f"Translate ALL text to {lang_name}. Return only the {lang_name} translation. Do not mix in any other language."
|
|
|
|
preview = text[:120].replace("\n", " ")
|
|
print(f"[openai] {lang} | {preview!r}")
|
|
|
|
delay = 1.0
|
|
for attempt in range(5):
|
|
try:
|
|
with _sem:
|
|
resp = _client.chat.completions.create(
|
|
model=OPENAI_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": text},
|
|
],
|
|
temperature=0.1,
|
|
)
|
|
result = resp.choices[0].message.content.strip()
|
|
print(f"[openai] → {result[:120].replace(chr(10), ' ')!r} ({resp.usage.total_tokens} tokens)")
|
|
return result
|
|
except RateLimitError:
|
|
if attempt == 4:
|
|
raise
|
|
print(f"[openai] rate limited, retry {attempt + 1}/4 in {delay:.0f}s")
|
|
time.sleep(delay)
|
|
delay = min(delay * 2, 30)
|
|
|
|
|
|
def translate(text: str, lang: str, db, preserve_html: bool = False) -> str:
|
|
if not text or not text.strip():
|
|
return text
|
|
k = cache_key(text, lang)
|
|
cached = get_translation(db, k, CACHE_TTL)
|
|
if cached is not None:
|
|
return cached
|
|
try:
|
|
result = _call_openai(text, lang, preserve_html)
|
|
except Exception as e:
|
|
print(f"Translation error: {e}")
|
|
return text # don't cache failures
|
|
set_translation(db, k, result)
|
|
if random.random() < 0.01: # ~1% of writes: prune old entries
|
|
prune_translations(db, CACHE_TTL * 2)
|
|
return result
|
|
|
|
|
|
def translate_html(html: str, lang: str, db) -> str:
|
|
"""Translate HTML while preserving all tags."""
|
|
imgs = IMG_RE.findall(html)
|
|
clean = IMG_RE.sub('', html)
|
|
clean = simplify_html(clean)
|
|
translated = translate(clean, lang, db, preserve_html=True)
|
|
return translated + ''.join(imgs)
|
|
|
|
|
|
def translate_feed(xml_bytes: bytes, lang: str, db) -> bytes:
|
|
try:
|
|
root = etree.fromstring(xml_bytes)
|
|
except etree.XMLSyntaxError as e:
|
|
raise ValueError(f"Invalid XML from upstream: {e}")
|
|
items = root.findall(".//item") or root.findall(
|
|
".//{http://www.w3.org/2005/Atom}entry"
|
|
)
|
|
for item in items:
|
|
for tag in TRANSLATE_TAGS:
|
|
el = item.find(tag)
|
|
if el is not None and el.text:
|
|
if tag == "description" or tag.endswith("}encoded"):
|
|
el.text = translate_html(el.text, lang, db)
|
|
else:
|
|
el.text = translate(el.text, lang, db)
|
|
return etree.tostring(root, xml_declaration=True, encoding="UTF-8")
|