Analytics Engineering with Data Build Tool Dbt
Analytics Engineering with Data Build Tool Dbt is where machine learning grows up. A model that only runs on a data scientist's laptop isn't a product; MLOps turns experimental code into reliable, observable, updatable systems that serve users in production.
Why Analytics Engineering Data Matters
The hardest bugs in ML systems are almost never in the model — they live in data pipelines, feature stores, deployments and monitors. Strong MLOps practice is what keeps real-world systems honest.
- Version data, features, code and models together.
- Automate the path from commit to production with CI/CD.
- Monitor data drift as aggressively as you monitor model accuracy.
- Design for rollback — every deployment is a hypothesis.
How Analytics Engineering Data Shows Up in Practice
In a typical project, analytics engineering with data build tool dbt is combined with the rest of the MLOps & Deployment toolkit. You rarely use any one technique in isolation; the real skill is knowing which combination fits the problem you are trying to solve, and being able to explain that choice to a non-technical stakeholder.
Non-negotiable once a model leaves the notebook and influences real users, money or safety-critical processes.
- Privacy-preserving Machine Learning and Federated Learning
- Design Implementation ETL ELT Data Pipelines
- A Formal Treatment of Lambda and
- Workflow Orchestration and Management with Apache
Back to the Data Science curriculum →
Code Examples: Analytics Engineering Data Build Tool Dbt (5 runnable snippets)
Copy any block into a file or notebook and run it end-to-end — each example stands alone.
Example 1: FastAPI model-serving microservice
# Example 1: FastAPI model-serving microservice -- Analytics Engineering Data Build Tool Dbt
from fastapi import FastAPI
from pydantic import BaseModel
import joblib, numpy as np
model = joblib.load("model.joblib") # sklearn Pipeline persisted at build time
app = FastAPI(title="Credit risk API", version="1.0")
class LoanRequest(BaseModel):
income: float
debt: float
age: int
history_score: float
@app.post("/predict")
def predict(req: LoanRequest) -> dict:
x = np.array([[req.income, req.debt, req.age, req.history_score]])
proba = float(model.predict_proba(x)[0, 1])
return {"default_probability": round(proba, 4),
"decision": "decline" if proba > 0.3 else "approve"}
# run with: uvicorn service:app --host 0.0.0.0 --port 8080
Example 2: MLflow experiment tracking across a sweep
# Example 2: MLflow experiment tracking across a sweep -- Analytics Engineering Data Build Tool Dbt
import mlflow
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
X, y = load_iris(return_X_y=True)
mlflow.set_experiment("iris-rf-sweep")
for n in [50, 100, 200]:
for depth in [None, 4, 8]:
with mlflow.start_run(run_name=f"rf-{n}-{depth}"):
mlflow.log_params({"n_estimators": n, "max_depth": depth})
clf = RandomForestClassifier(n_estimators=n, max_depth=depth,
random_state=0)
score = cross_val_score(clf, X, y, cv=5).mean()
mlflow.log_metric("cv_accuracy", score)
clf.fit(X, y)
mlflow.sklearn.log_model(clf, artifact_path="model")
Example 3: Population stability and drift monitor
# Example 3: Population stability and drift monitor -- Analytics Engineering Data Build Tool Dbt
import numpy as np
from scipy.stats import ks_2samp
def psi(expected, actual, bins: int = 10) -> float:
breaks = np.quantile(expected, np.linspace(0, 1, bins + 1))
e, _ = np.histogram(expected, breaks)
a, _ = np.histogram(actual, breaks)
e, a = e / e.sum(), a / a.sum()
mask = (e > 0) & (a > 0)
return float(((a[mask] - e[mask]) * np.log(a[mask] / e[mask])).sum())
rng = np.random.default_rng(0)
baseline = rng.normal(0.0, 1.0, 5_000)
production = rng.normal(0.3, 1.1, 5_000)
print(f"PSI = {psi(baseline, production):.3f} (>0.25 = significant drift)")
ks = ks_2samp(baseline, production)
print(f"KS statistic = {ks.statistic:.3f}")
print(f"KS p-value = {ks.pvalue:.4f}")
Example 4: Airflow DAG for a batch feature pipeline
# Example 4: Airflow DAG for a batch feature pipeline -- Analytics Engineering Data Build Tool Dbt
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def extract(**ctx): ...
def transform(**ctx): ...
def load(**ctx): ...
default_args = {
"owner": "data-platform",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
}
with DAG(
dag_id="daily_customer_features",
start_date=datetime(2026, 1, 1),
schedule_interval="0 3 * * *",
catchup=False,
default_args=default_args,
tags=["features", "batch"],
) as dag:
t1 = PythonOperator(task_id="extract", python_callable=extract)
t2 = PythonOperator(task_id="transform", python_callable=transform)
t3 = PythonOperator(task_id="load", python_callable=load)
t1 >> t2 >> t3
Example 5: Shadow-deployment traffic splitter
# Example 5: Shadow-deployment traffic splitter -- Analytics Engineering Data Build Tool Dbt
import random, time, statistics
def call_champion(x): time.sleep(0.002); return x * 1.03
def call_challenger(x):time.sleep(0.003); return x * 1.05
def route(x, shadow_pct: float = 0.10):
prod = call_champion(x)
if random.random() < shadow_pct:
t0 = time.perf_counter()
shadow = call_challenger(x)
shadow_latency = (time.perf_counter() - t0) * 1_000
diffs.append(shadow - prod)
latencies.append(shadow_latency)
return prod
diffs, latencies = [], []
for _ in range(1_000):
route(random.uniform(10, 100))
print(f"shadow calls : {len(diffs)}")
print(f"mean output delta : {statistics.mean(diffs):+.3f}")
print(f"p95 shadow latency : {sorted(latencies)[int(len(latencies)*0.95)]:.2f} ms")