Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request , BackgroundTasks | |
| from supabase import create_client, Client | |
| import json | |
| import uvicorn | |
| from typing import Dict, List, Optional | |
| import os | |
| from openai import Client | |
| # Initialize FastAPI app | |
| app = FastAPI() | |
| # Initialize Supabase client | |
| supabase: Client = create_client( | |
| supabase_url=os.getenv("SUPABASE_URL"), | |
| supabase_key=os.getenv("SUPABASE_KEY") | |
| ) | |
| # Initialize your open client | |
| client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID')) | |
| async def batch_processing_result(request: Request, background_tasks: BackgroundTasks): | |
| body = await request.json() | |
| batch_id = body.get('batch_job_id') | |
| print(batch_id) | |
| # Add the processing task to background tasks | |
| batch_job = client.batches.retrieve(batch_id) | |
| if batch_job.status == 'completed': | |
| background_tasks.add_task(process_batch_data, batch_id) | |
| insert_response = (supabase.table("batch_processing_details") | |
| .update({ | |
| "batch_job_status": True, | |
| }) | |
| .eq('batch_job_id',batch_id) | |
| .execute() | |
| ) | |
| return {"batch_job_status":'completed'} | |
| return {'batch_job_status':'completed'} | |
| async def process_batch_data(batch_id: str): | |
| try: | |
| batch_job = client.batches.retrieve(batch_id) | |
| if batch_job.status == 'completed': | |
| result_file_id = batch_job.output_file_id | |
| result = client.files.content(result_file_id).content | |
| json_str = result.decode('utf-8') | |
| json_lines = json_str.splitlines() | |
| res = [] | |
| for line in json_lines: | |
| if line.strip(): | |
| try: | |
| json_dict = json.loads(line) | |
| res.append(json_dict) | |
| except json.JSONDecodeError as e: | |
| print(f"Error decoding JSON on line: {line}\nError: {e}") | |
| supa_data = [] | |
| for resp in res: | |
| id = resp.get('custom_id').split('*') | |
| message_id = id[0] | |
| user_id = id[1] | |
| email = id[2] | |
| output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content')) | |
| output['message_id'] = message_id | |
| output['user_id'] = user_id | |
| output['email'] = email | |
| supa_data.append(output) | |
| # update_status_response = ( | |
| # supabase.table('receipt_ocr_data') | |
| # .update({'status':'processed'}) | |
| # .eq('email',email) | |
| # .eq('user_id',user_id) | |
| # .eq('message_id',message_id) | |
| # .execute() | |
| # ) | |
| print("Printing the the data") | |
| print(supa_data) | |
| insert_response = ( | |
| supabase.table("receipt_radar_structured_data_duplicate") | |
| .insert(supa_data) | |
| .execute() | |
| ) | |
| print("Completed processing") | |
| except Exception as e: | |
| print(f"Error in background processing: {str(e)}") | |
| # You might want to log this error or handle it in some way | |