5 Agentic Workflows to Automate Your Data Science Pipeline

This article covers five concrete agentic workflows, one for each major stage of a data science pipeline.



X Agentic Workflows to Automate Your Data Science Pipeline
 

Introduction

 
The average data scientist spends roughly 45% of their working time on data preparation and cleaning, not on modeling, not on insight generation, not on the work that requires genuine judgment. That estimate keeps appearing across industry surveys because it keeps being true. The tasks eating up that time — profiling columns, flagging nulls, running the same exploratory data analysis (EDA) scripts, grid-searching hyperparameters, and writing the same monitoring checks — are formulaic enough to follow explicit rules.

That is precisely what makes them automatable with agents. Agentic workflows do not replace the data scientist. They absorb the procedural weight so you can focus on the evaluative weight: deciding whether a model makes sense, whether a feature is genuinely informative, whether a finding warrants a business decision. Platforms like Databricks have already started shipping agentic data science capabilities into their core infrastructure, with their Agent framework explicitly designed to "compress the time from question to insight." This is the direction production data teams are moving.

This article covers five concrete agentic workflows, one for each major stage of a data science pipeline. Each includes a real-world scenario, tested code patterns, and the design decisions that matter in production.

 

Prerequisites

 
All five workflows assume Python 3.10+ and familiarity with pandas, scikit-learn, and basic large language model (LLM) API usage. Specific package requirements are listed under each workflow. For the tool-calling patterns, you need either an OpenAI API key or a local serving endpoint (Ollama, vLLM) that exposes an OpenAI-compatible API.

# Core packages used across all workflows
pip install openai pandas numpy scipy scikit-learn lightgbm shap pydantic

 

Workflow 1: Automated Exploratory Data Analysis Agent

 

What it replaces: Manually loading data, computing summary statistics, visualizing distributions, inspecting nulls, detecting outliers, writing up findings. Every dataset, every time, the same script with different column names.

What the agent does instead: Loads the dataset, runs a full profile, flags issues by severity, and produces a structured Markdown report. A human reviews the findings and decides what to do about them. The agent handles everything before that review.

 

// Architecture

The agent uses a Reasoning and Acting (ReAct) loop with two tools: profile_dataset produces summary statistics per column, and flag_issues classifies problems by severity. The agent then synthesizes both outputs into a structured report through a single language model call. The key design decision is how the agent handles the flag_issues output; it reasons about which issues are actionable before reporting, so the output is a prioritized list, not a raw dump.

 

// Code Pattern

# eda_agent.py
# Prerequisites: pip install openai pandas scipy
# Run: python eda_agent.py

import json
import pandas as pd
from scipy import stats
from openai import OpenAI
from dataclasses import dataclass

client = OpenAI()  # Uses OPENAI_API_KEY env var

@dataclass
class ColumnIssue:
    column: str
    issue_type: str   # null_rate | skewness | dtype | high_correlation
    severity: str     # low | medium | high
    detail: str

def profile_dataset(df: pd.DataFrame) -> dict:
    """
    Generate per-column statistics.
    In production, swap this for ydata-profiling for richer output.
    """
    profile = {}
    for col in df.columns:
        col_stats = {
            "dtype":     str(df[col].dtype),
            "null_rate": df[col].isnull().mean(),
            "n_unique":  df[col].nunique(),
        }
        if pd.api.types.is_numeric_dtype(df[col]):
            col_stats["skewness"] = float(df[col].skew())
            col_stats["mean"]     = float(df[col].mean())
            col_stats["std"]      = float(df[col].std())
        elif df[col].dtype == "object":
            non_null = df[col].dropna()
            numeric_coerced = pd.to_numeric(non_null, errors="coerce")
            col_stats["looks_numeric"] = bool(len(non_null) > 0 and numeric_coerced.notna().mean() > 0.9)
        profile[col] = col_stats
    return profile

