Downstream-consumer pipeline: postprocess hooks, extra outputs, and resume¶
A reproducible-study author rarely wants only the Parquet files GMAT writes. They want a derived quantity per run — computed once, tracked in the manifest alongside the GMAT-native outputs — and they want to re-run a perturbed variant of the study without re-deriving anything. This notebook walks that downstream-consumer workflow end to end:
- A per-run postprocess hook that writes a derived per-run Parquet.
lazy_extra_outputsaggregating those derived Parquets next to the GMATReportFile.- A resume that survives a mid-sweep kill, with the manifest and the per-run output tree in separate directories.
- A sensitivity variant that re-runs the study with one input perturbed and the same per-run pipeline.
Prerequisites. A local GMAT install (R2026a is the primary development target; see Supported versions). No [examples] extra is needed — this notebook produces tables, not plots.
Platform note. The kill-and-resume section sends SIGINT to a child process, which is POSIX-only — the same constraint as the killed-sweep-recovery notebook.
Set up the run¶
Resolve the GMAT install and the mission script. This notebook runs against leo_basic.script — the minimal point-mass LEO fixture under tests/data/. It propagates for 60 seconds, so every run is sub-second and the whole notebook finishes quickly.
import signal
import subprocess
import sys
import tempfile
import time
from pathlib import Path
import pandas as pd
from gmat_run import locate_gmat
from gmat_sweep import Manifest, Sweep, lazy_extra_outputs, lazy_multiindex, sweep
from gmat_sweep.backends.joblib import LocalJoblibPool
# The postprocess hook is written next to this notebook with %%writefile;
# make that directory importable by this kernel and its worker subprocesses.
sys.path.insert(0, str(Path.cwd()))
install = locate_gmat()
script_path = Path("../../tests/data/leo_basic.script").resolve()
print(f"GMAT version: {install.version}")
print(f"Script: {script_path.name}")
print(f"Exists: {script_path.exists()}")
GMAT version: R2026a Script: leo_basic.script Exists: True
The shared per-run pipeline¶
A postprocess hook is handed to a sweep as an import-path string ("module:function"), not a function object — the run spec is JSON-serialised on its way to every worker, and a bare function cannot survive that round trip. The hook must therefore be an importable module-level function; a closure or a lambda has no importable name.
So the pipeline lives in its own module, written next to this notebook with %%writefile. Keeping it in one module is also what lets the main sweep, the resumed sweep, and the sensitivity variant below share a single definition.
The hook here reads each run's GMAT ReportFile and writes a derived single-row Parquet: GMAT reports the Cartesian state but not the orbital radius |r|, so the hook computes it and records two per-run scalars GMAT never wrote directly — the final radius and the radius spread across the propagated arc.
%%writefile downstream_pipeline.py
"""Shared per-run pipeline for the downstream-consumer example sweep.
A postprocess hook is resolved fresh inside every worker subprocess from
an import-path string, so it must be an importable module-level function
— a closure or a lambda has no importable name. Keeping it in its own
module is what lets the main sweep, the resumed sweep, and the
sensitivity variant all share one definition.
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
from gmat_sweep import RunOutcome, RunSpec
def derive_orbit_metrics(run_spec: RunSpec, run_outcome: RunOutcome) -> dict[str, Path]:
"""Reduce one run's ReportFile to a derived single-row Parquet.
GMAT's ReportFile carries the Cartesian state but not the orbital
radius. This hook computes ``|r|`` over the propagation and records
two per-run scalars GMAT never wrote directly: the final radius and
the radius spread across the arc. The returned mapping registers the
Parquet in the manifest under the key ``"orbit_metrics"``.
"""
report = pd.read_parquet(run_outcome.output_paths["report__RF"])
radius_km = np.sqrt(
report["Sat.EarthMJ2000Eq.X"] ** 2
+ report["Sat.EarthMJ2000Eq.Y"] ** 2
+ report["Sat.EarthMJ2000Eq.Z"] ** 2
)
out_path = run_spec.output_dir / "orbit_metrics.parquet"
pd.DataFrame(
[
{
"final_radius_km": float(radius_km.iloc[-1]),
"radius_span_km": float(radius_km.max() - radius_km.min()),
}
]
).to_parquet(out_path)
return {"orbit_metrics": out_path}
Writing downstream_pipeline.py
Run the grid sweep with the hook¶
postprocess= names the hook by its import path. Each worker runs GMAT, then — on success — invokes the hook and records whatever Parquets it returns in that run's manifest entry. A five-point Sat.SMA grid is enough to show the shape.
HOOK = "downstream_pipeline:derive_orbit_metrics"
SMA_GRID = [6800.0, 6900.0, 7000.0, 7100.0, 7200.0]
main_tmp = tempfile.TemporaryDirectory(prefix="downstream-main-")
main_dir = Path(main_tmp.name)
result = sweep(
script_path,
grid={"Sat.SMA": SMA_GRID},
out=main_dir,
postprocess=HOOK,
progress=False,
)
print(f"Runs: {result.index.get_level_values('run_id').nunique()}")
result["__status"].value_counts()
Runs: 5
__status ok 15 Name: count, dtype: int64
Aggregate GMAT outputs and derived outputs side by side¶
Two aggregators read the same manifest. lazy_multiindex stitches the GMAT ReportFile Parquets into the (run_id, time) frame sweep() returns. lazy_extra_outputs does the same for the hook's Parquets, keyed by the name the hook registered — "orbit_metrics".
Their index shapes differ because the inputs differ: the ReportFile is one row per (run, time-step), while the hook wrote one row per run with no time column, so lazy_extra_outputs yields a single-level run_id index — one derived row per run.
manifest = Manifest.load(main_dir / "manifest.jsonl")
report_frame = lazy_multiindex(manifest, main_dir)
metrics = lazy_extra_outputs(main_dir / "manifest.jsonl", "orbit_metrics")
print(f"ReportFile : index={report_frame.index.names} shape={report_frame.shape}")
print(f"orbit_metrics : index={metrics.index.names} shape={metrics.shape}")
print()
print(report_frame[["Sat.Earth.SMA", "__status"]].head(6))
metrics
ReportFile : index=['run_id', 'time'] shape=(15, 7)
orbit_metrics : index=['run_id'] shape=(5, 3)
Sat.Earth.SMA __status
run_id time
0 2026-01-01 00:00:00 6800.0 ok
2026-01-01 00:00:30 6800.0 ok
2026-01-01 00:01:00 6800.0 ok
1 2026-01-01 00:00:00 6900.0 ok
2026-01-01 00:00:30 6900.0 ok
2026-01-01 00:01:00 6900.0 ok
| final_radius_km | radius_span_km | __status | |
|---|---|---|---|
| run_id | |||
| 0 | 6793.215542 | 0.015542 | ok |
| 1 | 6893.115095 | 0.015095 | ok |
| 2 | 6993.014667 | 0.014667 | ok |
| 3 | 7092.914257 | 0.014257 | ok |
| 4 | 7192.813864 | 0.013864 | ok |
Survive a kill: resume with a separate output directory¶
A long study gets interrupted. A common layout keeps the lightweight manifest.jsonl next to the analysis code under version control, while the bulky per-run Parquet trees go to scratch storage. The Sweep constructor takes manifest_path and output_dir as separate arguments, so that split is native — sweep()'s one-liner is just the co-located special case of it.
To reproduce a kill we launch such a sweep as a subprocess, let a few runs land, and send it SIGINT. Because the manifest and the per-run tree are in different directories, the resume has to be told where the output tree is: Sweep.from_manifest takes a caller-supplied output_dir for exactly this. First, the subprocess driver:
%%writefile killable_sweep.py
"""Subprocess driver for the kill-and-resume section of notebook 12.
Run as ``python killable_sweep.py <script> <manifest_dir> <scratch_dir>``.
It dispatches a hooked sweep whose manifest and per-run output tree live
in separate directories, so the notebook can SIGINT it mid-flight and
then resume it with a caller-supplied ``output_dir``.
"""
import sys
from pathlib import Path
from gmat_sweep import Sweep, expand_grid_to_run_specs
from gmat_sweep.backends.joblib import LocalJoblibPool
SMA_GRID = [6700.0 + 50.0 * i for i in range(12)]
HOOK = "downstream_pipeline:derive_orbit_metrics"
def main() -> None:
script_path, manifest_dir, scratch_dir = (Path(arg) for arg in sys.argv[1:4])
specs = expand_grid_to_run_specs({"Sat.SMA": SMA_GRID}, script_path, scratch_dir)
with LocalJoblibPool(max_workers=1) as pool:
Sweep(
runs=specs,
backend=pool,
manifest_path=manifest_dir / "manifest.jsonl",
output_dir=scratch_dir,
script_path=script_path,
postprocess=HOOK,
progress=False,
).run()
if __name__ == "__main__":
main()
Writing killable_sweep.py
Launch the sweep and kill it¶
The driver dispatches twelve runs with a single worker so completions are serialised and the kill lands predictably. We poll the manifest until four runs have appended their entries, then send SIGINT. The orchestrator does not catch KeyboardInterrupt, so the subprocess exits non-zero — the expected outcome — and the append-only, fsync'd manifest is left with a parseable prefix, every completed run carrying its derived orbit_metrics Parquet already.
meta_tmp = tempfile.TemporaryDirectory(prefix="downstream-meta-")
scratch_tmp = tempfile.TemporaryDirectory(prefix="downstream-scratch-")
meta_dir = Path(meta_tmp.name)
scratch_dir = Path(scratch_tmp.name)
killed_manifest = meta_dir / "manifest.jsonl"
def _entries_on_disk(path: Path) -> int:
if not path.exists():
return 0
# One header line plus one line per completed run.
return max(0, sum(1 for _ in path.open(encoding="utf-8")) - 1)
proc = subprocess.Popen(
[sys.executable, "killable_sweep.py", str(script_path), str(meta_dir), str(scratch_dir)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
deadline = time.monotonic() + 120.0
while time.monotonic() < deadline:
if proc.poll() is not None:
raise RuntimeError(f"driver exited before it could be killed (rc={proc.returncode})")
if _entries_on_disk(killed_manifest) >= 4:
break
time.sleep(0.1)
else:
proc.kill()
raise RuntimeError("driver did not reach the manifest-entry threshold in time")
proc.send_signal(signal.SIGINT)
rc = proc.wait(timeout=30)
partial = Manifest.load(killed_manifest)
first = partial.entries[0]
print(f"Driver exited with rc={rc} after SIGINT")
print(f"Planned runs (manifest header): {partial.run_count}")
print(f"Completed before the kill: {len(partial.entries)}")
print(f"Hook recorded on the header: {partial.postprocess}")
print(
f" run {first.run_id}: status={first.status!r}, "
f"postprocess_status={first.postprocess_status!r}, "
f"extra_outputs={list(first.extra_outputs)}"
)
Driver exited with rc=-2 after SIGINT Planned runs (manifest header): 12 Completed before the kill: 4 Hook recorded on the header: downstream_pipeline:derive_orbit_metrics run 0: status='ok', postprocess_status='ok', extra_outputs=['orbit_metrics']
Resume against the scratch tree¶
Sweep.from_manifest rebuilds the run set from the manifest, re-validates the script's canonical hash, and re-applies the postprocess hook recorded on the header — the resume does not have to pass postprocess= again. resume() then re-runs only the missing runs.
output_dir=scratch_dir is the load-bearing argument: the manifest lives in the metadata directory, but the per-run Parquet tree — including the runs the resume is about to write — belongs in the scratch directory. The resumed runs extend that same tree, and derive_orbit_metrics runs for every run, killed or resumed alike.
with LocalJoblibPool(max_workers=2) as pool:
resumed = Sweep.from_manifest(
killed_manifest,
script_path,
backend=pool,
output_dir=scratch_dir,
progress=False,
).resume()
resumed_reports = resumed.to_dataframe()
resumed_metrics = resumed.to_extra_outputs("orbit_metrics")
print(f"Manifest entries after resume: {len(Manifest.load(killed_manifest).entries)}")
print(
f"Runs in the report frame: {resumed_reports.index.get_level_values('run_id').nunique()}"
)
print(f"Rows in orbit_metrics: {len(resumed_metrics)}")
print(resumed_reports["__status"].value_counts())
resumed_metrics
Manifest entries after resume: 12 Runs in the report frame: 12 Rows in orbit_metrics: 12 __status ok 36 Name: count, dtype: int64
| final_radius_km | radius_span_km | __status | |
|---|---|---|---|
| run_id | |||
| 0 | 6693.316009 | 0.016009 | ok |
| 1 | 6743.265773 | 0.015773 | ok |
| 2 | 6793.215542 | 0.015542 | ok |
| 3 | 6843.165316 | 0.015316 | ok |
| 4 | 6893.115095 | 0.015095 | ok |
| 5 | 6943.064878 | 0.014878 | ok |
| 6 | 6993.014667 | 0.014667 | ok |
| 7 | 7042.964459 | 0.014459 | ok |
| 8 | 7092.914257 | 0.014257 | ok |
| 9 | 7142.864058 | 0.014058 | ok |
| 10 | 7192.813864 | 0.013864 | ok |
| 11 | 7242.763673 | 0.013673 | ok |
A sensitivity variant on the shared pipeline¶
A sensitivity study re-runs the sweep with one input perturbed and compares the arms. For the comparison to mean anything, every arm must drive the same per-run pipeline — the same hook — so only the deliberately-changed input moves.
This variant raises orbital eccentricity from the fixture's near-circular 0.001 to 0.02. Nothing else changes: the script, the Sat.SMA grid, and postprocess=HOOK are identical to the main sweep. This is the composing-sweeps recipe — one shared pipeline module, a variant that swaps exactly one stage — and the derived radius_span_km makes the effect plain: a more eccentric orbit sweeps a wider range of radii over the same arc.
variant_tmp = tempfile.TemporaryDirectory(prefix="downstream-variant-")
variant_dir = Path(variant_tmp.name)
sweep(
script_path,
grid={"Sat.SMA": SMA_GRID, "Sat.ECC": [0.02]},
out=variant_dir,
postprocess=HOOK,
progress=False,
)
variant_metrics = lazy_extra_outputs(variant_dir / "manifest.jsonl", "orbit_metrics")
comparison = pd.concat({"baseline": metrics, "ecc=0.02": variant_metrics}, names=["arm"])
print("Mean radius span per arm (km):")
print(comparison.groupby("arm")["radius_span_km"].mean())
comparison
Mean radius span per arm (km): arm baseline 0.014685 ecc=0.02 0.305178 Name: radius_span_km, dtype: float64
| final_radius_km | radius_span_km | __status | ||
|---|---|---|---|---|
| arm | run_id | |||
| baseline | 0 | 6793.215542 | 0.015542 | ok |
| 1 | 6893.115095 | 0.015095 | ok | |
| 2 | 6993.014667 | 0.014667 | ok | |
| 3 | 7092.914257 | 0.014257 | ok | |
| 4 | 7192.813864 | 0.013864 | ok | |
| ecc=0.02 | 0 | 6664.322986 | 0.322986 | ok |
| 1 | 6762.313698 | 0.313698 | ok | |
| 2 | 6860.304805 | 0.304805 | ok | |
| 3 | 6958.296284 | 0.296284 | ok | |
| 4 | 7056.288115 | 0.288115 | ok |
The manifest tracks both output kinds¶
Every run's manifest entry records both output kinds. output_paths holds the GMAT-native Parquets (the ReportFile, keyed report__<name>); extra_outputs holds the hook's Parquets, keyed by the hook's own strings. postprocess_status records the hook's outcome — none / ok / failed — independently of the run's status, so a hook bug stays distinguishable from a GMAT-engine failure.
entry_manifest = Manifest.load(main_dir / "manifest.jsonl")
entry = entry_manifest.entries[0]
print(f"Sweep-wide postprocess hook: {entry_manifest.postprocess}")
print(
f"run {entry.run_id}: status={entry.status!r}, postprocess_status={entry.postprocess_status!r}"
)
print()
print("GMAT-native outputs (entry.output_paths):")
for key, path in entry.output_paths.items():
print(f" {key:14s} -> {path.name}")
print("Hook-registered output (entry.extra_outputs):")
for key, path in entry.extra_outputs.items():
print(f" {key:14s} -> {path.name}")
Sweep-wide postprocess hook: downstream_pipeline:derive_orbit_metrics run 1: status='ok', postprocess_status='ok' GMAT-native outputs (entry.output_paths): report__RF -> report__RF.parquet Hook-registered output (entry.extra_outputs): orbit_metrics -> orbit_metrics.parquet
Where to next¶
- Postprocess hooks. Per-run postprocessing hooks covers the hook signature, carrying per-run data through
RunSpec.context, and what happens when a hook raises. - Aggregation. Aggregating sweep outputs documents the adaptive
run_id/(run_id, time)index and the marker-row contract for failed and skipped runs. - Resume. Resume documents the caller-supplied output directory, the last-wins entry semantics, and the script-drift gate.
- Composing sweeps. Composing related sweeps generalises the variant pattern to a family of related sweeps sharing one pipeline module.