Domain Adaptation Benchmark
Systematically implement and compare domain adaptation methods -- Fourier Domain Adaptation, CyCADA-style adversarial training, and mixed data strategies -- to bridge the sim-to-real gap for autonomous driving perception, producing a rigorous benchmark with statistical evaluation on real-world detection tasks.
Track: B -- Synthetic Data & Sensor Sim | Level: Advanced | Total Time: ~25-35 hours
Overview
The sim-to-real gap is the central challenge of synthetic data for autonomous driving. A perception model trained exclusively on rendered images typically suffers a 15--40 percentage point drop in mAP when deployed on real camera data. The gap arises from systematic differences in visual appearance (lighting, texture fidelity, color distribution), physics fidelity (motion blur, lens distortion, sensor noise), and content distribution (object frequency, scene layout, weather mix) between the simulation domain and the real world.
Domain adaptation is the family of techniques designed to close this gap without requiring labels in the target (real) domain. The field has produced dozens of methods, but three broad strategies dominate in practice:
-
Input-space adaptation -- transform source images so they "look" more like target images before feeding them to a model. Fourier Domain Adaptation (FDA) is the simplest representative: swap the low-frequency spectrum of a synthetic image with that of a real image, transferring global color and illumination statistics in a single FFT operation.
-
Adversarial domain adaptation -- train a generator to translate images between domains while a discriminator tries to distinguish them. CyCADA (Cycle-Consistent Adversarial Domain Adaptation) combines pixel-level and feature-level adversarial losses with cycle consistency to produce translations that preserve semantic content.
-
Mixed / blended training -- skip image translation entirely and instead mix synthetic and real data during training, using curriculum schedules, sampling ratios, or progressive fine-tuning to help the model generalize across both domains.
Each approach trades off complexity, compute cost, and effectiveness differently. A fair comparison requires controlling for every variable: same base detector, same training budget, same evaluation protocol. That is what this project builds -- a reproducible benchmark that implements all three strategies, trains identical downstream models, and evaluates them on a common real-world test set with proper statistical rigor (confidence intervals, significance tests, per-class breakdowns).
By the end of this project you will have a working benchmark codebase that can be extended with new adaptation methods, new datasets, and new downstream tasks. You will understand not just how each method works, but when and why to choose one over another.
Learning Objectives
After completing this project, you will be able to:
- Quantify the sim-to-real domain gap -- measure distributional distance between synthetic and real driving images using FID, MMD, and per-channel histogram statistics.
- Implement Fourier Domain Adaptation from scratch -- apply 2D FFT, swap low-frequency components between domains, and control the adaptation strength via a bandwidth parameter.
- Build a CyCADA-style adversarial adaptation pipeline -- train generators and discriminators for pixel-level translation with cycle consistency, semantic consistency, and feature-level domain alignment losses.
- Design mixed-training strategies -- experiment with naive mixing, curriculum learning (synthetic-first then real), and optimal sampling ratios to find the best blend for downstream performance.
- Train and evaluate an object detector end-to-end -- use a standard single-stage detector (SSD-lite or FCOS) as the downstream task, training from adapted data and evaluating with mAP, per-class AP, and precision-recall curves.
- Conduct statistically rigorous comparisons -- compute bootstrap confidence intervals, run paired significance tests, and present results in publication-quality benchmark tables and plots.
- Analyze failure modes -- identify which object classes, distances, and scene conditions each adaptation method handles well or poorly, building intuition for method selection in production.
- Build reproducible ML experiments -- use fixed seeds, config-driven training, deterministic data loading, and systematic logging to ensure every result can be reproduced.
Prerequisites
Required
- PyTorch proficiency -- comfortable building custom datasets, training loops, loss functions, and using torchvision transforms. You should be able to write a training loop from scratch without copying boilerplate.
- GAN fundamentals -- understanding of generator/discriminator architecture, adversarial loss (min-max and non-saturating variants), mode collapse, and training stability techniques.
- Transfer learning -- familiarity with fine-tuning pretrained models, feature extraction, and the concept of domain shift in deep learning.
Recommended
- FFT / signal processing basics -- understanding of the Fourier transform, frequency domain, and the relationship between low-frequency components and global image appearance.
- Object detection concepts -- familiarity with anchor-based and anchor-free detectors, intersection-over-union (IoU), non-maximum suppression, and mean average precision (mAP).
- Statistical testing -- basic knowledge of hypothesis testing, confidence intervals, and the bootstrap method.
Deep Dive Reading
Before starting, read the companion deep dives for theoretical background:
- Synthetic Data for AD Perception Training -- covers domain randomization, adaptation techniques, mixed training, and cost-benefit analysis of synthetic data.
- Sim-to-Real Gap Analysis (forthcoming) -- discusses the taxonomy of domain gaps, measurement techniques, and mitigation strategies across the AD stack.
Key Concepts
Domain Shift in Autonomous Driving
Domain shift occurs when the joint distribution P(X, Y) differs between training (source) and deployment (target) data. In autonomous driving, the shift between simulation and reality manifests along three axes:
| Gap Type | Examples | Impact |
|---|---|---|
| Visual / appearance | Unrealistic lighting, flat textures, missing reflections, incorrect color balance | Texture-dependent features fail; model confuses rendered surfaces with real ones |
| Physics / sensor | Missing motion blur, no lens flare, incorrect noise patterns, wrong dynamic range | Model overconfident on clean inputs; degrades on real sensor artifacts |
| Content / distribution | Different actor frequencies, scene layouts, weather conditions, geographic locations | Model has wrong priors; poor calibration on rare classes |
A visual gap alone can drop detector mAP by 20+ points. Physics and content gaps compound the effect. Effective adaptation must address at least the visual gap; state-of-the-art approaches also tackle the others.
Domain Gap Taxonomy
====================
Source Domain (Simulation) Target Domain (Real World)
+-----------------------+ +-----------------------+
| Clean renders | | Sensor noise |
| Perfect lighting | ----> | Variable weather |
| Uniform textures | GAP | Complex textures |
| Controlled actors | | Diverse actors |
+-----------------------+ +-----------------------+
Adaptation Strategies:
[1] Input-space: Transform source images to look like target
[2] Feature-space: Align intermediate representations
[3] Output-space: Use target structure for self-training
[4] Mixed training: Blend source + target data directly
Measuring the Domain Gap
Before adapting, you need to measure how large the gap is. Three complementary metrics:
Frechet Inception Distance (FID) measures the distance between two distributions of images by comparing their statistics in the Inception-v3 feature space:
$$ \text{FID} = |\mu_s - \mu_t|^2 + \text{Tr}\left(\Sigma_s + \Sigma_t - 2(\Sigma_s \Sigma_t)^{1/2}\right) $$
where $(\mu_s, \Sigma_s)$ and $(\mu_t, \Sigma_t)$ are the mean and covariance of Inception features for source and target images respectively. Lower FID means the distributions are more similar. Typical values: sim-to-real FID ranges from 50--200; same-domain FID is 5--20.
Maximum Mean Discrepancy (MMD) is a kernel-based distance between distributions:
$$ \text{MMD}^2(P, Q) = \mathbb{E}[k(x, x')] - 2\mathbb{E}[k(x, y)] + \mathbb{E}[k(y, y')] $$
where $k$ is a kernel function (typically RBF/Gaussian), $x, x' \sim P$ (source), and $y, y' \sim Q$ (target). MMD can be computed on raw pixels, features, or any intermediate representation.
Per-channel histogram distance is the simplest measure: compute the histogram of each RGB channel for source and target, then measure Earth Mover's Distance (Wasserstein-1) between them. This captures gross color/brightness differences that dominate the visual gap.
import numpy as np
from scipy.stats import wasserstein_distance
def channel_histogram_distance(images_source, images_target, n_bins=256):
"""Compute per-channel Wasserstein distance between image sets.
Args:
images_source: (N, H, W, 3) uint8 array of source images
images_target: (M, H, W, 3) uint8 array of target images
Returns:
distances: dict with keys 'R', 'G', 'B' and float values
"""
distances = {}
for c, name in enumerate(['R', 'G', 'B']):
src_pixels = images_source[..., c].flatten()
tgt_pixels = images_target[..., c].flatten()
src_hist, _ = np.histogram(src_pixels, bins=n_bins, range=(0, 255), density=True)
tgt_hist, _ = np.histogram(tgt_pixels, bins=n_bins, range=(0, 255), density=True)
distances[name] = wasserstein_distance(
np.arange(n_bins), np.arange(n_bins),
u_weights=src_hist, v_weights=tgt_hist
)
return distances
Fourier Domain Adaptation (FDA)
FDA (Yang & Soatto, 2020) is based on a key insight: the low-frequency components of an image's Fourier spectrum encode global appearance (color palette, brightness, contrast), while high-frequency components encode local structure (edges, textures, shapes). By replacing the low-frequency spectrum of a synthetic image with that of a real image, you transfer the "style" without altering the "content."
Algorithm:
-
Compute the 2D FFT of both source image $x_s$ and target image $x_t$ (per channel): $$F_s = \mathcal{F}(x_s), \quad F_t = \mathcal{F}(x_t)$$
-
Define a binary mask $M_\beta$ that selects the low-frequency center region of the spectrum. The mask covers a square of size $\beta \cdot H \times \beta \cdot W$ centered at the DC component, where $\beta \in (0, 1)$ is the bandwidth parameter.
-
Replace the low-frequency components of the source with those of the target: $$F_{\text{adapted}} = M_\beta \odot F_t + (1 - M_\beta) \odot F_s$$
-
Invert the FFT to get the adapted image: $$x_{\text{adapted}} = \mathcal{F}^{-1}(F_{\text{adapted}})$$
The bandwidth parameter $\beta$ controls the tradeoff: small $\beta$ (0.01--0.05) transfers only color/brightness; large $\beta$ (0.1--0.3) also transfers coarser texture patterns but risks introducing artifacts.
import numpy as np
def fda_transfer(source_img, target_img, beta=0.05):
"""Apply Fourier Domain Adaptation to a single image pair.
Args:
source_img: (H, W, 3) float32 array in [0, 1]
target_img: (H, W, 3) float32 array in [0, 1]
beta: bandwidth parameter (fraction of spectrum to swap)
Returns:
adapted: (H, W, 3) float32 array in [0, 1]
"""
assert source_img.shape == target_img.shape
h, w, c = source_img.shape
# Build low-frequency mask
cy, cx = h // 2, w // 2
bh, bw = int(h * beta), int(w * beta)
mask = np.zeros((h, w), dtype=np.float32)
mask[cy - bh:cy + bh, cx - bw:cx + bw] = 1.0
adapted = np.zeros_like(source_img)
for ch in range(c):
# FFT (shift so DC is at center)
F_src = np.fft.fftshift(np.fft.fft2(source_img[:, :, ch]))
F_tgt = np.fft.fftshift(np.fft.fft2(target_img[:, :, ch]))
# Amplitude swap in low-frequency region
amp_src = np.abs(F_src)
amp_tgt = np.abs(F_tgt)
phase_src = np.angle(F_src)
amp_mixed = amp_src * (1 - mask) + amp_tgt * mask
# Reconstruct with mixed amplitude and source phase
F_adapted = amp_mixed * np.exp(1j * phase_src)
adapted[:, :, ch] = np.real(np.fft.ifft2(np.fft.ifftshift(F_adapted)))
return np.clip(adapted, 0, 1)
Key properties of FDA:
- No training required -- pure signal processing
- Very fast -- single FFT per image, runs in milliseconds
- Preserves spatial structure perfectly (phase is untouched)
- Limited to global appearance transfer (cannot fix local texture issues)
- Requires choosing $\beta$ (typically via grid search on validation set)
CyCADA: Cycle-Consistent Adversarial Domain Adaptation
CyCADA (Hoffman et al., 2018) uses adversarial training to learn an image-to-image translation function that maps source images to the target domain while preserving semantic content. It combines four loss terms:
1. Adversarial loss -- A discriminator $D_T$ tries to distinguish real target images from translated source images. The generator $G_{S \to T}$ learns to fool it:
$$ \mathcal{L}{\text{GAN}}(G{S \to T}, D_T) = \mathbb{E}{x_t}[\log D_T(x_t)] + \mathbb{E}{x_s}[\log(1 - D_T(G_{S \to T}(x_s)))] $$
2. Cycle consistency loss -- A reverse generator $G_{T \to S}$ maps translated images back to the source domain. The round-trip should be identity:
$$ \mathcal{L}{\text{cyc}} = \mathbb{E}{x_s}\left[|G_{T \to S}(G_{S \to T}(x_s)) - x_s|1\right] + \mathbb{E}{x_t}\left[|G_{S \to T}(G_{T \to S}(x_t)) - x_t|_1\right] $$
This prevents the generator from hallucinating content or removing important objects.
3. Semantic consistency loss -- A pretrained task network $f$ (e.g., a classifier or segmentation model) should produce the same output for $x_s$ and $G_{S \to T}(x_s)$:
$$ \mathcal{L}{\text{sem}} = \mathbb{E}{x_s}\left[\mathcal{L}{\text{CE}}(f(G{S \to T}(x_s)), f(x_s))\right] $$
This ensures the translation preserves semantic meaning (a car stays a car).
4. Feature-level adversarial loss -- A second discriminator $D_{\text{feat}}$ operates on intermediate features of the task network, aligning source and target feature distributions:
$$ \mathcal{L}{\text{feat}} = \mathbb{E}{x_t}[\log D_{\text{feat}}(\phi(x_t))] + \mathbb{E}{x_s}[\log(1 - D{\text{feat}}(\phi(G_{S \to T}(x_s))))] $$
The total loss is:
$$ \mathcal{L}{\text{total}} = \mathcal{L}{\text{GAN}} + \lambda_{\text{cyc}} \mathcal{L}{\text{cyc}} + \lambda{\text{sem}} \mathcal{L}{\text{sem}} + \lambda{\text{feat}} \mathcal{L}_{\text{feat}} $$
Typical hyperparameters: $\lambda_{\text{cyc}} = 10$, $\lambda_{\text{sem}} = 1$, $\lambda_{\text{feat}} = 1$.
CyCADA Architecture
====================
Source Image x_s ----> [G_S->T] ----> Adapted Image x_s' ----> [D_T] (real or fake?)
| |
| +----> [G_T->S] ----> Reconstructed x_s''
| |
+--- Cycle loss: ||x_s'' - x_s||
|
+----> [Task Network f] ----> Semantic loss
|
+----> [Feature Extractor phi] ----> [D_feat] (domain?)
Target Image x_t ----> [D_T] (real)
----> [G_T->S] ----> x_t' ----> [G_S->T] ----> x_t'' (cycle)
Generator architecture: Typically a ResNet-based encoder-decoder with 9 residual blocks (for 256x256 inputs). Input: 3-channel image. Output: 3-channel image. Instance normalization is preferred over batch normalization.
Discriminator architecture: PatchGAN discriminator that classifies overlapping 70x70 patches as real or fake. This encourages high-frequency crispness and allows the discriminator to work on images of any size.
import torch
import torch.nn as nn
class ResidualBlock(nn.Module):
"""Residual block with instance normalization."""
def __init__(self, channels):
super().__init__()
self.block = nn.Sequential(
nn.ReflectionPad2d(1),
nn.Conv2d(channels, channels, 3),
nn.InstanceNorm2d(channels),
nn.ReLU(inplace=True),
nn.ReflectionPad2d(1),
nn.Conv2d(channels, channels, 3),
nn.InstanceNorm2d(channels),
)
def forward(self, x):
return x + self.block(x)
class CyCADAGenerator(nn.Module):
"""ResNet-based generator for image-to-image translation."""
def __init__(self, in_channels=3, out_channels=3, n_residual=9, n_features=64):
super().__init__()
# Initial convolution
layers = [
nn.ReflectionPad2d(3),
nn.Conv2d(in_channels, n_features, 7),
nn.InstanceNorm2d(n_features),
nn.ReLU(inplace=True),
]
# Downsampling
for i in range(2):
mult = 2 ** i
layers += [
nn.Conv2d(n_features * mult, n_features * mult * 2, 3, stride=2, padding=1),
nn.InstanceNorm2d(n_features * mult * 2),
nn.ReLU(inplace=True),
]
# Residual blocks
mult = 2 ** 2
for _ in range(n_residual):
layers.append(ResidualBlock(n_features * mult))
# Upsampling
for i in range(2):
mult = 2 ** (2 - i)
layers += [
nn.ConvTranspose2d(n_features * mult, n_features * mult // 2,
3, stride=2, padding=1, output_padding=1),
nn.InstanceNorm2d(n_features * mult // 2),
nn.ReLU(inplace=True),
]
# Output convolution
layers += [
nn.ReflectionPad2d(3),
nn.Conv2d(n_features, out_channels, 7),
nn.Tanh(),
]
self.model = nn.Sequential(*layers)
def forward(self, x):
return self.model(x)
class PatchDiscriminator(nn.Module):
"""PatchGAN discriminator (70x70 receptive field)."""
def __init__(self, in_channels=3, n_features=64, n_layers=3):
super().__init__()
layers = [
nn.Conv2d(in_channels, n_features, 4, stride=2, padding=1),
nn.LeakyReLU(0.2, inplace=True),
]
mult = 1
for i in range(1, n_layers):
mult_prev = mult
mult = min(2 ** i, 8)
layers += [
nn.Conv2d(n_features * mult_prev, n_features * mult, 4, stride=2, padding=1),
nn.InstanceNorm2d(n_features * mult),
nn.LeakyReLU(0.2, inplace=True),
]
mult_prev = mult
mult = min(2 ** n_layers, 8)
layers += [
nn.Conv2d(n_features * mult_prev, n_features * mult, 4, stride=1, padding=1),
nn.InstanceNorm2d(n_features * mult),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(n_features * mult, 1, 4, stride=1, padding=1),
]
self.model = nn.Sequential(*layers)
def forward(self, x):
return self.model(x)
Mixed / Blended Training Strategies
Instead of translating images, mixed training simply combines synthetic and real data during model training. The key design decisions:
Naive mixing: Concatenate all source and target data, shuffle, train normally. Simple but often suboptimal because the model may overfit to the larger domain (usually synthetic).
Ratio-controlled mixing: In each mini-batch, include a fixed ratio of synthetic-to-real images. Common ratios: 50/50, 70/30 (syn/real), 30/70. The optimal ratio depends on dataset sizes and domain gap magnitude.
Curriculum learning: Train in phases:
- Phase 1: Train on synthetic data only (builds feature representations)
- Phase 2: Fine-tune on real data only (adapts to target distribution)
- Optional Phase 3: Fine-tune on mixed data (maintains both)
Progressive blending: Start with 100% synthetic, gradually increase real data proportion over training:
$$ r(t) = \min\left(1, \frac{t}{T_{\text{blend}}}\right) $$
where $r(t)$ is the fraction of real data at step $t$, and $T_{\text{blend}}$ is the blending horizon.
class MixedDataLoader:
"""Data loader that blends source and target data at a given ratio."""
def __init__(self, source_loader, target_loader, source_ratio=0.5):
self.source_loader = source_loader
self.target_loader = target_loader
self.source_ratio = source_ratio
def __iter__(self):
source_iter = iter(self.source_loader)
target_iter = iter(self.target_loader)
while True:
try:
src_batch = next(source_iter)
tgt_batch = next(target_iter)
except StopIteration:
break
batch_size = src_batch[0].shape[0] + tgt_batch[0].shape[0]
n_source = int(batch_size * self.source_ratio)
n_target = batch_size - n_source
images = torch.cat([src_batch[0][:n_source], tgt_batch[0][:n_target]])
labels = torch.cat([src_batch[1][:n_source], tgt_batch[1][:n_target]])
# Shuffle within batch
perm = torch.randperm(images.shape[0])
yield images[perm], labels[perm]
class CurriculumScheduler:
"""Controls the source/target ratio over training epochs."""
def __init__(self, total_epochs, warmup_epochs=5, strategy='linear'):
self.total_epochs = total_epochs
self.warmup_epochs = warmup_epochs
self.strategy = strategy
def get_source_ratio(self, epoch):
"""Returns fraction of source (synthetic) data to use."""
if self.strategy == 'linear':
# Linear decay from 1.0 to 0.0
return max(0.0, 1.0 - epoch / self.total_epochs)
elif self.strategy == 'step':
# Phase 1: all synthetic, Phase 2: all real
return 1.0 if epoch < self.warmup_epochs else 0.0
elif self.strategy == 'cosine':
# Smooth cosine decay
return 0.5 * (1 + np.cos(np.pi * epoch / self.total_epochs))
else:
return 0.5 # constant 50/50
Evaluation Methodology
A fair benchmark requires a systematic evaluation protocol. The standard approach:
Evaluation Protocol
====================
Training Conditions:
[A] Source-only: Train on synthetic data, evaluate on real
[B] FDA-adapted: Train on FDA-transformed synthetic, evaluate on real
[C] CyCADA-adapted: Train on CyCADA-translated synthetic, evaluate on real
[D] Mixed training: Train on synthetic + real (various ratios), evaluate on real
[E] Target-only: Train on real data only (upper bound), evaluate on real
All conditions use:
- Same base detector architecture (e.g., SSD-lite with MobileNetV2)
- Same training hyperparameters (lr, epochs, batch size, augmentation)
- Same evaluation set (held-out real images, never used for training)
- Same evaluation metric (COCO-style mAP at IoU thresholds 0.5 and 0.5:0.95)
Statistical Rigor:
- 3-5 runs per condition with different random seeds
- Bootstrap 95% confidence intervals on mAP
- Paired t-test or Wilcoxon signed-rank test between conditions
Mean Average Precision (mAP): The standard detection metric. For each class, compute precision-recall curve, then area under curve (AP). mAP is the mean AP across classes.
$$ \text{AP}_c = \int_0^1 p_c(r) , dr $$
$$ \text{mAP} = \frac{1}{|C|} \sum_{c \in C} \text{AP}_c $$
Per-class AP breakdown reveals which object classes benefit most from adaptation. Typically, large objects (cars, trucks) are easier to adapt than small objects (pedestrians, traffic cones).
Distance-stratified evaluation bins detections by range (0--30m, 30--50m, 50--80m) to reveal whether adaptation helps more at close or far range.
Distribution Metrics for Monitoring Adaptation Quality
Beyond FID, track these during adaptation:
| Metric | What it measures | Computation cost | Sensitivity |
|---|---|---|---|
| FID | Feature-space distribution distance | Medium (needs Inception forward passes) | Global appearance |
| KID | Kernel Inception Distance (unbiased FID variant) | Medium | Same as FID, less biased for small N |
| MMD | Kernel-based distribution distance | Low-Medium | Flexible (works on any features) |
| SSIM | Structural similarity (per-image) | Low | Local structure preservation |
| Color histogram distance | Per-channel color distribution gap | Very low | Gross color shifts |
Monitoring these during CyCADA training helps detect mode collapse (FID suddenly increases), semantic drift (SSIM drops), or color artifacts (histogram distance increases).
Step-by-Step Implementation Guide
Step 1: Environment Setup and Dataset Preparation (3 hours)
Goal: Set up the project, create synthetic and real image datasets for benchmarking, and establish the evaluation framework.
1.1 Create the project
mkdir -p domain-adaptation-benchmark/{data,models,configs,outputs,notebooks}
cd domain-adaptation-benchmark
python -m venv .venv
source .venv/bin/activate
1.2 Install dependencies
pip install torch torchvision numpy matplotlib scipy opencv-python tqdm \
scikit-learn Pillow pandas seaborn ipykernel jupyter
| Package | Purpose |
|---|---|
torch, torchvision | Deep learning framework, pretrained models, transforms |
numpy, scipy | Numerical computing, FFT, statistics |
opencv-python | Image processing, color space conversion |
scikit-learn | Evaluation metrics, data splitting |
Pillow | Image I/O |
matplotlib, seaborn | Visualization, publication-quality plots |
pandas | Tabular results management |
tqdm | Progress bars |
1.3 Dataset strategy
For this benchmark we create a controlled setup using CIFAR-10 or a subset of a driving dataset. The key requirement is having paired "synthetic-like" and "real-like" domains. We achieve this by:
- Source domain (synthetic proxy): Apply strong color/style transformations to real images to simulate the appearance gap. This gives us ground-truth labels in the source domain.
- Target domain (real proxy): Use the original images as the target domain.
This proxy setup lets us run the full benchmark on a laptop without needing actual simulation data, while faithfully reproducing the domain gap phenomenon.
import torch
import torchvision
import torchvision.transforms as T
import numpy as np
class SyntheticStyleTransform:
"""Apply transformations that mimic common sim-to-real visual gaps.
Combines color shifts, reduced texture detail, and altered contrast
to create images that look 'synthetic'.
"""
def __init__(self, severity=1.0):
self.severity = severity
def __call__(self, img):
"""Transform a PIL image to look synthetic."""
img_np = np.array(img).astype(np.float32) / 255.0
# 1. Color shift (simulates incorrect white balance)
color_shift = np.array([0.1, -0.05, 0.15]) * self.severity
img_np = img_np + color_shift[None, None, :]
# 2. Reduced contrast (simulates flat rendering)
mean = img_np.mean()
img_np = mean + (img_np - mean) * (1.0 - 0.3 * self.severity)
# 3. Slight blur (simulates lower texture detail)
if self.severity > 0.5:
from scipy.ndimage import gaussian_filter
for c in range(3):
img_np[:, :, c] = gaussian_filter(img_np[:, :, c], sigma=0.5)
# 4. Increased brightness (simulates overbright rendering)
img_np = img_np + 0.05 * self.severity
img_np = np.clip(img_np, 0, 1)
return (img_np * 255).astype(np.uint8)
def create_benchmark_datasets(data_root='./data', image_size=128):
"""Create source (synthetic-proxy) and target (real) datasets."""
real_transform = T.Compose([
T.Resize(image_size),
T.CenterCrop(image_size),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
synth_transform = T.Compose([
T.Resize(image_size),
T.CenterCrop(image_size),
SyntheticStyleTransform(severity=1.0),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# Use STL-10 as proxy (96x96 images with 10 classes)
target_train = torchvision.datasets.STL10(
data_root, split='train', download=True, transform=real_transform)
target_test = torchvision.datasets.STL10(
data_root, split='test', download=True, transform=real_transform)
source_train = torchvision.datasets.STL10(
data_root, split='train', download=True, transform=synth_transform)
return source_train, target_train, target_test
1.4 Visualize the domain gap
def visualize_domain_gap(source_dataset, target_dataset, n_samples=8):
"""Side-by-side comparison of source and target images."""
fig, axes = plt.subplots(2, n_samples, figsize=(2.5 * n_samples, 5))
for i in range(n_samples):
# Source (synthetic proxy)
src_img = source_dataset[i][0]
src_img = denormalize(src_img) # undo normalization for display
axes[0, i].imshow(src_img.permute(1, 2, 0).numpy())
axes[0, i].set_title(f'Source {i}', fontsize=8)
axes[0, i].axis('off')
# Target (real)
tgt_img = target_dataset[i][0]
tgt_img = denormalize(tgt_img)
axes[1, i].imshow(tgt_img.permute(1, 2, 0).numpy())
axes[1, i].set_title(f'Target {i}', fontsize=8)
axes[1, i].axis('off')
axes[0, 0].set_ylabel('Source\n(Synthetic)', fontsize=10)
axes[1, 0].set_ylabel('Target\n(Real)', fontsize=10)
plt.suptitle('Domain Gap Visualization', fontsize=13)
plt.tight_layout()
plt.show()
def denormalize(tensor, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
"""Reverse ImageNet normalization for display."""
mean = torch.tensor(mean).view(3, 1, 1)
std = torch.tensor(std).view(3, 1, 1)
return torch.clamp(tensor * std + mean, 0, 1)
1.5 Compute baseline gap metrics
def compute_gap_metrics(source_images, target_images):
"""Compute FID, MMD, and histogram distances between domains."""
results = {}
# 1. Per-channel histogram distance
results['histogram'] = channel_histogram_distance(source_images, target_images)
# 2. FID (using Inception features)
results['fid'] = compute_fid(source_images, target_images)
# 3. MMD with RBF kernel
results['mmd'] = compute_mmd(source_images, target_images)
return results
Step 2: Fourier Domain Adaptation and Style Transfer (Notebook 01, ~75 min)
Goal: Implement FDA from scratch, visualize its effect, tune the bandwidth parameter, and measure how much it reduces the domain gap.
2.1 FFT fundamentals for images
Understanding the 2D Fourier transform is essential for FDA. An image $f(x, y)$ can be decomposed into a sum of 2D sinusoidal basis functions at different frequencies:
$$ F(u, v) = \sum_{x=0}^{H-1} \sum_{y=0}^{W-1} f(x, y) \cdot e^{-2\pi i \left(\frac{ux}{H} + \frac{vy}{W}\right)} $$
The resulting complex-valued spectrum $F(u, v)$ has two components:
- Amplitude $|F(u, v)|$: encodes how much of each frequency is present (related to texture and contrast)
- Phase $\angle F(u, v)$: encodes where patterns are located (related to structure and edges)
Key insight for FDA: amplitude controls appearance, phase controls structure. By swapping only the amplitude of low frequencies, we change how the image "looks" without changing "what" is in it.
def visualize_fft(image, title="FFT Analysis"):
"""Visualize the amplitude spectrum and phase of an image."""
fig, axes = plt.subplots(1, 4, figsize=(16, 4))
# Original image
axes[0].imshow(image)
axes[0].set_title("Original")
# Compute FFT (average across channels for visualization)
gray = np.mean(image, axis=2) if image.ndim == 3 else image
F = np.fft.fftshift(np.fft.fft2(gray))
# Amplitude spectrum (log scale)
amplitude = np.log(np.abs(F) + 1)
axes[1].imshow(amplitude, cmap='hot')
axes[1].set_title("Amplitude (log)")
# Phase
phase = np.angle(F)
axes[2].imshow(phase, cmap='twilight')
axes[2].set_title("Phase")
# Reconstruct from amplitude only (random phase) vs phase only
random_phase = np.random.uniform(-np.pi, np.pi, F.shape)
F_amp_only = np.abs(F) * np.exp(1j * random_phase)
recon_amp = np.real(np.fft.ifft2(np.fft.ifftshift(F_amp_only)))
axes[3].imshow(np.clip(recon_amp, 0, 255).astype(np.uint8), cmap='gray')
axes[3].set_title("Amplitude only\n(random phase)")
for ax in axes:
ax.axis('off')
plt.suptitle(title, fontsize=13)
plt.tight_layout()
plt.show()
2.2 FDA implementation
The full FDA implementation with configurable bandwidth:
import numpy as np
from typing import Optional
def create_low_freq_mask(h, w, beta):
"""Create a binary mask selecting the central beta fraction of the spectrum.
Args:
h, w: image dimensions
beta: bandwidth parameter in (0, 1)
Returns:
mask: (h, w) float32 array with 1s in the low-frequency center
"""
cy, cx = h // 2, w // 2
bh = max(1, int(h * beta))
bw = max(1, int(w * beta))
mask = np.zeros((h, w), dtype=np.float32)
mask[cy - bh:cy + bh + 1, cx - bw:cx + bw + 1] = 1.0
return mask
def fda_transfer(source_img, target_img, beta=0.05):
"""Apply Fourier Domain Adaptation.
Swaps low-frequency amplitude of source with that of target,
keeping source phase intact.
Args:
source_img: (H, W, 3) float32 in [0, 1]
target_img: (H, W, 3) float32 in [0, 1]
beta: fraction of spectrum to swap (0.01 - 0.3 typical)
Returns:
adapted: (H, W, 3) float32 in [0, 1]
"""
h, w, c = source_img.shape
mask = create_low_freq_mask(h, w, beta)
adapted = np.zeros_like(source_img)
for ch in range(c):
# Compute centered FFT
F_src = np.fft.fftshift(np.fft.fft2(source_img[:, :, ch]))
F_tgt = np.fft.fftshift(np.fft.fft2(target_img[:, :, ch]))
# Decompose into amplitude and phase
amp_src = np.abs(F_src)
amp_tgt = np.abs(F_tgt)
phase_src = np.angle(F_src)
# Blend amplitudes: use target's low freq, source's high freq
amp_adapted = amp_src * (1 - mask) + amp_tgt * mask
# Reconstruct with adapted amplitude and original phase
F_adapted = amp_adapted * np.exp(1j * phase_src)
adapted[:, :, ch] = np.real(np.fft.ifft2(np.fft.ifftshift(F_adapted)))
return np.clip(adapted, 0, 1)
def fda_transfer_batch(source_batch, target_batch, beta=0.05):
"""Apply FDA to a batch of images.
Each source image is paired with a random target image for style transfer.
Args:
source_batch: (N, H, W, 3) float32 in [0, 1]
target_batch: (M, H, W, 3) float32 in [0, 1]
beta: bandwidth parameter
Returns:
adapted_batch: (N, H, W, 3) float32 in [0, 1]
"""
n = source_batch.shape[0]
m = target_batch.shape[0]
adapted = np.zeros_like(source_batch)
for i in range(n):
# Random target for each source
j = np.random.randint(m)
adapted[i] = fda_transfer(source_batch[i], target_batch[j], beta)
return adapted
2.3 Visualizing FDA at different bandwidths
def visualize_fda_beta_sweep(source_img, target_img, betas=[0.01, 0.03, 0.05, 0.1, 0.2]):
"""Show how different beta values affect the adaptation."""
fig, axes = plt.subplots(2, len(betas) + 2, figsize=(3 * (len(betas) + 2), 6))
# Top row: adapted images
axes[0, 0].imshow(source_img)
axes[0, 0].set_title('Source', fontsize=9)
for i, beta in enumerate(betas):
adapted = fda_transfer(source_img, target_img, beta=beta)
axes[0, i + 1].imshow(adapted)
axes[0, i + 1].set_title(f'beta={beta}', fontsize=9)
axes[0, -1].imshow(target_img)
axes[0, -1].set_title('Target', fontsize=9)
# Bottom row: amplitude spectra
for col, img in enumerate([source_img] +
[fda_transfer(source_img, target_img, b) for b in betas] +
[target_img]):
gray = np.mean(img, axis=2)
F = np.fft.fftshift(np.fft.fft2(gray))
axes[1, col].imshow(np.log(np.abs(F) + 1), cmap='hot')
axes[1, col].set_title('Spectrum', fontsize=8)
for ax in axes.flat:
ax.axis('off')
plt.suptitle('FDA Beta Sweep: Source -> Target', fontsize=13)
plt.tight_layout()
plt.show()
2.4 Measuring FDA effectiveness
After adapting, measure the domain gap reduction:
def evaluate_fda_effectiveness(source_images, target_images, betas):
"""Compute FID and histogram distance for each beta value.
Returns a DataFrame with columns: beta, fid, hist_R, hist_G, hist_B
"""
import pandas as pd
results = []
# Baseline: unadapted source vs target
baseline_hist = channel_histogram_distance(source_images, target_images)
results.append({
'method': 'unadapted',
'beta': 0.0,
'hist_R': baseline_hist['R'],
'hist_G': baseline_hist['G'],
'hist_B': baseline_hist['B'],
'hist_mean': np.mean(list(baseline_hist.values())),
})
for beta in betas:
adapted = fda_transfer_batch(source_images, target_images, beta=beta)
hist = channel_histogram_distance(adapted, target_images)
results.append({
'method': f'FDA(beta={beta})',
'beta': beta,
'hist_R': hist['R'],
'hist_G': hist['G'],
'hist_B': hist['B'],
'hist_mean': np.mean(list(hist.values())),
})
return pd.DataFrame(results)
2.5 Exercise: soft-edge FDA mask
The hard rectangular mask in standard FDA creates ringing artifacts at the boundary. Implement a soft-edge variant using a Gaussian falloff:
$$ M_{\sigma}(u, v) = \exp\left(-\frac{(u - c_y)^2 + (v - c_x)^2}{2\sigma^2}\right) $$
where $\sigma$ controls the transition width. Compare hard vs. soft masks visually and quantitatively.
Step 3: Adversarial Domain Adaptation (Notebook 02, ~90 min)
Goal: Build a simplified CyCADA-style pipeline with generators, discriminators, cycle consistency, and feature-level alignment.
3.1 GAN building blocks
We use the ResNet generator and PatchGAN discriminator defined earlier. For this notebook we work at 128x128 resolution with a smaller generator (6 residual blocks instead of 9) to keep training feasible on a single GPU.
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
class ResidualBlock(nn.Module):
def __init__(self, channels):
super().__init__()
self.block = nn.Sequential(
nn.ReflectionPad2d(1),
nn.Conv2d(channels, channels, 3),
nn.InstanceNorm2d(channels),
nn.ReLU(inplace=True),
nn.ReflectionPad2d(1),
nn.Conv2d(channels, channels, 3),
nn.InstanceNorm2d(channels),
)
def forward(self, x):
return x + self.block(x)
class Generator(nn.Module):
"""Lightweight ResNet generator for 128x128 images."""
def __init__(self, in_ch=3, out_ch=3, n_res=6, nf=64):
super().__init__()
# Encoder
self.encoder = nn.Sequential(
nn.ReflectionPad2d(3),
nn.Conv2d(in_ch, nf, 7), nn.InstanceNorm2d(nf), nn.ReLU(True),
nn.Conv2d(nf, nf*2, 3, stride=2, padding=1), nn.InstanceNorm2d(nf*2), nn.ReLU(True),
nn.Conv2d(nf*2, nf*4, 3, stride=2, padding=1), nn.InstanceNorm2d(nf*4), nn.ReLU(True),
)
# Residual blocks
res_blocks = [ResidualBlock(nf*4) for _ in range(n_res)]
self.res_blocks = nn.Sequential(*res_blocks)
# Decoder
self.decoder = nn.Sequential(
nn.ConvTranspose2d(nf*4, nf*2, 3, stride=2, padding=1, output_padding=1),
nn.InstanceNorm2d(nf*2), nn.ReLU(True),
nn.ConvTranspose2d(nf*2, nf, 3, stride=2, padding=1, output_padding=1),
nn.InstanceNorm2d(nf), nn.ReLU(True),
nn.ReflectionPad2d(3),
nn.Conv2d(nf, out_ch, 7),
nn.Tanh(),
)
def forward(self, x):
return self.decoder(self.res_blocks(self.encoder(x)))
class Discriminator(nn.Module):
"""PatchGAN discriminator."""
def __init__(self, in_ch=3, nf=64):
super().__init__()
self.model = nn.Sequential(
nn.Conv2d(in_ch, nf, 4, 2, 1), nn.LeakyReLU(0.2, True),
nn.Conv2d(nf, nf*2, 4, 2, 1), nn.InstanceNorm2d(nf*2), nn.LeakyReLU(0.2, True),
nn.Conv2d(nf*2, nf*4, 4, 2, 1), nn.InstanceNorm2d(nf*4), nn.LeakyReLU(0.2, True),
nn.Conv2d(nf*4, nf*8, 4, 1, 1), nn.InstanceNorm2d(nf*8), nn.LeakyReLU(0.2, True),
nn.Conv2d(nf*8, 1, 4, 1, 1),
)
def forward(self, x):
return self.model(x)
3.2 Loss functions
class GANLoss(nn.Module):
"""Least-squares GAN loss (more stable than BCE)."""
def __init__(self):
super().__init__()
def forward(self, pred, target_is_real):
if target_is_real:
target = torch.ones_like(pred)
else:
target = torch.zeros_like(pred)
return nn.functional.mse_loss(pred, target)
def cycle_consistency_loss(real, reconstructed):
"""L1 loss between original and cycle-reconstructed image."""
return nn.functional.l1_loss(reconstructed, real)
def identity_loss(real, same_domain_output):
"""L1 loss encouraging generator to be identity for target-domain inputs."""
return nn.functional.l1_loss(same_domain_output, real)
3.3 CyCADA training loop
The training loop alternates between updating discriminators and generators:
class CyCADATrainer:
"""Simplified CyCADA training manager."""
def __init__(self, device='cuda', lr=2e-4, lambda_cyc=10.0,
lambda_idt=5.0, lambda_feat=1.0):
self.device = device
# Networks
self.G_S2T = Generator().to(device) # Source -> Target
self.G_T2S = Generator().to(device) # Target -> Source
self.D_T = Discriminator().to(device) # Target discriminator
self.D_S = Discriminator().to(device) # Source discriminator
# Losses
self.criterion_GAN = GANLoss()
self.lambda_cyc = lambda_cyc
self.lambda_idt = lambda_idt
self.lambda_feat = lambda_feat
# Optimizers
self.opt_G = optim.Adam(
list(self.G_S2T.parameters()) + list(self.G_T2S.parameters()),
lr=lr, betas=(0.5, 0.999))
self.opt_D = optim.Adam(
list(self.D_T.parameters()) + list(self.D_S.parameters()),
lr=lr, betas=(0.5, 0.999))
# Image replay buffer (stabilizes training)
self.fake_T_buffer = ReplayBuffer()
self.fake_S_buffer = ReplayBuffer()
def train_step(self, real_S, real_T):
"""Single training step. Returns dict of loss values."""
real_S = real_S.to(self.device)
real_T = real_T.to(self.device)
# ---- Generator forward pass ----
fake_T = self.G_S2T(real_S) # S -> T
fake_S = self.G_T2S(real_T) # T -> S
rec_S = self.G_T2S(fake_T) # S -> T -> S (cycle)
rec_T = self.G_S2T(fake_S) # T -> S -> T (cycle)
idt_T = self.G_S2T(real_T) # T -> T (identity)
idt_S = self.G_T2S(real_S) # S -> S (identity)
# ---- Update Generators ----
self.opt_G.zero_grad()
# Adversarial losses
loss_G_S2T = self.criterion_GAN(self.D_T(fake_T), target_is_real=True)
loss_G_T2S = self.criterion_GAN(self.D_S(fake_S), target_is_real=True)
# Cycle consistency losses
loss_cyc_S = cycle_consistency_loss(real_S, rec_S)
loss_cyc_T = cycle_consistency_loss(real_T, rec_T)
# Identity losses
loss_idt_S = identity_loss(real_S, idt_S)
loss_idt_T = identity_loss(real_T, idt_T)
# Total generator loss
loss_G = (loss_G_S2T + loss_G_T2S
+ self.lambda_cyc * (loss_cyc_S + loss_cyc_T)
+ self.lambda_idt * (loss_idt_S + loss_idt_T))
loss_G.backward()
self.opt_G.step()
# ---- Update Discriminators ----
self.opt_D.zero_grad()
# Discriminator T
fake_T_buf = self.fake_T_buffer.push_and_pop(fake_T.detach())
loss_D_T = (self.criterion_GAN(self.D_T(real_T), True) +
self.criterion_GAN(self.D_T(fake_T_buf), False)) * 0.5
# Discriminator S
fake_S_buf = self.fake_S_buffer.push_and_pop(fake_S.detach())
loss_D_S = (self.criterion_GAN(self.D_S(real_S), True) +
self.criterion_GAN(self.D_S(fake_S_buf), False)) * 0.5
loss_D = loss_D_T + loss_D_S
loss_D.backward()
self.opt_D.step()
return {
'G_total': loss_G.item(),
'G_adv': (loss_G_S2T + loss_G_T2S).item(),
'G_cyc': (loss_cyc_S + loss_cyc_T).item(),
'G_idt': (loss_idt_S + loss_idt_T).item(),
'D_total': loss_D.item(),
}
class ReplayBuffer:
"""Buffer of previously generated images to stabilize GAN training.
With 50% probability, returns a previously stored image instead of
the most recent one. This prevents the discriminator from overfitting
to the generator's current mode.
"""
def __init__(self, max_size=50):
self.max_size = max_size
self.data = []
def push_and_pop(self, images):
result = []
for img in images:
img = img.unsqueeze(0)
if len(self.data) < self.max_size:
self.data.append(img)
result.append(img)
elif torch.rand(1).item() > 0.5:
idx = torch.randint(0, len(self.data), (1,)).item()
result.append(self.data[idx].clone())
self.data[idx] = img
else:
result.append(img)
return torch.cat(result, dim=0)
3.4 Feature-level domain discriminator
Add a lightweight discriminator that operates on intermediate features of a pretrained classifier:
class FeatureDiscriminator(nn.Module):
"""Domain discriminator for feature-level alignment.
Operates on features extracted from an intermediate layer of
the task network. Binary classification: source (0) or target (1).
"""
def __init__(self, in_features=512):
super().__init__()
self.classifier = nn.Sequential(
nn.Linear(in_features, 256),
nn.ReLU(True),
nn.Dropout(0.5),
nn.Linear(256, 128),
nn.ReLU(True),
nn.Dropout(0.5),
nn.Linear(128, 1),
)
def forward(self, x):
# Global average pooling if input is spatial
if x.dim() == 4:
x = x.mean(dim=[2, 3])
return self.classifier(x)
class GradientReversalLayer(torch.autograd.Function):
"""Gradient Reversal Layer for domain-adversarial training.
During forward pass: identity.
During backward pass: negate gradients (scaled by lambda).
This is the key trick in DANN/CyCADA: the feature extractor learns
domain-invariant features by trying to CONFUSE the domain classifier.
"""
@staticmethod
def forward(ctx, x, lambda_val):
ctx.lambda_val = lambda_val
return x.clone()
@staticmethod
def backward(ctx, grad_output):
return -ctx.lambda_val * grad_output, None
def gradient_reversal(x, lambda_val=1.0):
return GradientReversalLayer.apply(x, lambda_val)
3.5 Training monitoring
Track key metrics during training to detect problems early:
def monitor_cycada_training(trainer, source_loader, target_loader, epoch,
n_vis=4):
"""Visualize adapted images and track FID during training."""
trainer.G_S2T.eval()
with torch.no_grad():
src_batch = next(iter(source_loader))[0][:n_vis].to(trainer.device)
tgt_batch = next(iter(target_loader))[0][:n_vis].to(trainer.device)
fake_T = trainer.G_S2T(src_batch)
fake_S = trainer.G_T2S(tgt_batch)
rec_S = trainer.G_T2S(fake_T)
rec_T = trainer.G_S2T(fake_S)
fig, axes = plt.subplots(4, n_vis, figsize=(3 * n_vis, 12))
row_labels = ['Source', 'S->T (adapted)', 'Target', 'T->S']
batches = [src_batch, fake_T, tgt_batch, fake_S]
for row, (label, batch) in enumerate(zip(row_labels, batches)):
for col in range(n_vis):
img = denormalize(batch[col].cpu()).permute(1, 2, 0).numpy()
axes[row, col].imshow(np.clip(img, 0, 1))
axes[row, col].axis('off')
axes[row, 0].set_ylabel(label, fontsize=10, rotation=0, labelpad=60)
plt.suptitle(f'CyCADA Training -- Epoch {epoch}', fontsize=13)
plt.tight_layout()
plt.show()
trainer.G_S2T.train()
3.6 Exercise: semantic consistency loss
Implement the semantic consistency loss from the full CyCADA paper. Load a pretrained classifier and add a loss term that penalizes changes in predicted class probabilities between source images and their adapted versions:
$$ \mathcal{L}{\text{sem}} = \text{CrossEntropy}\Big(f\big(G{S \to T}(x_s)\big),; \text{argmax}, f(x_s)\Big) $$
Integrate this into the training loop and compare results with and without semantic consistency.
Step 4: Benchmark Evaluation (Notebook 03, ~90 min)
Goal: Train an identical downstream classifier under each adaptation condition, evaluate with proper statistics, and produce a publication-quality benchmark report.
4.1 Downstream task: image classification
We use a lightweight classifier (MobileNetV2 or ResNet-18) as the downstream task. The same architecture and hyperparameters are used for every condition -- only the training data changes.
import torchvision.models as models
def create_classifier(num_classes=10, pretrained=True):
"""Create a MobileNetV2 classifier for the benchmark.
Uses pretrained ImageNet features with a fresh classification head.
"""
model = models.mobilenet_v2(pretrained=pretrained)
model.classifier[1] = nn.Linear(model.last_channel, num_classes)
return model
def train_classifier(model, train_loader, val_loader, epochs=20,
lr=1e-3, device='cuda'):
"""Train the classifier and return training history.
Args:
model: the classifier network
train_loader: training data loader
val_loader: validation data loader (target domain)
epochs: number of training epochs
lr: learning rate
device: cuda or cpu
Returns:
history: dict with 'train_loss', 'train_acc', 'val_loss', 'val_acc'
"""
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
criterion = nn.CrossEntropyLoss()
history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
for epoch in range(epochs):
# Training
model.train()
total_loss, correct, total = 0, 0, 0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item() * images.size(0)
correct += (outputs.argmax(1) == labels).sum().item()
total += images.size(0)
history['train_loss'].append(total_loss / total)
history['train_acc'].append(correct / total)
# Validation (on target domain)
model.eval()
val_loss, val_correct, val_total = 0, 0, 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
val_loss += loss.item() * images.size(0)
val_correct += (outputs.argmax(1) == labels).sum().item()
val_total += images.size(0)
history['val_loss'].append(val_loss / val_total)
history['val_acc'].append(val_correct / val_total)
scheduler.step()
if (epoch + 1) % 5 == 0:
print(f"Epoch {epoch+1}/{epochs} -- "
f"Train acc: {history['train_acc'][-1]:.3f}, "
f"Val acc: {history['val_acc'][-1]:.3f}")
return history
4.2 Running all benchmark conditions
def run_benchmark(source_train, target_train, target_test,
fda_betas=[0.01, 0.05, 0.1],
n_runs=3, epochs=20, device='cuda'):
"""Run the full domain adaptation benchmark.
Trains classifiers under each condition with multiple seeds.
"""
conditions = {}
# Condition A: Source-only (lower bound)
conditions['source_only'] = source_train
# Condition B: FDA-adapted (for each beta)
for beta in fda_betas:
adapted_data = apply_fda_to_dataset(source_train, target_train, beta=beta)
conditions[f'fda_beta_{beta}'] = adapted_data
# Condition C: CyCADA-adapted
cycada_data = apply_cycada_to_dataset(source_train, target_train)
conditions['cycada'] = cycada_data
# Condition D: Mixed training (various ratios)
for ratio in [0.3, 0.5, 0.7]:
mixed_data = create_mixed_dataset(source_train, target_train, source_ratio=ratio)
conditions[f'mixed_{ratio}'] = mixed_data
# Condition E: Target-only (upper bound)
conditions['target_only'] = target_train
# Run experiments
all_results = {}
for name, train_data in conditions.items():
print(f"\n{'='*60}")
print(f"Condition: {name}")
print(f"{'='*60}")
run_results = []
for run in range(n_runs):
torch.manual_seed(42 + run)
np.random.seed(42 + run)
model = create_classifier(num_classes=10)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = DataLoader(target_test, batch_size=64)
history = train_classifier(model, train_loader, test_loader,
epochs=epochs, device=device)
# Final evaluation
metrics = evaluate_classifier(model, test_loader, device)
metrics['run'] = run
metrics['history'] = history
run_results.append(metrics)
print(f" Run {run+1}: acc={metrics['accuracy']:.3f}")
all_results[name] = run_results
return all_results
4.3 Detailed evaluation metrics
from sklearn.metrics import (classification_report, confusion_matrix,
precision_recall_curve, average_precision_score)
def evaluate_classifier(model, test_loader, device='cuda'):
"""Compute detailed metrics on the test set.
Returns:
dict with accuracy, per_class_accuracy, confusion_matrix,
per_class_ap, mean_ap, predictions, labels
"""
model.eval()
all_preds = []
all_labels = []
all_probs = []
with torch.no_grad():
for images, labels in test_loader:
images = images.to(device)
outputs = model(images)
probs = torch.softmax(outputs, dim=1)
preds = outputs.argmax(1)
all_preds.append(preds.cpu())
all_labels.append(labels)
all_probs.append(probs.cpu())
preds = torch.cat(all_preds).numpy()
labels = torch.cat(all_labels).numpy()
probs = torch.cat(all_probs).numpy()
# Overall accuracy
accuracy = (preds == labels).mean()
# Per-class accuracy
classes = np.unique(labels)
per_class_acc = {}
for c in classes:
mask = labels == c
per_class_acc[c] = (preds[mask] == labels[mask]).mean()
# Per-class AP (one-vs-rest)
per_class_ap = {}
for c in classes:
binary_labels = (labels == c).astype(int)
per_class_ap[c] = average_precision_score(binary_labels, probs[:, c])
mean_ap = np.mean(list(per_class_ap.values()))
# Confusion matrix
cm = confusion_matrix(labels, preds)
return {
'accuracy': accuracy,
'mean_ap': mean_ap,
'per_class_accuracy': per_class_acc,
'per_class_ap': per_class_ap,
'confusion_matrix': cm,
'predictions': preds,
'labels': labels,
'probabilities': probs,
}
4.4 Statistical comparison
def bootstrap_confidence_interval(values, n_bootstrap=1000, ci=0.95, seed=42):
"""Compute bootstrap confidence interval for the mean.
Args:
values: array of metric values across runs
n_bootstrap: number of bootstrap samples
ci: confidence level (default 95%)
Returns:
mean, ci_lower, ci_upper
"""
rng = np.random.RandomState(seed)
values = np.array(values)
n = len(values)
boot_means = []
for _ in range(n_bootstrap):
sample = rng.choice(values, size=n, replace=True)
boot_means.append(sample.mean())
boot_means = np.sort(boot_means)
alpha = 1 - ci
ci_lower = np.percentile(boot_means, 100 * alpha / 2)
ci_upper = np.percentile(boot_means, 100 * (1 - alpha / 2))
return values.mean(), ci_lower, ci_upper
def paired_significance_test(results_a, results_b, metric='accuracy'):
"""Paired Wilcoxon signed-rank test between two conditions.
Args:
results_a: list of metric dicts from condition A
results_b: list of metric dicts from condition B
metric: which metric to compare
Returns:
statistic, p_value
"""
from scipy.stats import wilcoxon
vals_a = [r[metric] for r in results_a]
vals_b = [r[metric] for r in results_b]
stat, p_value = wilcoxon(vals_a, vals_b, alternative='two-sided')
return stat, p_value
def build_benchmark_table(all_results):
"""Create a summary table with means, CIs, and significance."""
import pandas as pd
rows = []
for name, runs in all_results.items():
accuracies = [r['accuracy'] for r in runs]
mean_aps = [r['mean_ap'] for r in runs]
acc_mean, acc_lo, acc_hi = bootstrap_confidence_interval(accuracies)
map_mean, map_lo, map_hi = bootstrap_confidence_interval(mean_aps)
rows.append({
'Condition': name,
'Accuracy': f'{acc_mean:.3f}',
'Acc 95% CI': f'[{acc_lo:.3f}, {acc_hi:.3f}]',
'mAP': f'{map_mean:.3f}',
'mAP 95% CI': f'[{map_lo:.3f}, {map_hi:.3f}]',
'N Runs': len(runs),
})
return pd.DataFrame(rows)
4.5 Visualization: benchmark summary plots
def plot_benchmark_results(all_results, class_names=None):
"""Create publication-quality benchmark visualization."""
fig = plt.figure(figsize=(18, 14))
# Plot 1: Bar chart with confidence intervals
ax1 = fig.add_subplot(2, 2, 1)
conditions = list(all_results.keys())
means = []
errors = []
for name in conditions:
accs = [r['accuracy'] for r in all_results[name]]
m, lo, hi = bootstrap_confidence_interval(accs)
means.append(m)
errors.append([m - lo, hi - m])
errors = np.array(errors).T
colors = plt.cm.Set2(np.linspace(0, 1, len(conditions)))
bars = ax1.bar(range(len(conditions)), means, yerr=errors,
color=colors, edgecolor='white', capsize=5)
ax1.set_xticks(range(len(conditions)))
ax1.set_xticklabels(conditions, rotation=45, ha='right', fontsize=8)
ax1.set_ylabel('Accuracy on Target Domain')
ax1.set_title('Benchmark Results: Accuracy by Condition')
ax1.grid(axis='y', alpha=0.3)
# Plot 2: Per-class AP comparison (heatmap)
ax2 = fig.add_subplot(2, 2, 2)
n_classes = len(list(all_results.values())[0][0]['per_class_ap'])
ap_matrix = np.zeros((len(conditions), n_classes))
for i, name in enumerate(conditions):
avg_ap = {}
for r in all_results[name]:
for c, ap in r['per_class_ap'].items():
avg_ap[c] = avg_ap.get(c, []) + [ap]
for c, vals in avg_ap.items():
ap_matrix[i, c] = np.mean(vals)
labels = class_names if class_names else [str(i) for i in range(n_classes)]
im = ax2.imshow(ap_matrix, cmap='RdYlGn', aspect='auto', vmin=0, vmax=1)
ax2.set_xticks(range(n_classes))
ax2.set_xticklabels(labels, rotation=45, ha='right', fontsize=8)
ax2.set_yticks(range(len(conditions)))
ax2.set_yticklabels(conditions, fontsize=8)
ax2.set_title('Per-Class AP by Condition')
plt.colorbar(im, ax=ax2, shrink=0.8)
# Plot 3: Training curves
ax3 = fig.add_subplot(2, 2, 3)
for name, runs in all_results.items():
# Average val accuracy curve across runs
val_accs = np.array([r['history']['val_acc'] for r in runs])
mean_curve = val_accs.mean(axis=0)
std_curve = val_accs.std(axis=0)
epochs = np.arange(1, len(mean_curve) + 1)
ax3.plot(epochs, mean_curve, label=name)
ax3.fill_between(epochs, mean_curve - std_curve, mean_curve + std_curve, alpha=0.15)
ax3.set_xlabel('Epoch')
ax3.set_ylabel('Validation Accuracy (Target)')
ax3.set_title('Training Curves by Condition')
ax3.legend(fontsize=7, ncol=2)
ax3.grid(alpha=0.3)
# Plot 4: Radar chart
ax4 = fig.add_subplot(2, 2, 4, projection='polar')
n_metrics = 5
metric_names = ['Accuracy', 'mAP', 'Speed', 'Simplicity', 'Stability']
angles = np.linspace(0, 2 * np.pi, n_metrics, endpoint=False).tolist()
angles += angles[:1] # close the polygon
for i, (name, runs) in enumerate(all_results.items()):
if name in ['source_only', 'target_only']:
continue # skip baselines for clarity
accs = [r['accuracy'] for r in runs]
maps = [r['mean_ap'] for r in runs]
values = [
np.mean(accs),
np.mean(maps),
1.0 if 'fda' in name else 0.3 if 'cycada' in name else 0.7, # speed
1.0 if 'fda' in name else 0.2 if 'cycada' in name else 0.8, # simplicity
1.0 - np.std(accs) * 10, # stability
]
values += values[:1]
ax4.plot(angles, values, label=name, linewidth=2)
ax4.fill(angles, values, alpha=0.1)
ax4.set_xticks(angles[:-1])
ax4.set_xticklabels(metric_names, fontsize=9)
ax4.set_title('Method Comparison Radar', y=1.1)
ax4.legend(fontsize=7, loc='upper right', bbox_to_anchor=(1.3, 1.1))
plt.suptitle('Domain Adaptation Benchmark Summary', fontsize=15, y=1.02)
plt.tight_layout()
plt.show()
4.6 Exercise: design your own adaptation method
Propose and implement a novel adaptation strategy that combines elements from the methods you have studied. Some ideas:
- FDA + fine-tuning: Apply FDA as preprocessing, then fine-tune on a small set of real images.
- Progressive FDA: Use a curriculum that starts with low beta and gradually increases it.
- Feature alignment without GAN: Use MMD loss to align feature distributions directly (no generator/discriminator needed).
- Multi-scale FDA: Apply different beta values to different frequency bands and blend them.
Implement your method, add it to the benchmark, and compare it against the existing conditions. Write a brief analysis (1 paragraph) explaining why your method does or does not improve over the baselines.
Notebook Guide
| # | Notebook | Focus | Duration |
|---|---|---|---|
| 1 | 01_fourier_domain_adaptation.ipynb | FFT basics, FDA implementation, bandwidth tuning, gap measurement | 75 min |
| 2 | 02_adversarial_adaptation.ipynb | GAN review, CyCADA training, cycle consistency, feature alignment | 90 min |
| 3 | 03_benchmark_evaluation.ipynb | Downstream training, mAP evaluation, statistical comparison, reporting | 90 min |
Tips for Success
- Start with small images (64x64 or 128x128). Domain adaptation methods scale straightforwardly to higher resolution, but debugging is much faster at low resolution.
- Log everything. Use dictionaries or pandas DataFrames to track every metric, hyperparameter, and random seed. You will thank yourself when writing the benchmark report.
- Watch the discriminator loss. If it drops to zero, the generator has collapsed. If it stays near 0.5, training is balanced. If it oscillates wildly, reduce the learning rate.
- Use the replay buffer. Without it, CyCADA training is unstable because the discriminator only sees the generator's latest mode.
- Run significance tests. Three runs is the minimum for meaningful statistics. Five is better. Never report a single-run result as a benchmark number.
Common Pitfalls
- Forgetting to normalize consistently: Source and target images must use the same normalization (e.g., ImageNet mean/std). Mismatched normalization creates an artificial domain gap.
- Data leakage: Never use target test images for FDA style transfer or CyCADA training. The target training split is fair game; the test split must be held out.
- Comparing different training budgets: If FDA-adapted training runs for 20 epochs and CyCADA-adapted runs for 50 (including GAN training), the comparison is unfair. Budget the total compute fairly.
- Ignoring class imbalance: If the dataset has imbalanced classes, mAP is more informative than raw accuracy. Always report both.
Extension Ideas
After completing the benchmark, consider these advanced extensions:
- Real driving datasets: Replace the proxy dataset with actual sim-to-real data (e.g., Sim10k -> Cityscapes, SYNTHIA -> Cityscapes, or GTA5 -> Cityscapes) for a publication-grade benchmark.
- Object detection: Replace classification with a full 2D object detector (FCOS, SSD) and evaluate with COCO-style mAP at multiple IoU thresholds.
- Self-training / pseudo-labels: Add a self-training condition where the source-trained model generates pseudo-labels on target data, then retrains on the combined set.
- Source-free adaptation: Explore methods that adapt a source-trained model to the target domain without access to source data (e.g., SHOT, AdaContrast).
- Continuous adaptation: Simulate a deployment scenario where the target domain shifts over time (e.g., day -> dusk -> night) and the model must adapt online.
- Multi-modal adaptation: Extend to lidar point clouds or camera + lidar fusion, where the domain gap manifests differently in each modality.
References
- Yang, Y. & Soatto, S. (2020). "FDA: Fourier Domain Adaptation for Semantic Segmentation." CVPR 2020.
- Hoffman, J. et al. (2018). "CyCADA: Cycle-Consistent Adversarial Domain Adaptation." ICML 2018.
- Ganin, Y. et al. (2016). "Domain-Adversarial Training of Neural Networks." JMLR 2016.
- Zhu, J.-Y. et al. (2017). "Unpaired Image-to-Image Translation using Cycle-Consistent Adversarial Networks." ICCV 2017.
- Tobin, J. et al. (2017). "Domain Randomization for Transferring Deep Neural Networks from Simulation to the Real World." IROS 2017.
- Heusel, M. et al. (2017). "GANs Trained by a Two Time-Scale Update Rule Converge to a Local Nash Equilibrium." NeurIPS 2017. (FID metric)
- Tremblay, J. et al. (2018). "Training Deep Networks with Synthetic Data: Bridging the Reality Gap by Domain Randomization." CVPR Workshops 2018.