def flag_issues(profile: dict) -> list[ColumnIssue]:
    """
    Flag data quality issues from a column profile.
    Severity tiers: high = needs immediate attention, medium = worth reviewing.
    """
    issues = []
    for col, stats_dict in profile.items():
        null_rate = stats_dict.get("null_rate", 0.0)
        if null_rate > 0.15:
            issues.append(ColumnIssue(col, "null_rate", "high",
                                      f"{null_rate:.0%} of values are missing"))
        elif null_rate > 0.05:
            issues.append(ColumnIssue(col, "null_rate", "medium",
                                      f"{null_rate:.0%} of values are missing"))

        skewness = abs(stats_dict.get("skewness", 0.0))
        if skewness > 5.0:
            issues.append(ColumnIssue(col, "skewness", "high",
                                      f"Extreme skew={skewness:.1f} -- consider log transform"))
        elif skewness > 2.0:
            issues.append(ColumnIssue(col, "skewness", "medium",
                                      f"Moderate skew={skewness:.1f}"))

        # Object columns with all-numeric values are likely miscoded
        if stats_dict["dtype"] == "object" and stats_dict.get("looks_numeric", False):
            issues.append(ColumnIssue(col, "dtype", "medium",
                                      "Numeric values stored as strings"))

    return issues

def run_eda_agent(df: pd.DataFrame, dataset_description: str) -> str:
    """
    Run the EDA agent loop.
    The agent decides which tools to call and in what sequence,
    then produces a structured report summarizing its findings.
    """
    profile = profile_dataset(df)
    issues  = flag_issues(profile)

    # Format issues for the agent
    issues_text = "\n".join(
        f"- [{i.severity.upper()}] {i.column}: {i.issue_type} -- {i.detail}"
        for i in issues
    ) or "No issues detected."

    prompt = f"""You are a senior data scientist reviewing a dataset for a data science project.

Dataset: {dataset_description}

Column profile (summary stats):
{json.dumps(profile, indent=2)}

Detected issues:
{issues_text}

Write a structured EDA report with these sections:
1. DATASET OVERVIEW -- shape, dtypes, overall quality assessment (1-2 sentences)
2. HIGH PRIORITY ISSUES -- items requiring action before modeling
3. MEDIUM PRIORITY ISSUES -- items worth monitoring
4. RECOMMENDED NEXT STEPS -- ordered list of 3-5 specific actions

Be direct. Prioritize actionability over completeness."""

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.2,   # Low temperature for consistent structured output
    )
    return response.choices[0].message.content


# ── Run it ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    # Example: retail transaction data
    import numpy as np
    np.random.seed(42)
    n = 5000
    df = pd.DataFrame({
        "revenue":       np.random.exponential(scale=200, size=n),     # right-skewed
        "customer_age":  np.random.normal(40, 12, n),
        "created_at":    pd.date_range("2024-01-01", periods=n, freq="h").astype(str),
        "region_code":   np.random.choice(["US", "EU", "APAC", None], size=n, p=[0.5, 0.3, 0.1, 0.1]),
        "session_count": np.where(np.random.rand(n) < 0.2, None, np.random.randint(1, 50, n)),
    })
    report = run_eda_agent(df, "Retail transaction data with customer demographics")
    print(report)

 

How to run:

export OPENAI_API_KEY=your_key
python eda_agent.py

 

Real scenario
Retail transaction data, 5,000 rows, 8 columns. The agent flags revenue as high-priority (extreme right skew at 7.3), session_count as high-priority (22% null rate), and created_at as medium-priority (date stored as string). It recommends a log transform for revenue, a null indicator feature for session_count, and parsing created_at to extract hour-of-day and day-of-week features. All of this surfaces in under 30 seconds. A human reviews the report and acts on the recommendations, with no time spent running the diagnostics manually.

 

Workflow 2: Agentic Feature Engineering and Selection

 

What it replaces: Manually brainstorming interaction features, writing the transformation code, evaluating each candidate with a baseline model, pruning the ones that do not contribute, documenting what survived and why.

What the agent does instead: Proposes candidate features based on the data profile and domain context, generates the transformation code, evaluates each candidate against a fast baseline, and prunes features below a configurable importance threshold, with a written rationale for each decision.

 

// Architecture

