random2345t6 / utils.py
SakibAhmed's picture
Upload 4 files
56313b7 verified
import os
import logging
import re
import shutil
import tempfile
import time
from typing import Optional
import zipfile
import gdown
from pypdf import PdfReader
import docx as python_docx
logger = logging.getLogger(__name__)
def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]:
logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}")
text_content = None
try:
if file_type == 'pdf':
reader = PdfReader(file_path)
text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text())
logger.info(f"[TEXT_EXTRACTION] PDF extracted {len(reader.pages)} pages, {len(text_content)} characters")
elif file_type == 'docx':
doc = python_docx.Document(file_path)
text_content = "\n".join(para.text for para in doc.paragraphs if para.text)
logger.info(f"[TEXT_EXTRACTION] DOCX extracted {len(doc.paragraphs)} paragraphs, {len(text_content)} characters")
elif file_type == 'txt':
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text_content = f.read()
logger.info(f"[TEXT_EXTRACTION] TXT extracted {len(text_content)} characters")
else:
logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type} for file {file_path}")
return None
if not text_content or not text_content.strip():
logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}")
return None
logger.info(f"[TEXT_EXTRACTION] Successfully extracted text from {file_path}")
return text_content.strip()
except Exception as e:
logger.error(f"[TEXT_EXTRACTION] Error extracting text from {file_path} ({file_type.upper()}): {e}", exc_info=True)
return None
FAISS_RAG_SUPPORTED_EXTENSIONS = {
'pdf': lambda path: extract_text_from_file(path, 'pdf'),
'docx': lambda path: extract_text_from_file(path, 'docx'),
'txt': lambda path: extract_text_from_file(path, 'txt'),
}
def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]:
if not url_or_id:
return None
match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id)
if match_folder:
return match_folder.group(1)
match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id)
if match_file_d:
return match_file_d.group(1)
match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id)
if match_uc:
return match_uc.group(1)
if "/" not in url_or_id and "=" not in url_or_id and "." not in url_or_id and len(url_or_id) > 10:
return url_or_id
logger.warning(f"Could not reliably extract Google Drive ID from input: {url_or_id}")
return None
def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool:
"""
Downloads a single file from Google Drive to a specific path.
"""
logger.info(f"[GDRIVE_SINGLE_FILE] Attempting to download file. Input: {file_id_or_url}")
file_id = get_id_from_gdrive_input(file_id_or_url)
if not file_id:
logger.error(f"[GDRIVE_SINGLE_FILE] Invalid Google Drive File ID or URL provided: {file_id_or_url}")
return False
try:
# Ensure the target directory exists before downloading
target_dir = os.path.dirname(target_path)
os.makedirs(target_dir, exist_ok=True)
logger.info(f"[GDRIVE_SINGLE_FILE] Downloading file ID: {file_id} to path: {target_path}")
# Use gdown to download directly to the target file path, fuzzy=True helps with some permissions
gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True)
if not os.path.exists(target_path) or os.path.getsize(target_path) == 0:
logger.error("[GDRIVE_SINGLE_FILE] Download failed or the resulting file is empty.")
return False
logger.info(f"[GDRIVE_SINGLE_FILE] Download successful.")
return True
except Exception as e:
logger.error(f"[GDRIVE_SINGLE_FILE] An error occurred during download: {e}", exc_info=True)
return False
def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool:
"""
Downloads a single ZIP file from Google Drive and extracts its contents.
"""
logger.info(f"[GDRIVE_FILE] Attempting to download and extract ZIP from Google Drive. Input: {file_id_or_url}")
file_id = get_id_from_gdrive_input(file_id_or_url)
if not file_id:
logger.error(f"[GDRIVE_FILE] Invalid Google Drive File ID or URL provided: {file_id_or_url}")
return False
temp_download_dir = tempfile.mkdtemp(prefix="gdrive_zip_")
temp_zip_path = os.path.join(temp_download_dir, "downloaded_file.zip")
try:
logger.info(f"[GDRIVE_FILE] Downloading file ID: {file_id} to temporary path: {temp_zip_path}")
gdown.download(id=file_id, output=temp_zip_path, quiet=False)
if not os.path.exists(temp_zip_path) or os.path.getsize(temp_zip_path) == 0:
logger.error("[GDRIVE_FILE] Download failed or the resulting file is empty.")
return False
logger.info(f"[GDRIVE_FILE] Download successful. Extracting ZIP to: {target_extraction_dir}")
os.makedirs(target_extraction_dir, exist_ok=True)
with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref:
zip_ref.extractall(target_extraction_dir)
logger.info(f"[GDRIVE_FILE] Successfully extracted ZIP archive.")
return True
except Exception as e:
logger.error(f"[GDRIVE_FILE] An error occurred during download or extraction: {e}", exc_info=True)
return False
finally:
if os.path.exists(temp_download_dir):
try:
shutil.rmtree(temp_download_dir)
logger.debug(f"[GDRIVE_FILE] Cleaned up temporary directory: {temp_download_dir}")
except Exception as e_del:
logger.warning(f"[GDRIVE_FILE] Could not remove temporary directory '{temp_download_dir}': {e_del}")
def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool:
logger.info(f"[GDRIVE] Attempting to download sources from Google Drive. Input: {folder_id_or_url}")
folder_id = get_id_from_gdrive_input(folder_id_or_url)
if not folder_id:
logger.error(f"[GDRIVE] Invalid Google Drive Folder ID or URL provided: {folder_id_or_url}")
return False
temp_download_parent_dir = tempfile.mkdtemp(prefix="gdrive_parent_")
download_path = None
try:
max_retries = 3
retry_delay_seconds = 10
last_gdown_exception = None
for attempt in range(max_retries):
logger.info(f"[GDRIVE] Attempt {attempt + 1} of {max_retries} to download folder ID: {folder_id}")
try:
start_time = time.time()
download_path = gdown.download_folder(id=folder_id, output=temp_download_parent_dir, quiet=False, use_cookies=False)
download_time = time.time() - start_time
if download_path and os.path.exists(temp_download_parent_dir) and os.listdir(temp_download_parent_dir):
logger.info(f"[GDRIVE] Successfully downloaded in {download_time:.2f}s. Path: {download_path}")
last_gdown_exception = None
break
else:
logger.warning(f"[GDRIVE] Attempt {attempt + 1} completed but directory is empty")
if attempt < max_retries - 1:
logger.info(f"[GDRIVE] Retrying in {retry_delay_seconds} seconds...")
time.sleep(retry_delay_seconds)
if os.path.exists(temp_download_parent_dir): shutil.rmtree(temp_download_parent_dir)
os.makedirs(temp_download_parent_dir)
else:
raise Exception("gdown failed to populate the directory after multiple attempts.")
except Exception as e:
last_gdown_exception = e
logger.warning(f"[GDRIVE] Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
logger.info(f"[GDRIVE] Retrying in {retry_delay_seconds} seconds...")
time.sleep(retry_delay_seconds)
if os.path.exists(temp_download_parent_dir): shutil.rmtree(temp_download_parent_dir)
os.makedirs(temp_download_parent_dir)
else:
logger.error(f"[GDRIVE] Failed after {max_retries} attempts. Last error: {e}", exc_info=True)
return False
if last_gdown_exception:
logger.error(f"[GDRIVE] Failed after all retries. Last error: {last_gdown_exception}", exc_info=True)
return False
os.makedirs(target_dir_for_contents, exist_ok=True)
items_in_temp_parent = os.listdir(temp_download_parent_dir)
source_content_root = temp_download_parent_dir
if len(items_in_temp_parent) == 1 and os.path.isdir(os.path.join(temp_download_parent_dir, items_in_temp_parent[0])):
potential_actual_root = os.path.join(temp_download_parent_dir, items_in_temp_parent[0])
if download_path and os.path.isdir(download_path) and os.path.normpath(download_path) == os.path.normpath(potential_actual_root):
logger.info(f"[GDRIVE] Using nested directory: {items_in_temp_parent[0]}")
source_content_root = potential_actual_root
elif not download_path or not os.path.isdir(download_path):
logger.info(f"[GDRIVE] Using nested directory (heuristic): {items_in_temp_parent[0]}")
source_content_root = potential_actual_root
logger.info(f"[GDRIVE] Moving contents from {source_content_root} to {target_dir_for_contents}")
files_moved = 0
for item_name in os.listdir(source_content_root):
s_item = os.path.join(source_content_root, item_name)
d_item = os.path.join(target_dir_for_contents, item_name)
if os.path.exists(d_item):
if os.path.isdir(d_item):
shutil.rmtree(d_item)
else:
os.remove(d_item)
if os.path.isdir(s_item):
shutil.move(s_item, d_item)
else:
shutil.move(s_item, d_item)
files_moved += 1
logger.info(f"[GDRIVE] Successfully moved {files_moved} items to {target_dir_for_contents}")
return True
except Exception as e:
logger.error(f"[GDRIVE] Unexpected error during download/processing: {e}", exc_info=True)
return False
finally:
if os.path.exists(temp_download_parent_dir):
try:
shutil.rmtree(temp_download_parent_dir)
logger.debug(f"[GDRIVE] Cleaned up temporary directory")
except Exception as e_del:
logger.warning(f"[GDRIVE] Could not remove temporary directory: {e_del}")