We will implement an advanced data pipeline using Dagster. We created a CSV-based IOManager that allows us to store assets, create daily data partitions, clean synthetic data, train models, add features, and manage the data. In the process, we implement a data-quality check, which validates nulls, categories, and ranges. Metadata and outputs must also be stored in a structured manner. Focus is placed on practical implementation and integrating raw data ingestion with transformations, quality checking, and machine-learning into one reproducible workflow.
Import subprocess, import sys and json
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])
import numpy as np, pandas as pd
From pathlib import Path
Dagster Import (
asset, AssetCheckResult, asset_check, Definitions, materialize, Output,
DailyPartitionsDefinition, IOManager, io_manager
)
From sklearn.linear_model, import LinearRegression
BASE = Path"/content/dagstore"); BASE.mkdir(parents=True, exist_ok=True)
START "2025-08-01"
Installing Dagster Pandas scikit-learn is the first step to having the complete toolset in Colab. Importing essential modules is followed by setting up NumPy, Pandas, and a starting date for our outputs.
class CSVIOManager(IOManager):
def __init__(self, base: Path): self.base = base
def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"
def handle_output(self, context, obj):
if isinstance() (obj., pd.DataFrame).
p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)
context.log.info(f"Saved {context.asset_key} -> {p}")
else:
p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))
context.log.info(f"Saved {context.asset_key} -> {p}")
def load_input(self, context):
k = context.upstream_output.asset_key; p = self._path(k, "csv")
df = pd.read_csv(p); context.log.info(f"Loaded {k}
It is possible to create a CSVIOManager custom that will save outputs in CSV, JSON or other formats and then load them back up when necessary. Registering it as csv_io_manager with Dagster and setting up daily partitioning schemes allows our pipeline to handle data from each date separately.
@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")
def raw_sales(context) -> Output[pd.DataFrame]:
rng = np.random.default_rng(42)
n = 200; day = context.partition_key
The xDataFrame () = dfMeta = "" = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
sales = 2.5 * x + 30 * promo + noise + 50
x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})
meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}
Return Output(df, metadata=meta);
@asset(description="Clean nulls, clip outliers for robust downstream modeling.")
def clean_sales(context, raw_sales: pd.DataFrame) -> Output[pd.DataFrame]:
df = raw_sales.dropna(subset=["units"]).copy()
lo, hi = df["units"].quantile([0.01, 0.99]( df["units"] = dfMeta = ""["units"].clip(lo, hi)
meta = {"rows": len(df), "units_min": float(df.units.min()), "units_max": float(df.units.max())}
return Output(df, metadata=meta).
@asset(description="Feature engineering: interactions & standardized columns.")
def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:
Df = clean_sales.copy()
df["units_sq"] df =["units"] ** 2; df["units_promo"] = df["units"] * df["promo"]
c? ["units", "units_sq", "units_promo"]:
What is the difference between mu and sigma?[c].mean()The df[c]If.std.(ddof=0), or 1.0
Df[f"z_{c}"] = (df[c] (-mu)/sigmaReturn Output(df) (metadata=
return Output(df, metadata={"rows": len(df), "cols": list(df.columns)})
Three core assets are created for pipeline. Raw_sales creates daily synthetic sales data that simulates real world imperfections. Clean_sales then removes outliers, clips nulls, and stabilizes the dataset while recording metadata. Finally, features adds interaction variables and standardized variables in order to prep the data before downstream modeling.
@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")
def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:
Nulls = (clean_sales.isna)().sum().sum())
promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))
units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())
passed = bool ((null == 0), and promo_ok, and units_ok).
AssetCheckResult
passed=passed,
metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},
)
@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")
def tiny_model_metrics(context, features: pd.DataFrame) -> dict:
Features = XReturn[["z_units", "z_units_sq", "z_units_promo", "promo"]].values
y = features["sales"].values
model = LinearRegression().fit(X, y)
return {"r2_train": float(model.score(X, y)),
**{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}}
Validation and modelling strengthen the pipeline. Clean_sales_quality enforces data quality by ensuring that the Promo field is only populated with 0/1 and the units are within acceptable bounds. Tiny_model_metrics performs an easy linear regression on engineered features. It then outputs important metrics, such as the training coefficient and the learned coefficient, completing the modeling process within Dagster.
Defs = Definitions
assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
resources={"io_manager": csv_io_manager}
)
If __name__ is equal to "__main__":
run_day = os.environ.get("RUN_DATE"()
print("Materializing everything for:", run_day)
Look for the result (
[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
partition_key=run_day,
resources={"io_manager": csv_io_manager},
)
print("Run success:", result.success)
For fname, ["raw_sales.csv","clean_sales.csv","features.csv","tiny_model_metrics.json"]:
F = BASE/fname
If F. Exists():
print(fname, "->", f.stat().st_size, "bytes")
If the fname.endswith()".json"):
print("Metrics:", json.loads(f.read_text()))
Registering our assets in Definitions and the IO Manager, we then materialize all the DAGs for a partition key selected in a single run. CSV/JSON files are stored in /content/dagstore. We print out a quick flag and the saved file size and model metrics to verify immediately.
We materialize assets and perform checks all in one Dagster run. Then, we confirm the data quality and build a regression engine whose metrics can be inspected. The pipeline is modular. Each asset produces and persists its outputs as CSV or JSON. We ensure compatibility through the explicit conversion of metadata values into supported types. This tutorial shows how to combine asset definitions and checks with partitioning to create a reproducible and technically robust workflow. It also provides a framework for building more complex pipelines.
Click here to find out more FULL CODES here. Please feel free to browse our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter Don’t forget about our 100k+ ML SubReddit Subscribe Now our Newsletter.