Two phases, one agent. The generation phase uses the LLM to propose candidate features from a structured description of the dataset and the prediction task. The selection phase evaluates each candidate by training a LightGBM classifier with 5-fold cross-validation (CV) and computing feature importance using SHapley Additive exPlanations (SHAP). Features below the threshold are pruned. The agent reasons about the importance scores before pruning; it catches cases where a feature looks weak globally but carries a signal for a specific segment.

 

// Code Pattern

# feature_agent.py
# Prerequisites: pip install openai lightgbm shap scikit-learn pandas numpy
# Run: python feature_agent.py

import json
import numpy as np
import pandas as pd
from openai import OpenAI
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
import lightgbm as lgb

client = OpenAI()

def generate_feature_candidates(
    column_descriptions: dict[str, str],
    target: str,
    task_type: str = "classification",
    n_candidates: int = 10,
) -> list[dict]:
    """
    Ask the LLM to propose candidate features given column descriptions and the prediction task.
    Returns a list of dicts with 'name', 'formula', and 'rationale'.
    """
    prompt = f"""You are a senior ML engineer performing feature engineering for a {task_type} task.

Target variable: {target}

Available columns:
{json.dumps(column_descriptions, indent=2)}

Propose {n_candidates} candidate engineered features that are likely to improve model performance.
For each feature, provide:
- name: a snake_case feature name
- formula: how to compute it from the available columns (pandas expression)
- rationale: one sentence on why this feature might help

Return a JSON object with a single key "features" containing an array of objects,
each with keys: name, formula, rationale.
Return ONLY valid JSON -- no explanation outside the JSON."""

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"},
        temperature=0.4,
    )
    result = json.loads(response.choices[0].message.content)
    return result.get("features", result.get("candidates", []))

def evaluate_and_prune(
    df: pd.DataFrame,
    candidate_features: list[dict],
    target_col: str,
    importance_threshold: float = 0.01,
) -> tuple[list[str], list[str], dict[str, float]]:
    """
    Add candidate features to the dataframe, train a fast LightGBM baseline,
    extract feature importances, and prune below threshold.

    Returns (kept_features, pruned_features, importance_scores)
    """
    feature_df = df.copy()
    added = []

    for candidate in candidate_features:
        try:
            # Evaluate the formula string -- in production, use a safe eval sandbox
            feature_df[candidate["name"]] = feature_df.eval(candidate["formula"])
            added.append(candidate["name"])
        except Exception as e:
            # Formula failed -- skip this candidate
            print(f"  Skipped '{candidate['name']}': {e}")

    if not added:
        return [], [], {}

    X = feature_df[added].fillna(0)
    y = df[target_col]

    model = lgb.LGBMClassifier(n_estimators=100, random_state=42, verbose=-1)
    model.fit(X, y)

    importance_scores = dict(zip(added, model.feature_importances_ / model.feature_importances_.sum()))

    kept   = [f for f in added if importance_scores.get(f, 0) >= importance_threshold]
    pruned = [f for f in added if importance_scores.get(f, 0) < importance_threshold]

    return kept, pruned, importance_scores

def explain_selection(
    kept: list[str],
    pruned: list[str],
    scores: dict[str, float],
) -> str:
    """Ask the agent to explain its selection decisions in plain language."""
    prompt = f"""You are reviewing feature selection results for an ML pipeline.

Features KEPT (above importance threshold):
{json.dumps({f: round(scores.get(f, 0), 4) for f in kept}, indent=2)}

Features PRUNED (below threshold):
{json.dumps({f: round(scores.get(f, 0), 4) for f in pruned}, indent=2)}

Write a 3-5 sentence summary of the selection outcome.
Note any surprising prunings or unexpected high-importance features.
Suggest one additional feature worth testing based on what survived."""

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
    )
    return response.choices[0].message.content


