Spaces:
Sleeping
Sleeping
| ############################################################################################## | |
| ### Script de création de la base de données FAISS des articles | |
| ### | |
| ### Ce script | |
| ### - charge la table articles depuis le dataset HF Loren/articles_database | |
| ### - la traite par batch : | |
| ### - création de chunks de texte | |
| ### - création des embeddings avec le modèle SentenceTransformer "intfloat/e5-small" | |
| ### - ajout des embeddings dans un index FAISS | |
| ### - sauvegarde des métadonnées des chunks dans un fichier parquet | |
| ### - sauvegarde de l'index FAISS dans un fichier faiss_index.bin | |
| ### - upload dans le dataset HF Loren/articles_faiss | |
| ### | |
| ### 👉 L'index Faiss peut alors être utilisé par un space Hugging Face | |
| ############################################################################################## | |
| import os | |
| import torch | |
| import duckdb | |
| from huggingface_hub import hf_hub_download, upload_file | |
| from huggingface_hub import HfApi, HfFolder, CommitOperationAdd | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from functools import partial | |
| import pyarrow as pa | |
| import pyarrow.parquet as pq | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| # Fonctions | |
| # Batch processing function | |
| def batch_process(list_articles: list, faiss_id_start: int) -> int: | |
| """ | |
| Traite un batch d'articles pour générer des embeddings et des métadonnées, | |
| puis les sauvegarde de manière sécurisée pour garantir la persistance en cas de problème. | |
| Étapes réalisées : | |
| 1. Découpage de chaque article en chunks via le splitter. | |
| 2. Création d'un dictionnaire de métadonnées pour chaque chunk contenant : | |
| - faiss_id : identifiant unique aligné avec l'index FAISS | |
| - document_id : identifiant de l'article | |
| - chunk_text : texte du chunk | |
| 3. Calcul des embeddings pour tous les chunks du batch. | |
| 4. Ajout des embeddings au FAISS index existant (append). | |
| 5. Écriture immédiate de l'index FAISS sur disque pour assurer la persistance. | |
| 6. Sauvegarde des métadonnées batch dans un fichier Parquet distinct. | |
| Args: | |
| list_articles (list): Liste de tuples (document_id, document_text) représentant les articles du batch. | |
| faiss_id_start (int): Identifiant de départ pour le premier chunk du batch, | |
| utilisé pour aligner FAISS et les métadonnées. | |
| Returns: | |
| int: Identifiant FAISS suivant, à utiliser pour le batch suivant afin de maintenir l'alignement. | |
| Notes : | |
| - Cette fonction est conçue pour être utilisée batch par batch. | |
| - Les fichiers Parquet et le fichier FAISS sont mis à jour à chaque batch pour éviter toute perte de données. | |
| """ | |
| global faiss_index | |
| try: | |
| list_chunks = [] | |
| list_metadata = [] | |
| for doc_id, doc_content in list_articles: | |
| chunks = splitter.split_text(doc_content) | |
| for chunk_text in chunks: | |
| list_chunks.append(chunk_text) | |
| list_metadata.append({ | |
| "faiss_id": faiss_id_start, | |
| "document_id": doc_id, | |
| "chunk_text": chunk_text | |
| }) | |
| faiss_id_start += 1 | |
| # Embeddings | |
| if list_chunks: | |
| passage_texts = [f"passage: {p}" for p in list_chunks] | |
| embeddings = model.encode(passage_texts, convert_to_numpy=True, | |
| normalize_embeddings=True) | |
| faiss_index.add(embeddings) | |
| faiss.write_index(faiss_index, str(FAISS_INDEX_FILE)) | |
| # Sauvegarde batch métadonnées en Parquet | |
| if list_metadata: | |
| table = pa.Table.from_pylist(list_metadata) | |
| batch_file = PARQUET_DIR / f"metadata_batch_{faiss_id_start}.parquet" | |
| pq.write_table(table, batch_file) | |
| return faiss_id_start | |
| except Exception as e: | |
| print(f"ERROR in batch_process function : {e}") | |
| return None | |
| ## | |
| # Initialisations | |
| global faiss_index | |
| print("Initialisations ...") | |
| load_dotenv() | |
| HF_TOKEN = os.getenv('API_HF_TOKEN') | |
| REPO_ID = "Loren/articles_database" | |
| DATA_DIR = Path("../../Data") # dossier parent du script | |
| CHUNK_SIZE = 250 | |
| CHUNK_OVERLAP = 50 | |
| BATCH_SIZE = 1000 | |
| MODEL_NAME = "intfloat/multilingual-e5-small" | |
| FAISS_INDEX_FILE = DATA_DIR / "faiss_index.bin" | |
| PARQUET_DIR = DATA_DIR / "parquet_metadata" | |
| CACHE_DIR = "/tmp" | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| # Rediriger le cache HF globalement | |
| os.environ["HF_HOME"] = CACHE_DIR | |
| os.environ["HF_DATASETS_CACHE"] = CACHE_DIR | |
| os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR | |
| # Téléchargement des fichiers Parquet depuis Hugging Face | |
| print("Téléchargement des fichiers Parquet depuis Hugging Face ...") | |
| articles_parquet = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="articles.parquet", | |
| repo_type="dataset", | |
| cache_dir=CACHE_DIR) | |
| # Connexion DuckDB en mémoire | |
| con = duckdb.connect() | |
| # Créer des tables DuckDB directement à partir des fichiers Parquet | |
| print("Création des vues DuckDB à partir des fichiers Parquet ...") | |
| con.execute(f"CREATE VIEW articles AS SELECT * FROM parquet_scan('{articles_parquet}')") | |
| # Creating the plitter for chunking document | |
| print("Initialisation du text splitter ...") | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP, | |
| keep_separator='end', | |
| separators=["\n\n", "\n", "."] | |
| ) | |
| # Creating the Sentence transformer model | |
| print("Initialisation du modèle de Sentence Transformer ...") | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"*** Device: {device}") | |
| model = SentenceTransformer(MODEL_NAME, device=device) | |
| # Creating the Faiss index | |
| embedding_dim = model.get_sentence_embedding_dimension() | |
| faiss_index = faiss.IndexFlatIP(embedding_dim) | |
| faiss_id_counter = 0 # compteur global pour lier faiss_id et métadonnées | |
| # Traitement par batchs | |
| print("Création des batches et traitement ...") | |
| cursor = con.execute(""" | |
| SELECT article_id, article_text | |
| FROM articles | |
| WHERE (LENGTH(article_text) - LENGTH(REPLACE(article_text, ' ', '')) + 1) >= 100""") | |
| # Création d'un itérateur de batches | |
| fetch_batch = partial(cursor.fetchmany, BATCH_SIZE) | |
| for batch_num, batch in enumerate(iter(fetch_batch, []), start=1): | |
| print("Traitement batch no ", batch_num, " ...") | |
| faiss_id_counter = batch_process(batch, faiss_id_counter) | |
| if not faiss_id_counter: | |
| print("*** Erreur traitement batch no ", batch_num) | |
| print("\n✅ Traitement terminé") | |
| # Upload des fichiers vers HF | |
| # Création du dataset HF | |
| REPO_ID = "Loren/articles_faiss" | |
| api = HfApi() | |
| HfFolder.save_token(HF_TOKEN) | |
| # Vérifier si le dataset existe | |
| try: | |
| repo_info = api.dataset_info(REPO_ID, token=HF_TOKEN) | |
| print(f"Dataset {REPO_ID} existe déjà, suppression en cours...") | |
| api.delete_repo(repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN) | |
| except Exception as e: | |
| print(f"Dataset n'existait pas : {e}") | |
| # Créer le dataset (privé) | |
| api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True, private=True, token=HF_TOKEN) | |
| print(f"Dataset {REPO_ID} créé avec succès.") | |
| # Récupérer la liste de fichiers parquet | |
| print("Upload des fichiers metadatas dans le dataset hugging face ", REPO_ID, " ...") | |
| parquet_files = [ | |
| os.path.join(PARQUET_DIR, f) | |
| for f in os.listdir(PARQUET_DIR) | |
| if f.endswith(".parquet") | |
| ] | |
| # Ajouter tous les fichiers | |
| operations = [ | |
| CommitOperationAdd( | |
| path_in_repo=f"data/{os.path.basename(f)}", | |
| path_or_fileobj=f | |
| ) | |
| for f in parquet_files | |
| ] | |
| api.create_commit( | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| operations=operations, | |
| commit_message="Upload batch metadata parquet files" | |
| ) | |
| print("✅ Upload metadatas terminé !") | |
| print("Upload de l'index Faiss dans le dataset hugging face ", REPO_ID, " ...") | |
| upload_file( | |
| path_or_fileobj=FAISS_INDEX_FILE, | |
| path_in_repo=FAISS_INDEX_FILE.name, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print("✅ Upload faiss index terminé") | |
| con.close() | |
| print("✅ Traitement terminé") |