Raagsan / app.py
iamismail's picture
Initial clean commit for Raagsan Space
439e1dd
#!/usr/bin/env python3
"""
Uses unified pipeline for both text and document processing
"""
import gradio as gr
import pandas as pd
from datetime import datetime
import os
from typing import List, Dict, Any, Tuple, Optional
import tempfile
import logging
import sys
import subprocess
import platform
from unified_pipeline import process_text_content, process_document_content
from scraper_common import scrape_news_async, set_scraping_cancelled, force_close_browser, scraping_cancelled
from auth import auth_manager
# --- Playwright bootstrap: install Chromium at runtime if missing ---
import os, glob, subprocess, pathlib
# Use the canonical path HF Spaces expect in root containers
import os, glob, subprocess
# Ensure path Playwright expects
import os
import subprocess
# Make sure Playwright knows where to install browsers (HF standard)
os.environ["PLAYWRIGHT_BROWSERS_PATH"] = "/root/.cache/ms-playwright"
# Ensure Chromium is installed at runtime
def ensure_chromium():
try:
subprocess.run(
["playwright", "install", "--with-deps", "chromium"],
check=True
)
except Exception as e:
print("Playwright install failed:", e)
ensure_chromium()
# Configure detailed logging for the app
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Global variables for cancellation
document_processing_cancelled = False
# Global variables for authentication
current_user = None
current_session = None
def clear_memory_state():
"""
Clear all memory state and global variables to free up memory
This function should be called before starting new processing operations
"""
global document_processing_cancelled
logger.info("🧹 Clearing memory state...")
# Reset cancellation flags
document_processing_cancelled = False
set_scraping_cancelled(False)
# Reset global PDF counter
from scraper_common import reset_global_pdf_count
reset_global_pdf_count()
# Clear timeout URLs set
from scraper_common import TIMEOUT_URLS
TIMEOUT_URLS.clear()
# Force close any open browser instances
try:
import asyncio
import threading
def close_browser_async():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(force_close_browser())
loop.close()
except Exception as e:
logger.debug(f"Browser already closed or error closing: {e}")
# Close browser in background thread to avoid blocking
browser_close_thread = threading.Thread(target=close_browser_async)
browser_close_thread.start()
except Exception as e:
logger.debug(f"Error closing browser during memory clear: {e}")
# Reset pipeline statistics if pipeline exists
try:
from unified_pipeline import get_pipeline
pipeline = get_pipeline()
if pipeline:
pipeline.reset_stats()
logger.debug("Pipeline statistics reset")
except Exception as e:
logger.debug(f"Error resetting pipeline stats: {e}")
# Force garbage collection
import gc
gc.collect()
logger.info("✅ Memory state cleared successfully")
# Authentication functions
def login_user(username: str, password: str) -> Tuple[bool, str]:
"""Login user and return (success, message)"""
global current_user, current_session
success, session_token = auth_manager.authenticate_user(username, password)
if success:
current_user = username
current_session = session_token
return True, f"Welcome, {username}!"
else:
return False, "Invalid username or password"
def logout_user() -> str:
"""Logout current user"""
global current_user, current_session
if current_session:
auth_manager.logout_user(current_session)
current_user = None
current_session = None
return "Logged out successfully"
def is_authenticated() -> bool:
"""Check if user is authenticated"""
global current_user, current_session
if not current_user or not current_session:
return False
# Validate session
valid, username = auth_manager.validate_session(current_session)
if not valid:
current_user = None
current_session = None
return False
return True
def get_current_user() -> Optional[str]:
"""Get current authenticated user"""
if is_authenticated():
return current_user
return None
def require_auth(func):
"""Decorator to require authentication for functions"""
def wrapper(*args, **kwargs):
if not is_authenticated():
return None, "Please login to access this feature"
return func(*args, **kwargs)
return wrapper
# Ensure archive directory exists
def ensure_archive_directory():
"""Ensure archive directory exists"""
archive_dir = "archive"
if not os.path.exists(archive_dir):
os.makedirs(archive_dir)
logger.info(f"📁 Created archive directory: {archive_dir}")
return archive_dir
def create_csv_download(df: pd.DataFrame, filename_prefix: str = "data") -> str:
"""
Create a CSV file from DataFrame and return the file path
"""
if df.empty:
# Create empty CSV with headers
empty_df = pd.DataFrame(columns=df.columns if not df.empty else ['#', 'title', 'content', 'summary', 'summary_somali', 'date', 'url'])
csv_content = empty_df.to_csv(index=False)
else:
csv_content = df.to_csv(index=False)
# Create temporary file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{filename_prefix}_{timestamp}.csv"
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, encoding='utf-8') as f:
f.write(csv_content)
temp_path = f.name
return temp_path
def save_csv_to_archive(df: pd.DataFrame, source: str, filename_prefix: str = "data") -> str:
"""
Save CSV file to archive folder organized by source + date
"""
# Create archive directory structure
today = datetime.now().strftime("%Y-%m-%d")
archive_dir = os.path.join("archive", source, today)
os.makedirs(archive_dir, exist_ok=True)
# Create filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{filename_prefix}_{timestamp}.csv"
csv_path = os.path.join(archive_dir, filename)
if df.empty:
# Create empty CSV with headers
empty_df = pd.DataFrame(columns=df.columns if not df.empty else ['#', 'title', 'content', 'summary', 'summary_somali', 'date', 'url'])
empty_df.to_csv(csv_path, index=False)
else:
df.to_csv(csv_path, index=False)
return csv_path
def create_text_content_tab():
"""
Create the text content tab interface
"""
with gr.Tab("Text Content"):
gr.Markdown("## Website Content Scraper")
gr.Markdown("Extract and analyze content from websites with AI-powered summarization.")
with gr.Group():
gr.Markdown("### Configuration")
with gr.Row():
url_input = gr.Textbox(
label="Website URL",
placeholder="https://example.com/article",
interactive=True,
scale=2
)
keywords_input = gr.Textbox(
label="Filter Keywords (optional)",
placeholder="e.g., flood, drought, conflict (comma-separated)",
interactive=True,
scale=2
)
with gr.Row():
start_date_input = gr.Textbox(
label="Start Date (optional)",
placeholder="YYYY-MM-DD (e.g., 2024-01-01)",
interactive=True,
scale=1,
info="Filter articles from this date onwards"
)
end_date_input = gr.Textbox(
label="End Date (optional)",
placeholder="YYYY-MM-DD (e.g., 2024-12-31)",
interactive=True,
scale=1,
info="Filter articles up to this date"
)
with gr.Row():
scrape_btn = gr.Button("Scrape Content", variant="primary")
cancel_btn = gr.Button("Cancel", variant="stop", interactive=True, value="Cancel")
clear_btn = gr.Button("Clear", variant="secondary")
# Status text
status_text = gr.Textbox(
label="Status",
value="Ready to scrape content...",
interactive=False,
visible=True
)
# Display area for scraped content
content_df = gr.Dataframe(
label="Scraped Content",
headers=["#", "Title", "Category", "Content", "Summary", "Summary (Somali)", "Date", "URL"],
datatype=["str", "str", "str", "str", "str", "str", "str", "str"],
interactive=True,
wrap=True
)
# Action buttons
with gr.Row():
download_btn = gr.DownloadButton(
label="📥 Download CSV",
variant="secondary",
visible=False
)
# Store full content data globally for modal access
full_content_store = gr.State([])
def process_and_display(url, custom_keywords="", start_date="", end_date=""):
"""Process URL and display results with progress updates"""
# Clear memory state before starting new processing
clear_memory_state()
# Clear captcha status
from scraper_common import clear_captcha_status
clear_captcha_status()
logger.info(f"🚀 Starting text content processing for URL: {url}")
logger.info(f"🔑 Custom keywords provided: {custom_keywords}")
logger.debug(f"📋 Processing parameters: URL={url.strip()}")
if not url.strip():
logger.warning("⚠️ Empty URL provided")
return pd.DataFrame(), None, "❌ Error: Please enter a valid URL", []
try:
import asyncio
import threading
import time
# Detect website type
from unified_pipeline import determine_website_type
website_type = determine_website_type(url.strip())
# Check cancellation
if scraping_cancelled():
logger.warning("⚠️ Operation cancelled before starting")
return pd.DataFrame(), None, "🛑 Operation cancelled by user", []
# Step 1: Start scraping
status_msg = f"📡 Step 1/4: Starting content extraction from {website_type}..."
yield pd.DataFrame(), None, status_msg, []
if scraping_cancelled():
logger.warning("⚠️ Operation cancelled by user before content extraction")
return pd.DataFrame(), None, "🛑 Operation cancelled by user", []
# Create a result container and status tracker
result_container = {
'df': None,
'full_content_data': None,
'error': None,
'completed': False,
'status': 'processing'
}
def run_async_processing():
"""Run the async processing in a separate thread"""
try:
result_container['status'] = 'scraping'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
df, full_content_data = loop.run_until_complete(process_text_content(url.strip(), custom_keywords, start_date.strip() if start_date else None, end_date.strip() if end_date else None))
result_container['df'] = df
result_container['full_content_data'] = full_content_data
result_container['status'] = 'completed'
result_container['completed'] = True
except Exception as e:
result_container['error'] = str(e)
result_container['status'] = 'error'
result_container['completed'] = True
finally:
loop.close()
# Start processing in a separate thread
processing_thread = threading.Thread(target=run_async_processing)
processing_thread.start()
# Monitor the processing and update status
status_step = 1
last_status_time = time.time()
while processing_thread.is_alive():
if scraping_cancelled():
logger.warning("⚠️ Operation cancelled during processing")
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(force_close_browser())
loop.close()
except Exception as e:
logger.error(f"Error closing browser: {e}")
return pd.DataFrame(), None, "🛑 Operation cancelled by user", []
# Check for captcha status and update UI
from scraper_common import get_captcha_status
captcha_status = get_captcha_status()
if captcha_status:
yield pd.DataFrame(), None, captcha_status, []
time.sleep(0.5) # Check every 500ms
continue
# Update status periodically during processing
current_time = time.time()
if current_time - last_status_time >= 2.0: # Update every 2 seconds
if status_step == 1:
status_msg = "🔄 Step 2/4: Extracting content from website..."
yield pd.DataFrame(), None, status_msg, []
status_step = 2
last_status_time = current_time
elif status_step == 2:
status_msg = "🤖 Step 3/4: Processing content with AI models..."
yield pd.DataFrame(), None, status_msg, []
status_step = 3
last_status_time = current_time
time.sleep(0.5) # Check every 500ms
# Get the result
if result_container['error']:
logger.error(f"❌ Error during processing: {result_container['error']}")
return pd.DataFrame(), None, f"❌ Error: {result_container['error']}", []
df = result_container['df']
full_content_data = result_container['full_content_data']
# Check cancellation after pipeline processing
if scraping_cancelled():
logger.warning("⚠️ Operation cancelled by user after content extraction")
return pd.DataFrame(), None, "🛑 Operation cancelled by user", []
# Step 4: Saving to archive
num_articles = len(df) if df is not None and not df.empty else 0
status_msg = f"💾 Step 4/4: Saving to archive... Found {num_articles} articles"
yield pd.DataFrame(), None, status_msg, []
if scraping_cancelled():
logger.warning("⚠️ Operation cancelled by user during archiving")
return pd.DataFrame(), None, "🛑 Operation cancelled by user", []
# Actually save to archive
if not df.empty:
try:
source = url.split('/')[2].replace('www.', '') if '://' in url else 'unknown'
archive_path = save_csv_to_archive(df, source, "scraped_content")
logger.info(f"📁 Saved to archive: {archive_path}")
except Exception as e:
logger.error(f"❌ Error saving to archive: {str(e)}")
csv_file = create_csv_download(df, "scraped_content") if not df.empty else None
# Final cancellation check
if scraping_cancelled():
logger.warning("⚠️ Operation cancelled by user before finalizing results")
return pd.DataFrame(), None, "🛑 Operation cancelled by user", []
# Processing complete
logger.info(f"✅ Processing complete! Found {len(df)} articles.")
final_status = f"✅ Processing complete! Found {len(df)} articles."
yield df, csv_file, final_status, full_content_data
except Exception as e:
# Processing complete
logger.error(f"❌ Error during text content processing: {str(e)}")
logger.debug(f"🔍 Error details: {type(e).__name__}: {str(e)}")
return pd.DataFrame(), None, f"Error: {str(e)}", []
def cancel_scraping():
"""Cancel the scraping operation"""
logger.warning("⚠️ User requested cancellation of scraping operation")
# Clear memory state when cancelling
clear_memory_state()
logger.info("🛑 Set cancellation flags")
# Force close browser asynchronously in a separate thread to avoid blocking
import threading
def close_browser_async():
import asyncio
try:
logger.info("🔧 Attempting to close browser...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(force_close_browser())
loop.close()
logger.info("✅ Browser closed successfully")
except Exception as e:
logger.error(f"❌ Error closing browser: {e}")
# Start browser closing in background
browser_close_thread = threading.Thread(target=close_browser_async)
browser_close_thread.start()
return "🛑 Cancellation requested - stopping operation..."
def clear_all():
"""Clear URL input, keywords input, date inputs, DataFrame, and download button"""
logger.info("🧹 User requested to clear all data")
# Clear memory state when manually clearing
clear_memory_state()
return "", "", "", "", pd.DataFrame(), None, "Ready to scrape content...", []
def update_download_visibility(df):
return gr.DownloadButton(visible=not df.empty)
scrape_btn.click(
fn=process_and_display,
inputs=[url_input, keywords_input, start_date_input, end_date_input],
outputs=[content_df, download_btn, status_text, full_content_store],
show_progress=True
)
cancel_btn.click(
fn=cancel_scraping,
outputs=[status_text]
)
clear_btn.click(
fn=clear_all,
outputs=[url_input, keywords_input, start_date_input, end_date_input, content_df, download_btn, status_text, full_content_store]
)
content_df.change(
fn=update_download_visibility,
inputs=[content_df],
outputs=[download_btn]
)
def create_document_content_tab():
"""
Create the document content tab interface
"""
with gr.Tab("Document Content"):
gr.Markdown("## Document Content Processor")
gr.Markdown("Extract and analyze content from PDF, DOC, and CSV documents with AI-powered processing.")
with gr.Group():
gr.Markdown("### Document Source")
with gr.Row():
doc_url_input = gr.Textbox(
label="Document URL",
placeholder="https://example.com/documents/",
interactive=True,
scale=2
)
with gr.Row():
doc_start_date_input = gr.Textbox(
label="Start Date (optional)",
placeholder="YYYY-MM-DD (e.g., 2024-01-01)",
interactive=True,
scale=1,
info="Filter documents from this date onwards"
)
doc_end_date_input = gr.Textbox(
label="End Date (optional)",
placeholder="YYYY-MM-DD (e.g., 2024-12-31)",
interactive=True,
scale=1,
info="Filter documents up to this date"
)
with gr.Row():
process_btn = gr.Button("Process Documents", variant="primary")
doc_cancel_btn = gr.Button("Cancel", variant="stop", interactive=True, value="Cancel")
doc_clear_btn = gr.Button("Clear", variant="secondary")
# Status text for documents
doc_status_text = gr.Textbox(
label="Status",
value="Ready to process documents...",
interactive=False,
visible=True
)
# Display area for document content
doc_df = gr.Dataframe(
label="Document Content",
headers=["Title", "Date", "Source", "File Path", "Extracted Text", "Summary", "Summary (Somali)", "File Type"],
datatype=["str", "str", "str", "str", "str", "str", "str", "str"],
interactive=True,
wrap=True
)
# Action buttons
with gr.Row():
doc_download_btn = gr.DownloadButton(
label="📥 Download CSV",
variant="secondary",
visible=False
)
def process_and_display_docs(url, start_date="", end_date=""):
"""Process documents and display results with progress updates"""
# Clear memory state before starting new processing
clear_memory_state()
# Clear captcha status
from scraper_common import clear_captcha_status
clear_captcha_status()
if not url.strip():
return pd.DataFrame(), None, "❌ Error: Please enter a valid URL"
try:
import asyncio
import threading
import time
# Detect website type
from unified_pipeline import determine_website_type
website_type = determine_website_type(url.strip())
# Check cancellation
if document_processing_cancelled:
return pd.DataFrame(), None, "🛑 Operation cancelled by user"
# Step 1: Start document extraction
status_msg = f"📄 Step 1/4: Starting document extraction from {website_type}..."
yield pd.DataFrame(), None, status_msg
if document_processing_cancelled:
return pd.DataFrame(), None, "🛑 Operation cancelled by user"
# Create a result container and status tracker
result_container = {
'df': None,
'error': None,
'completed': False,
'status': 'processing'
}
def run_async_processing():
"""Run the async processing in a separate thread"""
try:
result_container['status'] = 'extracting'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
df = loop.run_until_complete(process_document_content(url.strip(), start_date.strip() if start_date else None, end_date.strip() if end_date else None))
result_container['df'] = df
result_container['status'] = 'completed'
result_container['completed'] = True
except Exception as e:
result_container['error'] = str(e)
result_container['status'] = 'error'
result_container['completed'] = True
finally:
loop.close()
# Start processing in a separate thread
processing_thread = threading.Thread(target=run_async_processing)
processing_thread.start()
# Monitor the processing and update status
status_step = 1
last_status_time = time.time()
while processing_thread.is_alive():
if document_processing_cancelled:
logger.warning("⚠️ Document processing cancelled during processing")
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(force_close_browser())
loop.close()
except Exception as e:
logger.error(f"Error closing browser: {e}")
return pd.DataFrame(), None, "🛑 Operation cancelled by user"
# Check for captcha status and update UI
from scraper_common import get_captcha_status
captcha_status = get_captcha_status()
if captcha_status:
yield pd.DataFrame(), None, captcha_status
time.sleep(0.5) # Check every 500ms
continue
# Update status periodically during processing
current_time = time.time()
if current_time - last_status_time >= 2.0: # Update every 2 seconds
if status_step == 1:
status_msg = "🔄 Step 2/4: Extracting documents from website..."
yield pd.DataFrame(), None, status_msg
status_step = 2
last_status_time = current_time
elif status_step == 2:
status_msg = "🤖 Step 3/4: Processing documents with AI models..."
yield pd.DataFrame(), None, status_msg
status_step = 3
last_status_time = current_time
time.sleep(0.5) # Check every 500ms
# Get the result
if result_container['error']:
logger.error(f"❌ Error during document processing: {result_container['error']}")
return pd.DataFrame(), None, f"❌ Error: {result_container['error']}"
df = result_container['df']
# Check cancellation after pipeline processing
if document_processing_cancelled:
return pd.DataFrame(), None, "🛑 Operation cancelled by user"
# Step 4: Saving to archive
num_docs = len(df) if df is not None and not df.empty else 0
status_msg = f"💾 Step 4/4: Saving to archive... Found {num_docs} documents"
yield pd.DataFrame(), None, status_msg
if document_processing_cancelled:
return pd.DataFrame(), None, "🛑 Operation cancelled by user"
# Actually save to archive
if not df.empty:
try:
source = url.split('/')[2].replace('www.', '') if '://' in url else 'unknown'
archive_path = save_csv_to_archive(df, source, "document_content")
logger.info(f"📁 Saved to archive: {archive_path}")
except Exception as e:
logger.error(f"❌ Error saving to archive: {str(e)}")
csv_file = create_csv_download(df, "document_content") if not df.empty else None
# Final cancellation check
if document_processing_cancelled:
return pd.DataFrame(), None, "🛑 Operation cancelled by user"
# Processing complete
logger.info(f"✅ Document processing complete! Found {len(df)} documents.")
final_status = f"✅ Processing complete! Found {len(df)} documents."
yield df, csv_file, final_status
except Exception as e:
# Processing complete
return pd.DataFrame(), None, f"Error: {str(e)}"
def cancel_document_processing():
"""Cancel the document processing operation"""
logger.warning("⚠️ User requested cancellation of document processing")
# Clear memory state when cancelling
clear_memory_state()
# Force close browser asynchronously in a separate thread to avoid blocking
import threading
def close_browser_async():
import asyncio
try:
logger.info("🔧 Attempting to close browser...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(force_close_browser())
loop.close()
logger.info("✅ Browser closed successfully")
except Exception as e:
logger.error(f"❌ Error closing browser: {e}")
# Start browser closing in background
browser_close_thread = threading.Thread(target=close_browser_async)
browser_close_thread.start()
return "🛑 Document processing cancelled - stopping operation..."
def clear_doc_all():
"""Clear URL input, date inputs, DataFrame, and download button for document content"""
# Clear memory state when manually clearing
clear_memory_state()
return "", "", "", pd.DataFrame(), None, "Ready to process documents..."
process_btn.click(
fn=process_and_display_docs,
inputs=[doc_url_input, doc_start_date_input, doc_end_date_input],
outputs=[doc_df, doc_download_btn, doc_status_text],
show_progress=True
)
doc_cancel_btn.click(
fn=cancel_document_processing,
outputs=[doc_status_text]
)
doc_clear_btn.click(
fn=clear_doc_all,
outputs=[doc_url_input, doc_start_date_input, doc_end_date_input, doc_df, doc_download_btn, doc_status_text]
)
doc_df.change(
fn=lambda df: gr.DownloadButton(visible=not df.empty),
inputs=[doc_df],
outputs=[doc_download_btn]
)
def create_archive_tab():
"""
Create the archive access tab interface
"""
with gr.Tab("Archive Access"):
gr.Markdown("## Archived Files Access")
gr.Markdown("Browse, download, and manage previously processed files from the archive.")
# File Management Section
with gr.Row():
# CSV Files Column
with gr.Column(scale=1, elem_classes="admin-section"):
gr.Markdown("### CSV Files")
gr.Markdown("*Processed data files*")
with gr.Row():
refresh_csv_btn = gr.Button("Refresh CSV", variant="secondary", size="sm")
gr.Markdown("*Update CSV file list*")
csv_df = gr.Dataframe(
label="",
headers=["Source", "Date", "Filename", "Path"],
datatype=["str", "str", "str", "str"],
interactive=True,
wrap=False,
elem_id="csv_dataframe"
)
csv_selection = gr.Dropdown(
label="Select CSV File",
choices=[],
value=None,
interactive=True
)
# CSV Action buttons
with gr.Row():
open_csv_btn = gr.Button("Open", variant="secondary", size="sm")
delete_csv_btn = gr.Button("Delete", variant="stop", size="sm")
open_csv_folder_btn = gr.Button("Folder", variant="secondary", size="sm")
# PDF Files Column
with gr.Column(scale=1, elem_classes="admin-section"):
gr.Markdown("### PDF Files")
gr.Markdown("*Downloaded documents*")
with gr.Row():
refresh_pdf_btn = gr.Button("Refresh PDF", variant="secondary", size="sm")
gr.Markdown("*Update PDF file list*")
pdf_df = gr.Dataframe(
label="",
headers=["Source", "Date", "Filename", "Size", "Path"],
datatype=["str", "str", "str", "str", "str"],
interactive=True,
wrap=False,
elem_id="pdf_dataframe"
)
pdf_selection = gr.Dropdown(
label="Select PDF File",
choices=[],
value=None,
interactive=True
)
# PDF Action buttons
with gr.Row():
open_pdf_btn = gr.Button("Open", variant="secondary", size="sm")
delete_pdf_btn = gr.Button("Delete", variant="stop", size="sm")
open_pdf_folder_btn = gr.Button("Folder", variant="secondary", size="sm")
# Status section
with gr.Column(elem_classes="admin-section"):
status_text = gr.Textbox(
label="Status",
interactive=False,
value="Ready to access archived files...",
lines=2
)
# Archive functions
def get_archived_csv_files():
"""Get list of archived CSV files"""
archive_dir = ensure_archive_directory()
csv_files = []
if os.path.exists(archive_dir):
for source in os.listdir(archive_dir):
source_path = os.path.join(archive_dir, source)
if os.path.isdir(source_path):
for date in os.listdir(source_path):
date_path = os.path.join(source_path, date)
if os.path.isdir(date_path):
for file in os.listdir(date_path):
if file.endswith('.csv'):
file_path = os.path.join(date_path, file)
file_size = os.path.getsize(file_path)
csv_files.append({
'source': source,
'date': date,
'filename': file,
'path': file_path,
'size': f"{file_size / 1024:.2f} KB"
})
return sorted(csv_files, key=lambda x: (x['source'], x['date'], x['filename']), reverse=True)
def get_archived_pdf_files():
"""Get list of archived PDF files"""
archive_dir = ensure_archive_directory()
pdf_files = []
if os.path.exists(archive_dir):
for source in os.listdir(archive_dir):
source_path = os.path.join(archive_dir, source)
if os.path.isdir(source_path):
for date in os.listdir(source_path):
date_path = os.path.join(source_path, date)
if os.path.isdir(date_path):
# Check main date folder
for file in os.listdir(date_path):
if file.endswith('.pdf'):
file_path = os.path.join(date_path, file)
file_size = os.path.getsize(file_path)
pdf_files.append({
'source': source,
'date': date,
'filename': file,
'path': file_path,
'size': f"{file_size / 1024 / 1024:.2f} MB"
})
# Check pdf subfolder
pdf_folder = os.path.join(date_path, "pdf")
if os.path.exists(pdf_folder):
for file in os.listdir(pdf_folder):
if file.endswith('.pdf'):
file_path = os.path.join(pdf_folder, file)
file_size = os.path.getsize(file_path)
pdf_files.append({
'source': source,
'date': date,
'filename': file,
'path': file_path,
'size': f"{file_size / 1024 / 1024:.2f} MB"
})
return sorted(pdf_files, key=lambda x: (x['source'], x['date'], x['filename']), reverse=True)
def refresh_csv_files():
"""Refresh CSV files list"""
csv_files = get_archived_csv_files()
if csv_files:
display_data = [
{
'Source': item['source'],
'Date': item['date'],
'Filename': item['filename'],
'Path': item['path']
}
for item in csv_files
]
df = pd.DataFrame(display_data)
choices = [f"{item['source']} | {item['date']} | {item['filename']}" for item in csv_files]
default_choice = choices[0] if choices else None
return df, f"Found {len(csv_files)} CSV files. Select a file below and click 'Open Selected CSV'.", gr.update(choices=choices, value=default_choice)
else:
return pd.DataFrame(), "No CSV files found in the archive.", gr.update(choices=[], value=None)
def refresh_pdf_files():
"""Refresh PDF files list"""
pdf_files = get_archived_pdf_files()
if pdf_files:
display_data = [
{
'Source': item['source'],
'Date': item['date'],
'Filename': item['filename'],
'Size': item['size'],
'Path': item['path']
}
for item in pdf_files
]
df = pd.DataFrame(display_data)
choices = [f"{item['source']} | {item['date']} | {item['filename']}" for item in pdf_files]
default_choice = choices[0] if choices else None
return df, f"Found {len(pdf_files)} PDF files. Select a file below and click 'Open Selected PDF'.", gr.update(choices=choices, value=default_choice)
else:
return pd.DataFrame(), "No PDF files found in the archive.", gr.update(choices=[], value=None)
def open_selected_csv(selected_option):
"""Open the selected CSV file"""
try:
if not selected_option:
return "Please choose a CSV from the dropdown before clicking 'Open'."
try:
source, date, filename = [part.strip() for part in selected_option.split("|")]
except ValueError:
return "Invalid selection format. Please refresh the list and try again."
for item in get_archived_csv_files():
if item['source'] == source and item['date'] == date and item['filename'] == filename:
file_path = item['path']
if os.path.exists(file_path):
return open_csv_file(file_path)
return f"Cannot open file: {file_path}. File does not exist."
return "Selected file not found. Please refresh the list."
except Exception as e:
return f"Error opening CSV file: {str(e)}"
def open_selected_pdf(selected_option):
"""Open the selected PDF file"""
try:
if not selected_option:
return "Please choose a PDF from the dropdown before clicking 'Open'."
try:
source, date, filename = [part.strip() for part in selected_option.split("|")]
except ValueError:
return "Invalid selection format. Please refresh the list and try again."
for item in get_archived_pdf_files():
if item['source'] == source and item['date'] == date and item['filename'] == filename:
file_path = item['path']
if os.path.exists(file_path):
return open_pdf_file(file_path)
return f"Cannot open file: {file_path}. File does not exist."
return "Selected file not found. Please refresh the list."
except Exception as e:
return f"Error opening PDF file: {str(e)}"
def open_csv_file(file_path: str):
"""Open a CSV file with the default application"""
try:
abs_path = os.path.abspath(file_path)
# Open file based on operating system
if platform.system() == "Windows":
subprocess.run(["start", "", abs_path], check=True, shell=True)
elif platform.system() == "Darwin": # macOS
subprocess.run(["open", abs_path], check=True)
else: # Linux
subprocess.run(["xdg-open", abs_path], check=True)
return f"Opened CSV file: {abs_path}"
except Exception as e:
return f"Error opening CSV file: {str(e)}"
def open_pdf_file(file_path: str):
"""Open a PDF file with the default application"""
try:
abs_path = os.path.abspath(file_path)
# Open file based on operating system
if platform.system() == "Windows":
subprocess.run(["start", "", abs_path], check=True, shell=True)
elif platform.system() == "Darwin": # macOS
subprocess.run(["open", abs_path], check=True)
else: # Linux
subprocess.run(["xdg-open", abs_path], check=True)
return f"Opened PDF file: {abs_path}"
except Exception as e:
return f"Error opening PDF file: {str(e)}"
def delete_selected_csv(selected_option):
"""Delete the selected CSV file"""
try:
if not selected_option:
return "Please choose a CSV from the dropdown before clicking 'Delete'."
try:
source, date, filename = [part.strip() for part in selected_option.split("|")]
except ValueError:
return "Invalid selection format. Please refresh the list and try again."
for item in get_archived_csv_files():
if item['source'] == source and item['date'] == date and item['filename'] == filename:
file_path = item['path']
if os.path.exists(file_path):
os.remove(file_path)
return f"Successfully deleted CSV file: {filename}"
return f"Cannot delete file: {file_path}. File does not exist."
return "Selected file not found. Please refresh the list."
except Exception as e:
return f"Error deleting CSV file: {str(e)}"
def delete_selected_pdf(selected_option):
"""Delete the selected PDF file"""
try:
if not selected_option:
return "Please choose a PDF from the dropdown before clicking 'Delete'."
try:
source, date, filename = [part.strip() for part in selected_option.split("|")]
except ValueError:
return "Invalid selection format. Please refresh the list and try again."
for item in get_archived_pdf_files():
if item['source'] == source and item['date'] == date and item['filename'] == filename:
file_path = item['path']
if os.path.exists(file_path):
os.remove(file_path)
return f"Successfully deleted PDF file: {filename}"
return f"Cannot delete file: {file_path}. File does not exist."
return "Selected file not found. Please refresh the list."
except Exception as e:
return f"Error deleting PDF file: {str(e)}"
def open_csv_folder():
"""Open the CSV archive folder"""
archive_dir = os.path.abspath("archive")
try:
if platform.system() == "Windows":
subprocess.run(["explorer", archive_dir], check=True)
elif platform.system() == "Darwin": # macOS
subprocess.run(["open", archive_dir], check=True)
else: # Linux
subprocess.run(["xdg-open", archive_dir], check=True)
return f"Opened archive folder: {archive_dir}"
except Exception as e:
return f"Error opening folder: {str(e)}"
def open_pdf_folder():
"""Open the PDF archive folder"""
archive_dir = os.path.abspath("archive")
try:
if platform.system() == "Windows":
subprocess.run(["explorer", archive_dir], check=True)
elif platform.system() == "Darwin": # macOS
subprocess.run(["open", archive_dir], check=True)
else: # Linux
subprocess.run(["xdg-open", archive_dir], check=True)
return f"Opened archive folder: {archive_dir}"
except Exception as e:
return f"Error opening folder: {str(e)}"
refresh_csv_btn.click(
fn=refresh_csv_files,
outputs=[csv_df, status_text, csv_selection]
)
refresh_pdf_btn.click(
fn=refresh_pdf_files,
outputs=[pdf_df, status_text, pdf_selection]
)
open_csv_btn.click(
fn=open_selected_csv,
inputs=[csv_selection],
outputs=[status_text]
)
open_pdf_btn.click(
fn=open_selected_pdf,
inputs=[pdf_selection],
outputs=[status_text]
)
delete_csv_btn.click(
fn=delete_selected_csv,
inputs=[csv_selection],
outputs=[status_text]
)
delete_pdf_btn.click(
fn=delete_selected_pdf,
inputs=[pdf_selection],
outputs=[status_text]
)
open_csv_folder_btn.click(
fn=open_csv_folder,
outputs=[status_text]
)
open_pdf_folder_btn.click(
fn=open_pdf_folder,
outputs=[status_text]
)
def create_keywords_management_tab():
"""
Create the keywords management tab interface
"""
with gr.Tab("Keywords Management"):
# Header section
gr.Markdown("## Keywords Configuration")
gr.Markdown("Manage keyword categories for intelligent article filtering and categorization.")
# Load current keywords configuration
def load_keywords_config():
"""Load current keywords configuration"""
try:
from keyword_filter import load_keywords_config
categories = load_keywords_config()
return categories if categories else {}
except Exception as e:
logger.error(f"Error loading keywords config: {str(e)}")
return {}
def get_category_list():
"""Get list of categories for dropdown"""
categories = load_keywords_config()
return list(categories.keys()) if categories else []
def get_keywords_for_category(category):
"""Get keywords for a specific category"""
categories = load_keywords_config()
if category and category in categories:
return ", ".join(categories[category])
return ""
def add_new_category(category_name, keywords_text):
"""Add a new category with keywords"""
try:
from keyword_filter import load_keywords_config, save_keywords_config
if not category_name.strip():
return "❌ Category name cannot be empty", gr.update(), gr.update()
# Load current config
categories = load_keywords_config()
if not categories:
categories = {}
# Parse keywords
keywords = [kw.strip() for kw in keywords_text.split(",") if kw.strip()]
if not keywords:
return "❌ Please provide at least one keyword", gr.update(), gr.update()
# Add new category
categories[category_name.strip()] = keywords
# Save configuration
config_data = {"categories": categories}
success, message = save_keywords_config(config_data)
if success:
return f"✅ {message}", gr.update(choices=get_category_list(), value=category_name.strip()), gr.update()
else:
return f"❌ {message}", gr.update(), gr.update()
except Exception as e:
logger.error(f"Error adding category: {str(e)}")
return f"❌ Error adding category: {str(e)}", gr.update(), gr.update()
def update_category_keywords(category, keywords_text):
"""Update keywords for a category"""
try:
from keyword_filter import load_keywords_config, save_keywords_config
if not category:
return "❌ Please select a category", gr.update()
# Load current config
categories = load_keywords_config()
if not categories:
return "❌ No categories found", gr.update()
# Parse keywords
keywords = [kw.strip() for kw in keywords_text.split(",") if kw.strip()]
if not keywords:
return "❌ Please provide at least one keyword", gr.update()
# Update category
categories[category] = keywords
# Save configuration
config_data = {"categories": categories}
success, message = save_keywords_config(config_data)
if success:
return f"✅ {message}"
else:
return f"❌ {message}"
except Exception as e:
logger.error(f"Error updating category: {str(e)}")
return f"❌ Error updating category: {str(e)}"
def delete_category(category):
"""Delete a category"""
try:
from keyword_filter import load_keywords_config, save_keywords_config
if not category:
return "❌ Please select a category to delete", gr.update(), gr.update()
# Load current config
categories = load_keywords_config()
if not categories:
return "❌ No categories found", gr.update(), gr.update()
# Remove category
if category in categories:
del categories[category]
# Save configuration
config_data = {"categories": categories}
success, message = save_keywords_config(config_data)
if success:
new_choices = get_category_list()
return f"✅ Category '{category}' deleted successfully", gr.update(choices=new_choices, value=None), gr.update()
else:
return f"❌ {message}", gr.update(), gr.update()
else:
return f"❌ Category '{category}' not found", gr.update(), gr.update()
except Exception as e:
logger.error(f"Error deleting category: {str(e)}")
return f"❌ Error deleting category: {str(e)}", gr.update(), gr.update()
# Initialize with current categories
initial_categories = get_category_list()
# Create two-column layout
with gr.Row():
# Left column - Add new category
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### Add New Category")
gr.Markdown("*Create a new keyword category for article filtering*")
new_category_name = gr.Textbox(
label="Category Name",
placeholder="e.g., Health / Epidemics",
interactive=True,
info="Enter a descriptive name for the category"
)
new_category_keywords = gr.Textbox(
label="Keywords (comma-separated)",
placeholder="e.g., cholera, malaria, covid, outbreak",
lines=4,
interactive=True,
info="Enter keywords separated by commas."
)
add_category_btn = gr.Button("Add Category", variant="primary", size="lg")
# Right column - Edit existing category
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### Edit Existing Category")
gr.Markdown("*Modify or delete existing keyword categories*")
category_dropdown = gr.Dropdown(
label="Select Category",
choices=initial_categories,
interactive=True,
value=initial_categories[0] if initial_categories else None,
info="Choose a category to edit or delete"
)
category_keywords = gr.Textbox(
label="Keywords (comma-separated)",
placeholder="Enter keywords separated by commas",
lines=4,
interactive=True,
info="Edit the keywords for the selected category"
)
with gr.Row():
update_btn = gr.Button("Update Keywords", variant="primary")
delete_btn = gr.Button("Delete Category", variant="stop")
# Status section
gr.Markdown("---")
status_display = gr.Textbox(
label="Status",
value="Ready to manage keywords...",
interactive=False,
visible=True,
info="Status messages will appear here"
)
# Event handlers
add_category_btn.click(
fn=add_new_category,
inputs=[new_category_name, new_category_keywords],
outputs=[status_display, category_dropdown, category_keywords]
)
category_dropdown.change(
fn=get_keywords_for_category,
inputs=[category_dropdown],
outputs=[category_keywords]
)
update_btn.click(
fn=update_category_keywords,
inputs=[category_dropdown, category_keywords],
outputs=[status_display]
)
delete_btn.click(
fn=delete_category,
inputs=[category_dropdown],
outputs=[status_display, category_dropdown, category_keywords]
)
def create_admin_tab():
"""
Create the admin panel tab interface
"""
with gr.Tab("Admin Panel") as admin_tab:
gr.Markdown("## Admin Panel")
gr.Markdown("Manage user accounts, permissions, and system settings.")
# Create two main columns for better organization
with gr.Row():
# Left column - User Management (only visible to admins)
with gr.Column(scale=1, elem_classes="admin-section", visible=True) as admin_user_section:
with gr.Group(elem_classes="admin-group"):
gr.Markdown("### Add New User")
gr.Markdown("*Create new user accounts*")
new_username = gr.Textbox(
label="Username",
placeholder="Enter username",
interactive=True
)
new_password = gr.Textbox(
label="Password",
placeholder="Enter password",
type="password",
interactive=True
)
is_admin = gr.Checkbox(
label="Grant admin privileges",
value=False,
interactive=True
)
add_user_btn = gr.Button("Add User", variant="primary", size="sm", elem_classes="admin-button")
# Right column - Password Management
with gr.Column(scale=1, elem_classes="admin-section"):
with gr.Group(elem_classes="admin-group"):
gr.Markdown("### Change Password")
gr.Markdown("*Update your account password*")
change_old_password = gr.Textbox(
label="Current Password",
placeholder="Enter current password",
type="password",
interactive=True
)
change_new_password = gr.Textbox(
label="New Password",
placeholder="Enter new password",
type="password",
interactive=True
)
change_password_btn = gr.Button("Change Password", variant="secondary", size="sm", elem_classes="admin-button")
# User List Section - Full width
with gr.Column(elem_classes="admin-section"):
with gr.Group(elem_classes="admin-group"):
gr.Markdown("### System Users")
gr.Markdown("*View all registered users*")
with gr.Row():
refresh_users_btn = gr.Button("Refresh", variant="secondary", size="sm", elem_classes="admin-button")
users_df = gr.Dataframe(
label="",
headers=["Username", "Admin", "Created", "Last Login"],
datatype=["str", "str", "str", "str"],
interactive=False,
wrap=True
)
# Status messages - Compact
admin_status = gr.Textbox(
label="Status",
value="Ready - Use the controls above to manage users",
interactive=False,
lines=2
)
def handle_add_user(username, password, admin_check):
"""Handle adding new user (admin only)"""
if not is_authenticated() or not auth_manager.is_admin(get_current_user()):
return "❌ Access denied - Admin privileges required", pd.DataFrame(), gr.update(value=""), gr.update(value=""), gr.update(value=False), gr.update(visible=False)
if not username or not password:
return "❌ Please enter both username and password", pd.DataFrame(), gr.update(value=""), gr.update(value=""), gr.update(value=False), gr.update(visible=True)
success = auth_manager.add_user(username, password, admin_check)
if success:
# Refresh user list
users = auth_manager.list_users()
user_data = []
for user, info in users.items():
user_data.append({
'Username': user,
'Admin': 'Yes' if info.get('is_admin', False) else 'No',
'Created': info.get('created_at', 'Unknown'),
'Last Login': info.get('last_login', 'Never')
})
df = pd.DataFrame(user_data)
return f"✅ User '{username}' added successfully", df, gr.update(value=""), gr.update(value=""), gr.update(value=False), gr.update(visible=True)
else:
return f"❌ Failed to add user '{username}' (user may already exist)", pd.DataFrame(), gr.update(value=""), gr.update(value=""), gr.update(value=False), gr.update(visible=True)
def handle_change_password(old_password, new_password):
"""Handle password change"""
if not is_authenticated():
return "❌ Please login first", gr.update(value=""), gr.update(value="")
if not old_password or not new_password:
return "❌ Please enter both current and new password", gr.update(value=""), gr.update(value="")
success = auth_manager.change_password(get_current_user(), old_password, new_password)
if success:
return "✅ Password changed successfully", gr.update(value=""), gr.update(value="")
else:
return "❌ Failed to change password (check current password)", gr.update(value=""), gr.update(value="")
def refresh_users():
"""Refresh the user list"""
if not is_authenticated() or not auth_manager.is_admin(get_current_user()):
return pd.DataFrame(), "❌ Access denied - Admin privileges required"
users = auth_manager.list_users()
user_data = []
for user, info in users.items():
user_data.append({
'Username': user,
'Admin': 'Yes' if info.get('is_admin', False) else 'No',
'Created': info.get('created_at', 'Unknown'),
'Last Login': info.get('last_login', 'Never')
})
df = pd.DataFrame(user_data)
return df, f"✅ User list refreshed - {len(users)} users found"
def refresh_users_and_check_admin():
"""Refresh users and check admin status"""
# Check admin status
is_admin = is_authenticated() and auth_manager.is_admin(get_current_user())
# Get users if admin
if is_admin:
users = auth_manager.list_users()
user_data = []
for user, info in users.items():
user_data.append({
'Username': user,
'Admin': 'Yes' if info.get('is_admin', False) else 'No',
'Created': info.get('created_at', 'Unknown'),
'Last Login': info.get('last_login', 'Never')
})
df = pd.DataFrame(user_data)
return df, f"✅ User list refreshed - {len(users)} users found", gr.update(visible=True)
else:
return pd.DataFrame(), "❌ Access denied - Admin privileges required", gr.update(visible=False)
def check_admin_status():
"""Check if current user is admin and show/hide admin user section"""
if is_authenticated() and auth_manager.is_admin(get_current_user()):
return gr.update(visible=True)
else:
return gr.update(visible=False)
def initialize_admin_panel():
"""Initialize admin panel when tab loads"""
if is_authenticated() and auth_manager.is_admin(get_current_user()):
# Get users list for admin
users = auth_manager.list_users()
user_data = []
for user, info in users.items():
user_data.append({
'Username': user,
'Admin': 'Yes' if info.get('is_admin', False) else 'No',
'Created': info.get('created_at', 'Unknown'),
'Last Login': info.get('last_login', 'Never')
})
df = pd.DataFrame(user_data)
return df, f"✅ Admin panel loaded - {len(users)} users found", gr.update(visible=True)
else:
return pd.DataFrame(), "❌ Access denied - Admin privileges required", gr.update(visible=False)
# Event handlers
add_user_btn.click(
fn=handle_add_user,
inputs=[new_username, new_password, is_admin],
outputs=[admin_status, users_df, new_username, new_password, is_admin, admin_user_section]
)
change_password_btn.click(
fn=handle_change_password,
inputs=[change_old_password, change_new_password],
outputs=[admin_status, change_old_password, change_new_password]
)
refresh_users_btn.click(
fn=refresh_users_and_check_admin,
outputs=[users_df, admin_status, admin_user_section]
)
# Initialize admin panel when tab loads
admin_tab.select(
fn=initialize_admin_panel,
outputs=[users_df, admin_status, admin_user_section]
)
def create_website_config_tab():
"""
Create the website configuration management tab interface
"""
with gr.Tab("Website Config"):
# Header section
gr.Markdown("## Website Configuration Management")
gr.Markdown("Configure and manage CSS selectors for website scraping. Customize how content is extracted.")
# Load current website configuration
def load_website_config():
"""Load current website configuration"""
try:
from scraper_common import load_website_config
config = load_website_config()
return config if config else {}
except Exception as e:
logger.error(f"Error loading website config: {str(e)}")
return {}
def get_website_list():
"""Get list of website types for dropdown"""
config = load_website_config()
return list(config.keys()) if config else []
def get_config_for_website(website_type, current_state=None):
"""Get configuration for a specific website"""
# Use state if available, otherwise load from file
config = current_state if current_state else load_website_config()
if website_type and website_type in config:
website_config = config[website_type]
# Convert arrays to comma-separated strings for display
pdf_links = website_config.get('pdf_links', [])
if isinstance(pdf_links, list):
pdf_links_str = ", ".join(pdf_links) if pdf_links else ""
else:
pdf_links_str = str(pdf_links) if pdf_links else ""
file_links = website_config.get('file_links', [])
if isinstance(file_links, list):
file_links_str = ", ".join(file_links) if file_links else ""
else:
file_links_str = str(file_links) if file_links else ""
# Handle content field (can be string or array)
content = website_config.get('content', '')
if isinstance(content, list):
content_str = ", ".join(content) if content else ""
else:
content_str = str(content) if content else ""
return (
website_config.get('base_url', '') or '',
website_config.get('article_links', '') or '',
website_config.get('page_links', '') or '',
website_config.get('title', '') or '',
content_str,
website_config.get('date', '') or '',
website_config.get('navigation_selector', '') or '',
website_config.get('navigation_url_addition', '') or '',
str(website_config.get('start_page', 0)) if website_config.get('start_page') is not None else '0',
pdf_links_str,
file_links_str,
website_config.get('recaptcha_text', '') or ''
)
return ('', '', '', '', '', '', '', '', '0', '', '', '')
# Initialize with current websites
initial_websites = get_website_list()
# Create layout with three sections
with gr.Row():
# Left column - Website selection and actions
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### Select Website")
gr.Markdown("*Choose a website to edit or delete*")
website_dropdown = gr.Dropdown(
label="Website Type",
choices=initial_websites,
interactive=True,
value=initial_websites[0] if initial_websites else None,
info="Select a website configuration to edit"
)
with gr.Row():
delete_website_btn = gr.Button("Delete Website", variant="stop")
with gr.Group():
gr.Markdown("### Add New Website")
gr.Markdown("*Create a new website configuration*")
new_website_type = gr.Textbox(
label="Website Type Name",
placeholder="e.g., newsite",
interactive=True,
info="Enter a unique identifier (no spaces)"
)
add_website_btn = gr.Button("Add New Website", variant="primary")
# Right column - Configuration form
with gr.Column(scale=2):
gr.Markdown("### Configuration Fields")
gr.Markdown("*Edit the configuration fields below*")
# Required fields
with gr.Group():
gr.Markdown("**Required Fields**")
base_url_field = gr.Textbox(
label="Base URL",
placeholder="e.g., https://example.com",
interactive=True,
info="Base URL of the website (required)"
)
title_field = gr.Textbox(
label="Title Selector",
placeholder="e.g., h1, .title, #article-title",
interactive=True,
info="CSS selector for article title (required)"
)
content_field = gr.Textbox(
label="Content Selector",
placeholder="e.g., .content, p, #main-body",
interactive=True,
lines=2,
info="CSS selector for article content (required). For multiple selectors, use comma-separated values."
)
# Optional fields
with gr.Group():
gr.Markdown("**Optional Fields**")
article_links_field = gr.Textbox(
label="Article Links Selector",
placeholder="e.g., .article-link a, h2 a",
interactive=True,
info="CSS selector for article links on listing pages"
)
page_links_field = gr.Textbox(
label="Page Links Selector",
placeholder="e.g., .page-link a",
interactive=True,
info="CSS selector for page links (for document sites)"
)
date_field = gr.Textbox(
label="Date Selector",
placeholder="e.g., .date, time, .published",
interactive=True,
info="CSS selector for publication date"
)
navigation_selector_field = gr.Textbox(
label="Navigation Selector",
placeholder="e.g., .pagination, .nav-links",
interactive=True,
info="CSS selector for pagination navigation"
)
navigation_url_addition_field = gr.Textbox(
label="Navigation URL Addition",
placeholder="e.g., ?page={page_no}, /page/{page_no}/",
interactive=True,
info="URL pattern for pagination (use {page_no} as placeholder)"
)
start_page_field = gr.Textbox(
label="Start Page",
placeholder="0 or 1",
interactive=True,
value="0",
info="Starting page number (0 or 1)"
)
pdf_links_field = gr.Textbox(
label="PDF Links Selectors",
placeholder="e.g., a[href$='.pdf'], .pdf-link",
interactive=True,
lines=2,
info="CSS selectors for PDF links (comma-separated for multiple)"
)
file_links_field = gr.Textbox(
label="File Links Selectors",
placeholder="e.g., a[href$='.csv'], .file-link",
interactive=True,
lines=2,
info="CSS selectors for file links (comma-separated for multiple)"
)
recaptcha_text_field = gr.Textbox(
label="Recaptcha Text",
placeholder="e.g., Let's confirm you are human",
interactive=True,
info="Text to look for when recaptcha is present"
)
with gr.Row():
update_website_btn = gr.Button("Update Website", variant="primary")
save_all_btn = gr.Button("Save All Changes", variant="primary")
cancel_btn = gr.Button("Cancel", variant="secondary")
# State to track unsaved changes
unsaved_config_state = gr.State(value={})
def add_new_website(website_type, base_url, article_links, page_links, title, content, date,
navigation_selector, navigation_url_addition, start_page,
pdf_links, file_links, recaptcha_text, current_state):
"""Add a new website configuration"""
try:
if not website_type or not website_type.strip():
return gr.update(), gr.update()
website_type = website_type.strip()
# Validate website type name
if ' ' in website_type:
return gr.update(), gr.update()
# Load current config or use state
config = current_state if current_state else load_website_config()
if not config:
config = {}
# Check if website already exists
if website_type in config:
return gr.update(), gr.update()
# Validate required fields
if not title and not content:
return gr.update(), gr.update()
if not base_url or not base_url.strip():
return gr.update(), gr.update()
# Build config object
new_config = {}
# Add base_url (required)
new_config['base_url'] = base_url.strip()
# Add fields if provided
if article_links.strip():
new_config['article_links'] = article_links.strip()
if page_links.strip():
new_config['page_links'] = page_links.strip()
if title.strip():
new_config['title'] = title.strip()
if content.strip():
# Check if content is comma-separated (multiple selectors)
content_vals = [c.strip() for c in content.split(',') if c.strip()]
if len(content_vals) > 1:
new_config['content'] = content_vals
else:
new_config['content'] = content.strip()
if date.strip():
new_config['date'] = date.strip()
if navigation_selector.strip():
new_config['navigation_selector'] = navigation_selector.strip()
else:
new_config['navigation_selector'] = None
if navigation_url_addition.strip():
new_config['navigation_url_addition'] = navigation_url_addition.strip()
else:
new_config['navigation_url_addition'] = None
if start_page.strip():
try:
new_config['start_page'] = int(start_page.strip())
except ValueError:
return gr.update(), gr.update()
else:
new_config['start_page'] = 0
# Handle array fields
if pdf_links.strip():
pdf_list = [p.strip() for p in pdf_links.split(',') if p.strip()]
new_config['pdf_links'] = pdf_list
if file_links.strip():
file_list = [f.strip() for f in file_links.split(',') if f.strip()]
new_config['file_links'] = file_list
if recaptcha_text.strip():
new_config['recaptcha_text'] = recaptcha_text.strip()
# Add to config
config[website_type] = new_config
# Store in state (not saved yet)
website_list = list(config.keys())
return (gr.update(choices=website_list, value=website_type),
config)
except Exception as e:
logger.error(f"Error adding website: {str(e)}")
return gr.update(), gr.update()
def update_website(website_type, base_url, article_links, page_links, title, content, date,
navigation_selector, navigation_url_addition, start_page,
pdf_links, file_links, recaptcha_text, current_state):
"""Update an existing website configuration"""
try:
if not website_type:
return gr.update()
# Load current config or use state
config = current_state if current_state else load_website_config()
if not config:
config = {}
if website_type not in config:
return gr.update()
# Validate required fields
if not title and not content:
return gr.update()
if not base_url or not base_url.strip():
return gr.update()
# Start with existing config to preserve fields
existing_config = config.get(website_type, {})
updated_config = existing_config.copy()
# Update base_url (required)
updated_config['base_url'] = base_url.strip()
# Update fields if provided
if article_links.strip():
updated_config['article_links'] = article_links.strip()
elif 'article_links' in updated_config:
del updated_config['article_links']
if page_links.strip():
updated_config['page_links'] = page_links.strip()
elif 'page_links' in updated_config:
del updated_config['page_links']
if title.strip():
updated_config['title'] = title.strip()
if content.strip():
# Check if content is comma-separated (multiple selectors)
content_vals = [c.strip() for c in content.split(',') if c.strip()]
if len(content_vals) > 1:
updated_config['content'] = content_vals
else:
updated_config['content'] = content.strip()
if date.strip():
updated_config['date'] = date.strip()
elif 'date' in updated_config:
del updated_config['date']
if navigation_selector.strip():
updated_config['navigation_selector'] = navigation_selector.strip()
else:
updated_config['navigation_selector'] = None
if navigation_url_addition.strip():
updated_config['navigation_url_addition'] = navigation_url_addition.strip()
else:
updated_config['navigation_url_addition'] = None
if start_page.strip():
try:
updated_config['start_page'] = int(start_page.strip())
except ValueError:
return gr.update()
else:
updated_config['start_page'] = 0
# Handle array fields
if pdf_links.strip():
pdf_list = [p.strip() for p in pdf_links.split(',') if p.strip()]
updated_config['pdf_links'] = pdf_list
elif 'pdf_links' in updated_config:
del updated_config['pdf_links']
if file_links.strip():
file_list = [f.strip() for f in file_links.split(',') if f.strip()]
updated_config['file_links'] = file_list
elif 'file_links' in updated_config:
del updated_config['file_links']
if recaptcha_text.strip():
updated_config['recaptcha_text'] = recaptcha_text.strip()
elif 'recaptcha_text' in updated_config:
del updated_config['recaptcha_text']
# Update config
config[website_type] = updated_config
return config
except Exception as e:
logger.error(f"Error updating website: {str(e)}")
return gr.update()
def delete_website(website_type, current_state):
"""Delete a website configuration"""
try:
if not website_type:
return gr.update(), gr.update()
# Load current config or use state
config = current_state if current_state else load_website_config()
if not config:
return gr.update(), gr.update()
if website_type not in config:
return gr.update(), gr.update()
# Remove website
del config[website_type]
# Update dropdown choices
website_list = list(config.keys())
return (gr.update(choices=website_list, value=website_list[0] if website_list else None),
config)
except Exception as e:
logger.error(f"Error deleting website: {str(e)}")
return gr.update(), gr.update()
def save_all_changes(current_state):
"""Save all changes to file"""
try:
from scraper_common import save_website_config
# Use current state or load from file
config = current_state if current_state else load_website_config()
if not config:
return gr.update(), {}
# Save configuration
success, message = save_website_config(config)
if success:
# Reload to get updated list
updated_config = load_website_config()
website_list = list(updated_config.keys())
return (gr.update(choices=website_list),
{})
else:
return (gr.update(), current_state)
except Exception as e:
logger.error(f"Error saving configuration: {str(e)}")
return gr.update(), current_state
def cancel_changes():
"""Cancel changes and reload from file"""
try:
# Reload from file
config = load_website_config()
website_list = list(config.keys())
# Reset form if website is selected
if website_list:
form_values = get_config_for_website(website_list[0])
return (gr.update(choices=website_list, value=website_list[0]),
form_values[0], # base_url
form_values[1], # article_links
form_values[2], # page_links
form_values[3], # title
form_values[4], # content
form_values[5], # date
form_values[6], # navigation_selector
form_values[7], # navigation_url_addition
form_values[8], # start_page
form_values[9], # pdf_links
form_values[10], # file_links
form_values[11], # recaptcha_text
{})
else:
return (gr.update(choices=[]),
'', '', '', '', '', '', '', '', '0', '', '', '',
{})
except Exception as e:
logger.error(f"Error cancelling changes: {str(e)}")
return (gr.update(), '', '', '', '', '', '', '', '', '0', '', '', '', {})
# Event handlers
website_dropdown.change(
fn=get_config_for_website,
inputs=[website_dropdown, unsaved_config_state],
outputs=[base_url_field, article_links_field, page_links_field, title_field, content_field, date_field,
navigation_selector_field, navigation_url_addition_field, start_page_field,
pdf_links_field, file_links_field, recaptcha_text_field]
)
add_website_btn.click(
fn=add_new_website,
inputs=[new_website_type, base_url_field, article_links_field, page_links_field, title_field, content_field, date_field,
navigation_selector_field, navigation_url_addition_field, start_page_field,
pdf_links_field, file_links_field, recaptcha_text_field, unsaved_config_state],
outputs=[website_dropdown, unsaved_config_state]
)
update_website_btn.click(
fn=update_website,
inputs=[website_dropdown, base_url_field, article_links_field, page_links_field, title_field, content_field, date_field,
navigation_selector_field, navigation_url_addition_field, start_page_field,
pdf_links_field, file_links_field, recaptcha_text_field, unsaved_config_state],
outputs=[unsaved_config_state]
)
delete_website_btn.click(
fn=delete_website,
inputs=[website_dropdown, unsaved_config_state],
outputs=[website_dropdown, unsaved_config_state]
)
save_all_btn.click(
fn=save_all_changes,
inputs=[unsaved_config_state],
outputs=[website_dropdown, unsaved_config_state]
)
cancel_btn.click(
fn=cancel_changes,
outputs=[website_dropdown, base_url_field, article_links_field, page_links_field, title_field,
content_field, date_field, navigation_selector_field, navigation_url_addition_field,
start_page_field, pdf_links_field, file_links_field, recaptcha_text_field, unsaved_config_state]
)
def create_main_app():
"""
Create the main application with authentication flow
"""
with gr.Blocks(
title="Raagsan Dashboard Web Scrapping",
theme=gr.themes.Soft(),
css="""
/* Global Container Styles */
.gradio-container {
max-width: 1400px !important;
margin: 0 auto !important;
width: 100% !important;
padding: 20px !important;
min-height: 100vh !important;
}
/* Ensure all tabs use full width */
.tabs > .tab-nav, .tabs > .tabitem {
max-width: 1400px !important;
width: 100% !important;
}
/* Tab Navigation Styling */
.tab-nav button {
border: 2px solid var(--border-color-primary) !important;
border-radius: 10px 10px 0 0 !important;
margin-right: 5px !important;
padding: 12px 24px !important;
font-weight: 600 !important;
transition: all 0.3s ease !important;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1) !important;
}
.tab-nav button[aria-selected="true"] {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
color: white !important;
border-color: #667eea !important;
box-shadow: 0 4px 8px rgba(102, 126, 234, 0.3) !important;
}
.tab-nav button:hover {
transform: translateY(-2px) !important;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.15) !important;
}
/* Tab Content Container */
.tabitem {
border: 2px solid var(--border-color-primary) !important;
border-radius: 0 10px 10px 10px !important;
padding: 30px !important;
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.15) !important;
margin-top: 0 !important;
}
/* Ensure rows and columns in all tabs expand to full width */
.gradio-row {
width: 100% !important;
gap: 20px !important;
margin-bottom: 15px !important;
}
.gradio-column {
width: 100% !important;
}
/* Card Style for Sections */
.gradio-group {
border: 2px solid var(--border-color-primary) !important;
border-radius: 12px !important;
padding: 25px !important;
margin: 15px 0 !important;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1) !important;
transition: all 0.3s ease !important;
}
.gradio-group:hover {
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.2) !important;
border-color: #667eea !important;
}
/* Input Fields Styling */
.gradio-textbox input, .gradio-textbox textarea {
border: 2px solid var(--border-color-primary) !important;
border-radius: 8px !important;
padding: 12px !important;
font-size: 14px !important;
transition: all 0.3s ease !important;
}
.gradio-textbox input:focus, .gradio-textbox textarea:focus {
border-color: #667eea !important;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.2) !important;
outline: none !important;
}
/* Dropdown Styling */
.gradio-dropdown {
border-radius: 8px !important;
}
.gradio-dropdown > div {
border: 2px solid var(--border-color-primary) !important;
border-radius: 8px !important;
transition: all 0.3s ease !important;
}
.gradio-dropdown > div:focus-within {
border-color: #667eea !important;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.2) !important;
}
/* Button Styling */
button {
border-radius: 8px !important;
padding: 10px 24px !important;
font-weight: 600 !important;
transition: all 0.3s ease !important;
border: none !important;
}
button:hover {
transform: translateY(-2px) !important;
box-shadow: 0 6px 12px rgba(0, 0, 0, 0.15) !important;
}
button:active {
transform: translateY(0) !important;
}
/* Primary Button */
button[variant="primary"] {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
color: white !important;
box-shadow: 0 4px 8px rgba(102, 126, 234, 0.3) !important;
}
/* Secondary Button */
button[variant="secondary"] {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%) !important;
color: white !important;
box-shadow: 0 4px 8px rgba(245, 87, 108, 0.3) !important;
}
/* Stop/Danger Button */
button[variant="stop"] {
background: linear-gradient(135deg, #fa709a 0%, #fee140 100%) !important;
color: #333 !important;
box-shadow: 0 4px 8px rgba(250, 112, 154, 0.3) !important;
}
/* Dataframe Styling */
.gradio-dataframe {
border: 2px solid var(--border-color-primary) !important;
border-radius: 12px !important;
overflow: hidden !important;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15) !important;
}
.gradio-dataframe table {
border-collapse: separate !important;
border-spacing: 0 !important;
}
.gradio-dataframe th {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
color: white !important;
padding: 15px !important;
font-weight: 600 !important;
text-transform: uppercase !important;
font-size: 12px !important;
letter-spacing: 0.5px !important;
border: 1px solid #667eea !important;
}
.gradio-dataframe td {
padding: 12px 15px !important;
border: 1px solid var(--border-color-primary) !important;
}
.gradio-dataframe tr:hover {
background-color: rgba(102, 126, 234, 0.1) !important;
}
/* Markdown Headings */
h2 {
font-weight: 700 !important;
margin-bottom: 10px !important;
font-size: 24px !important;
}
h3 {
font-weight: 600 !important;
margin-bottom: 8px !important;
font-size: 18px !important;
}
/* Login Container */
.login-container {
max-width: 500px !important;
margin: 50px auto !important;
padding: 40px !important;
border-radius: 20px !important;
border: 2px solid var(--border-color-primary) !important;
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.2) !important;
}
/* Dashboard Header */
.dashboard-header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
color: white !important;
padding: 25px !important;
border-radius: 15px !important;
margin-bottom: 25px !important;
box-shadow: 0 10px 30px rgba(102, 126, 234, 0.3) !important;
}
.dashboard-header * {
color: white !important;
}
.dashboard-header h1,
.dashboard-header h2,
.dashboard-header h3,
.dashboard-header p,
.dashboard-header span,
.dashboard-header div {
color: white !important;
}
.header-row {
display: flex !important;
align-items: center !important;
justify-content: space-between !important;
gap: 20px !important;
}
.header-left {
flex: 1 !important;
}
.header-left * {
color: white !important;
}
.header-right {
display: flex !important;
flex-direction: column !important;
align-items: flex-end !important;
gap: 10px !important;
}
.header-right * {
color: white !important;
}
.user-welcome {
margin: 0 !important;
font-size: 16px !important;
font-weight: 500 !important;
color: white !important;
}
.user-welcome * {
color: white !important;
}
.logout-btn {
min-width: 100px !important;
background: rgba(255, 255, 255, 0.2) !important;
backdrop-filter: blur(10px) !important;
border: 2px solid white !important;
color: white !important;
}
.logout-btn:hover {
background: white !important;
color: #667eea !important;
}
/* Status Messages */
.status-success {
color: #28a745 !important;
font-weight: bold !important;
}
.status-error {
color: #dc3545 !important;
font-weight: bold !important;
}
/* Admin Panel Specific */
.admin-panel {
border: 2px solid var(--border-color-primary) !important;
padding: 20px !important;
border-radius: 15px !important;
margin-top: 20px !important;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15) !important;
}
.admin-group {
border: 2px solid var(--border-color-primary) !important;
border-radius: 12px !important;
padding: 25px !important;
margin: 15px 0 !important;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1) !important;
}
.admin-section {
margin-bottom: 30px !important;
border: 2px solid var(--border-color-primary) !important;
border-radius: 15px !important;
padding: 20px !important;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1) !important;
}
.admin-button {
margin: 5px !important;
}
/* Label Styling */
label {
font-weight: 600 !important;
font-size: 14px !important;
margin-bottom: 8px !important;
}
/* Info Text */
.gradio-info {
font-size: 12px !important;
font-style: italic !important;
opacity: 0.8 !important;
}
/* Download Button */
.download-button {
background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%) !important;
color: white !important;
box-shadow: 0 4px 8px rgba(79, 172, 254, 0.3) !important;
}
/* Scrollbar Styling */
::-webkit-scrollbar {
width: 10px !important;
height: 10px !important;
}
::-webkit-scrollbar-track {
background: var(--background-fill-secondary) !important;
border-radius: 10px !important;
}
::-webkit-scrollbar-thumb {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
border-radius: 10px !important;
}
::-webkit-scrollbar-thumb:hover {
background: linear-gradient(135deg, #764ba2 0%, #667eea 100%) !important;
}
/* Status Textbox Styling */
.gradio-textbox[label="Status"] {
border: 2px solid var(--border-color-primary) !important;
border-radius: 10px !important;
padding: 15px !important;
}
/* Checkbox Styling */
input[type="checkbox"] {
width: 20px !important;
height: 20px !important;
accent-color: #667eea !important;
}
/* Markdown Paragraphs */
p {
line-height: 1.6 !important;
}
/* Section Dividers */
hr {
border: none !important;
height: 2px !important;
background: var(--border-color-primary) !important;
margin: 30px 0 !important;
opacity: 0.3 !important;
}
/* Better spacing for form elements */
.gradio-form {
gap: 15px !important;
}
/* Hover effects for cards */
.admin-section:hover {
transform: translateY(-2px) !important;
transition: all 0.3s ease !important;
}
/* Loading Animation Enhancement */
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
.loading {
animation: pulse 2s cubic-bezier(0.4, 0, 0.6, 1) infinite !important;
}
/* Improve link styling in markdown */
a {
color: #667eea !important;
text-decoration: none !important;
font-weight: 600 !important;
transition: all 0.3s ease !important;
}
a:hover {
color: #764ba2 !important;
text-decoration: underline !important;
}
/* Better spacing for rows within groups */
.gradio-group .gradio-row {
margin-bottom: 10px !important;
}
"""
) as main_app:
# State to track authentication
auth_state = gr.State({"authenticated": False, "user": None})
# Main content area
with gr.Column() as main_content:
# Login section
with gr.Row(visible=True) as login_section:
with gr.Column(elem_classes="login-container"):
gr.Markdown("# Dashboard Login")
gr.Markdown("Please login to access Dashboard")
with gr.Row():
username_input = gr.Textbox(
label="Username",
placeholder="Enter your username",
interactive=True,
scale=2
)
with gr.Row():
password_input = gr.Textbox(
label="Password",
placeholder="Enter your password",
type="password",
interactive=True,
scale=2
)
with gr.Row():
login_btn = gr.Button("Login", variant="primary", scale=1)
login_status = gr.Textbox(
label="Status",
value="Ready to login - Enter your credentials above",
interactive=False,
elem_classes="status-success"
)
# Dashboard section (initially hidden)
with gr.Column(visible=False) as dashboard_section:
# Header with user info and logout
with gr.Column(elem_classes="dashboard-header"):
with gr.Row(elem_classes="header-row"):
# Left side - Title and description
with gr.Column(scale=3, elem_classes="header-left"):
gr.Markdown("# Raagsan Dashboard")
gr.Markdown("Extract and analyze content from websites and documents (PDF, DOC, CSV).")
# Right side - User info and logout
with gr.Column(scale=1, elem_classes="header-right"):
user_info = gr.Markdown("Welcome, Guest", elem_classes="user-welcome")
logout_btn = gr.Button("Logout", variant="stop", size="sm", elem_classes="logout-btn")
# Create tabs
with gr.Tabs():
create_text_content_tab()
create_document_content_tab()
create_archive_tab()
create_keywords_management_tab()
create_admin_tab()
create_website_config_tab()
def handle_login(username, password):
"""Handle login attempt"""
if not username or not password:
return "Please enter both username and password", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False, value="Welcome, Guest")
success, message = login_user(username, password)
if success:
return f"{message}", gr.update(visible=False), gr.update(visible=True), gr.update(visible=True, value=f"Welcome, {username}")
else:
return f"{message}", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False, value="Welcome, Guest")
def handle_logout():
"""Handle logout"""
message = logout_user()
return f"{message}", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False, value="Welcome, Guest"), gr.update(value=""), gr.update(value="")
# Event handlers
login_btn.click(
fn=handle_login,
inputs=[username_input, password_input],
outputs=[login_status, login_section, dashboard_section, user_info]
)
logout_btn.click(
fn=handle_logout,
outputs=[login_status, login_section, dashboard_section, user_info, username_input, password_input]
)
return main_app
# Create the main app instance
demo = create_main_app()
if __name__ == "__main__":
# Clean up expired sessions on startup
auth_manager.cleanup_expired_sessions()
# Launch the application
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True
)