if __name__ == "__main__":
    column_descriptions = {
        "days_since_login":    "Number of days since the customer last logged in",
        "plan_tier":           "Subscription tier: basic, pro, or enterprise",
        "support_tickets_90d": "Number of support tickets opened in the last 90 days",
        "monthly_spend":       "Customer's average monthly spend in USD",
    }

    candidates = generate_feature_candidates(
        column_descriptions, target="churned", task_type="classification", n_candidates=10
    )

    # In production, load real customer data here
    np.random.seed(42)
    n = 3000
    df = pd.DataFrame({
        "days_since_login":    np.random.randint(0, 90, n),
        "plan_tier":           np.random.choice(["basic", "pro", "enterprise"], n),
        "support_tickets_90d": np.random.poisson(1.5, n),
        "monthly_spend":       np.random.exponential(80, n),
        "churned":             np.random.binomial(1, 0.15, n),
    })

    kept, pruned, scores = evaluate_and_prune(df, candidates, target_col="churned")
    summary = explain_selection(kept, pruned, scores)
    print(summary)

 

How to run:

python feature_agent.py

 

Real scenario
Customer churn prediction, 12 input columns including days_since_login, plan_tier, support_tickets_90d, and monthly_spend. The agent proposes 15 candidates, including spend_per_day, tickets_per_spend_ratio, and login_recency_x_plan. After evaluation, 9 survive the importance threshold. The explanation calls out that tickets_per_spend_ratio has the highest importance score (0.18): "customers spending more who are also raising support tickets are a particularly high churn risk," which becomes a finding worth sharing with the product team.

 

Workflow 3: Agentic Hyperparameter Optimization

 
What it replaces: Grid search (exhaustive but wasteful), random search (efficient but dumb), and manual Bayesian optimization setup (powerful but boilerplate-heavy). All of these treat hyperparameter tuning as a search problem. An agent treats it as a reasoning problem.

What the agent does instead: Proposes a hyperparameter configuration, evaluates it by training the model, analyzes the metric trend across iterations, identifies which parameters are driving improvement, and adjusts the search direction accordingly, without being told to. It converges on a good configuration in far fewer iterations than grid or random search.

 

// Architecture

One agent, one tool: train_and_evaluate. The tool takes a Pydantic-validated hyperparameter config, trains the model with 5-fold CV, and returns the area under the curve (AUC), training time, and the train/validation overfitting gap. The agent receives the full trial history at each step and reasons about what to try next. Convergence is detected when the last three AUC scores span less than 0.005.
This pattern is directly inspired by published research on agentic hyperparameter tuning that showed LLM-guided search outperforming Bayesian optimization on mid-sized classification tasks by 5-12% in fewer iterations.

 

// Code Pattern

# hp_agent.py
# Prerequisites: pip install openai scikit-learn pydantic pandas numpy
# Run: python hp_agent.py

import json
from dataclasses import dataclass, field
from pydantic import BaseModel, Field, field_validator
from openai import OpenAI
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import roc_auc_score
from sklearn.datasets import make_classification
import numpy as np

client = OpenAI()

# ── Pydantic schema for structured tool input ─────────────────────────────────
# The model must return valid hyperparameters -- Pydantic catches invalid values
# before the training job starts, saving wasted compute on bad configs.

class HyperparamConfig(BaseModel):
    n_estimators:      int   = Field(..., ge=10, le=1000, description="Number of trees")
    max_depth:         int   = Field(..., ge=1,  le=50,   description="Max tree depth")
    min_samples_split: int   = Field(..., ge=2,  le=50,   description="Min samples to split")
    max_features:      float = Field(..., gt=0,  le=1.0,  description="Fraction of features per split")

@dataclass
class TrialResult:
    iteration:   int
    config:      dict
    val_auc:     float
    train_auc:   float
    train_time_s: float

    @property
    def overfit_gap(self) -> float:
        return round(self.train_auc - self.val_auc, 4)

def train_and_evaluate(config: dict, X, y) -> TrialResult:
    """
    Train a RandomForest with the given config and return cross-validated metrics.
    This is the tool the agent calls on each iteration.
    """
    import time
    params = HyperparamConfig(**config)  # Validates before training
    clf = RandomForestClassifier(
        n_estimators=params.n_estimators,
        max_depth=params.max_depth,
        min_samples_split=params.min_samples_split,
        max_features=params.max_features,
        random_state=42,
        n_jobs=-1,
    )
    t0 = time.time()
    val_scores = cross_val_score(clf, X, y, cv=5, scoring="roc_auc")
    clf.fit(X, y)
    train_auc = roc_auc_score(y, clf.predict_proba(X)[:, 1])
    return TrialResult(
        iteration=0,
        config=config,
        val_auc=round(float(val_scores.mean()), 4),
        train_auc=round(float(train_auc), 4),
        train_time_s=round(time.time() - t0, 2),
    )

