Composing related sweeps¶
A sensitivity study is rarely one sweep. You run a main sweep, then re-run it with one input perturbed — a drag-area factor, a tighter filter threshold, a different input subset — and compare the arms. Every variant must drive the same per-run pipeline as the main sweep, or the numbers are not comparable.
This page shows how to structure a family of related sweeps so the variants share that pipeline cleanly — without copy-pasting it, and without one driver reaching into another's private helpers.
The shape of a per-run pipeline¶
A sweep whose runs need work on either side of GMAT has three stages:
- Preprocess — pure Python, no GMAT. Turn each input row into whatever the run needs: the GMAT initial state, plus any reference value you will score the run against.
- Build the run spec — map one preprocessed payload to a
RunSpec. - Postprocess — after GMAT runs, reduce the run's output against the reference into the answer you actually want.
The variants differ in exactly one stage each — a perturbed override, a different input selection — and reuse the other two verbatim. The goal is to make "reuse the other two verbatim" a one-line import, not a copy.
Keep the pipeline in one module¶
The per-run pipeline is your domain code — gmat-sweep has no opinion on how you derive a state or score a run. Put the three stages in a module with a stable, public (non-underscored) surface, and let every driver in the family import from it:
# mission_pipeline.py — the shared per-run pipeline for the sweep family.
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
from gmat_sweep import RunOutcome, RunSpec
def preprocess(cases: pd.DataFrame) -> list[dict]:
"""Stage 1 — turn each input row into a per-run payload (no GMAT)."""
payloads: list[dict] = []
for run_id, case in cases.iterrows():
# Domain work goes here: derive the GMAT initial state and the
# reference state the run will be scored against.
payloads.append(
{
"run_id": int(run_id),
"epoch": case["epoch"],
"x0_km": [case["x"], case["y"], case["z"]],
"drag_area_m2": float(case["drag_area_m2"]),
"reference_km": [case["ref_x"], case["ref_y"], case["ref_z"]],
}
)
return payloads
def build_run_spec(payload: dict, mission: Path, out_dir: Path) -> RunSpec:
"""Stage 2 — map one payload to a RunSpec."""
x0 = payload["x0_km"]
return RunSpec(
script_path=mission,
overrides={
"Sat.Epoch": payload["epoch"],
"Sat.X": x0[0],
"Sat.Y": x0[1],
"Sat.Z": x0[2],
"Sat.DragArea": payload["drag_area_m2"],
},
output_dir=out_dir / f"run-{payload['run_id']}",
run_id=payload["run_id"],
seed=None,
run_options={},
# The reference state is needed to score the run but is not a
# GMAT input — carry it in context, not overrides.
context={"reference_km": payload["reference_km"]},
)
def score_run(run_spec: RunSpec, run_outcome: RunOutcome) -> dict[str, Path]:
"""Stage 3 — postprocess hook: GMAT final state vs. the reference."""
report = pd.read_parquet(run_outcome.output_paths["report__FinalState"])
final = report.sort_values("time").iloc[-1]
final_km = np.array([final["Sat.X"], final["Sat.Y"], final["Sat.Z"]])
reference_km = np.array(run_spec.context["reference_km"])
miss_km = float(np.linalg.norm(final_km - reference_km))
out_path = run_spec.output_dir / "score.parquet"
pd.DataFrame([{"run_id": run_spec.run_id, "miss_km": miss_km}]).to_parquet(out_path)
return {"score": out_path}
Two gmat-sweep features carry the pipeline:
RunSpec.contextmoves the reference state from preprocess to postprocess.overridescannot — every key there is applied to the GMATMissionand folded into the manifest'sparameter_spec. The reference state is neither a GMAT input nor a swept parameter, so it rides incontext, which the worker leaves untouched and the hook reads asrun_spec.context. See Postprocess hooks.score_runis a module-level function, referenced by import path. The worker imports it fresh in each subprocess, so it must have an importable name — a closure or lambda cannot be a hook.
Running the main sweep¶
The main driver is now thin: preprocess, build specs, run, aggregate.
from pathlib import Path
import pandas as pd
from gmat_sweep import LocalJoblibPool, Sweep, lazy_extra_outputs
from mission_pipeline import build_run_spec, preprocess
def run_main_sweep(cases: pd.DataFrame, mission: Path, out: Path) -> pd.DataFrame:
payloads = preprocess(cases)
specs = [build_run_spec(p, mission, out) for p in payloads]
with LocalJoblibPool() as pool:
Sweep(
runs=specs,
backend=pool,
manifest_path=out / "manifest.jsonl",
output_dir=out,
script_path=mission,
postprocess="mission_pipeline:score_run",
).run()
return lazy_extra_outputs(out / "manifest.jsonl", "score")
Note what is not there: no hand-built parameter_spec. A Sweep
built from an explicit RunSpec list auto-derives the _kind="explicit"
spec from the runs' overrides — see
Parameter spec. The postprocess hook writes one
score.parquet per run; lazy_extra_outputs
folds them into a single run_id-indexed frame.
A variant: one stage swapped¶
A drag-area sensitivity arm reuses preprocess, the spec builder, and
the scorer untouched. It differs in exactly one place — it scales the
swept Sat.DragArea. Because RunSpec is a frozen dataclass,
dataclasses.replace expresses that as a clean transform on the spec
the shared builder produced:
from dataclasses import replace
from pathlib import Path
import pandas as pd
from gmat_sweep import LocalJoblibPool, RunSpec, Sweep, lazy_extra_outputs
from mission_pipeline import build_run_spec, preprocess
def scale_drag(spec: RunSpec, factor: float) -> RunSpec:
"""The one stage that differs from the main sweep."""
overrides = {**spec.overrides, "Sat.DragArea": spec.overrides["Sat.DragArea"] * factor}
return replace(spec, overrides=overrides)
def run_drag_variant(
cases: pd.DataFrame, mission: Path, out: Path, factor: float
) -> pd.DataFrame:
payloads = preprocess(cases)
specs = [scale_drag(build_run_spec(p, mission, out), factor) for p in payloads]
with LocalJoblibPool() as pool:
Sweep(
runs=specs,
backend=pool,
manifest_path=out / "manifest.jsonl",
output_dir=out,
script_path=mission,
postprocess="mission_pipeline:score_run",
).run()
return lazy_extra_outputs(out / "manifest.jsonl", "score")
The variant imports only the public surface of mission_pipeline —
preprocess, build_run_spec, and the "mission_pipeline:score_run"
hook path. It never imports the main driver (run_main_sweep), and
nothing in it is underscore-prefixed. A second variant — say, one that
swaps the input selection rather than an override — is the same script
with preprocess fed a different cases frame and scale_drag
dropped.
This is the discipline that keeps a sweep family maintainable: the shared pipeline lives in one module with a contract; every driver, main or variant, composes that contract; a variant's only original code is the one stage it deliberately changes.
Aggregating across the family¶
Each driver returns a run_id-indexed score frame. Tag each arm and
concatenate to compare:
import pandas as pd
def compare_arms(main: pd.DataFrame, low: pd.DataFrame, high: pd.DataFrame) -> pd.DataFrame:
frames = {"baseline": main, "drag_low": low, "drag_high": high}
tagged = [df.assign(arm=arm) for arm, df in frames.items()]
return pd.concat(tagged).set_index("arm", append=True)
Because every arm ran the identical per-run pipeline — same preprocess,
same build_run_spec, same score_run — a row-for-row diff across
arms is meaningful: the only thing that moved is the stage the variant
deliberately swapped.
Caveats¶
contextis restored from the manifest on resume. Each run'scontextis recorded on its manifest entry, so a run resumed viaSweep.from_manifestcomes back with thecontextit ran with. A run that never completed before an interruption has no entry to restore from — passcontext_provider=tofrom_manifestto recompute it frompreprocess(which is deterministic and re-runs cleanly).- Keep the pipeline module importable from the worker. The
postprocesshook path is resolved in each worker subprocess, somission_pipelinemust be on the workers'sys.path— install the project, or run from its root.