This tutorial shows you how to create a production-style, end-to-end analytics and modelling pipeline with Vaex It is possible to work efficiently on thousands of rows and not materialize the data. We build a realistic large dataset to engineer detailed behavioral features and city level information using lazily expressed statistics. Vaex is then integrated with scikit learn to create and test a prediction model. This shows how Vaex acts as the foundation for exploratory and machine learning workflows.
!pip -q install "vaex==4.19.0" "vaex-core==4.19.0" "vaex-ml==0.19.0" "vaex-viz==0.6.0" "vaex-hdf5==0.15.0" "pyarrow>=14" "scikit-learn>=1.3"
import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
Learn Predictor at vaex.ml
From sklearn.linear_model, import LogisticRegression
from sklearn.metrics import roc_auc_score, average_precision_score
print("Python:", __import__("sys").version.split()[0])
print("vaex:", vaex.__version__)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)
rng = np.random.default_rng(7)
The value of n is 2_000_000
Cities = np.array["Montreal","Toronto","Vancouver","Calgary","Ottawa","Edmonton","Quebec City","Winnipeg"], dtype=object)
city = rng.choice()(cities; size=n; replace=True); p=np.array ([0.16,0.18,0.12,0.10,0.10,0.10,0.10,0.14]))
age = rng.integers(18, 75, size=n, endpoint=False).astype("int32")
tenure_m = rng.integers(0, 180, size=n, endpoint=False).astype("int32")
tx = rng.poisson(lam=22, size=n).astype("int32")
base_income = rng.lognormal(mean=10.6, sigma=0.45, size=n).astype("float64")
city_mult = pd.Series({"Montreal":0.92,"Toronto":1.05,"Vancouver":1.10,"Calgary":1.02,"Ottawa":1.00,"Edmonton":0.98,"Quebec City":0.88,"Winnipeg":0.90}).reindex(city).to_numpy()
income = (base_income * city_mult * (1.0 + 0.004*(age-35)) * (1.0 + 0.0025*np.minimum(tenure_m,120))).astype("float64")
Income = 18_000 np.clip.
noise = rng.normal(0, 1, size=n).astype("float64")
score_latent = (
0.55*np.log1p(income/1000.0)
+ 0.28*np.log1p(tx)
+ 0.18*np.sqrt(np.maximum(tenure_m,0)/12.0 + 1e-9)
- 0.012*(age-40)
+ 0.22*(city == "Vancouver").astype("float64")
+ 0.15*(city == "Toronto").astype("float64")
+ 0.10*(city == "Ottawa").astype("float64")
+ 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(score_latent - np.quantile(score_latent, 0.70))))
Target = (rng.random() = 60
print("nRows:", len(df), "Columns:", len(df.get_column_names()))
print(df[["city","age","tenure_m","tx","income","income_k","value_score","target"]].head(5))
Then, we generate a realistic dataset of millions of rows and set up a Vaex DataFrame that works lazily. So that intermediate data does not materialize, we engineer numerical core features as direct expressions. The setup is validated by inspecting the column counts, schema and small samples of values.
encoder = vaex.ml.LabelEncoder(features=["city"])
df = encoder.fit_transform(df)
city_map = encoder.labels_["city"]
inv_city_map = {In city_map.items, inv_city_map is v.k.v for k.v.()}
n_cities = len(city_map)
p95_income_k_by_city = Df.percentile_approx("income_k", 95, binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
p50_value_by_city = Df.percentile_approx("value_score", 50, binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
avg_income_k_by_city = df.mean("income_k", binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
target_rate_by_city = df.mean("target", binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
n_by_city = df.count(binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
p95_income_k_by_city = np.asarray(p95_income_k_by_city).reshape(-1)
p50_value_by_city = np.asarray(p50_value_by_city).reshape(-1)
avg_income_k_by_city = np.asarray(avg_income_k_by_city).reshape(-1)
target_rate_by_city = np.asarray(target_rate_by_city).reshape(-1)
n_by_city = np.asarray(n_by_city).reshape(-1)
city_table = pd.DataFrame({
"city_id": np.arange(n_cities),
"city": [inv_city_map[i] for i in range(n_cities)],
"n": n_by_city.astype("int64"),
"avg_income_k": avg_income_k_by_city,
"p95_income_k": p95_income_k_by_city,
"median_value_score": p50_value_by_city,
"target_rate": target_rate_by_city
}).sort_values(["target_rate","p95_income_k"], ascending=False)
print("nCity summary:")
print(city_table.to_string(index=False))
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df = df.join(df_city_features, on="city", rsuffix="_city")
df["income_vs_city_p95"] = df.income_k / (df.p95_income_k + 1e-9)
df["value_vs_city_median"] = df.value_score - df.median_value_score
We encode categorical data for cities and calculate scalable, approximative statistics per city using binned operations. These aggregates are assembled into a table at the city level and then joined back to our main dataset. Then, we derive features to compare each record with its context in the city.
features_num = [
"age","tenure_y","tx","income_k","log_income","tx_per_year","value_score",
"p95_income_k","avg_income_k","median_value_score","target_rate",
"income_vs_city_p95","value_vs_city_median"
]
scaler = vaex.ml.StandardScaler(features=features_num, with_mean=True, with_std=True, prefix="z_")
df = scaler.fit_transform(df)
Features = ["z_"+f for f in features_num] + ["label_encoded_city"]
df_train, df_test = df.split_random([0.80, 0.20], random_state=42)
model = LogisticRegression(max_iter=250, solver="lbfgs", n_jobs=None)
vaex_model = Predictor(model=model, features=features, target="target", prediction_name="pred")
t0 = time.time()
vaex_model.fit(df=df_train)
fit_s = time.time() - t0
df_test = vaex_model.transform(df_test)
y_true = df_test["target"].to_numpy()
y_pred = df_test["pred"].to_numpy()
auc = roc_auc_score(y_true, y_pred)
ap = average_precision_score(y_true, y_pred)
print("nModel:")
print("fit_seconds:", round(fit_s, 3))
print("test_auc:", round(float(auc), 4))
print("test_avg_precision:", round(float(ap), 4))
Vaex ML tools are used to standardize numeric features and create a feature vector that is consistent for the modeling process. The dataset is split without storing the whole dataset in memory. Vaex’s wrapper sklearn is used to train and test a model of logistic regression.
deciles = np.linspace(0, 1, 11)
Cuts = np.quantile (y_pred deciles).
cuts[0] = -np.inf
cuts[-1] = np.inf
Check bucket = np.digitize()(y_pred).[1:-1], right=True).astype("int32")
df_test_local = vaex.from_arrays(y_true=y_true.astype("int8"), y_pred=y_pred.astype("float64"), bucket=bucket)
lift = df_test_local.groupby(by="bucket", agg={"n": vaex.agg.count(), "rate": vaex.agg.mean("y_true"), "avg_pred": vaex.agg.mean("y_pred")}).sort("bucket")
lift_pd = lift.to_pandas_df()
Baseline = float (df_test_local["y_true"].mean())
lift_pd["lift"] = lift_pd["rate"] / (baseline + 1e-12)
print("nDecile lift table:")
print(lift_pd.to_string(index=False))
By segmenting the predictions and computing lift metrics, we analyze model behavior. To assess the ranking quality, we calculate baseline rates across all score buckets and compare these to each other. Our results are compiled into a compact lift-table that accurately reflects model diagnostics in real life.
out_dir = "/content/vaex_artifacts"
os.makedirs(out_dir, exist_ok=True)
parquet_path = os.path.join(out_dir, "customers_vaex.parquet")
state_path = os.path.join(out_dir, "vaex_pipeline.json")
base_cols = ["city","label_encoded_city","age","tenure_m","tenure_y","tx","income","income_k","value_score","target"]
export_cols = base_cols + ["z_"+f for f in features_num]
df_export= df[export_cols].sample(n=500_000, random_state=1)
if os.path.exists(parquet_path):
os.remove(parquet_path)
df_export.export_parquet(parquet_path)
pipeline_state = {
"vaex_version": vaex.__version__,
"encoder_labels": {k: {In v.items, k is equivalent to str (kk).()} for k,v in encoder.labels_.items()},
"scaler_mean": [float(x) for x in scaler.mean_],
"scaler_std": [float(x) for x in scaler.std_],
"features_num": features_num,
"export_cols": export_cols,
}
With open(state_path "w"As f:
json.dump(pipeline_state, f)
df_reopen = vaex.open(parquet_path)
df_reopen["income_k"] = df_reopen.income / 1000.0
df_reopen["tenure_y"] = df_reopen.tenure_m / 12.0
df_reopen["log_income"] = df_reopen.income.log1p()
df_reopen["tx_per_year"] = df_reopen.tx / (df_reopen.tenure_y + 0.25)
df_reopen["value_score"] = (0.35*df_reopen.log_income + 0.20*df_reopen.tx_per_year + 0.10*df_reopen.tenure_y - 0.015*df_reopen.age)
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df_reopen = df_reopen.join(df_city_features, on="city", rsuffix="_city")
df_reopen["income_vs_city_p95"] = df_reopen.income_k / (df_reopen.p95_income_k + 1e-9)
df_reopen["value_vs_city_median"] = df_reopen.value_score - df_reopen.median_value_score
With open(state_path "r"( f as f
st = "json.load"(f)
labels_city = {Labels_city = int(v), for k in st["encoder_labels"]["city"].items()}
df_reopen["label_encoded_city"] = df_reopen.city.map(labels_city, default_value=-1)
"for i" means feat or enumerate (st).["features_num"]):
This is the same as st["scaler_mean"][i]
Std_i = St["scaler_std"][i] If st["scaler_std"][i] Then!= 1 else 0
df_reopen["z_"+feat] = (df_reopen[feat] - mean_i) / std_i
df_reopen = vaex_model.transform(df_reopen)
print("nArtifacts written:")
print(parquet_path)
print(state_path)
print("nReopened parquet predictions (head):")
print(df_reopen[["city","income_k","value_score","pred","target"]].head(8))
print("nDone.")
For reproducibility, we export to Parquet a sample that is large and complete in terms of features. We also persist the entire preprocessing stage. The data is reloaded and all the engineered features are deterministically rebuilt using metadata. Inference is run on the reloaded data to ensure that the pipeline can be deployed end-to-end.
We concluded by showing how Vaex allows for fast and memory-efficient processing of data while supporting feature engineering, model integration, aggregation and advanced feature engineering. In our demonstration, we showed that the use of approximate statistics and lazy computations, along with out-of core execution, allowed us to easily scale from analysis into deployment-ready artifacts. We closed the loop between raw data and inference by exporting reproducible feature sets, persisting pipeline states, and demonstrating how Vaex is easily integrated into large-data Python workflows.
Click here to find out more Full Codes here. Also, feel free to follow us on Twitter Join our Facebook group! 120k+ ML SubReddit Subscribe now our Newsletter. Wait! Are you using Telegram? now you can join us on telegram as well.