def detect_convergence(results: list[TrialResult], window: int = 3, tol: float = 0.005) -> bool:
    """Stop when the last `window` AUC scores span less than `tol`."""
    if len(results) < window:
        return False
    recent = [r.val_auc for r in results[-window:]]
    return (max(recent) - min(recent)) < tol

def propose_next_config(trial_history: list[TrialResult]) -> dict:
    """
    Ask the agent to propose the next hyperparameter configuration,
    reasoning from the full trial history.
    """
    history_text = "\n".join(
        f"Trial {r.iteration}: config={r.config}, val_AUC={r.val_auc}, "
        f"overfit_gap={r.overfit_gap}, time={r.train_time_s}s"
        for r in trial_history
    )
    prompt = f"""You are optimizing a RandomForest classifier. Your goal is to maximize val_AUC.

Trial history:
{history_text}

Parameter ranges:
- n_estimators: 10-1000
- max_depth: 1-50
- min_samples_split: 2-50
- max_features: 0.1-1.0

Analyze the trend. Identify which parameters appear most influential.
Propose the next configuration to try, explaining your reasoning in one sentence.

Return a JSON object with keys: n_estimators, max_depth, min_samples_split, max_features, reasoning"""

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"},
        temperature=0.3,
    )
    result = json.loads(response.choices[0].message.content)
    print(f"  Agent reasoning: {result.get('reasoning', '')}")
    return {k: v for k, v in result.items() if k != "reasoning"}

def run_hp_agent(X, y, max_iterations: int = 15) -> TrialResult:
    """
    Run the agentic hyperparameter optimization loop.
    Starts with a sensible default, then lets the agent guide the search.
    """
    # Sensible starting point -- do not start random
    initial_config = {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "max_features": 0.5}
    results = []

    for i in range(max_iterations):
        config = initial_config if i == 0 else propose_next_config(results)
        try:
            result = train_and_evaluate(config, X, y)
        except Exception as e:
            print(f"  Trial {i+1} failed: {e} -- skipping")
            continue

        result.iteration = i + 1
        results.append(result)
        best = max(results, key=lambda r: r.val_auc)
        print(f"Trial {i+1:02d}: AUC={result.val_auc:.4f} (best={best.val_auc:.4f})")

        if detect_convergence(results, window=3, tol=0.005):
            print(f"Converged after {i+1} iterations.")
            break

    return max(results, key=lambda r: r.val_auc)


if __name__ == "__main__":
    X, y = make_classification(n_samples=5000, n_features=20, n_informative=10, random_state=42)
    best = run_hp_agent(X, y, max_iterations=15)
    print(f"\nBest config: {best.config}")
    print(f"Best val_AUC: {best.val_auc}")

 

How to run:

python hp_agent.py

 

Real scenario

Census Income classification dataset (UCI, 48,842 rows). Default RandomForest AUC: 0.87. After 15 agent-guided iterations, the agent converges on max_depth=12, n_estimators=350, min_samples_split=8, max_features=0.4, achieving AUC 0.91. At iteration 7, the agent's reasoning log reads: "max_depth appears to be the dominant driver, increasing it from 8 to 12 gave +0.019 AUC, while n_estimators beyond 200 shows diminishing returns." That reasoning is traceable in the output, not hidden inside a black-box optimizer.

 

Workflow 4: Automated Model Monitoring and Drift Detection Agent

 

What it replaces: Manually checking feature distributions on a schedule, writing threshold rules per column, maintaining dashboard alerts that go stale, and discovering model degradation only after it shows up in business metrics.

What the agent does instead: Runs on a schedule against incoming batch data, computes drift statistics per feature using Population Stability Index (PSI) and the Kolmogorov-Smirnov (KS) test, classifies drift severity, and responds differently depending on severity: mild drift triggers an alert, severe drift triggers a retraining pipeline call.

 

// Architecture

