Spaces:
Sleeping
Sleeping
| import pdfplumber | |
| import re | |
| import os | |
| # Extract text as paragraph delimiter without tables and graphs | |
| def extract_and_format_paragraphs(pdf_path): | |
| """Extract and format paragraphs from a PDF text, applying filters to remove headers, footnotes, and specific sections.""" | |
| # Define patterns for headers, footnotes, and specific lines | |
| header_pattern = re.compile(r"^(ECB-PUBLIC|Title|Document|Header)", re.IGNORECASE) | |
| footer_pattern = re.compile(r"^(Page \d+ of \d+|Footer|Document|Note:|Source:|the 75th and 25th percentiles|\|\d+)", re.IGNORECASE) | |
| footnote_pattern = re.compile(r"^\d+ \d{1} ", re.IGNORECASE) # Footnotes start with a number followed by a space | |
| start_marker_pattern = re.compile(r"^Chart", re.IGNORECASE) | |
| end_marker_pattern = re.compile(r"^(Source:|Note:)", re.IGNORECASE) | |
| # Define common abbreviations and patterns that should not be considered as end-of-sentence | |
| #exceptions_pattern = re.compile(r'\b(e\.g|i\.e\.|etc\.|a\.k\.a\.)\b', re.IGNORECASE) | |
| def remove_abbreviation_periods(text): | |
| # Define regex patterns for common abbreviations where periods should be ignored | |
| abbreviations = [ | |
| r'\b(?:e\.g|i\.e|a\.m|p\.m|U\.S|J\.R\.R|Dr|Mr|Ms|Mrs|Jr|Sr)\b' | |
| ] | |
| for abbr in abbreviations: | |
| # Remove periods in abbreviations at the end of the text | |
| text = re.sub(f'({abbr})\.', r'\1', text) | |
| return text | |
| def is_end_of_sentence(text): | |
| # Strip leading and trailing whitespace | |
| text = text.strip() | |
| # Remove periods in common abbreviations from the end of the text | |
| text = remove_abbreviation_periods(text) | |
| # Define regex patterns for sentence-ending punctuation | |
| sentence_end_re = re.compile(r'[\.\!\?]\s*$') | |
| # Check if the text ends with sentence-ending punctuation | |
| return bool(sentence_end_re.search(text)) | |
| def clean_text(text): | |
| """Remove unnecessary line breaks, extra spaces, and filter out headers, footnotes, and specific sections.""" | |
| lines = text.split('\n') | |
| filtered_lines = [] | |
| in_removal_section = False | |
| paragraph_lines = [] | |
| def is_footnote_line(line): | |
| """Check if a line matches the footnote pattern.""" | |
| return footnote_pattern.match(line) | |
| def append_line_to_paragraph(line): | |
| """Append the line to the paragraph, handling line breaks and footnotes.""" | |
| if paragraph_lines and not is_end_of_sentence(paragraph_lines[-1]): | |
| # This line is a continuation of the previous one | |
| if paragraph_lines[-1][-1] == "-": | |
| paragraph_lines[-1] = paragraph_lines[-1][:-1] | |
| paragraph_lines[-1] += line.strip() | |
| else: | |
| paragraph_lines[-1] += ' ' + line.strip() | |
| else: | |
| # Start a new line in the paragraph | |
| paragraph_lines.append(line.strip()) | |
| skip_line = False | |
| for line in lines: | |
| # Check for start and end markers | |
| if start_marker_pattern.match(line): | |
| in_removal_section = True | |
| if in_removal_section and end_marker_pattern.match(line): | |
| in_removal_section = False | |
| continue | |
| # Handle footnotes | |
| if is_footnote_line(line): | |
| skip_line = True | |
| continue | |
| if skip_line: | |
| if is_end_of_sentence(line): | |
| skip_line = False | |
| continue | |
| # Filter out headers and footers | |
| if not header_pattern.match(line) and \ | |
| not footer_pattern.match(line) and \ | |
| not in_removal_section: | |
| # Remove unnecessary line breaks and append line to paragraph_lines | |
| if line.strip(): | |
| append_line_to_paragraph(line) | |
| # Join all paragraph lines into a single paragraph text, removing unnecessary newlines | |
| cleaned_paragraphs = "\n".join(paragraph_lines) | |
| return cleaned_paragraphs | |
| full_text = "" | |
| previous_page_text = "" | |
| with pdfplumber.open(pdf_path) as pdf: | |
| if "minutes" in os.path.basename(pdf_path).lower(): | |
| with pdfplumber.open(pdf_path) as pdf: | |
| for page_num, page in enumerate(pdf.pages): | |
| # Get the page dimensions | |
| width = page.width | |
| height = page.height | |
| header_height = height * 0.075 # Adjust this value based on your PDF | |
| footer_height = height * 0.15 # Adjust this value based on your PDF | |
| left_bbox = (0, header_height, width / 2, height - footer_height) # Left column | |
| right_bbox = (width / 2, header_height, width, height - footer_height) | |
| # Extract text from the left column | |
| left_column_text = page.within_bbox(left_bbox).extract_text() or "" | |
| # Clean the left column text | |
| cleaned_left_text = clean_text(left_column_text) | |
| # Extract text from the right column | |
| right_column_text = page.within_bbox(right_bbox).extract_text() or "" | |
| # Clean the right column text | |
| cleaned_right_text = clean_text(right_column_text) | |
| # Handle text from previous page | |
| if previous_page_text: | |
| # Check if the previous page text ends with punctuation | |
| if not is_end_of_sentence(previous_page_text): | |
| # Append the current page's left column text to previous page text | |
| previous_page_text += " " + cleaned_left_text | |
| else: | |
| # Add previous page text to full text | |
| full_text += previous_page_text + "\n" | |
| # Reset previous page text to current left column text | |
| previous_page_text = cleaned_left_text | |
| else: | |
| previous_page_text = cleaned_left_text | |
| # Process the right column text | |
| if previous_page_text: | |
| # Check if the previous page text ends with punctuation | |
| if not is_end_of_sentence(previous_page_text): | |
| # Append the right column text to previous page text | |
| previous_page_text += " " + cleaned_right_text | |
| else: | |
| # Add previous page text to full text | |
| full_text += previous_page_text + "\n" | |
| # Reset previous page text to current right column text | |
| previous_page_text = cleaned_right_text | |
| else: | |
| previous_page_text = cleaned_right_text | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| # Clean and format the page text | |
| cleaned_text = clean_text(page_text) | |
| # Handle text from previous page | |
| if previous_page_text: | |
| # Check if the previous page text ends with punctuation | |
| if not is_end_of_sentence(previous_page_text): | |
| # Append the current page text to previous page text | |
| previous_page_text += " " + cleaned_text | |
| else: | |
| # Add previous page text to full text | |
| full_text += previous_page_text + "\n" | |
| # Reset previous page text | |
| previous_page_text = cleaned_text | |
| else: | |
| previous_page_text = cleaned_text | |
| # Add remaining text from the last page | |
| if previous_page_text: | |
| full_text += previous_page_text | |
| return full_text.strip() | |
| # Cleaning: cut unecessary information such as annex and intro | |
| def find_text_range(text, start_keywords, end_keywords): | |
| """Find the text range between start and multiple end keywords.""" | |
| start_index = 0 | |
| for start_keyword in start_keywords: | |
| keyword_index = text.lower().find(start_keyword.lower()) | |
| if keyword_index != -1 and keyword_index > start_index: | |
| start_index = keyword_index | |
| #start_index = text.lower().find(start_keyword.lower()) | |
| # Find the earliest occurrence of any end keyword | |
| end_index = len(text) # Default to end of text | |
| for end_keyword in end_keywords: | |
| keyword_index = text.lower().find(end_keyword.lower()) | |
| if keyword_index != -1 and keyword_index < end_index: | |
| end_index = keyword_index | |
| return start_index, end_index | |
| def extract_relevant_text(text, start_index, end_index): | |
| """Extract text from the start index to the end index.""" | |
| return text[start_index:end_index].strip() | |
| # Split paragraphs into list of paragraphs | |
| def split_text_into_paragraphs(extracted_text, min_length): | |
| """ | |
| Split the extracted text into paragraphs based on newlines, and merge single-sentence paragraphs. | |
| """ | |
| # Split the text into paragraphs based on newlines | |
| paragraphs = re.split(r'\n+', extracted_text.strip()) | |
| def is_end_of_sentence(text): | |
| """Check if the text ends with punctuation indicating the end of a sentence.""" | |
| return bool(re.search(r'[.!?]$', text.strip())) | |
| def count_sentences(text): | |
| """Count the number of sentences in a text.""" | |
| return len(re.split(r'(?<=[.!?])\s+', text.strip())) | |
| def merge_single_sentence_paragraphs(paragraphs): | |
| """Merge single-sentence paragraphs with the next paragraph if necessary.""" | |
| merged_paragraphs = [] | |
| i = 0 | |
| while i < len(paragraphs): | |
| para = paragraphs[i].strip() | |
| if not para: | |
| i += 1 | |
| continue | |
| if count_sentences(para) == 1 and i + 1 < len(paragraphs): | |
| # Check if the next paragraph should be merged with the current one | |
| next_para = paragraphs[i + 1].strip() | |
| if next_para: | |
| # Merge single-sentence paragraph with the next paragraph | |
| merged_paragraphs.append(para + ' ' + next_para) | |
| i += 2 # Skip the next paragraph since it has been merged | |
| else: | |
| # If the next paragraph is empty, just add the current paragraph | |
| merged_paragraphs.append(para) | |
| i += 1 | |
| else: | |
| # Add the current paragraph if it has more than one sentence or is the last one | |
| merged_paragraphs.append(para) | |
| i += 1 | |
| return merged_paragraphs | |
| # Filter out paragraphs that are too short | |
| filtered_paragraphs = [p for p in paragraphs if len(p.strip()) > min_length] | |
| # Merge single-sentence paragraphs | |
| final_paragraphs = merge_single_sentence_paragraphs(filtered_paragraphs) | |
| return final_paragraphs |