Omkar008's picture
Update main.py
2f4763f verified
from fastapi import FastAPI, Request , BackgroundTasks
from supabase import create_client, Client
import json
import uvicorn
from typing import Dict, List, Optional
import os
from openai import Client
# Initialize FastAPI app
app = FastAPI()
# Initialize Supabase client
supabase: Client = create_client(
supabase_url=os.getenv("SUPABASE_URL"),
supabase_key=os.getenv("SUPABASE_KEY")
)
# Initialize your open client
client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID'))
@app.post("/batch_processing_result")
async def batch_processing_result(request: Request, background_tasks: BackgroundTasks):
body = await request.json()
batch_id = body.get('batch_job_id')
print(batch_id)
# Add the processing task to background tasks
batch_job = client.batches.retrieve(batch_id)
if batch_job.status == 'completed':
background_tasks.add_task(process_batch_data, batch_id)
insert_response = (supabase.table("batch_processing_details")
.update({
"batch_job_status": True,
})
.eq('batch_job_id',batch_id)
.execute()
)
return {"batch_job_status":'completed'}
return {'batch_job_status':'completed'}
async def process_batch_data(batch_id: str):
try:
batch_job = client.batches.retrieve(batch_id)
if batch_job.status == 'completed':
result_file_id = batch_job.output_file_id
result = client.files.content(result_file_id).content
json_str = result.decode('utf-8')
json_lines = json_str.splitlines()
res = []
for line in json_lines:
if line.strip():
try:
json_dict = json.loads(line)
res.append(json_dict)
except json.JSONDecodeError as e:
print(f"Error decoding JSON on line: {line}\nError: {e}")
supa_data = []
for resp in res:
id = resp.get('custom_id').split('*')
message_id = id[0]
user_id = id[1]
email = id[2]
output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content'))
output['message_id'] = message_id
output['user_id'] = user_id
output['email'] = email
supa_data.append(output)
# update_status_response = (
# supabase.table('receipt_ocr_data')
# .update({'status':'processed'})
# .eq('email',email)
# .eq('user_id',user_id)
# .eq('message_id',message_id)
# .execute()
# )
print("Printing the the data")
print(supa_data)
insert_response = (
supabase.table("receipt_radar_structured_data_duplicate")
.insert(supa_data)
.execute()
)
print("Completed processing")
except Exception as e:
print(f"Error in background processing: {str(e)}")
# You might want to log this error or handle it in some way