Close Menu
  • AI
  • Content Creation
  • Tech
  • Robotics
AI-trends.todayAI-trends.today
  • AI
  • Content Creation
  • Tech
  • Robotics
Trending
  • Deepgram Python SDK Implementation for Transcription and Async Processing of Audio, Async Text Intelligence, and Async Text Intelligence.
  • DeepSeek AI releases DeepSeek V4: Sparse attention and heavily compressed attention enable one-million-token contexts.
  • AI-Designed drugs by a DeepMind spinoff are headed to human trials
  • Apple’s new CEO must launch an AI killer product
  • OpenMythos Coding Tutorial: Recurrent-Depth Transformers, Depth Extrapolation and Mixture of Experts Routing
  • 5 Reasons to Think Twice Before Using ChatGPT—or Any Chatbot—for Financial Advice
  • OpenAI Releases GPT-5.5, a Absolutely Retrained Agentic Mannequin That Scores 82.7% on Terminal-Bench 2.0 and 84.9% on GDPval
  • Your Favorite AI Gay Thirst Traps: The Men Behind them
AI-trends.todayAI-trends.today
Home»Tech»Vaex: A Guide for Building a Scalable Machine Learning and Analytics Pipeline that can Process Millions of Records

Vaex: A Guide for Building a Scalable Machine Learning and Analytics Pipeline that can Process Millions of Records

Tech By Gavin Wallace03/03/20266 Mins Read
Facebook Twitter LinkedIn Email
LifelongAgentBench: A Benchmark for Evaluating Continuous Learning in LLM-Based Agents
LifelongAgentBench: A Benchmark for Evaluating Continuous Learning in LLM-Based Agents
Share
Facebook Twitter LinkedIn Email

This tutorial shows you how to create a production-style, end-to-end analytics and modelling pipeline with Vaex It is possible to work efficiently on thousands of rows and not materialize the data. We build a realistic large dataset to engineer detailed behavioral features and city level information using lazily expressed statistics. Vaex is then integrated with scikit learn to create and test a prediction model. This shows how Vaex acts as the foundation for exploratory and machine learning workflows.

!pip -q install "vaex==4.19.0" "vaex-core==4.19.0" "vaex-ml==0.19.0" "vaex-viz==0.6.0" "vaex-hdf5==0.15.0" "pyarrow>=14" "scikit-learn>=1.3"


import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
Learn Predictor at vaex.ml
From sklearn.linear_model, import LogisticRegression
from sklearn.metrics import roc_auc_score, average_precision_score


print("Python:", __import__("sys").version.split()[0])
print("vaex:", vaex.__version__)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)


rng = np.random.default_rng(7)


The value of n is 2_000_000
Cities = np.array["Montreal","Toronto","Vancouver","Calgary","Ottawa","Edmonton","Quebec City","Winnipeg"], dtype=object)
city = rng.choice()(cities; size=n; replace=True); p=np.array ([0.16,0.18,0.12,0.10,0.10,0.10,0.10,0.14]))
age = rng.integers(18, 75, size=n, endpoint=False).astype("int32")
tenure_m = rng.integers(0, 180, size=n, endpoint=False).astype("int32")
tx = rng.poisson(lam=22, size=n).astype("int32")
base_income = rng.lognormal(mean=10.6, sigma=0.45, size=n).astype("float64")
city_mult = pd.Series({"Montreal":0.92,"Toronto":1.05,"Vancouver":1.10,"Calgary":1.02,"Ottawa":1.00,"Edmonton":0.98,"Quebec City":0.88,"Winnipeg":0.90}).reindex(city).to_numpy()
income = (base_income * city_mult * (1.0 + 0.004*(age-35)) * (1.0 + 0.0025*np.minimum(tenure_m,120))).astype("float64")
Income = 18_000 np.clip.


noise = rng.normal(0, 1, size=n).astype("float64")
score_latent = (
   0.55*np.log1p(income/1000.0)
   + 0.28*np.log1p(tx)
   + 0.18*np.sqrt(np.maximum(tenure_m,0)/12.0 + 1e-9)
   - 0.012*(age-40)
   + 0.22*(city == "Vancouver").astype("float64")
   + 0.15*(city == "Toronto").astype("float64")
   + 0.10*(city == "Ottawa").astype("float64")
   + 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(score_latent - np.quantile(score_latent, 0.70))))
