We explore in this tutorial how federated llearning will behave when the traditional, centralized aggregation servers are removed, and replaced by a peer-to -peer, fully decentralized gossip mechanism. In this tutorial, we implement centralized FedAvg as well as decentralized Gossip Federated learning from scratch. We also introduce client-side differentiable privacy through calibrated noise in local model updates. We run controlled experiments using non-IID data from MNIST to examine the impact of privacy strengths, measured by different values for epsilon, on convergence speed, model stability and accuracy. In addition, we examine the trade-offs that exist between privacy and efficiency of learning in real world decentralized systems. Visit the Full Codes here.
import os, math, random, time
Dataclasses can be imported from another dataclass
Typing import Dict list, Tuple
Import subprocess (sys)
Define pip_install():
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q"] + pkgs)
pip_install(["torch", "torchvision", "numpy", "matplotlib", "networkx", "tqdm"])
Numpy can be imported as np
Buy a torch
Import torch.nn in nn
You can import the torch.
From torch.utils.data, import DataLoader or Subset
Transforms from imported datasets of torchvision
Matplotlib.pyplot can be imported as a plt
Import networkx as Nx
From tqdm, import trange
SEED = 7
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
Compose = "transform"[transforms.ToTensor()])
train_ds = datasets.MNIST(root="/content/data", train=True, download=True, transform=transform)
test_ds = datasets.MNIST(root="/content/data", train=False, download=True, transform=transform)
Install all dependencies and set up your execution environment. To maintain reproducibility, we initialize device settings and random seeds. The MNIST data set is loaded as well, and it serves as an effective yet lightweight benchmark for federated-learning experiments. Click here to see the Full Codes here.
def make_noniid_clients(dataset, num_clients=20, shards_per_client=2, seed=SEED):
rng = np.random.default_rng(seed)
y = np.array([dataset[i][1] for i in range(len(dataset))])
idx = np.arange(len(dataset))
idx_sorted = idx[np.argsort(y)]
num_shards = num_clients * shards_per_client
shard_size = len(dataset) // num_shards
Shards [idx_sorted[i*shard_size:(i+1)*shard_size] for i in range(num_shards)]
rng.shuffle(shards)
client_indices = []
If c is in the range(num_clients), then:
Shards = take[c*shards_per_client:(c+1)*shards_per_client]
client_indices.append(np.concatenate(take))
return client_indices
NUM_CLIENTS = 20
client_indices = make_noniid_clients(train_ds, num_clients=NUM_CLIENTS, shards_per_client=2)
test_loader = DataLoader(test_ds, batch_size=1024, shuffle=False, num_workers=2, pin_memory=True)
The class of MLP module is:
def __init__(self):
You can also check out our supersized().__init__()
self.fc1 = nn.Linear(28*28, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 10)
def forward(self, x):
View x as x.size(0)
F.relu = self.fc1 (x).
F.relu (self.fc2x)
Return self.fc3 (x)
The training dataset is divided into labeled shards and distributed across clients. The model is compact and balances both expressiveness of the network with computational efficiency. This allows us to simulate heterogeneity in data, which is a key challenge for federated systems. Click here to see the Full Codes here.
def get_model_params(model):
return {Return k : v.detach().clone() For k, v model.state_dict().items()}
def set_model_params(model, params):
model.load_state_dict(params, strict=True)
Add_params (a, b).
return {Return to k a[k] +[k] Keys for the letter k()}
Def sub_params a and b:
return {Return k:[k] The b[k] Keys for the letter k()}
Def scale_params (a, s).
return {return k[k] * s for k in a.keys()}
def mean_params(params_list):
out = {k: torch.zeros_like(params_list[0][k]If k is in the params_list, then '' for k[0].keys()}
If p is in params_list, then:
For k in and out.keys():
The following are some of the most recent and popular posts on our website.[k] +=[k]
Keys for the k out():
You can find out more about it here.[k] /= len(params_list)
Return to the page
def l2_norm_params(delta):
sq = 0.0
Delta.values for v():
sq += float(torch.sum(v.float() * v.float()).item())
Return Math.sqrt()
def dp_sanitize_update(delta, clip_norm, epsilon, delta_dp, rng):
norm = l2_norm_params(delta)
scale = min(1.0, clip_norm / (norm + 1e-12))
clipped = scale_params(delta, scale)
If epsilon equals None or Math.isinf() or epsilon
Our parameter manipulation tools allow for addition, subtraction or scaling and average of model weights between clients. By reducing local updates, and by injecting Gaussian-like noise, we introduce differential privacy. It serves as the core privacy mechanism that enables us to study the privacy–utility trade-off in both centralized and decentralized settings. Visit the Full Codes here.
def local_train_one_client(base_params, client_id, epochs, lr, batch_size, weight_decay=0.0):
model = MLP().to(device)
set_model_params(model, base_params)
model.train()
DataLoader = loader
Subset(train_ds, client_indices[client_id].tolist() if hasattr(client_indices[client_id], "tolist") else client_indices[client_id]),
batch_size=batch_size,
shuffle=True,
num_workers=2,
pin_memory=True
)
opt = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
For _, in the range (epochs).
For xb or yb loader
xb = xb.to.(device),yb.to.(device)
opt.zero_grad(set_to_none=True)
Logits = Model(xb).
loss = F.cross_entropy(logits, yb)
loss.backward()
opt.step()
return get_model_params(model)
@torch.no_grad()
Def evaluate (params)
model = MLP().to(device)
set_model_params(model, params)
model.eval()
total, correct = 0, 0
loss_sum = 0.0
Test_loader for xb and yb:
xb,yb = (xb.to)(device), (yb.to)(device).
Model = logits (xb)
loss = F.cross_entropy(logits, yb, reduction="sum")
loss_sum += float(loss.item())
Pred = torch.argmax (logits, dim=1)
Correct += int ((pred ==yb).sum().item())
total += int(yb.numel())
return loss_sum / total, correct / total
We create a local training loop, which each client runs independently with its own data. A unified evaluation procedure is also implemented to determine test accuracy and loss for a given model state. All of these features simulate real-world federated learning where the training and evaluative functions are completely independent from the ownership of data. Look at the Full Codes here.
@dataclass
Class FedAvgConfig
Integer = 25
Clients_per_round int = 10.
local_epochs: int = 1
lr: floating = 0.06
batch_size: int = 64
Clip_norm = float = 2.History =
epsilon: float = math.inf
delta_dp: float = 1e-5
def run_fedavg(cfg):
global_params = get_model_params(MLP().to(device))
history = {"test_loss": [], "test_acc": []}
If r is trange, then the rounding of cfg.rounds will be:
chosen = random.sample(range(NUM_CLIENTS), k=cfg.clients_per_round)
start_params = global_params
Updates = []
Choose cid as the chosen code
local_params = local_train_one_client(start_params, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(local_params, start_params)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
updates.append(delta_dp)
avg_update = mean_params(updates)
global_params = add_params(start_params, avg_update)
tl, ta = evaluate(global_params)
History["test_loss"].append(tl)
History["test_acc"].append(ta)
Global_params - Return the history
The centralized FedAvg is implemented, in which a subset clients train locally and then send differentially private updates back to the central aggregator. The model’s performance is tracked across multiple communication cycles to determine convergence behaviour under different privacy budgets. It is used as a baseline to compare decentralized learning based on gossip. Take a look at the Full Codes here.
@dataclass
Class GossipConfig
Integer = 25
local_epochs: int = 1
lr: floating = 0.06
batch_size: int = 64
Clip_norm : float = 2
epsilon: float = math.inf
delta_dp: float = 1e-5
topology: str = "ring"
P: float=0.2
gossip_pairs_per_round: int = 10
def build_topology(cfg):
If == cfg.topology "ring":
G = nx.cycle_graph(NUM_CLIENTS)
Elif CFG.Topology == "erdos_renyi":
G = nx.erdos_renyi_graph(NUM_CLIENTS, cfg.p, seed=SEED)
If not, nx.is_connected (G)
comps = list(nx.connected_components(G))
for i in range(len(comps) - 1):
A = next(iter (comps[i]))
Next(iter)(comps[i+1]))
G.add_edge(a, b)
else:
raise ValueError
Return to GHistory =
def run_gossip(cfg):
node_params = [get_model_params(MLP().to(device)) for _ in range(NUM_CLIENTS)]
G = build_topology(cfg)
history = {"avg_test_loss": [], "avg_test_acc": []}
If r is trange, then the rounding of cfg.rounds will be:
new_params = []
If you have a cid that is in the range(NUM_CLIENTS), then:
p0 = node_params[cid]
p_local = local_train_one_client(p0, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(p_local, p0)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
p_local_dp = add_params(p0, delta_dp)
new_params.append(p_local_dp)
node_params = new_params
Edges = list(G.edges())
for _ in range(cfg.gossip_pairs_per_round):
I, J = random.choice (edges)
avg = mean_params([node_params[i], node_params[j]])
node_params[i] = avg
node_params[j] = avg
losses, accs = [], []
If you have a cid that is in the range (NUM_CLIENTS), then:
tl = evaluate(node_params[cid])
losses.append(tl)
accs.append(ta)
History["avg_test_loss"].append(float(np.mean(losses)))
History["avg_test_acc"].append(float(np.mean(accs)))
Return history with node_params
A peer-to-peer exchange model is used to implement Gossip Federated Learning. Simulating repeated local training, pairwise parameter averages and averaging of parameters without the need for a central server is possible. This allows us to examine how privacy noise spreads through decentralized communication and impacts convergence. Visit the Full Codes here.
eps_sweep = [math.inf, 8.0, 4.0, 2.0, 1.0]
ROUNDS = 20,
fedavg_results = {}
gossip_results = {}
common_local_epochs = 1
common_lr = 0.06
common_bs = 64
common_clip = 2.0
common_delta = 1e-5
For eps, use eps_sweep
FedAvgConfig (fcfg) =
rounds=ROUNDS,
clients_per_round=10,
local_epochs=common_local_epochs,
lr=common_lr,
batch_size=common_bs,
clip_norm=common_clip,
epsilon=eps,
delta_dp=common_delta
)
hist_f, _ = run_fedavg(fcfg)
fedavg_results[eps] = hist_f
The GossipConfig (gcfg) is a configuration file for the application.
rounds=ROUNDS,
local_epochs=common_local_epochs,
lr=common_lr,
batch_size=common_bs,
clip_norm=common_clip,
epsilon=eps,
delta_dp=common_delta,
topology="ring",
gossip_pairs_per_round=10
)
hist_g, _ = run_gossip(gcfg)
gossip_results[eps] = hist_g
plt.figure(figsize=(10, 5))
For eps, use eps_sweep
plt.plot(fedavg_results[eps]["test_acc"], label=f"FedAvg eps={eps}")
plt.xlabel("Round")
plt.ylabel("Accuracy")
plt.legend()
plt.grid(True)
plt.show()
plt.figure(figsize=(10, 5))
The eps sweeper is for the eps:
plt.plot(gossip_results[eps]["avg_test_acc"], label=f"Gossip eps={eps}")
plt.xlabel("Round")
plt.ylabel("Avg Accuracy")
plt.legend()
plt.grid(True)
plt.show()
final_fed = [fedavg_results[eps]["test_acc"][-1] "for eps" in "eps_sweep"
final_gos = [gossip_results[eps]["avg_test_acc"][-1] "for eps" in eps_sweep]
"x" = [100.0 if math.isinf(eps) else eps for eps in eps_sweep]
plt.figure(figsize=(8, 5))
plt.plot(x, final_fed, marker="o", label="FedAvg")
plt.plot(x, final_gos, marker="o", label="Gossip")
plt.xlabel("Epsilon")
plt.ylabel("Final Accuracy")
plt.legend()
plt.grid(True)
plt.show()
def rounds_to_threshold(acc_curve, threshold):
for i, a in enumerate(acc_curve):
if a >= threshold:
return i + 1
Return No
best_f = fedavg_results[math.inf]["test_acc"][-1]
best_g = gossip_results[math.inf]["avg_test_acc"][-1]
th_f = 0.9 * best_f
th_g = 0.9 * best_g
The eps will be swept up by eps_sweep.
rf = rounds_to_threshold(fedavg_results[eps]["test_acc"], th_f)
rg = rounds_to_threshold(gossip_results[eps]["avg_test_acc"], th_g)
print(eps, rf, rg)
We collect data for decentralized as well as centralized strategies of training. We visualize convergence trends and final accuracy to clearly expose the privacy–utility trade-off. Also, we compute convergence speed metrics in order to compare quantitatively how different aggregation strategies respond to privacy restrictions.
We concluded that decentralization changes fundamentally how differential privacy noise spreads through a system. In contrast, centralized FedAvg tends to converge faster when privacy is weak. However, gossip-based federated systems are more resilient against noisy updates. The results of our experiments show that in both environments, stronger privacy leads to significantly slower learning. This effect is more pronounced in the decentralized setting due to information delay. We found that to design privacy-preserving systems, it is necessary to jointly reason about the aggregation, communication, and budgets of privacy, rather than treat them independently.
Click here to find out more Full Codes here. Also, feel free to follow us on Twitter Join our Facebook group! 100k+ ML SubReddit Subscribe now our Newsletter. Wait! Are you using Telegram? now you can join us on telegram as well.

