Analysis Pipeline and Workflow
This document provides a comprehensive explanation of the AnalysisG analysis pipeline, including the complete workflow from data loading to training/inference, with detailed function call sequences and state management.
Overview
The AnalysisG framework orchestrates a multi-stage pipeline for high-energy physics analysis and machine learning:
Pipeline Stages:
Data Loading: ROOT/HDF5 files → Event objects
Event Selection: Apply physics cuts and filters
Graph Construction: Events → Graph representations
Training/Inference: GNN models on graph datasets
Evaluation: Metrics computation and result storage
Main Controller: analysis class
Workflow Diagram
┌─────────────────────────────────────────────────────────────┐
│ User Configuration │
│ • add_samples() │
│ • add_event_template() │
│ • add_selection_template() │
│ • add_graph_template() │
│ • add_model() / add_metric_template() │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ start() Method │
│ Orchestrates entire pipeline │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 1. Cache Validation │
│ check_cache() - Check for pre-built data │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 2. Event Building (if needed) │
│ build_events() - ROOT → Event objects │
│ ├─ Parallel file reading │
│ ├─ Particle reconstruction │
│ └─ Event serialization to HDF5 │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. Selection Application │
│ build_selections() - Apply physics cuts │
│ ├─ Event filtering │
│ ├─ Custom selection logic │
│ └─ Cache filtered events │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 4. Graph Construction │
│ build_graphs() - Events → Graphs │
│ ├─ Node/edge feature extraction │
│ ├─ Edge connectivity computation │
│ └─ Graph serialization │
└──────────────────────┬──────────────────────────────────────┘
│
┌───────┴────────┐
▼ ▼
┌──────────────────────┐ ┌──────────────────────┐
│ 5a. Training Mode │ │ 5b. Inference Mode │
│ build_model_session()│ │ build_inference() │
│ ├─ K-fold CV │ │ ├─ Load model │
│ ├─ Data batching │ │ ├─ Run prediction │
│ ├─ Optimization │ │ └─ Save outputs │
│ └─ Checkpointing │ └─────────────────────┘
└──────────┬───────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 6. Metrics Computation │
│ build_metric() - Evaluate performance │
│ ├─ Per-fold metrics │
│ ├─ Aggregated statistics │
│ └─ ROC curves, plots │
└─────────────────────────────────────────────────────────────┘
Analysis Class API
Public Methods
add_samples()
Signature:
void add_samples(std::string path, std::string label)
Purpose: Register input ROOT files for processing
Parameters:
path(string): File path or glob pattern (e.g.,/data/*.root)label(string): Sample identifier (e.g.,"ttbar","signal")
Behavior:
Resolves glob patterns to file list
Stores in
file_labelsmap:label→pathFiles processed lazily on
start()call
Usage:
from AnalysisG import Analysis
ana = Analysis()
ana.AddSamples("/data/ttbar/*.root", "ttbar")
ana.AddSamples("/data/signal/*.root", "signal")
Internal Storage: std::map<std::string, std::string> file_labels
add_event_template()
Signature:
void add_event_template(event_template* ev, std::string label)
Purpose: Register custom event class for ROOT file parsing
Parameters:
ev(event_template*): User-defined event class (inheritsEventTemplate)label(string): Event type identifier (must match sample label or be shared)
Behavior:
Clones event template:
ev->clone()Stores in
event_labelsmap:label→event_template*Template used to instantiate events during
build_events()
Usage:
from AnalysisG import EventTemplate
class MyEvent(EventTemplate):
def selection(self):
return len([p for p in self.Particles if p.is_lep]) >= 2
ana.AddEvent(MyEvent(), "ttbar")
Internal Storage: std::map<std::string, event_template*> event_labels
Key Point: Event template defines:
Particle registration (jets, leptons, etc.)
ROOT branch mapping (via
add_leaf())Selection logic (
selection()method)
add_selection_template()
Signature:
void add_selection_template(selection_template* sel)
Purpose: Register event filter for physics cuts
Parameters:
sel(selection_template*): Selection criteria implementation
Behavior:
Clones selection:
sel->clone()Stores in
selection_namesmap:name→selection_template*Applied to all events during
build_selections()
Usage:
from AnalysisG import SelectionTemplate
class FourTopSelection(SelectionTemplate):
def selection(self, event):
jets = [p for p in event.Particles if p.Type == "jet"]
bjets = [p for p in jets if p.is_b]
return len(jets) >= 4 and len(bjets) >= 2
ana.AddSelection(FourTopSelection())
Internal Storage: std::map<std::string, selection_template*> selection_names
add_graph_template()
Signature:
void add_graph_template(graph_template* gr, std::string label)
Purpose: Register graph construction method for GNN input
Parameters:
gr(graph_template*): Graph builder (inheritsGraphTemplate)label(string): Graph type identifier
Behavior:
Clones graph template:
gr->clone()Stores in
graph_labelsmap:(event_label, graph_name)→graph_template*Constructs graphs during
build_graphs()
Usage:
from AnalysisG import GraphTemplate
class ParticleGraph(GraphTemplate):
def build(self, event):
# Build node features, edge connectivity
pass
ana.AddGraph(ParticleGraph(), "particle_graph")
Internal Storage: std::map<std::string, std::map<std::string, graph_template*>> graph_labels
add_model()
Signatures:
void add_model(model_template* model, optimizer_params_t* op, std::string run_name)
void add_model(model_template* model, std::string run_name)
Purpose: Register GNN model for training or inference
Parameters:
model(model_template*): PyTorch model wrapperop(optimizer_params_t*): Training hyperparameters (learning rate, epochs, etc.)run_name(string): Training run identifier
Behavior (with optimizer):
Clones model:
model->clone()Stores in
model_sessions:(model*, optimizer_params_t*)Triggers training mode:
build_model_session()
Behavior (without optimizer):
Stores in
model_inferencemapTriggers inference mode:
build_inference()
Usage (Training):
from AnalysisG import ModelTemplate
model = MyGNN() # PyTorch model
params = OptimizerParams()
params.lr = 0.001
params.epochs = 100
params.optimizer = "Adam"
ana.AddModel(model, params, "gnn_training")
Usage (Inference):
model = MyGNN()
model.Load("checkpoint.pt") # Load pre-trained weights
ana.AddModel(model, "inference_run")
Internal Storage:
Training:
std::vector<std::tuple<model_template*, optimizer_params_t*>> model_sessionsInference:
std::map<std::string, model_template*> model_inference
add_metric_template()
Signature:
void add_metric_template(metric_template* mx, model_template* mdl)
Purpose: Register evaluation metric for model performance
Parameters:
mx(metric_template*): Metric implementation (accuracy, PageRank, etc.)mdl(model_template*): Model to evaluate
Behavior:
Associates metric with model
Stores in
metric_namesmapComputed during/after training via
build_metric()
Usage:
from AnalysisG.metrics import Accuracy
metric = Accuracy()
ana.AddMetric(metric, model)
Internal Storage: std::map<std::string, metric_template*> metric_names
start()
Signature:
void start()
Purpose: Execute entire analysis pipeline
Behavior: Sequential execution of build stages:
check_cache()- Validate cached databuild_project()- Initialize output directoriesbuild_events()- Process ROOT filesbuild_selections()- Apply event filtersbuild_graphs()- Construct graph representationsbuild_model_session()ORbuild_inference()- Train/inferbuild_metric()- Compute evaluation metrics
State Management: Sets started = true to prevent re-execution
Thread Safety: Uses attach_threads() for parallel processing
Usage:
ana = Analysis()
# ... configuration ...
ana.Start() # Blocks until complete
Private Methods (Internal Pipeline)
check_cache()
Purpose: Validate existence of cached data to skip reprocessing
Process:
Check
OutputPathfor existing HDF5 filesPopulate
in_cachemap:(sample, stage)→boolSet
skip_event_buildflags if events cached
Cache Keys:
"events"- Reconstructed event objects"selections/{name}"- Filtered events"graphs/{graph_name}"- Constructed graphs
Behavior:
If cached: Skip corresponding build stage
If missing: Rebuild from source
build_project()
Purpose: Initialize output directory structure
Creates:
OutputPath/
├── events/
│ └── {sample_label}/
├── selections/
│ └── {selection_name}/{sample_label}/
├── graphs/
│ └── {graph_name}/{sample_label}/
├── models/
│ └── {run_name}/
│ ├── checkpoints/
│ └── logs/
└── metrics/
└── {metric_name}/
build_events()
Purpose: Parse ROOT files into event objects
Process:
Initialize I/O:
io* reader = new io()For each sample:
Open ROOT file:
reader->open_root(path)Get TTree list:
reader->list_root_trees()For each TTree:
Get event template:
event_labels[sample_label]For each entry in TTree:
Read branches:
reader->read_root_branch()Build event:
event->build(element_t*)Apply
selection(): Keep iftrueSerialize to HDF5:
event->__reduce__()
Cache: Write to
OutputPath/events/{sample}.h5
Parallelization: Multiple files processed concurrently
Key Functions:
event_template::build(element_t*)- Construct event from ROOT dataevent_template::selection()- User-defined filterio::write_hdf5()- Serialize to disk
build_selections()
Purpose: Apply additional event filters beyond event-level selection()
Process:
For each selection template:
Load cached events:
io::read_hdf5()For each event:
Apply
selection_template::selection(event)Keep if returns
true
Cache filtered events:
OutputPath/selections/{name}/{sample}.h5
Use Case: Multi-stage cuts (e.g., preselection → tight selection)
build_graphs()
Purpose: Transform events into graph representations for GNNs
Process:
For each graph template:
Load events (from selections or events)
For each event:
Call
graph_template::build(event)Extract node features (particle kinematics)
Compute edge connectivity (e.g., k-NN, fully connected)
Create edge features (DeltaR, invariant mass)
Serialize graphs:
graph_template::__reduce__()Cache:
OutputPath/graphs/{graph_name}/{sample}.h5
Parallelization: Graphs built concurrently across samples
Key Functions:
graph_template::build(event)- User-defined graph constructiongraph::edge_aggregation()- Compute edge features
build_model_session()
Purpose: Train GNN models with k-fold cross-validation
Process:
Initialize DataLoader:
loader = new dataloader(settings) loader->add_graph_path(graph_paths) loader->set_kfolds(k)
For each fold (k-fold CV):
Initialize optimizer:
initialize_loop(optimizer*, k, model, config, report)Clone model for fold
Setup Adam/SGD optimizer
Reset learning rate scheduler
Training loop (epochs):
For each batch in train set:
Load graphs:
loader->next()Forward pass:
model->forward(graphs)Compute loss:
model->loss_fx(pred, truth)Backward:
loss.backward()Update weights:
optimizer->step()
Validation:
Evaluate on validation set
Compute metrics
Save checkpoint if improved
Test evaluation:
Load best checkpoint
Evaluate on test fold
Store results
Aggregate results: Average metrics across folds
Parallelization: Each fold trained in separate thread
Static Method: execution(model, settings, data, progress, output, content, msg)
Key Variables:
optimizer* trainer- Per-fold optimizer instancesmodel_report* reports- Training metrics/logsstd::vector<std::thread*> threads- Parallel fold training
build_inference()
Purpose: Run pre-trained model on new data
Process:
Load model weights:
model->load(checkpoint_path)Set to evaluation mode:
model->eval()For each sample:
Load graphs:
io::read_hdf5()Batch graphs:
dataloader::batch()For each batch:
Forward pass:
model->forward(graphs)Extract predictions
Store outputs:
OutputPath/inference/{sample}.h5
No Gradient: torch::no_grad() for efficiency
build_metric()
Purpose: Compute evaluation metrics (accuracy, ROC, custom metrics)
Process:
For each metric template:
Initialize:
metric->initialize()Load predictions + truth labels
Compute metric:
metric->compute(pred, truth)Store results:
OutputPath/metrics/{metric_name}/
Common metrics:
Accuracy:
(TP + TN) / (TP + TN + FP + FN)ROC curve: TPR vs FPR at varying thresholds
PageRank: Graph-based particle importance
Static Method: execution_metric(metric_t*, progress, msg)
Parallelization: Metrics computed concurrently
Workflow Example (Complete)
Four-Top Analysis with GNN Training
from AnalysisG import Analysis, EventTemplate, GraphTemplate, ModelTemplate
from AnalysisG.metrics import Accuracy
# 1. Define custom event class
class FourTopEvent(EventTemplate):
def selection(self):
jets = [p for p in self.Particles if p.Type == "jet"]
bjets = [p for p in jets if p.is_b]
leptons = [p for p in self.Particles if p.is_lep]
return len(jets) >= 4 and len(bjets) >= 2 and len(leptons) == 1
# 2. Define graph construction
class ParticleGraph(GraphTemplate):
def build(self, event):
# Nodes: all particles
self.Nodes = event.Particles
# Edges: k-NN connectivity (k=5)
self.edge_index = self.knn(k=5)
# 3. Initialize analysis
ana = Analysis()
ana.OutputPath = "./results"
ana.kFolds = 5
# 4. Add data
ana.AddSamples("/data/ttbar/*.root", "ttbar")
ana.AddSamples("/data/fourtop/*.root", "signal")
# 5. Register templates
ana.AddEvent(FourTopEvent(), "ttbar")
ana.AddEvent(FourTopEvent(), "signal")
ana.AddGraph(ParticleGraph(), "particle_graph")
# 6. Setup model training
from my_models import ParticleGNN
model = ParticleGNN(input_dim=8, hidden_dim=64, output_dim=2)
params = OptimizerParams()
params.lr = 0.001
params.epochs = 100
params.optimizer = "Adam"
params.weight_decay = 1e-4
ana.AddModel(model, params, "fourtop_classifier")
# 7. Add metrics
ana.AddMetric(Accuracy(), model)
# 8. Execute pipeline
ana.Start()
# Pipeline executes:
# - check_cache(): No cache, rebuild all
# - build_events(): Parse ROOT → 50,000 events
# - build_graphs(): Events → graphs (5min)
# - build_model_session(): 5-fold CV training (30min)
# * Fold 1: 80% train, 10% val, 10% test
# * ... (parallel execution)
# - build_metric(): Compute accuracy per fold
# Results saved to:
# ./results/models/fourtop_classifier/
# ├── fold_0/checkpoint_best.pt
# ├── fold_1/checkpoint_best.pt
# └── ...
# ./results/metrics/Accuracy/
# └── fold_results.json
Output:
{
"accuracy": {
"fold_0": 0.87,
"fold_1": 0.89,
"fold_2": 0.88,
"fold_3": 0.86,
"fold_4": 0.87,
"mean": 0.874,
"std": 0.011
}
}
Inference on New Data
# Load pre-trained model
model = ParticleGNN.Load("./results/models/fourtop_classifier/fold_0/checkpoint_best.pt")
# Setup analysis for inference
ana = Analysis()
ana.OutputPath = "./inference_results"
ana.AddSamples("/new_data/*.root", "unknown")
ana.AddEvent(FourTopEvent(), "unknown")
ana.AddGraph(ParticleGraph(), "particle_graph")
ana.AddModel(model, "classify_unknown") # No optimizer = inference mode
ana.Start()
# Pipeline executes:
# - build_events(): Parse new ROOT files
# - build_graphs(): Construct graphs
# - build_inference(): Run model.forward(), save predictions
# Results:
# ./inference_results/inference/unknown.h5
# Columns: event_id, prediction, probability
Progress Monitoring
Real-Time Progress APIs
import time
# Start analysis in background thread
ana.Start() # Non-blocking if attach_threads() called
while not ana.IsComplete()["all"]:
progress = ana.Progress()
print(f"Events: {progress['events'][0]:.1f}%")
print(f"Graphs: {progress['graphs'][0]:.1f}%")
print(f"Training: {progress['training'][0]:.1f}%")
mode = ana.ProgressMode()
print(f"Current: {mode['current']}")
report = ana.ProgressReport()
print(f"Status: {report['message']}")
time.sleep(5)
Methods:
progress()→std::map<std::string, std::vector<float>>Keys:
"events","graphs","training","metrics"Values:
[current%, total_steps]
progress_mode()→std::map<std::string, std::string>"current"→ Current pipeline stage name
progress_report()→std::map<std::string, std::string>"message"→ Detailed status message
is_complete()→std::map<std::string, bool>Per-stage completion flags
Performance Considerations
Caching Strategy
First Run (no cache):
Total time: ~2 hours for 100k events
Breakdown: Events (30min) + Graphs (60min) + Training (30min)
Subsequent Runs (with cache):
Training only: ~30min
Changes to model hyperparameters don’t invalidate cache
Cache Invalidation:
Changing event/selection templates → rebuild events
Changing graph templates → rebuild graphs
Model changes → no rebuild needed
Parallelization
Multi-threading:
File reading:
N_coresfiles processed simultaneouslyGraph construction: Parallel across samples
K-fold training: Each fold in separate thread
GPU Acceleration:
CUDA kernels for physics calculations (DeltaR, mass)
PyTorch CUDA tensors for model forward/backward
Batch size limited by GPU memory
Bottlenecks:
Graph construction (CPU-bound)
Disk I/O for large datasets
GNN forward pass (GPU memory)
Memory Management
Event Buffering:
Events loaded in chunks (default: 1000 events)
HDF5 compression saves ~70% disk space
Graph Caching:
Graphs cached to disk, not kept in memory
DataLoader streams graphs in batches
Model Training:
Gradient accumulation for large models
Automatic mixed precision (AMP) support
See Also
Build System and CMake Configuration - CMake configuration and compilation
cpp_complete_reference - C++ API reference
../core/analysis - Analysis class documentation
../core/event_template - Event template guide
../core/graph_template - Graph construction