import os import random import re import threading import time from openai import OpenAI, RateLimitError from lxml import etree from lxml import html as lhtml from lxml.etree import tostring from db import cache_key, get_translation, set_translation, prune_translations CACHE_TTL = int(os.getenv("CACHE_TTL_SECONDS", str(7 * 24 * 3600))) # default 7 days OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini") _CONCURRENCY = int(os.getenv("OPENAI_MAX_CONCURRENT", "3")) TRANSLATE_TAGS = [ "title", "description", "{http://purl.org/rss/1.0/modules/content/}encoded", ] IMG_RE = re.compile(r']*>', re.IGNORECASE) _LANG_NAMES = { "sk": "Slovak", "cs": "Czech", "en": "English", "de": "German", "fr": "French", "hu": "Hungarian", "pl": "Polish", "uk": "Ukrainian", "ru": "Russian", } _STRIP_ATTRS = frozenset(['class', 'dir', 'style', 'id']) _UNWRAP_TAGS = frozenset(['div', 'span']) def simplify_html(html_str: str) -> str: """Strip Telegram boilerplate to reduce tokens sent to OpenAI. Unwraps
/ wrappers, removes empty

tags, strips non-content attributes (class, dir, style, id), and keeps only href on tags. """ try: root = lhtml.fragment_fromstring(html_str, create_parent='div') except Exception: return html_str for el in root.iter(): if el.tag == 'a': for attr in _STRIP_ATTRS: el.attrib.pop(attr, None) else: el.attrib.clear() for el in reversed(list(root.iter())): if el.tag in _UNWRAP_TAGS: if el.getparent() is not None: el.drop_tag() for el in list(root.iter('p')): if not (el.text or '').strip() and not len(el): el.getparent().remove(el) inner = (root.text or '') + ''.join( tostring(child, encoding='unicode') for child in root ) return inner.strip() _client = OpenAI(max_retries=3) # reads OPENAI_API_KEY from env _sem = threading.Semaphore(_CONCURRENCY) def _call_openai(text: str, lang: str, preserve_html: bool = False) -> str: lang_name = _LANG_NAMES.get(lang, lang) if preserve_html: system = ( f"Translate ALL text content to {lang_name}. " f"Every word must be in {lang_name} — do not leave any text in the source language or any other language. " "Preserve all HTML tags and attributes exactly. " "Return only the translated HTML." ) else: system = f"Translate ALL text to {lang_name}. Return only the {lang_name} translation. Do not mix in any other language." preview = text[:120].replace("\n", " ") print(f"[openai] {lang} | {preview!r}") delay = 1.0 for attempt in range(5): try: with _sem: resp = _client.chat.completions.create( model=OPENAI_MODEL, messages=[ {"role": "system", "content": system}, {"role": "user", "content": text}, ], temperature=0.1, ) result = resp.choices[0].message.content.strip() print(f"[openai] → {result[:120].replace(chr(10), ' ')!r} ({resp.usage.total_tokens} tokens)") return result except RateLimitError: if attempt == 4: raise print(f"[openai] rate limited, retry {attempt + 1}/4 in {delay:.0f}s") time.sleep(delay) delay = min(delay * 2, 30) def translate(text: str, lang: str, db, preserve_html: bool = False) -> str: if not text or not text.strip(): return text k = cache_key(text, lang) cached = get_translation(db, k, CACHE_TTL) if cached is not None: return cached try: result = _call_openai(text, lang, preserve_html) except Exception as e: print(f"Translation error: {e}") return text # don't cache failures set_translation(db, k, result) if random.random() < 0.01: # ~1% of writes: prune old entries prune_translations(db, CACHE_TTL * 2) return result def translate_html(html: str, lang: str, db) -> str: """Translate HTML while preserving all tags.""" imgs = IMG_RE.findall(html) clean = IMG_RE.sub('', html) clean = simplify_html(clean) translated = translate(clean, lang, db, preserve_html=True) return translated + ''.join(imgs) def translate_feed(xml_bytes: bytes, lang: str, db) -> bytes: try: root = etree.fromstring(xml_bytes) except etree.XMLSyntaxError as e: raise ValueError(f"Invalid XML from upstream: {e}") items = root.findall(".//item") or root.findall( ".//{http://www.w3.org/2005/Atom}entry" ) for item in items: for tag in TRANSLATE_TAGS: el = item.find(tag) if el is not None and el.text: if tag == "description" or tag.endswith("}encoded"): el.text = translate_html(el.text, lang, db) else: el.text = translate(el.text, lang, db) return etree.tostring(root, xml_declaration=True, encoding="UTF-8")