Target = (rng.random() = 60


print("nRows:", len(df), "Columns:", len(df.get_column_names()))
print(df[["city","age","tenure_m","tx","income","income_k","value_score","target"]].head(5))

Then, we generate a realistic dataset of millions of rows and set up a Vaex DataFrame that works lazily. So that intermediate data does not materialize, we engineer numerical core features as direct expressions. The setup is validated by inspecting the column counts, schema and small samples of values.

encoder = vaex.ml.LabelEncoder(features=["city"])
df = encoder.fit_transform(df)
city_map = encoder.labels_["city"]
inv_city_map = {In city_map.items, inv_city_map is v.k.v for k.v.()}
n_cities = len(city_map)


p95_income_k_by_city = Df.percentile_approx("income_k", 95, binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
p50_value_by_city = Df.percentile_approx("value_score", 50, binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
avg_income_k_by_city = df.mean("income_k", binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
target_rate_by_city = df.mean("target", binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
n_by_city = df.count(binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])


p95_income_k_by_city = np.asarray(p95_income_k_by_city).reshape(-1)
p50_value_by_city = np.asarray(p50_value_by_city).reshape(-1)
avg_income_k_by_city = np.asarray(avg_income_k_by_city).reshape(-1)
target_rate_by_city = np.asarray(target_rate_by_city).reshape(-1)
n_by_city = np.asarray(n_by_city).reshape(-1)


city_table = pd.DataFrame({
   "city_id": np.arange(n_cities),
   "city": [inv_city_map[i] for i in range(n_cities)],
   "n": n_by_city.astype("int64"),
   "avg_income_k": avg_income_k_by_city,
   "p95_income_k": p95_income_k_by_city,
   "median_value_score": p50_value_by_city,
   "target_rate": target_rate_by_city
}).sort_values(["target_rate","p95_income_k"], ascending=False)


print("nCity summary:")
print(city_table.to_string(index=False))


df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df = df.join(df_city_features, on="city", rsuffix="_city")


df["income_vs_city_p95"] = df.income_k / (df.p95_income_k + 1e-9)
df["value_vs_city_median"] = df.value_score - df.median_value_score

We encode categorical data for cities and calculate scalable, approximative statistics per city using binned operations. These aggregates are assembled into a table at the city level and then joined back to our main dataset. Then, we derive features to compare each record with its context in the city.

features_num = [
   "age","tenure_y","tx","income_k","log_income","tx_per_year","value_score",
   "p95_income_k","avg_income_k","median_value_score","target_rate",
   "income_vs_city_p95","value_vs_city_median"
]


scaler = vaex.ml.StandardScaler(features=features_num, with_mean=True, with_std=True, prefix="z_")
df = scaler.fit_transform(df)


Features = ["z_"+f for f in features_num] + ["label_encoded_city"]


df_train, df_test = df.split_random([0.80, 0.20], random_state=42)


model = LogisticRegression(max_iter=250, solver="lbfgs", n_jobs=None)
vaex_model = Predictor(model=model, features=features, target="target", prediction_name="pred")


t0 = time.time()
vaex_model.fit(df=df_train)
fit_s = time.time() - t0


df_test = vaex_model.transform(df_test)


y_true = df_test["target"].to_numpy()
y_pred = df_test["pred"].to_numpy()


auc = roc_auc_score(y_true, y_pred)
ap = average_precision_score(y_true, y_pred)


print("nModel:")
print("fit_seconds:", round(fit_s, 3))
print("test_auc:", round(float(auc), 4))
print("test_avg_precision:", round(float(ap), 4))

Vaex ML tools are used to standardize numeric features and create a feature vector that is consistent for the modeling process. The dataset is split without storing the whole dataset in memory. Vaex’s wrapper sklearn is used to train and test a model of logistic regression.

deciles = np.linspace(0, 1, 11)
Cuts = np.quantile (y_pred deciles).
cuts[0] = -np.inf
cuts[-1] = np.inf
Check bucket = np.digitize()(y_pred).[1:-1], right=True).astype("int32")
df_test_local = vaex.from_arrays(y_true=y_true.astype("int8"), y_pred=y_pred.astype("float64"), bucket=bucket)
lift = df_test_local.groupby(by="bucket", agg={"n": vaex.agg.count(), "rate": vaex.agg.mean("y_true"), "avg_pred": vaex.agg.mean("y_pred")}).sort("bucket")
lift_pd = lift.to_pandas_df()
Baseline = float (df_test_local["y_true"].mean())
lift_pd["lift"] = lift_pd["rate"] / (baseline + 1e-12)
print("nDecile lift table:")
print(lift_pd.to_string(index=False))

By segmenting the predictions and computing lift metrics, we analyze model behavior. To assess the ranking quality, we calculate baseline rates across all score buckets and compare these to each other. Our results are compiled into a compact lift-table that accurately reflects model diagnostics in real life.

out_dir = "/content/vaex_artifacts"
os.makedirs(out_dir, exist_ok=True)


parquet_path = os.path.join(out_dir, "customers_vaex.parquet")
state_path = os.path.join(out_dir, "vaex_pipeline.json")


base_cols = ["city","label_encoded_city","age","tenure_m","tenure_y","tx","income","income_k","value_score","target"]
export_cols = base_cols + ["z_"+f for f in features_num]
df_export= df[export_cols].sample(n=500_000, random_state=1)


if os.path.exists(parquet_path):
   os.remove(parquet_path)
df_export.export_parquet(parquet_path)


pipeline_state = {
   "vaex_version": vaex.__version__,
   "encoder_labels": {k: {In v.items, k is equivalent to str (kk).()} for k,v in encoder.labels_.items()},
   "scaler_mean": [float(x) for x in scaler.mean_],
   "scaler_std": [float(x) for x in scaler.std_],
   "features_num": features_num,
   "export_cols": export_cols,
}
With open(state_path "w"As f:
   json.dump(pipeline_state, f)


df_reopen = vaex.open(parquet_path)


df_reopen["income_k"] = df_reopen.income / 1000.0
df_reopen["tenure_y"] = df_reopen.tenure_m / 12.0
df_reopen["log_income"] = df_reopen.income.log1p()
df_reopen["tx_per_year"] = df_reopen.tx / (df_reopen.tenure_y + 0.25)
df_reopen["value_score"] = (0.35*df_reopen.log_income + 0.20*df_reopen.tx_per_year + 0.10*df_reopen.tenure_y - 0.015*df_reopen.age)


df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df_reopen = df_reopen.join(df_city_features, on="city", rsuffix="_city")
df_reopen["income_vs_city_p95"] = df_reopen.income_k / (df_reopen.p95_income_k + 1e-9)
df_reopen["value_vs_city_median"] = df_reopen.value_score - df_reopen.median_value_score


With open(state_path "r"( f as f
 st = "json.load"(f)


labels_city = {Labels_city = int(v), for k in st["encoder_labels"]["city"].items()}
df_reopen["label_encoded_city"] = df_reopen.city.map(labels_city, default_value=-1)


"for i" means feat or enumerate (st).["features_num"]):
 This is the same as st["scaler_mean"][i]
 Std_i = St["scaler_std"][i] If st["scaler_std"][i] Then!= 1 else 0
   df_reopen["z_"+feat] = (df_reopen[feat] - mean_i) / std_i


df_reopen = vaex_model.transform(df_reopen)


print("nArtifacts written:")
print(parquet_path)
print(state_path)
print("nReopened parquet predictions (head):")
print(df_reopen[["city","income_k","value_score","pred","target"]].head(8))


print("nDone.")

For reproducibility, we export to Parquet a sample that is large and complete in terms of features. We also persist the entire preprocessing stage. The data is reloaded and all the engineered features are deterministically rebuilt using metadata. Inference is run on the reloaded data to ensure that the pipeline can be deployed end-to-end.

We concluded by showing how Vaex allows for fast and memory-efficient processing of data while supporting feature engineering, model integration, aggregation and advanced feature engineering. In our demonstration, we showed that the use of approximate statistics and lazy computations, along with out-of core execution, allowed us to easily scale from analysis into deployment-ready artifacts. We closed the loop between raw data and inference by exporting reproducible feature sets, persisting pipeline states, and demonstrating how Vaex is easily integrated into large-data Python workflows.


Click here to find out more Full Codes here. Also, feel free to follow us on Twitter Join our Facebook group! 120k+ ML SubReddit Subscribe now our Newsletter. Wait! Are you using Telegram? now you can join us on telegram as well.


ar ces learning mac machine learning x
Share. Facebook Twitter LinkedIn Email
Avatar
Gavin Wallace

Related Posts

Deepgram Python SDK Implementation for Transcription and Async Processing of Audio, Async Text Intelligence, and Async Text Intelligence.

25/04/2026

DeepSeek AI releases DeepSeek V4: Sparse attention and heavily compressed attention enable one-million-token contexts.

24/04/2026

OpenMythos Coding Tutorial: Recurrent-Depth Transformers, Depth Extrapolation and Mixture of Experts Routing

24/04/2026

OpenAI Releases GPT-5.5, a Absolutely Retrained Agentic Mannequin That Scores 82.7% on Terminal-Bench 2.0 and 84.9% on GDPval

24/04/2026
Top News

A Dark Horse AI is rewriting rules of game design

A United Arab Emirates Lab Announces Frontier AI Projects—and a New Outpost in Silicon Valley

Where is the AI drug?

Mira Murati’s Stealth AI Lab launches its first product

Stanford Students Wait in Line to Hear From Silicon Valley Royalty at ‘AI Coachella’

Load More
AI-Trends.Today

Your daily source of AI news and trends. Stay up to date with everything AI and automation!

X (Twitter) Instagram
Top Insights

AI Agents are too cheap for our own good

12/06/2025

Astronomers are Using Artificial intelligence to Unlock Secrets about Black Holes

11/06/2025
Latest News

Deepgram Python SDK Implementation for Transcription and Async Processing of Audio, Async Text Intelligence, and Async Text Intelligence.

25/04/2026

DeepSeek AI releases DeepSeek V4: Sparse attention and heavily compressed attention enable one-million-token contexts.

24/04/2026
X (Twitter) Instagram
  • Privacy Policy
  • Contact Us
  • Terms and Conditions
© 2026 AI-Trends.Today

Type above and press Enter to search. Press Esc to cancel.