Data-Clean-PlayGround / helper_functions.py
shivamsharma120120's picture
Initial commit
136b539
import gradio as gr
import pandas as pd
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder
import io
import numpy as np
import tempfile
import os
# ===========================================================
# Helper Functions
# ===========================================================
def file_summary(df):
if df is None:
return pd.DataFrame(), "โš ๏ธ No data loaded."
memory_usage = df.memory_usage(deep=True)
column_types = []
for col in df.columns:
dtype = df[col].dtype
if pd.api.types.is_numeric_dtype(dtype):
unique_ratio = df[col].nunique() / len(df) if len(df) > 0 else 0
if unique_ratio < 0.05 or df[col].nunique() < 20:
column_types.append("Categorical (Numerical)")
else:
column_types.append("Continuous")
elif pd.api.types.is_object_dtype(dtype) or pd.api.types.is_categorical_dtype(dtype):
column_types.append("Categorical (String/Object)")
elif pd.api.types.is_bool_dtype(dtype):
column_types.append("Categorical (Boolean)")
else:
column_types.append("Other")
mem_vals = [round(df[c].memory_usage(deep=True) / 1024, 2) for c in df.columns]
summary_df = pd.DataFrame({
"Column": df.columns,
"Data Type": df.dtypes.values,
"Column Type": column_types,
"NULL Values": df.isnull().sum().values,
"Memory Size (KB)": mem_vals
})
return summary_df, f"๐Ÿ“Š Summary Generated: {df.shape[1]} columns, {df.shape[0]} rows"
# ===========================================================
# Loading CSV + UI helpers
# ===========================================================
def load_csv(file):
if file is None:
return None, None, pd.DataFrame(), gr.update(choices=[]), gr.update(choices=[]), "โš ๏ธ Please upload a CSV file."
try:
df = pd.read_csv(file.name)
cols = df.columns.tolist()
# Detect only encodable columns
encodable_cols = df.select_dtypes(include=["object", "category", "bool"]).columns.tolist()
summary, _ = file_summary(df)
return df, df.copy(), summary, gr.update(choices=cols), gr.update(choices=encodable_cols), f"โœ… File loaded successfully! Shape: {df.shape}"
except Exception as e:
return None, None, pd.DataFrame(), gr.update(choices=[]), gr.update(choices=[]), f"โŒ Error: {e}"
# ===========================================================
# Duplicate, Missing & Deletion
# ===========================================================
def check_duplicate_columns(df):
if df is None:
return "โš ๏ธ Please load a dataset first."
dup_cols = df.columns[df.columns.duplicated()]
if len(dup_cols) == 0:
return "โœ… No duplicate columns found."
return f"โš ๏ธ Found duplicate columns: {', '.join(dup_cols)}"
def remove_duplicate_columns(df):
if df is None:
return df, "โš ๏ธ Please load a dataset first."
dup_cols = df.columns[df.columns.duplicated()]
if len(dup_cols) == 0:
return df, "โœ… No duplicate columns to remove."
df = df.loc[:, ~df.columns.duplicated()]
return df, f"โœ… Removed duplicate columns: {', '.join(dup_cols)}"
def check_duplicate_rows(df):
if df is None:
return "โš ๏ธ Please load a dataset first."
dup_rows = df.duplicated().sum()
if dup_rows == 0:
return "โœ… No duplicate rows found."
return f"โš ๏ธ Found {dup_rows} duplicate rows."
def remove_duplicate_rows(df):
if df is None:
return df, "โš ๏ธ Please load a dataset first."
dup_rows = df.duplicated().sum()
if dup_rows == 0:
return df, "โœ… No duplicate rows to remove."
df = df.drop_duplicates()
return df, f"โœ… Removed {dup_rows} duplicate rows successfully."
def check_missing_columns(df):
if df is None:
return "โš ๏ธ Please load a dataset first."
missing = df.isnull().sum()
cols_with_missing = missing[missing > 0]
if cols_with_missing.empty:
return "โœ… No missing values found."
return f"โš ๏ธ Columns with missing values: {', '.join(cols_with_missing.index)}"
def drop_high_missing(df):
if df is None:
return df, "โš ๏ธ No data loaded."
missing_pct = df.isnull().mean() * 100
to_drop = missing_pct[missing_pct > 50].index.tolist()
if not to_drop:
return df, "โœ… No columns with >50% missing values."
df = df.drop(columns=to_drop)
return df, f"โœ… Dropped columns with >50% missing values: {', '.join(to_drop)}"
def delete_column(df, col):
if df is None:
return df, "โš ๏ธ Please load a dataset first."
if col not in df.columns:
return df, f"โš ๏ธ Column '{col}' not found."
df = df.drop(columns=[col])
return df, f"โœ… Column '{col}' deleted."
# ===========================================================
# Missing Value Handler (Column-Type Based Logic)
# ===========================================================
def get_missing_columns(df):
if df is None:
return gr.update(choices=[]), "โš ๏ธ Please load a dataset first."
cols = df.columns[df.isnull().any()].tolist()
if not cols:
return gr.update(choices=[]), "โœ… No columns with missing values."
return gr.update(choices=cols), f"โš ๏ธ Columns with missing values: {', '.join(cols)}"
def detect_column_type(df, column):
if df is None or column not in df.columns:
return "โš ๏ธ Invalid column.", gr.update(choices=[])
dtype = df[column].dtype
if pd.api.types.is_numeric_dtype(dtype):
unique_ratio = df[column].nunique() / len(df)
if unique_ratio < 0.05 or df[column].nunique() < 20:
col_type = "Categorical (Numerical)"
options = ["Mode"]
else:
col_type = "Continuous (Numerical)"
options = ["Mean", "Median", "Mode"]
else:
col_type = "Categorical (String/Object)"
options = ["Mode"]
return f"๐Ÿงฉ Column Type: {col_type}", gr.update(choices=options, value=options[0])
def apply_missing_value(df, column, method):
if df is None:
return df, "โš ๏ธ Please load a dataset first."
if column not in df.columns:
return df, f"โš ๏ธ Column '{column}' not found."
if df[column].isnull().sum() == 0:
return df, f"โœ… Column '{column}' has no missing values."
if pd.api.types.is_numeric_dtype(df[column]):
if method == "Mean":
df[column].fillna(df[column].mean(), inplace=True)
elif method == "Median":
df[column].fillna(df[column].median(), inplace=True)
elif method == "Mode":
df[column].fillna(df[column].mode().iloc[0], inplace=True)
else:
df[column].fillna(df[column].mode().iloc[0], inplace=True)
return df, f"โœ… Missing values in '{column}' filled using {method}."
# ===========================================================
# Encoding + Download Functions
# ===========================================================
def show_value_counts(df, col, method):
"""Show value counts only if Ordinal Encoding is selected."""
if df is None or col not in df.columns:
return gr.DataFrame(value="โš ๏ธ Please select a valid column.")
if method != "Ordinal Encoding":
return gr.DataFrame(value="โ„น๏ธ Value counts visible only for Ordinal Encoding.")
counts = df[col].value_counts(dropna=False).reset_index()
counts.columns = [col, "Count"]
return counts
def encode_column(df, col, method, order):
if df is None:
return df, "โš ๏ธ Please load a dataset first."
if col not in df.columns:
return df, "โš ๏ธ Column not found."
if method == "Label Encoding":
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str))
return df, f"โœ… Label Encoding applied on '{col}'."
elif method == "Ordinal Encoding":
if not order:
return df, "โš ๏ธ Please provide order for Ordinal Encoding."
# Normalize both the column values and user-provided order for comparison
df[col] = df[col].astype(str).str.strip()
user_order = [x.strip() for x in order if x.strip()]
col_values = sorted(df[col].dropna().unique().tolist())
# Check if user provided valid categories
missing_from_col = [x for x in user_order if x not in col_values]
extra_in_col = [x for x in col_values if x not in user_order]
if missing_from_col:
return df, f"โŒ Invalid category(s): {missing_from_col}. Please check spelling/case. Existing values: {col_values}"
if extra_in_col:
msg = f"โš ๏ธ Warning: Some values in column were not in the provided order and will be encoded as NaN: {extra_in_col}"
else:
msg = ""
try:
oe = OrdinalEncoder(categories=[user_order])
df[col] = oe.fit_transform(df[[col]])
return df, f"โœ… Ordinal Encoding applied on '{col}' with order {user_order}. {msg}"
except Exception as e:
return df, f"โŒ Error during encoding: {e}"
return df, "โš ๏ธ Invalid encoding method."
# ===========================================================
# Column Normalization & Renaming Functions
# ===========================================================
def normalize_column_names(df):
"""Convert all column names to lowercase, strip spaces, and replace internal spaces with underscores."""
if df is None:
return df, "โš ๏ธ Please load a dataset first."
original_cols = df.columns.tolist()
new_cols = [col.strip().lower().replace(" ", "_") for col in original_cols]
rename_map = {old: new for old, new in zip(original_cols, new_cols) if old != new}
df.columns = new_cols
if not rename_map:
return df, "โœ… All column names were already normalized."
return df, f"โœ… Column names normalized: {rename_map}"
def rename_single_column(df, old_col, new_col):
"""Rename one specific column."""
if df is None:
return df, "โš ๏ธ Please load a dataset first."
if old_col not in df.columns:
return df, f"โš ๏ธ Column '{old_col}' not found."
if not new_col.strip():
return df, "โš ๏ธ Please enter a valid new column name."
df = df.rename(columns={old_col: new_col.strip()})
return df, f"โœ… Column '{old_col}' renamed to '{new_col.strip()}'."
# ===========================================================
# Data Type Conversion (Numerical Columns)
# ===========================================================
def get_numeric_columns(df):
"""Return a list of numeric columns for dtype conversion."""
if df is None:
return gr.update(choices=[]), "โš ๏ธ Please load a dataset first."
num_cols = df.select_dtypes(include=["int", "float", "complex"]).columns.tolist()
if not num_cols:
return gr.update(choices=[]), "โœ… No numeric columns available for conversion."
return gr.update(choices=num_cols), f"๐Ÿ”ข Numeric columns available: {', '.join(num_cols)}"
def show_current_dtype(df, col):
"""Display the current dtype of the selected numeric column."""
if df is None or col not in df.columns:
return "โš ๏ธ Please select a valid column."
dtype = str(df[col].dtype)
return f"๐Ÿ“˜ Current Data Type: {dtype}"
def change_column_dtype(df, col, new_dtype):
"""Change the data type of a numeric column using pandas .astype()."""
if df is None:
return df, "โš ๏ธ Please load a dataset first."
if col not in df.columns:
return df, f"โš ๏ธ Column '{col}' not found."
if not new_dtype:
return df, "โš ๏ธ Please select a new data type."
try:
df[col] = df[col].astype(new_dtype)
return df, f"โœ… Column '{col}' converted to type '{new_dtype}'."
except Exception as e:
return df, f"โŒ Conversion failed: {e}"
# ===========================================================
# Outlier Detection & Handling Functions
# ===========================================================
def get_continuous_columns(df):
"""Detect all numerical columns (int and float) for outlier handling."""
if df is None:
return gr.update(choices=[]), "โš ๏ธ Please load a dataset first."
numeric_cols = df.select_dtypes(include=["int", "float"]).columns.tolist()
if not numeric_cols:
return gr.update(choices=[]), "โœ… No numerical columns found."
return gr.update(choices=numeric_cols), f"๐Ÿ“Š Numerical columns detected: {', '.join(numeric_cols)}"
def show_column_stats(df, col):
"""Display basic stats for selected continuous column."""
if df is None or col not in df.columns:
return "โš ๏ธ Please select a valid column."
stats = df[col].describe().to_dict()
return (
f"๐Ÿ“ˆ Column: {col}\n"
f"Mean: {stats['mean']:.3f}, Std: {stats['std']:.3f}, Min: {stats['min']:.3f}, Max: {stats['max']:.3f}"
)
def handle_outliers(df, col, method, threshold):
"""Apply chosen outlier handling technique."""
if df is None:
return df, "โš ๏ธ Please load a dataset first."
if col not in df.columns:
return df, f"โš ๏ธ Column '{col}' not found."
if not pd.api.types.is_numeric_dtype(df[col]):
return df, f"โš ๏ธ Column '{col}' is not numeric."
if threshold is None or str(threshold).strip() == "":
return df, "โš ๏ธ Please enter a valid threshold value."
try:
threshold = float(threshold)
except:
return df, "โš ๏ธ Threshold value must be numeric."
series = df[col]
# IQR method
if method == "IQR":
Q1, Q3 = series.quantile(0.25), series.quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - threshold * IQR
upper = Q3 + threshold * IQR
before = series.copy()
df[col] = np.clip(series, lower, upper)
return df, f"โœ… IQR method applied with threshold={threshold}. Clipped {sum(before != df[col])} outliers."
# Z-score method
elif method == "Z-score":
mean, std = series.mean(), series.std()
z_scores = (series - mean) / std
mask = np.abs(z_scores) > threshold
before = series.copy()
df.loc[mask, col] = mean # replace with mean
return df, f"โœ… Z-score method applied (|Z| > {threshold}). Replaced {mask.sum()} outliers with mean."
# Winsorization
elif method == "Winsorization":
lower = series.quantile(threshold / 100)
upper = series.quantile(1 - threshold / 100)
before = series.copy()
df[col] = np.clip(series, lower, upper)
return df, f"โœ… Winsorization applied with {threshold}% tails capped."
# Min-Max clipping
elif method == "MinMax":
min_val = series.min()
max_val = series.max()
lower = min_val + threshold * (max_val - min_val)
upper = max_val - threshold * (max_val - min_val)
before = series.copy()
df[col] = np.clip(series, lower, upper)
return df, f"โœ… Min-Max clipping applied with threshold={threshold}. Clipped {sum(before != df[col])} values."
else:
return df, "โš ๏ธ Invalid outlier handling method selected."
# ===========================================================
# Downloading the Cleaned CSV File
# ===========================================================
def make_csv_download(df):
if df is None or df.empty:
return None
# Create a temporary file
temp_dir = tempfile.gettempdir()
temp_path = os.path.join(temp_dir, "cleaned_data.csv")
df.to_csv(temp_path, index=False)
return temp_path