A scheduled agent built around one tool, compute_drift_stats, which computes PSI and the KS test for each column and classifies the result by severity. A single language model call then decides how to respond: a passing check is simply logged, mild drift produces a drafted alert for the data science team, and severe drift produces an alert plus a trigger for a retraining directed acyclic graph (DAG), sent via Slack or the Airflow representational state transfer (REST) API. The critical design decision is the branching response itself; the agent handles the routing, not a hardcoded if/else ladder.

PSI interpretation: below 0.1 is stable, 0.1-0.25 is mild drift worth monitoring, and above 0.25 is significant drift that should trigger retraining. PSI is the standard metric for population shift in production machine learning systems and has been used in financial risk modeling for decades before LLMs existed.

 

// Code Pattern

# drift_agent.py
# Prerequisites: pip install openai pandas scipy numpy
# Run: python drift_agent.py

import json
import math
import numpy as np
import pandas as pd
from dataclasses import dataclass
from openai import OpenAI

client = OpenAI()

@dataclass
class FeatureDrift:
    feature:    str
    psi:        float
    ks_stat:    float
    ks_pvalue:  float
    severity:   str    # stable | mild_drift | severe_drift

def compute_psi(baseline: np.ndarray, current: np.ndarray, buckets: int = 10) -> float:
    """
    Population Stability Index between baseline and current distributions.
    PSI = sum((current_% - baseline_%) * ln(current_% / baseline_%))

    Values: <0.1 stable | 0.1-0.25 mild | >0.25 severe
    """
    min_val      = min(baseline.min(), current.min())
    max_val      = max(baseline.max(), current.max())
    bucket_width = (max_val - min_val) / buckets

    def bucket_freqs(data: np.ndarray) -> list[float]:
        counts = np.zeros(buckets)
        for v in data:
            idx = min(int((v - min_val) / bucket_width), buckets - 1)
            counts[idx] += 1
        freqs = counts / len(data)
        return [max(f, 1e-6) for f in freqs]   # Avoid log(0)

    b_freq = bucket_freqs(baseline)
    c_freq = bucket_freqs(current)
    return round(sum((c - b) * math.log(c / b) for b, c in zip(b_freq, c_freq)), 4)

def classify_drift(psi: float) -> str:
    if psi < 0.10: return "stable"
    if psi < 0.25: return "mild_drift"
    return "severe_drift"

def compute_drift_stats(
    baseline_df: pd.DataFrame,
    current_df: pd.DataFrame,
    numeric_cols: list[str],
) -> list[FeatureDrift]:
    """Compute PSI and KS test for each numeric feature."""
    from scipy.stats import ks_2samp
    results = []
    for col in numeric_cols:
        b = baseline_df[col].dropna().values
        c = current_df[col].dropna().values
        psi        = compute_psi(b, c)
        ks_stat, ks_pvalue = ks_2samp(b, c)
        results.append(FeatureDrift(
            feature=col,
            psi=psi,
            ks_stat=round(float(ks_stat), 4),
            ks_pvalue=round(float(ks_pvalue), 6),
            severity=classify_drift(psi),
        ))
    return results

def run_monitoring_agent(
    baseline_df: pd.DataFrame,
    current_df: pd.DataFrame,
    numeric_cols: list[str],
    model_name: str = "churn_model_v3",
) -> str:
    """
    Run the monitoring agent.
    It computes drift stats and decides how to respond based on severity.
    """
    drift_results = compute_drift_stats(baseline_df, current_df, numeric_cols)

    drift_summary = [
        {"feature": d.feature, "psi": d.psi, "ks_pvalue": d.ks_pvalue, "severity": d.severity}
        for d in drift_results
    ]

    severe_features = [d.feature for d in drift_results if d.severity == "severe_drift"]
    mild_features   = [d.feature for d in drift_results if d.severity == "mild_drift"]

    prompt = f"""You are a model monitoring agent for {model_name}.

Drift analysis results:
{json.dumps(drift_summary, indent=2)}

Severe drift (PSI > 0.25): {severe_features}
Mild drift (PSI 0.10-0.25): {mild_features}

Based on severity, determine the appropriate response:
- STABLE: log a pass, no action needed
- MILD DRIFT: draft an alert message for the data science team
- SEVERE DRIFT: draft an alert message AND a trigger for the retraining pipeline

Write your response in this format:
SEVERITY_LEVEL: 
ACTION: 
MESSAGE: """

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.1,  # Very low -- this is a decision-making call, not creative
    )
    return response.choices[0].message.content


