Spaces:
Running
Running
| import os | |
| import logging | |
| import re | |
| import shutil | |
| import tempfile | |
| import time | |
| from typing import Optional | |
| import zipfile | |
| import gdown | |
| from pypdf import PdfReader | |
| import docx as python_docx | |
| logger = logging.getLogger(__name__) | |
| def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]: | |
| logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}") | |
| text_content = None | |
| try: | |
| if file_type == 'pdf': | |
| reader = PdfReader(file_path) | |
| text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text()) | |
| logger.info(f"[TEXT_EXTRACTION] PDF extracted {len(reader.pages)} pages, {len(text_content)} characters") | |
| elif file_type == 'docx': | |
| doc = python_docx.Document(file_path) | |
| text_content = "\n".join(para.text for para in doc.paragraphs if para.text) | |
| logger.info(f"[TEXT_EXTRACTION] DOCX extracted {len(doc.paragraphs)} paragraphs, {len(text_content)} characters") | |
| elif file_type == 'txt': | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| text_content = f.read() | |
| logger.info(f"[TEXT_EXTRACTION] TXT extracted {len(text_content)} characters") | |
| else: | |
| logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type} for file {file_path}") | |
| return None | |
| if not text_content or not text_content.strip(): | |
| logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}") | |
| return None | |
| logger.info(f"[TEXT_EXTRACTION] Successfully extracted text from {file_path}") | |
| return text_content.strip() | |
| except Exception as e: | |
| logger.error(f"[TEXT_EXTRACTION] Error extracting text from {file_path} ({file_type.upper()}): {e}", exc_info=True) | |
| return None | |
| FAISS_RAG_SUPPORTED_EXTENSIONS = { | |
| 'pdf': lambda path: extract_text_from_file(path, 'pdf'), | |
| 'docx': lambda path: extract_text_from_file(path, 'docx'), | |
| 'txt': lambda path: extract_text_from_file(path, 'txt'), | |
| } | |
| def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]: | |
| if not url_or_id: | |
| return None | |
| match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id) | |
| if match_folder: | |
| return match_folder.group(1) | |
| match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id) | |
| if match_file_d: | |
| return match_file_d.group(1) | |
| match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id) | |
| if match_uc: | |
| return match_uc.group(1) | |
| if "/" not in url_or_id and "=" not in url_or_id and "." not in url_or_id and len(url_or_id) > 10: | |
| return url_or_id | |
| logger.warning(f"Could not reliably extract Google Drive ID from input: {url_or_id}") | |
| return None | |
| def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool: | |
| """ | |
| Downloads a single file from Google Drive to a specific path. | |
| """ | |
| logger.info(f"[GDRIVE_SINGLE_FILE] Attempting to download file. Input: {file_id_or_url}") | |
| file_id = get_id_from_gdrive_input(file_id_or_url) | |
| if not file_id: | |
| logger.error(f"[GDRIVE_SINGLE_FILE] Invalid Google Drive File ID or URL provided: {file_id_or_url}") | |
| return False | |
| try: | |
| # Ensure the target directory exists before downloading | |
| target_dir = os.path.dirname(target_path) | |
| os.makedirs(target_dir, exist_ok=True) | |
| logger.info(f"[GDRIVE_SINGLE_FILE] Downloading file ID: {file_id} to path: {target_path}") | |
| # Use gdown to download directly to the target file path, fuzzy=True helps with some permissions | |
| gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True) | |
| if not os.path.exists(target_path) or os.path.getsize(target_path) == 0: | |
| logger.error("[GDRIVE_SINGLE_FILE] Download failed or the resulting file is empty.") | |
| return False | |
| logger.info(f"[GDRIVE_SINGLE_FILE] Download successful.") | |
| return True | |
| except Exception as e: | |
| logger.error(f"[GDRIVE_SINGLE_FILE] An error occurred during download: {e}", exc_info=True) | |
| return False | |
| def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool: | |
| """ | |
| Downloads a single ZIP file from Google Drive and extracts its contents. | |
| """ | |
| logger.info(f"[GDRIVE_FILE] Attempting to download and extract ZIP from Google Drive. Input: {file_id_or_url}") | |
| file_id = get_id_from_gdrive_input(file_id_or_url) | |
| if not file_id: | |
| logger.error(f"[GDRIVE_FILE] Invalid Google Drive File ID or URL provided: {file_id_or_url}") | |
| return False | |
| temp_download_dir = tempfile.mkdtemp(prefix="gdrive_zip_") | |
| temp_zip_path = os.path.join(temp_download_dir, "downloaded_file.zip") | |
| try: | |
| logger.info(f"[GDRIVE_FILE] Downloading file ID: {file_id} to temporary path: {temp_zip_path}") | |
| gdown.download(id=file_id, output=temp_zip_path, quiet=False) | |
| if not os.path.exists(temp_zip_path) or os.path.getsize(temp_zip_path) == 0: | |
| logger.error("[GDRIVE_FILE] Download failed or the resulting file is empty.") | |
| return False | |
| logger.info(f"[GDRIVE_FILE] Download successful. Extracting ZIP to: {target_extraction_dir}") | |
| os.makedirs(target_extraction_dir, exist_ok=True) | |
| with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(target_extraction_dir) | |
| logger.info(f"[GDRIVE_FILE] Successfully extracted ZIP archive.") | |
| return True | |
| except Exception as e: | |
| logger.error(f"[GDRIVE_FILE] An error occurred during download or extraction: {e}", exc_info=True) | |
| return False | |
| finally: | |
| if os.path.exists(temp_download_dir): | |
| try: | |
| shutil.rmtree(temp_download_dir) | |
| logger.debug(f"[GDRIVE_FILE] Cleaned up temporary directory: {temp_download_dir}") | |
| except Exception as e_del: | |
| logger.warning(f"[GDRIVE_FILE] Could not remove temporary directory '{temp_download_dir}': {e_del}") | |
| def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool: | |
| logger.info(f"[GDRIVE] Attempting to download sources from Google Drive. Input: {folder_id_or_url}") | |
| folder_id = get_id_from_gdrive_input(folder_id_or_url) | |
| if not folder_id: | |
| logger.error(f"[GDRIVE] Invalid Google Drive Folder ID or URL provided: {folder_id_or_url}") | |
| return False | |
| temp_download_parent_dir = tempfile.mkdtemp(prefix="gdrive_parent_") | |
| download_path = None | |
| try: | |
| max_retries = 3 | |
| retry_delay_seconds = 10 | |
| last_gdown_exception = None | |
| for attempt in range(max_retries): | |
| logger.info(f"[GDRIVE] Attempt {attempt + 1} of {max_retries} to download folder ID: {folder_id}") | |
| try: | |
| start_time = time.time() | |
| download_path = gdown.download_folder(id=folder_id, output=temp_download_parent_dir, quiet=False, use_cookies=False) | |
| download_time = time.time() - start_time | |
| if download_path and os.path.exists(temp_download_parent_dir) and os.listdir(temp_download_parent_dir): | |
| logger.info(f"[GDRIVE] Successfully downloaded in {download_time:.2f}s. Path: {download_path}") | |
| last_gdown_exception = None | |
| break | |
| else: | |
| logger.warning(f"[GDRIVE] Attempt {attempt + 1} completed but directory is empty") | |
| if attempt < max_retries - 1: | |
| logger.info(f"[GDRIVE] Retrying in {retry_delay_seconds} seconds...") | |
| time.sleep(retry_delay_seconds) | |
| if os.path.exists(temp_download_parent_dir): shutil.rmtree(temp_download_parent_dir) | |
| os.makedirs(temp_download_parent_dir) | |
| else: | |
| raise Exception("gdown failed to populate the directory after multiple attempts.") | |
| except Exception as e: | |
| last_gdown_exception = e | |
| logger.warning(f"[GDRIVE] Attempt {attempt + 1} failed: {e}") | |
| if attempt < max_retries - 1: | |
| logger.info(f"[GDRIVE] Retrying in {retry_delay_seconds} seconds...") | |
| time.sleep(retry_delay_seconds) | |
| if os.path.exists(temp_download_parent_dir): shutil.rmtree(temp_download_parent_dir) | |
| os.makedirs(temp_download_parent_dir) | |
| else: | |
| logger.error(f"[GDRIVE] Failed after {max_retries} attempts. Last error: {e}", exc_info=True) | |
| return False | |
| if last_gdown_exception: | |
| logger.error(f"[GDRIVE] Failed after all retries. Last error: {last_gdown_exception}", exc_info=True) | |
| return False | |
| os.makedirs(target_dir_for_contents, exist_ok=True) | |
| items_in_temp_parent = os.listdir(temp_download_parent_dir) | |
| source_content_root = temp_download_parent_dir | |
| if len(items_in_temp_parent) == 1 and os.path.isdir(os.path.join(temp_download_parent_dir, items_in_temp_parent[0])): | |
| potential_actual_root = os.path.join(temp_download_parent_dir, items_in_temp_parent[0]) | |
| if download_path and os.path.isdir(download_path) and os.path.normpath(download_path) == os.path.normpath(potential_actual_root): | |
| logger.info(f"[GDRIVE] Using nested directory: {items_in_temp_parent[0]}") | |
| source_content_root = potential_actual_root | |
| elif not download_path or not os.path.isdir(download_path): | |
| logger.info(f"[GDRIVE] Using nested directory (heuristic): {items_in_temp_parent[0]}") | |
| source_content_root = potential_actual_root | |
| logger.info(f"[GDRIVE] Moving contents from {source_content_root} to {target_dir_for_contents}") | |
| files_moved = 0 | |
| for item_name in os.listdir(source_content_root): | |
| s_item = os.path.join(source_content_root, item_name) | |
| d_item = os.path.join(target_dir_for_contents, item_name) | |
| if os.path.exists(d_item): | |
| if os.path.isdir(d_item): | |
| shutil.rmtree(d_item) | |
| else: | |
| os.remove(d_item) | |
| if os.path.isdir(s_item): | |
| shutil.move(s_item, d_item) | |
| else: | |
| shutil.move(s_item, d_item) | |
| files_moved += 1 | |
| logger.info(f"[GDRIVE] Successfully moved {files_moved} items to {target_dir_for_contents}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"[GDRIVE] Unexpected error during download/processing: {e}", exc_info=True) | |
| return False | |
| finally: | |
| if os.path.exists(temp_download_parent_dir): | |
| try: | |
| shutil.rmtree(temp_download_parent_dir) | |
| logger.debug(f"[GDRIVE] Cleaned up temporary directory") | |
| except Exception as e_del: | |
| logger.warning(f"[GDRIVE] Could not remove temporary directory: {e_del}") |