The per-shard step runners the targets pipeline (and the single-core
ssd_run_scenario_shards()) call - one target per shard, one runner per step.
Each takes a shard's tasks (the tasks list-column of a row of the matching
ssd_scenario_sample_shards() family), runs the bundled tasks with the same
per-task seed-and-run primitives the baseline runner uses
(*_data_task_primer()) under one local_dqrng_backend() scope, reads any
upstream shard back from Parquet by partition path, and writes one Parquet at
the shard's Hive partition path - returning that path (the format = "file"
contract). Because a task's result is fully determined by its (seed, primer)
and is order-independent, the per-task results are byte-identical to
ssd_run_scenario_baseline() regardless of how tasks bundle into shards.
Usage
ssd_run_sample_step(tasks, scenario, out_dir)
ssd_run_fit_step(tasks, scenario, sample_dir, out_dir)
ssd_run_hc_step(tasks, scenario, fit_dir, out_dir)Arguments
- tasks
A tibble of the shard's task rows (the
taskslist-column of a row of the matchingssd_scenario_*_shards()), each carrying the step's axis values, its<step>_idkey,seed, andprimer- and, forfit/hc, the parent step's path-axis values and<parent>_id.- scenario
The
ssdsims_scenario(a referenced global in_targets.R).- out_dir
The step's results root (e.g.
"results/sample").- sample_dir
The
sampleresults root the parent shards were written to (thefitstep).- fit_dir
The
fitresults root the parent shards were written to (thehcstep).
Functions
ssd_run_sample_step(): Run thesampletasks: read each task's dataset off the scenario viascenario_dataset(), draw the effective draw size - the scenario'snrow_maxsetting, capped at the dataset size forreplace = FALSE- throughsample_data_task_primer(), and tag each draw with itssample_idand a.roworder index so a downstreamfitshard can isolate and re-order it.ssd_run_fit_step(): Run thefittasks: read the distinct set of parentsampleshards the shard's tasks reference (each once - they may span several sample shards), isolate each task's draw bysample_id(restoring row order), truncate it inline (head(sample, nrow), RNG-free, section 5), and fit with the per-task(seed, primer)throughfit_data_task_primer()(resolvingmin_pmixoff the scenario viascenario_min_pmix()). The fittedfitdistsobject is serialised into afit_blobstring column keyed byfit_id, and one Parquet is written at the shard's partition path.ssd_run_hc_step(): Run thehctasks: read the distinct set of parentfitshards the shard's tasks reference (each once - an hc shard typically spans several fit shards), decode each parent union fit once perfit_id(reused across everydistsettask that shares it), resolve each task'sdistsetname to its members viascenario_distset(), subset the union fit to that pool (strict = FALSE), and estimate the hazard concentration with the per-task(seed, primer)throughhc_data_task_primer()(the subset happens in that shared primitive). Each task's hc tibble (with the scalarciapplied uniformly and bootstrap-only scenario optionsNAwhenci = FALSE) is tagged with itshc_id, parentfit_id, anddistsetname, stacked, and written as one Parquet at the shard's partition path. A set whose members all dropped from the union fit emits no rows for that cell (the survivor model).The four non-axis hc readout settings (
proportion,est_method,ci,samples) default to the scenario slice, so the single-scenario and standalone paths are byte-identical. When the shard'staskscarry per-taskproportion/est_method/ci/samplescolumns (the design factory's per-overlap aggregated demand,ssd_design_targets()), each task is summarised with its own cell's demand instead - the maximal readout set the per-member summary then filters.
See also
ssd_scenario_sample_shards() (the shard grouping these consume),
ssd_run_scenario_shards(), ssd_run_scenario_baseline().
Examples
data <- ssd_scenario_data(ssddata::ccme_boron)
scenario <- ssd_define_scenario(data, nsim = 1L, seed = 42L)
shards <- ssd_scenario_sample_shards(scenario)
dir <- tempfile()
ssd_run_sample_step(shards$tasks[[1L]], scenario, file.path(dir, "sample"))
#> [1] "/tmp/Rtmp4RDcVE/file1b081d21b376/sample/dataset=ccme_boron/sim=1/replace=TRUE/part.parquet"
# \donttest{
data <- ssd_scenario_data(ssddata::ccme_boron)
scenario <- ssd_define_scenario(
data,
nsim = 1L,
nrow = 6L,
seed = 42L,
dists = ssd_distset(lnorm = "lnorm")
)
dir <- tempfile()
ssd_run_sample_step(
ssd_scenario_sample_shards(scenario)$tasks[[1L]],
scenario,
file.path(dir, "sample")
)
#> [1] "/tmp/Rtmp4RDcVE/file1b0853a005c1/sample/dataset=ccme_boron/sim=1/replace=TRUE/part.parquet"
ssd_run_fit_step(
ssd_scenario_fit_shards(scenario)$tasks[[1L]],
scenario,
file.path(dir, "sample"),
file.path(dir, "fit")
)
#> [1] "/tmp/Rtmp4RDcVE/file1b0853a005c1/fit/dataset=ccme_boron/sim=1/nrow=6/rescale=FALSE/part.parquet"
# }
# \donttest{
data <- ssd_scenario_data(ssddata::ccme_boron)
scenario <- ssd_define_scenario(
data,
nsim = 1L,
nrow = 6L,
seed = 42L,
dists = ssd_distset(lnorm = "lnorm")
)
dir <- tempfile()
ssd_run_sample_step(
ssd_scenario_sample_shards(scenario)$tasks[[1L]],
scenario,
file.path(dir, "sample")
)
#> [1] "/tmp/Rtmp4RDcVE/file1b08bf4d5b2/sample/dataset=ccme_boron/sim=1/replace=TRUE/part.parquet"
ssd_run_fit_step(
ssd_scenario_fit_shards(scenario)$tasks[[1L]],
scenario,
file.path(dir, "sample"),
file.path(dir, "fit")
)
#> [1] "/tmp/Rtmp4RDcVE/file1b08bf4d5b2/fit/dataset=ccme_boron/sim=1/nrow=6/rescale=FALSE/part.parquet"
ssd_run_hc_step(
ssd_scenario_hc_shards(scenario)$tasks[[1L]],
scenario,
file.path(dir, "fit"),
file.path(dir, "hc")
)
#> [1] "/tmp/Rtmp4RDcVE/file1b08bf4d5b2/hc/dataset=ccme_boron/sim=1/part.parquet"
# }