Skip to content

Composing related sweeps

A sensitivity study is rarely one sweep. You run a main sweep, then re-run it with one input perturbed — a drag-area factor, a tighter filter threshold, a different input subset — and compare the arms. Every variant must drive the same per-run pipeline as the main sweep, or the numbers are not comparable.

This page shows how to structure a family of related sweeps so the variants share that pipeline cleanly — without copy-pasting it, and without one driver reaching into another's private helpers.

The shape of a per-run pipeline

A sweep whose runs need work on either side of GMAT has three stages:

  1. Preprocess — pure Python, no GMAT. Turn each input row into whatever the run needs: the GMAT initial state, plus any reference value you will score the run against.
  2. Build the run spec — map one preprocessed payload to a RunSpec.
  3. Postprocess — after GMAT runs, reduce the run's output against the reference into the answer you actually want.

The variants differ in exactly one stage each — a perturbed override, a different input selection — and reuse the other two verbatim. The goal is to make "reuse the other two verbatim" a one-line import, not a copy.

Keep the pipeline in one module

The per-run pipeline is your domain code — gmat-sweep has no opinion on how you derive a state or score a run. Put the three stages in a module with a stable, public (non-underscored) surface, and let every driver in the family import from it:

# mission_pipeline.py — the shared per-run pipeline for the sweep family.
from __future__ import annotations

from pathlib import Path

import numpy as np
import pandas as pd

from gmat_sweep import RunOutcome, RunSpec


def preprocess(cases: pd.DataFrame) -> list[dict]:
    """Stage 1 — turn each input row into a per-run payload (no GMAT)."""
    payloads: list[dict] = []
    for run_id, case in cases.iterrows():
        # Domain work goes here: derive the GMAT initial state and the
        # reference state the run will be scored against.
        payloads.append(
            {
                "run_id": int(run_id),
                "epoch": case["epoch"],
                "x0_km": [case["x"], case["y"], case["z"]],
                "drag_area_m2": float(case["drag_area_m2"]),
                "reference_km": [case["ref_x"], case["ref_y"], case["ref_z"]],
            }
        )
    return payloads


def build_run_spec(payload: dict, mission: Path, out_dir: Path) -> RunSpec:
    """Stage 2 — map one payload to a RunSpec."""
    x0 = payload["x0_km"]
    return RunSpec(
        script_path=mission,
        overrides={
            "Sat.Epoch": payload["epoch"],
            "Sat.X": x0[0],
            "Sat.Y": x0[1],
            "Sat.Z": x0[2],
            "Sat.DragArea": payload["drag_area_m2"],
        },
        output_dir=out_dir / f"run-{payload['run_id']}",
        run_id=payload["run_id"],
        seed=None,
        run_options={},
        # The reference state is needed to score the run but is not a
        # GMAT input — carry it in context, not overrides.
        context={"reference_km": payload["reference_km"]},
    )


def score_run(run_spec: RunSpec, run_outcome: RunOutcome) -> dict[str, Path]:
    """Stage 3 — postprocess hook: GMAT final state vs. the reference."""
    report = pd.read_parquet(run_outcome.output_paths["report__FinalState"])
    final = report.sort_values("time").iloc[-1]
    final_km = np.array([final["Sat.X"], final["Sat.Y"], final["Sat.Z"]])

    reference_km = np.array(run_spec.context["reference_km"])
    miss_km = float(np.linalg.norm(final_km - reference_km))

    out_path = run_spec.output_dir / "score.parquet"
    pd.DataFrame([{"run_id": run_spec.run_id, "miss_km": miss_km}]).to_parquet(out_path)
    return {"score": out_path}

Two gmat-sweep features carry the pipeline:

  • RunSpec.context moves the reference state from preprocess to postprocess. overrides cannot — every key there is applied to the GMAT Mission and folded into the manifest's parameter_spec. The reference state is neither a GMAT input nor a swept parameter, so it rides in context, which the worker leaves untouched and the hook reads as run_spec.context. See Postprocess hooks.
  • score_run is a module-level function, referenced by import path. The worker imports it fresh in each subprocess, so it must have an importable name — a closure or lambda cannot be a hook.

