Writing results
Goal
Persist analysis output back to the platform so it shows up under the experiment's Results tab and is discoverable by other plugins.
The simplest case
Use the convenience method:
class MyPlugin(AnalysisPlugin):
async def run(self, experiment_id: int):
# ... compute the result dict ...
result = {"method": "v4", "n_peaks": 312, "score": 0.92}
await self.save_analysis(experiment_id, result)save_analysis writes to PluginAnalysisResult keyed by (experiment_id, plugin_id). In the current platform backend, that is stored as a JSON entry under analysis_results[plugin_id] on the experiment row. The plugin_id defaults to metadata.name. Override AnalysisPlugin.plugin_id if you need a different storage key.
Preserve run history
save_analysis is upsert — calling it again with the same (experiment_id, plugin_id) overwrites the previous row. To keep history, embed runs as a list:
from datetime import datetime, UTC
class MyPlugin(AnalysisPlugin):
async def run(self, experiment_id: int):
previous = await self.load_analysis(experiment_id)
history = (previous.result.get("runs") if previous else []) or []
new_run = {
"run_id": datetime.now(UTC).isoformat(),
"method": "v4",
"n_peaks": 312,
"user_id": self._current_user_id(),
}
history.append(new_run)
await self.save_analysis(experiment_id, {
"latest": new_run,
"runs": history[-50:], # cap to last 50 runs
})A separate latest key is convenient for downstream consumers that don't want to read the whole history.
Save design and analysis together
For FULL plugins that legitimately own both design data and analysis results (rare, but useful for self-contained pipelines):
class MyPlugin(AnalysisPlugin):
async def configure_and_run(self, experiment_id: int, params: dict):
result = await self._compute(experiment_id, params)
await self.save(
experiment_id,
design={"params": params},
analysis={"latest": result},
)save() returns (DesignData | None, PluginAnalysisResult | None). ANALYSIS plugins should pass only analysis=...; EXPERIMENT_DESIGN plugins should pass only design=...; STATIC plugins should not call either write path.
Bulk write across experiments
The convenience methods are scoped to one experiment. For batch operations, drop down to the repo:
class MyPlugin(AnalysisPlugin):
async def batch_save(self, results: dict[int, dict]):
repo = self._context.get_plugin_data_repository()
for experiment_id, result in results.items():
await repo.save_analysis_result(
experiment_id=experiment_id,
plugin_id=self.plugin_id,
result=result,
)The repo doesn't ship a multi-row save_many method by design — most write paths benefit from the per-experiment hooks (on_after_experiment_save, etc.) firing one at a time. If your workflow legitimately needs bulk insert, implement it directly via get_shared_db_session() on a plugin-owned table.
Idempotency under retry
If your analysis is triggered by a queue or scheduler that may retry on failure, use a stable run ID:
class MyPlugin(AnalysisPlugin):
async def run(self, experiment_id: int, *, request_id: str):
previous = await self.load_analysis(experiment_id)
existing_runs = (previous.result.get("runs") if previous else []) or []
if any(r["run_id"] == request_id for r in existing_runs):
return # already done; don't append a duplicate
# ... compute and save ...request_id can be the platform's X-Request-Id, a job ID from your queue, or any other deterministic identifier.
Surfacing results in the experiment UI
The platform's experiment Results tab automatically renders every plugin's PluginAnalysisResult.result. Default rendering uses AnalysisPlugin.export_tree() / export_summary() / export_csv(), which produce sensible defaults via the SDK's auto_json_to_* helpers. Override these to customize the display:
class MyPlugin(AnalysisPlugin):
def export_summary(self, data: dict) -> dict:
return {
"metadata": {
"method": data.get("method"),
"score": data.get("score"),
},
"sections": [
{"title": "Peaks", "kind": "table", "rows": data.get("peaks", [])},
],
}The frontend reads the summary structure and renders cards / tables / metric tiles.
Referencing produced artifacts
If your analysis produces a file (CSV report, image, raw output blob), store artifact references under the conventional result["artifacts"] key so reader plugins can fetch only those references without loading the full result payload:
from mint_sdk import ANALYSIS_ARTIFACTS_KEY
class MyPlugin(AnalysisPlugin):
async def run(self, experiment_id: int):
csv_bytes = self._compute_report(experiment_id)
artifact_id = await self._upload_via_platform_rest_api(
csv_bytes, filename="report.csv"
)
await self.save_analysis(experiment_id, {
"summary": {"rows": 1240},
ANALYSIS_ARTIFACTS_KEY: [
{"id": artifact_id, "filename": "report.csv", "kind": "csv"},
],
})Later, load only that key:
artifacts = await self.load_artifacts(experiment_id)For any other small metadata projection, call await self.load_analysis(experiment_id, fields=["summary", "artifacts"]). The repository will avoid transferring or parsing large tables stored under other result keys.
Notes
resultis JSON. Serialize complex Python objects yourself (datetimes, dataclasses, NumPy) — the SDK doesn't auto-convert.- Results are per-plugin per-experiment. Two analysis plugins running on the same experiment have independent rows.
load_analysis()andload_analyses()default to the calling plugin's own result; passinclude_others=Trueonly for reader plugins that intentionally aggregate results from multiple plugins. - For large outputs (megabytes of peak data per run), consider writing to plugin-owned tables instead — JSON columns aren't ideal for queries or bulk reads. See Recipes → Querying plugin data.
Related
- Concepts → Data model —
PluginAnalysisResultshape - Recipes → Reading experiments — read side