AIEXTRACT1 / backend /app /openrouter_client.py
Seth0330's picture
Update backend/app/openrouter_client.py
88c325b verified
import os
import base64
import json
import re
from io import BytesIO
from typing import Any, Dict, List
import httpx
try:
import fitz # PyMuPDF
from PIL import Image
PDF_SUPPORT = True
except ImportError as e:
PDF_SUPPORT = False
print(f"[WARNING] PDF support libraries not available: {e}. PDF conversion will not work.")
# Get your OpenRouter API key from env (you'll set this in Hugging Face later)
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY")
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1/chat/completions"
MODEL_NAME = "qwen/qwen3-vl-235b-a22b-instruct"
# HuggingFace Inference API
HF_TOKEN = os.environ.get("HF_TOKEN")
HF_INFERENCE_API_URL = "https://api-inference.huggingface.co/models"
HF_MODEL_NAME = os.environ.get("HF_MODEL_NAME", "Qwen/Qwen3-VL-235B-A22B-Instruct") # Default HF model
# OpenAI API
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
OPENAI_BASE_URL = "https://api.openai.com/v1/chat/completions"
OPENAI_MODEL_NAME = os.environ.get("OPENAI_MODEL_NAME", "gpt-4o") # Default OpenAI vision model
# Backend selection: "openrouter", "huggingface", or "openai"
EXTRACTION_BACKEND = os.environ.get("EXTRACTION_BACKEND", "openrouter").lower()
def _pdf_to_images(pdf_bytes: bytes) -> List[bytes]:
"""
Convert PDF pages to PNG images.
Returns a list of PNG image bytes, one per page.
"""
if not PDF_SUPPORT:
raise RuntimeError("PyMuPDF not installed. Cannot convert PDF to images.")
pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf")
images = []
print(f"[INFO] PDF has {len(pdf_doc)} page(s)")
for page_num in range(len(pdf_doc)):
page = pdf_doc[page_num]
# Render page to image (zoom factor 2 for better quality)
mat = fitz.Matrix(2.0, 2.0) # 2x zoom for better quality
pix = page.get_pixmap(matrix=mat)
# Convert to PIL Image then to JPEG bytes (better compression, matches working code)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
img_bytes = BytesIO()
img.save(img_bytes, format="JPEG", quality=95)
images.append(img_bytes.getvalue())
print(f"[INFO] Converted page {page_num + 1} to image ({pix.width}x{pix.height})")
pdf_doc.close()
return images
def _image_bytes_to_base64(image_bytes: bytes) -> str:
"""Convert image bytes to base64 data URL (JPEG format)."""
b64 = base64.b64encode(image_bytes).decode("utf-8")
data_url = f"data:image/jpeg;base64,{b64}"
print(f"[DEBUG] Base64 encoded image: {len(image_bytes)} bytes -> {len(data_url)} chars")
return data_url
def _file_to_image_blocks(file_bytes: bytes, content_type: str) -> List[Dict[str, Any]]:
"""
Convert file to image blocks for the vision model.
- For images: Returns single image block
- For PDFs: Converts each page to an image and returns multiple blocks
"""
# Handle PDF files
if content_type == "application/pdf" or content_type.endswith("/pdf"):
if not PDF_SUPPORT:
raise RuntimeError("PDF support requires PyMuPDF. Please install it.")
print(f"[INFO] Converting PDF to images...")
pdf_images = _pdf_to_images(file_bytes)
# Create image blocks for each page
# OpenRouter format: {"type": "image_url", "image_url": {"url": "data:..."}}
image_blocks = []
for i, img_bytes in enumerate(pdf_images):
data_url = _image_bytes_to_base64(img_bytes)
image_blocks.append({
"type": "image_url",
"image_url": {"url": data_url}
})
print(f"[INFO] Created image block for page {i + 1} ({len(img_bytes)} bytes)")
return image_blocks
# Handle regular image files
else:
# Convert to JPEG for consistency (better compression)
try:
img = Image.open(BytesIO(file_bytes))
if img.mode != "RGB":
img = img.convert("RGB")
# Resize if too large (max 1920px on longest side) - matches your working code
max_size = 1920
w, h = img.size
if w > max_size or h > max_size:
if w > h:
new_w = max_size
new_h = int(h * (max_size / w))
else:
new_h = max_size
new_w = int(w * (max_size / h))
img = img.resize((new_w, new_h), Image.LANCZOS)
print(f"[INFO] Resized image from {w}x{h} to {new_w}x{new_h}")
# Convert to JPEG bytes
img_bytes = BytesIO()
img.save(img_bytes, format="JPEG", quality=95)
img_bytes = img_bytes.getvalue()
data_url = _image_bytes_to_base64(img_bytes)
except Exception as e:
# Fallback: use original file bytes
print(f"[WARNING] Could not process image with PIL: {e}. Using original bytes.")
b64 = base64.b64encode(file_bytes).decode("utf-8")
data_url = f"data:{content_type};base64,{b64}"
print(f"[DEBUG] Encoding image file. Content type: {content_type}, Size: {len(file_bytes)} bytes")
return [{
"type": "image_url",
"image_url": {"url": data_url}
}]
async def _extract_single_page(image_bytes: bytes, page_num: int, total_pages: int, backend: str = None) -> Dict[str, Any]:
"""
Extract text from a single page/image.
Processes one page at a time to avoid large payloads.
"""
backend = backend or EXTRACTION_BACKEND
if backend == "huggingface":
return await _extract_with_hf(image_bytes, page_num, total_pages)
elif backend == "openai":
return await _extract_with_openai_single(image_bytes, page_num, total_pages)
else:
return await _extract_with_openrouter_single(image_bytes, page_num, total_pages)
async def extract_fields_from_document(
file_bytes: bytes,
content_type: str,
filename: str,
) -> Dict[str, Any]:
"""
Extract fields from document. Processes pages separately for better reliability.
Supports OpenRouter, HuggingFace Inference API, and OpenAI Vision API.
"""
# Convert file to image blocks (handles PDF conversion)
image_blocks_data = _file_to_image_blocks(file_bytes, content_type)
if not image_blocks_data:
raise ValueError("No images generated from file")
# Get raw image bytes for processing
if content_type == "application/pdf" or content_type.endswith("/pdf"):
# For PDFs, we need to get the raw image bytes
pdf_images = _pdf_to_images(file_bytes)
image_bytes_list = pdf_images
else:
# For regular images, use the file bytes directly
image_bytes_list = [file_bytes]
total_pages = len(image_bytes_list)
print(f"[INFO] Processing {total_pages} page(s) separately for better reliability...")
# Process each page separately
page_results = []
for page_num, img_bytes in enumerate(image_bytes_list):
print(f"[INFO] Processing page {page_num + 1}/{total_pages}...")
try:
page_result = await _extract_single_page(img_bytes, page_num + 1, total_pages)
page_results.append({
"page_number": page_num + 1,
"text": page_result.get("full_text", ""),
"fields": page_result.get("fields", {}),
"confidence": page_result.get("confidence", 0),
"doc_type": page_result.get("doc_type", "other"),
})
print(f"[INFO] Page {page_num + 1} processed successfully")
except Exception as e:
print(f"[ERROR] Failed to process page {page_num + 1}: {e}")
page_results.append({
"page_number": page_num + 1,
"text": "",
"fields": {},
"confidence": 0,
"error": str(e)
})
# Combine results from all pages
combined_full_text = "\n\n".join([f"=== PAGE {p['page_number']} ===\n\n{p['text']}" for p in page_results if p.get("text")])
# Merge fields from all pages (prefer non-empty values)
combined_fields = {}
for page_result in page_results:
page_fields = page_result.get("fields", {})
for key, value in page_fields.items():
if value and (key not in combined_fields or not combined_fields[key]):
combined_fields[key] = value
# Calculate average confidence
confidences = [p.get("confidence", 0) for p in page_results if p.get("confidence", 0) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
# Determine doc_type from first successful page
doc_type = "other"
for page_result in page_results:
if page_result.get("doc_type") and page_result["doc_type"] != "other":
doc_type = page_result["doc_type"]
break
return {
"doc_type": doc_type,
"confidence": avg_confidence,
"full_text": combined_full_text,
"fields": combined_fields,
"pages": page_results
}
async def _extract_with_openrouter_single(image_bytes: bytes, page_num: int, total_pages: int) -> Dict[str, Any]:
"""Extract from a single page using OpenRouter."""
if not OPENROUTER_API_KEY:
raise RuntimeError("OPENROUTER_API_KEY environment variable is not set")
# Create single image block
data_url = _image_bytes_to_base64(image_bytes)
image_block = {
"type": "image_url",
"image_url": {"url": data_url}
}
system_prompt = (
"You are a document extraction engine with vision capabilities. "
"You read and extract text from documents in any language, preserving structure, formatting, and all content. "
"You output structured JSON with both the full extracted text and key-value pairs."
)
user_prompt = (
f"Read this document page ({page_num} of {total_pages}) using your vision capability and extract ALL text content. "
"I want the complete end-to-end text, preserving structure, headings, formatting, and content in all languages.\n\n"
"Extract every word, number, and piece of information, including any non-English text (Punjabi, Hindi, etc.).\n\n"
"Respond with JSON in this format:\n"
"{\n"
' \"doc_type\": \"invoice | receipt | contract | report | notice | other\",\n'
' \"confidence\": number between 0 and 100,\n'
' \"full_text\": \"Complete extracted text from this page, preserving structure and formatting. Include all languages.\",\n'
' \"fields\": {\n'
' \"invoice_number\": \"...\",\n'
' \"date\": \"...\",\n'
' \"company_name\": \"...\",\n'
' \"address\": \"...\",\n'
' \"other_field\": \"...\"\n'
" }\n"
"}\n\n"
"IMPORTANT:\n"
"- Extract ALL text from this page, including non-English languages\n"
"- Preserve structure, headings, and formatting\n"
"- Fill in fields with relevant extracted information\n"
"- If a field is not found, use empty string or omit it"
)
payload: Dict[str, Any] = {
"model": MODEL_NAME,
"messages": [
{
"role": "system",
"content": [{"type": "text", "text": system_prompt}],
},
{
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
image_block
],
},
],
"max_tokens": 4096, # Smaller for single page
}
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": os.environ.get("APP_URL", "https://huggingface.co/spaces/your-space"),
"X-Title": "Document Capture Demo",
}
payload_size_mb = len(json.dumps(payload).encode('utf-8')) / 1024 / 1024
print(f"[INFO] OpenRouter: Processing page {page_num}, payload: {payload_size_mb:.2f} MB")
try:
timeout = httpx.Timeout(180.0, connect=30.0) # 3 min per page
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(OPENROUTER_BASE_URL, headers=headers, json=payload)
resp.raise_for_status()
data = resp.json()
except httpx.TimeoutException:
raise RuntimeError(f"OpenRouter API timed out for page {page_num}")
except Exception as e:
raise RuntimeError(f"OpenRouter API error for page {page_num}: {str(e)}")
if "choices" not in data or len(data["choices"]) == 0:
raise ValueError(f"No choices in OpenRouter response for page {page_num}")
content = data["choices"][0]["message"]["content"]
if isinstance(content, list):
text = "".join(part.get("text", "") for part in content if part.get("type") == "text")
else:
text = content
# Parse JSON response
return _parse_model_response(text, page_num)
async def _extract_with_openai_single(image_bytes: bytes, page_num: int, total_pages: int) -> Dict[str, Any]:
"""Extract from a single page using OpenAI GPT-4o Vision API."""
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY environment variable is not set")
# Create single image block
data_url = _image_bytes_to_base64(image_bytes)
image_block = {
"type": "image_url",
"image_url": {"url": data_url}
}
system_prompt = (
"You are a document extraction engine with vision capabilities. "
"You read and extract text from documents in any language, preserving structure, formatting, and all content. "
"You output structured JSON with both the full extracted text and key-value pairs."
)
user_prompt = (
f"Read this document page ({page_num} of {total_pages}) using your vision capability and extract ALL text content. "
"I want the complete end-to-end text, preserving structure, headings, formatting, and content in all languages.\n\n"
"Extract every word, number, and piece of information, including any non-English text (Punjabi, Hindi, etc.).\n\n"
"Respond with JSON in this format:\n"
"{\n"
' "doc_type": "invoice | receipt | contract | report | notice | other",\n'
' "confidence": number between 0 and 100,\n'
' "full_text": "Complete extracted text from this page, preserving structure and formatting. Include all languages.",\n'
' "fields": {\n'
' "invoice_number": "...",\n'
' "date": "...",\n'
' "company_name": "...",\n'
' "address": "...",\n'
' "other_field": "..."\n'
" }\n"
"}\n\n"
"IMPORTANT:\n"
"- Extract ALL text from this page, including non-English languages\n"
"- Preserve structure, headings, and formatting\n"
"- Fill in fields with relevant extracted information\n"
"- If a field is not found, use empty string or omit it"
)
payload: Dict[str, Any] = {
"model": OPENAI_MODEL_NAME,
"messages": [
{
"role": "system",
"content": system_prompt,
},
{
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
image_block
],
},
],
"max_tokens": 4096, # Similar to OpenRouter
"temperature": 0.1, # Lower temperature for more consistent extraction
}
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}
payload_size_mb = len(json.dumps(payload).encode('utf-8')) / 1024 / 1024
print(f"[INFO] OpenAI: Processing page {page_num} with model {OPENAI_MODEL_NAME}, payload: {payload_size_mb:.2f} MB")
try:
timeout = httpx.Timeout(180.0, connect=30.0) # 3 min per page
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(OPENAI_BASE_URL, headers=headers, json=payload)
resp.raise_for_status()
data = resp.json()
except httpx.TimeoutException:
raise RuntimeError(f"OpenAI API timed out for page {page_num}")
except Exception as e:
error_msg = str(e)
print(f"[ERROR] OpenAI API error details: {type(e).__name__}: {error_msg}")
raise RuntimeError(f"OpenAI API error for page {page_num}: {error_msg}")
if "choices" not in data or len(data["choices"]) == 0:
raise ValueError(f"No choices in OpenAI response for page {page_num}")
response_text = data["choices"][0]["message"]["content"]
print(f"[DEBUG] OpenAI response preview: {response_text[:500]}")
return _parse_model_response(response_text, page_num)
async def _extract_with_hf(image_bytes: bytes, page_num: int, total_pages: int) -> Dict[str, Any]:
"""Extract from a single page using HuggingFace Inference API (router endpoint)."""
if not HF_TOKEN:
raise RuntimeError("HF_TOKEN environment variable is not set")
try:
from huggingface_hub import InferenceClient
except ImportError:
raise RuntimeError("huggingface_hub not installed. Add it to requirements.txt")
# Use InferenceClient with router endpoint (required for newer models)
client = InferenceClient(
api_key=HF_TOKEN,
timeout=180.0
)
prompt = (
f"Read this document page ({page_num} of {total_pages}) and extract ALL text content. "
"Extract every word, number, and piece of information, including any non-English text. "
"Return JSON with 'full_text', 'doc_type', 'confidence', and 'fields'."
)
print(f"[INFO] HuggingFace: Processing page {page_num} with model {HF_MODEL_NAME}")
try:
# Convert image bytes to base64 data URL
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
image_data_url = f"data:image/jpeg;base64,{image_base64}"
# Use chat.completions.create() as shown in HuggingFace documentation
# This uses the router endpoint which is now required
# Run in executor since it's a blocking synchronous call
import asyncio
loop = asyncio.get_event_loop()
completion = await loop.run_in_executor(
None,
lambda: client.chat.completions.create(
model=HF_MODEL_NAME,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": image_data_url
}
}
]
}
],
max_tokens=2048,
temperature=0.1
)
)
# Extract response text from completion
if hasattr(completion, 'choices') and len(completion.choices) > 0:
message = completion.choices[0].message
if hasattr(message, 'content'):
response_text = message.content
else:
response_text = str(message)
else:
response_text = str(completion)
if not response_text:
raise ValueError("Empty response from HuggingFace API")
print(f"[DEBUG] HuggingFace response preview: {response_text[:500]}")
return _parse_model_response(response_text, page_num)
except Exception as e:
error_msg = str(e)
print(f"[ERROR] HuggingFace API error details: {type(e).__name__}: {error_msg}")
# Check if it's a permissions error
if "403" in error_msg or "permissions" in error_msg.lower() or "Forbidden" in error_msg:
raise RuntimeError(
f"HuggingFace API error for page {page_num}: Insufficient permissions. "
"Your HF_TOKEN may need to be a token with 'read' access to Inference API. "
"Check your HuggingFace account settings and token permissions."
)
raise RuntimeError(f"HuggingFace API error for page {page_num}: {error_msg}")
def _parse_model_response(text: str, page_num: int = None) -> Dict[str, Any]:
"""Parse JSON response from model, handling truncation and errors."""
if not text or not text.strip():
raise ValueError("Empty response from model")
# Try to parse JSON
try:
parsed = json.loads(text)
print(f"[DEBUG] Successfully parsed JSON for page {page_num or 'single'}")
return parsed
except json.JSONDecodeError as e:
print(f"[DEBUG] Direct JSON parse failed: {e}")
# Try to extract JSON from markdown code blocks
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# Try to find JSON object
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if json_match:
try:
fixed_json = _fix_truncated_json(json_match.group(0))
return json.loads(fixed_json)
except Exception:
pass
# Extract full_text even from truncated JSON
full_text_match = re.search(r'"full_text"\s*:\s*"(.*?)(?:"\s*[,}]|$)', text, re.DOTALL)
if full_text_match:
full_text = (full_text_match.group(1)
.replace('\\n', '\n')
.replace('\\"', '"')
.replace('\\\\', '\\'))
return {
"doc_type": "other",
"confidence": 90.0,
"full_text": full_text,
"fields": {"full_text": full_text}
}
# Last resort: return raw text
return {
"doc_type": "other",
"confidence": 50.0,
"full_text": text[:2000],
"fields": {"raw_text": text[:2000]}
}
def _fix_truncated_json(json_str: str) -> str:
"""Attempt to fix truncated JSON by closing unclosed strings and objects."""
# Count open braces
open_braces = json_str.count('{') - json_str.count('}')
open_brackets = json_str.count('[') - json_str.count(']')
# Check if we're in the middle of a string
in_string = False
escape_next = False
for i, char in enumerate(json_str):
if escape_next:
escape_next = False
continue
if char == '\\':
escape_next = True
continue
if char == '"':
in_string = not in_string
# If we're in a string, close it
if in_string:
json_str = json_str.rstrip() + '"'
# Close any open brackets
json_str += ']' * open_brackets
# Close any open braces
json_str += '}' * open_braces
return json_str
def _extract_partial_json(text: str) -> Dict[str, Any]:
"""Extract what we can from a partial JSON response."""
result = {
"doc_type": "other",
"confidence": 0.0,
"fields": {}
}
# Try to extract doc_type
doc_type_match = re.search(r'"doc_type"\s*:\s*"([^"]+)"', text)
if doc_type_match:
result["doc_type"] = doc_type_match.group(1)
# Try to extract confidence
confidence_match = re.search(r'"confidence"\s*:\s*(\d+(?:\.\d+)?)', text)
if confidence_match:
result["confidence"] = float(confidence_match.group(1))
# Try to extract full_text (even if truncated)
full_text_match = re.search(r'"full_text"\s*:\s*"([^"]*(?:\\.[^"]*)*)', text, re.DOTALL)
if full_text_match:
try:
full_text = full_text_match.group(1)
# Unescape common sequences
full_text = full_text.replace('\\n', '\n').replace('\\"', '"').replace('\\\\', '\\')
result["full_text"] = full_text
result["fields"]["full_text"] = full_text
except Exception:
pass
return result