|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder |
|
|
import io |
|
|
import numpy as np |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def file_summary(df): |
|
|
if df is None: |
|
|
return pd.DataFrame(), "โ ๏ธ No data loaded." |
|
|
memory_usage = df.memory_usage(deep=True) |
|
|
column_types = [] |
|
|
for col in df.columns: |
|
|
dtype = df[col].dtype |
|
|
if pd.api.types.is_numeric_dtype(dtype): |
|
|
unique_ratio = df[col].nunique() / len(df) if len(df) > 0 else 0 |
|
|
if unique_ratio < 0.05 or df[col].nunique() < 20: |
|
|
column_types.append("Categorical (Numerical)") |
|
|
else: |
|
|
column_types.append("Continuous") |
|
|
elif pd.api.types.is_object_dtype(dtype) or pd.api.types.is_categorical_dtype(dtype): |
|
|
column_types.append("Categorical (String/Object)") |
|
|
elif pd.api.types.is_bool_dtype(dtype): |
|
|
column_types.append("Categorical (Boolean)") |
|
|
else: |
|
|
column_types.append("Other") |
|
|
|
|
|
mem_vals = [round(df[c].memory_usage(deep=True) / 1024, 2) for c in df.columns] |
|
|
summary_df = pd.DataFrame({ |
|
|
"Column": df.columns, |
|
|
"Data Type": df.dtypes.values, |
|
|
"Column Type": column_types, |
|
|
"NULL Values": df.isnull().sum().values, |
|
|
"Memory Size (KB)": mem_vals |
|
|
}) |
|
|
return summary_df, f"๐ Summary Generated: {df.shape[1]} columns, {df.shape[0]} rows" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_csv(file): |
|
|
if file is None: |
|
|
return None, None, pd.DataFrame(), gr.update(choices=[]), gr.update(choices=[]), "โ ๏ธ Please upload a CSV file." |
|
|
try: |
|
|
df = pd.read_csv(file.name) |
|
|
cols = df.columns.tolist() |
|
|
|
|
|
encodable_cols = df.select_dtypes(include=["object", "category", "bool"]).columns.tolist() |
|
|
summary, _ = file_summary(df) |
|
|
return df, df.copy(), summary, gr.update(choices=cols), gr.update(choices=encodable_cols), f"โ
File loaded successfully! Shape: {df.shape}" |
|
|
except Exception as e: |
|
|
return None, None, pd.DataFrame(), gr.update(choices=[]), gr.update(choices=[]), f"โ Error: {e}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_duplicate_columns(df): |
|
|
if df is None: |
|
|
return "โ ๏ธ Please load a dataset first." |
|
|
dup_cols = df.columns[df.columns.duplicated()] |
|
|
if len(dup_cols) == 0: |
|
|
return "โ
No duplicate columns found." |
|
|
return f"โ ๏ธ Found duplicate columns: {', '.join(dup_cols)}" |
|
|
|
|
|
def remove_duplicate_columns(df): |
|
|
if df is None: |
|
|
return df, "โ ๏ธ Please load a dataset first." |
|
|
dup_cols = df.columns[df.columns.duplicated()] |
|
|
if len(dup_cols) == 0: |
|
|
return df, "โ
No duplicate columns to remove." |
|
|
df = df.loc[:, ~df.columns.duplicated()] |
|
|
return df, f"โ
Removed duplicate columns: {', '.join(dup_cols)}" |
|
|
|
|
|
def check_duplicate_rows(df): |
|
|
if df is None: |
|
|
return "โ ๏ธ Please load a dataset first." |
|
|
dup_rows = df.duplicated().sum() |
|
|
if dup_rows == 0: |
|
|
return "โ
No duplicate rows found." |
|
|
return f"โ ๏ธ Found {dup_rows} duplicate rows." |
|
|
|
|
|
def remove_duplicate_rows(df): |
|
|
if df is None: |
|
|
return df, "โ ๏ธ Please load a dataset first." |
|
|
dup_rows = df.duplicated().sum() |
|
|
if dup_rows == 0: |
|
|
return df, "โ
No duplicate rows to remove." |
|
|
df = df.drop_duplicates() |
|
|
return df, f"โ
Removed {dup_rows} duplicate rows successfully." |
|
|
|
|
|
def check_missing_columns(df): |
|
|
if df is None: |
|
|
return "โ ๏ธ Please load a dataset first." |
|
|
missing = df.isnull().sum() |
|
|
cols_with_missing = missing[missing > 0] |
|
|
if cols_with_missing.empty: |
|
|
return "โ
No missing values found." |
|
|
return f"โ ๏ธ Columns with missing values: {', '.join(cols_with_missing.index)}" |
|
|
|
|
|
def drop_high_missing(df): |
|
|
if df is None: |
|
|
return df, "โ ๏ธ No data loaded." |
|
|
missing_pct = df.isnull().mean() * 100 |
|
|
to_drop = missing_pct[missing_pct > 50].index.tolist() |
|
|
if not to_drop: |
|
|
return df, "โ
No columns with >50% missing values." |
|
|
df = df.drop(columns=to_drop) |
|
|
return df, f"โ
Dropped columns with >50% missing values: {', '.join(to_drop)}" |
|
|
|
|
|
def delete_column(df, col): |
|
|
if df is None: |
|
|
return df, "โ ๏ธ Please load a dataset first." |
|
|
if col not in df.columns: |
|
|
return df, f"โ ๏ธ Column '{col}' not found." |
|
|
df = df.drop(columns=[col]) |
|
|
return df, f"โ
Column '{col}' deleted." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_missing_columns(df): |
|
|
if df is None: |
|
|
return gr.update(choices=[]), "โ ๏ธ Please load a dataset first." |
|
|
cols = df.columns[df.isnull().any()].tolist() |
|
|
if not cols: |
|
|
return gr.update(choices=[]), "โ
No columns with missing values." |
|
|
return gr.update(choices=cols), f"โ ๏ธ Columns with missing values: {', '.join(cols)}" |
|
|
|
|
|
def detect_column_type(df, column): |
|
|
if df is None or column not in df.columns: |
|
|
return "โ ๏ธ Invalid column.", gr.update(choices=[]) |
|
|
dtype = df[column].dtype |
|
|
if pd.api.types.is_numeric_dtype(dtype): |
|
|
unique_ratio = df[column].nunique() / len(df) |
|
|
if unique_ratio < 0.05 or df[column].nunique() < 20: |
|
|
col_type = "Categorical (Numerical)" |
|
|
options = ["Mode"] |
|
|
else: |
|
|
col_type = "Continuous (Numerical)" |
|
|
options = ["Mean", "Median", "Mode"] |
|
|
else: |
|
|
col_type = "Categorical (String/Object)" |
|
|
options = ["Mode"] |
|
|
return f"๐งฉ Column Type: {col_type}", gr.update(choices=options, value=options[0]) |
|
|
|
|
|
def apply_missing_value(df, column, method): |
|
|
if df is None: |
|
|
return df, "โ ๏ธ Please load a dataset first." |
|
|
if column not in df.columns: |
|
|
return df, f"โ ๏ธ Column '{column}' not found." |
|
|
if df[column].isnull().sum() == 0: |
|
|
return df, f"โ
Column '{column}' has no missing values." |
|
|
|
|
|
if pd.api.types.is_numeric_dtype(df[column]): |
|
|
if method == "Mean": |
|
|
df[column].fillna(df[column].mean(), inplace=True) |
|
|
elif method == "Median": |
|
|
df[column].fillna(df[column].median(), inplace=True) |
|
|
elif method == "Mode": |
|
|
df[column].fillna(df[column].mode().iloc[0], inplace=True) |
|
|
else: |
|
|
df[column].fillna(df[column].mode().iloc[0], inplace=True) |
|
|
return df, f"โ
Missing values in '{column}' filled using {method}." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def show_value_counts(df, col, method): |
|
|
"""Show value counts only if Ordinal Encoding is selected.""" |
|
|
if df is None or col not in df.columns: |
|
|
return gr.DataFrame(value="โ ๏ธ Please select a valid column.") |
|
|
if method != "Ordinal Encoding": |
|
|
return gr.DataFrame(value="โน๏ธ Value counts visible only for Ordinal Encoding.") |
|
|
counts = df[col].value_counts(dropna=False).reset_index() |
|
|
counts.columns = [col, "Count"] |
|
|
return counts |
|
|
|
|
|
def encode_column(df, col, method, order): |
|
|
if df is None: |
|
|
return df, "โ ๏ธ Please load a dataset first." |
|
|
if col not in df.columns: |
|
|
return df, "โ ๏ธ Column not found." |
|
|
|
|
|
if method == "Label Encoding": |
|
|
le = LabelEncoder() |
|
|
df[col] = le.fit_transform(df[col].astype(str)) |
|
|
return df, f"โ
Label Encoding applied on '{col}'." |
|
|
|
|
|
elif method == "Ordinal Encoding": |
|
|
if not order: |
|
|
return df, "โ ๏ธ Please provide order for Ordinal Encoding." |
|
|
|
|
|
|
|
|
df[col] = df[col].astype(str).str.strip() |
|
|
user_order = [x.strip() for x in order if x.strip()] |
|
|
col_values = sorted(df[col].dropna().unique().tolist()) |
|
|
|
|
|
|
|
|
missing_from_col = [x for x in user_order if x not in col_values] |
|
|
extra_in_col = [x for x in col_values if x not in user_order] |
|
|
|
|
|
if missing_from_col: |
|
|
return df, f"โ Invalid category(s): {missing_from_col}. Please check spelling/case. Existing values: {col_values}" |
|
|
|
|
|
if extra_in_col: |
|
|
msg = f"โ ๏ธ Warning: Some values in column were not in the provided order and will be encoded as NaN: {extra_in_col}" |
|
|
else: |
|
|
msg = "" |
|
|
|
|
|
try: |
|
|
oe = OrdinalEncoder(categories=[user_order]) |
|
|
df[col] = oe.fit_transform(df[[col]]) |
|
|
return df, f"โ
Ordinal Encoding applied on '{col}' with order {user_order}. {msg}" |
|
|
except Exception as e: |
|
|
return df, f"โ Error during encoding: {e}" |
|
|
|
|
|
return df, "โ ๏ธ Invalid encoding method." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_column_names(df): |
|
|
"""Convert all column names to lowercase, strip spaces, and replace internal spaces with underscores.""" |
|
|
if df is None: |
|
|
return df, "โ ๏ธ Please load a dataset first." |
|
|
|
|
|
original_cols = df.columns.tolist() |
|
|
new_cols = [col.strip().lower().replace(" ", "_") for col in original_cols] |
|
|
rename_map = {old: new for old, new in zip(original_cols, new_cols) if old != new} |
|
|
df.columns = new_cols |
|
|
|
|
|
if not rename_map: |
|
|
return df, "โ
All column names were already normalized." |
|
|
return df, f"โ
Column names normalized: {rename_map}" |
|
|
|
|
|
|
|
|
def rename_single_column(df, old_col, new_col): |
|
|
"""Rename one specific column.""" |
|
|
if df is None: |
|
|
return df, "โ ๏ธ Please load a dataset first." |
|
|
if old_col not in df.columns: |
|
|
return df, f"โ ๏ธ Column '{old_col}' not found." |
|
|
if not new_col.strip(): |
|
|
return df, "โ ๏ธ Please enter a valid new column name." |
|
|
|
|
|
df = df.rename(columns={old_col: new_col.strip()}) |
|
|
return df, f"โ
Column '{old_col}' renamed to '{new_col.strip()}'." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_numeric_columns(df): |
|
|
"""Return a list of numeric columns for dtype conversion.""" |
|
|
if df is None: |
|
|
return gr.update(choices=[]), "โ ๏ธ Please load a dataset first." |
|
|
num_cols = df.select_dtypes(include=["int", "float", "complex"]).columns.tolist() |
|
|
if not num_cols: |
|
|
return gr.update(choices=[]), "โ
No numeric columns available for conversion." |
|
|
return gr.update(choices=num_cols), f"๐ข Numeric columns available: {', '.join(num_cols)}" |
|
|
|
|
|
|
|
|
def show_current_dtype(df, col): |
|
|
"""Display the current dtype of the selected numeric column.""" |
|
|
if df is None or col not in df.columns: |
|
|
return "โ ๏ธ Please select a valid column." |
|
|
dtype = str(df[col].dtype) |
|
|
return f"๐ Current Data Type: {dtype}" |
|
|
|
|
|
|
|
|
def change_column_dtype(df, col, new_dtype): |
|
|
"""Change the data type of a numeric column using pandas .astype().""" |
|
|
if df is None: |
|
|
return df, "โ ๏ธ Please load a dataset first." |
|
|
if col not in df.columns: |
|
|
return df, f"โ ๏ธ Column '{col}' not found." |
|
|
if not new_dtype: |
|
|
return df, "โ ๏ธ Please select a new data type." |
|
|
|
|
|
try: |
|
|
df[col] = df[col].astype(new_dtype) |
|
|
return df, f"โ
Column '{col}' converted to type '{new_dtype}'." |
|
|
except Exception as e: |
|
|
return df, f"โ Conversion failed: {e}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_continuous_columns(df): |
|
|
"""Detect all numerical columns (int and float) for outlier handling.""" |
|
|
if df is None: |
|
|
return gr.update(choices=[]), "โ ๏ธ Please load a dataset first." |
|
|
|
|
|
numeric_cols = df.select_dtypes(include=["int", "float"]).columns.tolist() |
|
|
|
|
|
if not numeric_cols: |
|
|
return gr.update(choices=[]), "โ
No numerical columns found." |
|
|
|
|
|
return gr.update(choices=numeric_cols), f"๐ Numerical columns detected: {', '.join(numeric_cols)}" |
|
|
|
|
|
|
|
|
|
|
|
def show_column_stats(df, col): |
|
|
"""Display basic stats for selected continuous column.""" |
|
|
if df is None or col not in df.columns: |
|
|
return "โ ๏ธ Please select a valid column." |
|
|
stats = df[col].describe().to_dict() |
|
|
return ( |
|
|
f"๐ Column: {col}\n" |
|
|
f"Mean: {stats['mean']:.3f}, Std: {stats['std']:.3f}, Min: {stats['min']:.3f}, Max: {stats['max']:.3f}" |
|
|
) |
|
|
|
|
|
|
|
|
def handle_outliers(df, col, method, threshold): |
|
|
"""Apply chosen outlier handling technique.""" |
|
|
if df is None: |
|
|
return df, "โ ๏ธ Please load a dataset first." |
|
|
if col not in df.columns: |
|
|
return df, f"โ ๏ธ Column '{col}' not found." |
|
|
if not pd.api.types.is_numeric_dtype(df[col]): |
|
|
return df, f"โ ๏ธ Column '{col}' is not numeric." |
|
|
if threshold is None or str(threshold).strip() == "": |
|
|
return df, "โ ๏ธ Please enter a valid threshold value." |
|
|
|
|
|
try: |
|
|
threshold = float(threshold) |
|
|
except: |
|
|
return df, "โ ๏ธ Threshold value must be numeric." |
|
|
|
|
|
series = df[col] |
|
|
|
|
|
|
|
|
if method == "IQR": |
|
|
Q1, Q3 = series.quantile(0.25), series.quantile(0.75) |
|
|
IQR = Q3 - Q1 |
|
|
lower = Q1 - threshold * IQR |
|
|
upper = Q3 + threshold * IQR |
|
|
before = series.copy() |
|
|
df[col] = np.clip(series, lower, upper) |
|
|
return df, f"โ
IQR method applied with threshold={threshold}. Clipped {sum(before != df[col])} outliers." |
|
|
|
|
|
|
|
|
elif method == "Z-score": |
|
|
mean, std = series.mean(), series.std() |
|
|
z_scores = (series - mean) / std |
|
|
mask = np.abs(z_scores) > threshold |
|
|
before = series.copy() |
|
|
df.loc[mask, col] = mean |
|
|
return df, f"โ
Z-score method applied (|Z| > {threshold}). Replaced {mask.sum()} outliers with mean." |
|
|
|
|
|
|
|
|
elif method == "Winsorization": |
|
|
lower = series.quantile(threshold / 100) |
|
|
upper = series.quantile(1 - threshold / 100) |
|
|
before = series.copy() |
|
|
df[col] = np.clip(series, lower, upper) |
|
|
return df, f"โ
Winsorization applied with {threshold}% tails capped." |
|
|
|
|
|
|
|
|
elif method == "MinMax": |
|
|
min_val = series.min() |
|
|
max_val = series.max() |
|
|
lower = min_val + threshold * (max_val - min_val) |
|
|
upper = max_val - threshold * (max_val - min_val) |
|
|
before = series.copy() |
|
|
df[col] = np.clip(series, lower, upper) |
|
|
return df, f"โ
Min-Max clipping applied with threshold={threshold}. Clipped {sum(before != df[col])} values." |
|
|
|
|
|
else: |
|
|
return df, "โ ๏ธ Invalid outlier handling method selected." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_csv_download(df): |
|
|
if df is None or df.empty: |
|
|
return None |
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
|
temp_path = os.path.join(temp_dir, "cleaned_data.csv") |
|
|
df.to_csv(temp_path, index=False) |
|
|
return temp_path |
|
|
|