rsshubtrans/translator/translate.py

160 lines
5.1 KiB
Python

import os
import random
import re
import threading
import time
from openai import OpenAI, RateLimitError
from lxml import etree
from lxml import html as lhtml
from lxml.etree import tostring
from db import cache_key, get_translation, set_translation, prune_translations
CACHE_TTL = int(os.getenv("CACHE_TTL_SECONDS", str(7 * 24 * 3600))) # default 7 days
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
_CONCURRENCY = int(os.getenv("OPENAI_MAX_CONCURRENT", "3"))
TRANSLATE_TAGS = [
"title",
"description",
"{http://purl.org/rss/1.0/modules/content/}encoded",
]
IMG_RE = re.compile(r'<img\s[^>]*>', re.IGNORECASE)
_LANG_NAMES = {
"sk": "Slovak",
"cs": "Czech",
"en": "English",
"de": "German",
"fr": "French",
"hu": "Hungarian",
"pl": "Polish",
"uk": "Ukrainian",
"ru": "Russian",
}
_STRIP_ATTRS = frozenset(['class', 'dir', 'style', 'id'])
_UNWRAP_TAGS = frozenset(['div', 'span'])
def simplify_html(html_str: str) -> str:
"""Strip Telegram boilerplate to reduce tokens sent to OpenAI.
Unwraps <div>/<span> wrappers, removes empty <p> tags, strips non-content
attributes (class, dir, style, id), and keeps only href on <a> tags.
"""
try:
root = lhtml.fragment_fromstring(html_str, create_parent='div')
except Exception:
return html_str
for el in root.iter():
if el.tag == 'a':
for attr in _STRIP_ATTRS:
el.attrib.pop(attr, None)
else:
el.attrib.clear()
for el in reversed(list(root.iter())):
if el.tag in _UNWRAP_TAGS:
if el.getparent() is not None:
el.drop_tag()
for el in list(root.iter('p')):
if not (el.text or '').strip() and not len(el):
el.getparent().remove(el)
inner = (root.text or '') + ''.join(
tostring(child, encoding='unicode') for child in root
)
return inner.strip()
_client = OpenAI(max_retries=3) # reads OPENAI_API_KEY from env
_sem = threading.Semaphore(_CONCURRENCY)
def _call_openai(text: str, lang: str, preserve_html: bool = False) -> str:
lang_name = _LANG_NAMES.get(lang, lang)
if preserve_html:
system = (
f"Translate ALL text content to {lang_name}. "
f"Every word must be in {lang_name} — do not leave any text in the source language or any other language. "
"Preserve all HTML tags and attributes exactly. "
"Return only the translated HTML."
)
else:
system = f"Translate ALL text to {lang_name}. Return only the {lang_name} translation. Do not mix in any other language."
preview = text[:120].replace("\n", " ")
print(f"[openai] {lang} | {preview!r}")
delay = 1.0
for attempt in range(5):
try:
with _sem:
resp = _client.chat.completions.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": text},
],
temperature=0.1,
)
result = resp.choices[0].message.content.strip()
print(f"[openai] → {result[:120].replace(chr(10), ' ')!r} ({resp.usage.total_tokens} tokens)")
return result
except RateLimitError:
if attempt == 4:
raise
print(f"[openai] rate limited, retry {attempt + 1}/4 in {delay:.0f}s")
time.sleep(delay)
delay = min(delay * 2, 30)
def translate(text: str, lang: str, db, preserve_html: bool = False) -> str:
if not text or not text.strip():
return text
k = cache_key(text, lang)
cached = get_translation(db, k, CACHE_TTL)
if cached is not None:
return cached
try:
result = _call_openai(text, lang, preserve_html)
except Exception as e:
print(f"Translation error: {e}")
return text # don't cache failures
set_translation(db, k, result)
if random.random() < 0.01: # ~1% of writes: prune old entries
prune_translations(db, CACHE_TTL * 2)
return result
def translate_html(html: str, lang: str, db) -> str:
"""Translate HTML while preserving all tags."""
imgs = IMG_RE.findall(html)
clean = IMG_RE.sub('', html)
clean = simplify_html(clean)
translated = translate(clean, lang, db, preserve_html=True)
return translated + ''.join(imgs)
def translate_feed(xml_bytes: bytes, lang: str, db) -> bytes:
try:
root = etree.fromstring(xml_bytes)
except etree.XMLSyntaxError as e:
raise ValueError(f"Invalid XML from upstream: {e}")
items = root.findall(".//item") or root.findall(
".//{http://www.w3.org/2005/Atom}entry"
)
for item in items:
for tag in TRANSLATE_TAGS:
el = item.find(tag)
if el is not None and el.text:
if tag == "description" or tag.endswith("}encoded"):
el.text = translate_html(el.text, lang, db)
else:
el.text = translate(el.text, lang, db)
return etree.tostring(root, xml_declaration=True, encoding="UTF-8")