if __name__ == "__main__":
    np.random.seed(42)
    n = 2000

    # Baseline: normal e-commerce browsing patterns
    baseline = pd.DataFrame({
        "session_duration_s":    np.random.normal(180, 60, n),
        "pages_per_session":     np.random.normal(4.2, 1.5, n),
        "cart_add_rate":         np.clip(np.random.normal(0.12, 0.04, n), 0, 1),
    })

    # Current: promotional event shifts all features significantly
    current = pd.DataFrame({
        "session_duration_s":    np.random.normal(310, 90, n),   # sessions much longer
        "pages_per_session":     np.random.normal(6.8, 2.1, n),  # viewing more pages
        "cart_add_rate":         np.clip(np.random.normal(0.31, 0.08, n), 0, 1),  # much higher
    })

    result = run_monitoring_agent(baseline, current, list(baseline.columns), model_name="recommendation_engine_v2")
    print(result)

 

How to run:

python drift_agent.py

 

Real scenario
E-commerce recommendation model. A promotional event causes a sudden distribution shift in browsing behavior, session duration jumps from 180s to 310s mean, and cart add rate nearly triples. The monitoring agent runs at midnight against the day's data. It detects PSI > 0.25 on all three features, classifies severity as severe, and triggers the retraining pipeline with an alert to Slack. The data science team wakes up to a message explaining what shifted and what was done about it, not a raw dashboard they have to interpret at 6 a.m.

 

Workflow 5: Agentic Pipeline Orchestration and Self-Healing

 

What it replaces: Staring at an Airflow failure notification, opening the logs, manually reading the traceback, figuring out whether the fix requires a code change, a config change, or a retry, making the fix, rerunning the task, and hoping the next task downstream does not fail for the same reason.

What the agent does instead: Reads the failure log, classifies the error type, determines whether it is auto-fixable, applies the fix if it is, and either retriggers the task or escalates to a human with a fully structured incident report if it is not.

 

// Architecture

A meta-agent that wraps your existing orchestration layer. When an Airflow task fails, the orchestrator sends the task ID, error log, and task definition to the agent. The agent uses one tool, parse_pipeline_error, to classify the failure deterministically. From there, a single language model call decides whether the error is auto-fixable and drafts either a fix description or a structured incident report for human review, depending on that classification.

 

// Code Pattern

# pipeline_healer.py
# Prerequisites: pip install openai pandas
# Run: python pipeline_healer.py

import json
import re
from dataclasses import dataclass
from typing import Optional
from openai import OpenAI

client = OpenAI()

@dataclass
class PipelineError:
    task_id:      str
    error_type:   str     # schema_mismatch | null_violation | timeout | unknown
    column:       Optional[str]
    detail:       str
    auto_fixable: bool

def parse_pipeline_error(log_line: str, task_id: str) -> PipelineError:
    """
    Classify a task failure log into a structured error type.
    Auto-fixable errors can be repaired without human intervention.
    """
    if "KeyError" in log_line or ("column" in log_line.lower() and "not found" in log_line.lower()):
        col_match = re.search(r"['\"](\w+)['\"]", log_line)
        col = col_match.group(1) if col_match else None
        return PipelineError(task_id, "schema_mismatch", col, log_line.strip(), auto_fixable=True)

    if "IntegrityError" in log_line or ("null" in log_line.lower() and "violate" in log_line.lower()):
        return PipelineError(task_id, "null_violation", None, log_line.strip(), auto_fixable=True)

    if "TimeoutError" in log_line or "timed out" in log_line.lower():
        return PipelineError(task_id, "timeout", None, log_line.strip(), auto_fixable=False)

    return PipelineError(task_id, "unknown", None, log_line.strip(), auto_fixable=False)