Running the main sweep

The main driver is now thin: preprocess, build specs, run, aggregate.

from pathlib import Path

import pandas as pd
from gmat_sweep import LocalJoblibPool, Sweep, lazy_extra_outputs

from mission_pipeline import build_run_spec, preprocess


def run_main_sweep(cases: pd.DataFrame, mission: Path, out: Path) -> pd.DataFrame:
    payloads = preprocess(cases)
    specs = [build_run_spec(p, mission, out) for p in payloads]
    with LocalJoblibPool() as pool:
        Sweep(
            runs=specs,
            backend=pool,
            manifest_path=out / "manifest.jsonl",
            output_dir=out,
            script_path=mission,
            postprocess="mission_pipeline:score_run",
        ).run()
    return lazy_extra_outputs(out / "manifest.jsonl", "score")

Note what is not there: no hand-built parameter_spec. A Sweep built from an explicit RunSpec list auto-derives the _kind="explicit" spec from the runs' overrides — see Parameter spec. The postprocess hook writes one score.parquet per run; lazy_extra_outputs folds them into a single run_id-indexed frame.

A variant: one stage swapped

A drag-area sensitivity arm reuses preprocess, the spec builder, and the scorer untouched. It differs in exactly one place — it scales the swept Sat.DragArea. Because RunSpec is a frozen dataclass, dataclasses.replace expresses that as a clean transform on the spec the shared builder produced:

from dataclasses import replace
from pathlib import Path

import pandas as pd
from gmat_sweep import LocalJoblibPool, RunSpec, Sweep, lazy_extra_outputs

from mission_pipeline import build_run_spec, preprocess


def scale_drag(spec: RunSpec, factor: float) -> RunSpec:
    """The one stage that differs from the main sweep."""
    overrides = {**spec.overrides, "Sat.DragArea": spec.overrides["Sat.DragArea"] * factor}
    return replace(spec, overrides=overrides)


def run_drag_variant(
    cases: pd.DataFrame, mission: Path, out: Path, factor: float
) -> pd.DataFrame:
    payloads = preprocess(cases)
    specs = [scale_drag(build_run_spec(p, mission, out), factor) for p in payloads]
    with LocalJoblibPool() as pool:
        Sweep(
            runs=specs,
            backend=pool,
            manifest_path=out / "manifest.jsonl",
            output_dir=out,
            script_path=mission,
            postprocess="mission_pipeline:score_run",
        ).run()
    return lazy_extra_outputs(out / "manifest.jsonl", "score")

The variant imports only the public surface of mission_pipelinepreprocess, build_run_spec, and the "mission_pipeline:score_run" hook path. It never imports the main driver (run_main_sweep), and nothing in it is underscore-prefixed. A second variant — say, one that swaps the input selection rather than an override — is the same script with preprocess fed a different cases frame and scale_drag dropped.

This is the discipline that keeps a sweep family maintainable: the shared pipeline lives in one module with a contract; every driver, main or variant, composes that contract; a variant's only original code is the one stage it deliberately changes.

Aggregating across the family

Each driver returns a run_id-indexed score frame. Tag each arm and concatenate to compare:

import pandas as pd


def compare_arms(main: pd.DataFrame, low: pd.DataFrame, high: pd.DataFrame) -> pd.DataFrame:
    frames = {"baseline": main, "drag_low": low, "drag_high": high}
    tagged = [df.assign(arm=arm) for arm, df in frames.items()]
    return pd.concat(tagged).set_index("arm", append=True)

Because every arm ran the identical per-run pipeline — same preprocess, same build_run_spec, same score_run — a row-for-row diff across arms is meaningful: the only thing that moved is the stage the variant deliberately swapped.

Caveats

  • context is restored from the manifest on resume. Each run's context is recorded on its manifest entry, so a run resumed via Sweep.from_manifest comes back with the context it ran with. A run that never completed before an interruption has no entry to restore from — pass context_provider= to from_manifest to recompute it from preprocess (which is deterministic and re-runs cleanly).
  • Keep the pipeline module importable from the worker. The postprocess hook path is resolved in each worker subprocess, so mission_pipeline must be on the workers' sys.path — install the project, or run from its root.