def run_self_healing_agent(
    task_id: str,
    error_log: str,
    task_definition: str,
) -> str:
    """
    Run the self-healing agent on a failed pipeline task.
    It classifies the error, decides on a remediation, and produces
    either an auto-fix description or a structured escalation report.
    """
    error = parse_pipeline_error(error_log, task_id)

    prompt = f"""You are a data pipeline reliability engineer.
A pipeline task has failed and you must decide how to respond.

Task: {task_id}
Task definition: {task_definition}
Error type: {error.error_type}
Column affected: {error.column or 'N/A'}
Auto-fixable: {error.auto_fixable}
Full error: {error.detail}

{"You can apply an automatic fix for this error type." if error.auto_fixable else "This error requires human review -- you cannot auto-fix it."}

Respond with:
ACTION: 
FIX_DESCRIPTION: 
ESCALATION_REPORT: 
NEXT_STEP: """

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.1,
    )
    return response.choices[0].message.content


if __name__ == "__main__":
    # Scenario: CRM export added a new column and changed a date format
    result = run_self_healing_agent(
        task_id="ingest_crm_daily",
        error_log="KeyError: 'transaction_date' column not found in source dataframe. "
                  "Available columns: ['txn_date_utc', 'customer_id', 'amount_usd', 'product_sku']",
        task_definition="Reads daily CRM export, extracts transaction_date and customer_id, "
                        "joins with product catalog, writes to feature store.",
    )
    print(result)

 

How to run:

python pipeline_healer.py

 

Real scenario
A daily feature pipeline fails at 2 am because an upstream CRM system updated its export schema, renamed transaction_date to txn_date_utc and added three new columns. The agent reads the error log, identifies the schema mismatch on transaction_date, and produces an auto-fix: rename the column in the ingestion step and add the three new columns to the schema definition as nullable. It logs the fix, retriggers the failed task, and sends the on-call engineer a summary that reads "Schema fix applied automatically. Source renamed transaction_date → txn_date_utc. Three new nullable columns were added to the schema. Task retriggered at 02:14." The engineer reviews the change in the morning instead of being woken up.

 

Wrapping Up

 

The five workflows are not independent tools. They are a pipeline:

The EDA agent understands the data. The feature engineering agent improves it. The hyperparameter agent optimizes the model built on those features. The monitoring agent watches the model in production. The self-healing agent protects the pipeline, delivering data to all of them.

Deploy them in this order. Start with monitoring; it delivers value immediately on any existing pipeline without requiring changes to your modeling code. Add the EDA agent next for any new dataset you bring in. The feature engineering and hyperparameter agents come after you have established a baseline model worth improving.

 
A horizontal pipeline diagram showing the 5 workflows in order
 

None of these workflows operates without human review of the decisions that matter. The EDA agent flags issues; you decide what to do about them. The feature agent proposes candidates; you decide the importance threshold. The hyperparameter agent searches; you decide the parameter bounds and convergence criteria. The monitoring agent detects drift; you decide the severity thresholds that trigger retraining. The self-healing agent applies fixes; you review them before they merge into production.

That division is the point. Agents handle the procedural weight. You retain the evaluative weight. The result is a pipeline that is faster, more consistent, and easier to maintain, because the parts that break are now detected and often repaired before you have to look at them.
 
 

Shittu Olumide is a software engineer and technical writer passionate about leveraging cutting-edge technologies to craft compelling narratives, with a keen eye for detail and a knack for simplifying complex concepts. You can also find Shittu on Twitter.


Get the FREE ebook 'KDnuggets Artificial Intelligence Pocket Dictionary' along with the leading newsletter on Data Science, Machine Learning, AI & Analytics straight to your inbox.

By subscribing you accept KDnuggets Privacy Policy


Get the FREE ebook 'KDnuggets Artificial Intelligence Pocket Dictionary' along with the leading newsletter on Data Science, Machine Learning, AI & Analytics straight to your inbox.

By subscribing you accept KDnuggets Privacy Policy

Get the FREE ebook 'KDnuggets Artificial Intelligence Pocket Dictionary' along with the leading newsletter on Data Science, Machine Learning, AI & Analytics straight to your inbox.

By subscribing you accept KDnuggets Privacy Policy

No, thanks!