Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2500,6 +2500,43 @@ mod tests {
);
}

#[tokio::test]
async fn resume_replaces_non_object_input_with_approvals_object() {
// The public rerun-based resume path accepts any JSON input. A scalar input
// cannot preserve fields, so the engine replaces it with the approvals
// object and the gate proceeds immediately.
let graph = WorkflowGraph {
nodes: vec![
node("t", NodeKind::Trigger),
gate("gate"),
node("downstream", NodeKind::OutputParser),
],
edges: vec![edge("t", "gate"), edge("gate", "downstream")],
..Default::default()
};
let compiled = compile(&graph).expect("compile");
let caps = mock_capabilities();

let outcome = resume(
&compiled,
json!("raw-input"),
vec!["gate".to_string()],
&caps,
)
.await
.expect("resume");

assert!(outcome.pending_approvals.is_empty());
assert_eq!(
outcome.output["run"]["trigger"],
json!({ "approvals": ["gate"] })
);
assert!(
!outcome.output["nodes"]["downstream"]["items"].is_null(),
"downstream should run after the scalar input is replaced with approvals"
);
}

/// A [`RunObserver`] that counts run-start / run-finish and records step ids,
/// so a test can assert every hook fired the right number of times.
#[derive(Default)]
Expand Down Expand Up @@ -2677,6 +2714,76 @@ mod tests {
);
}

#[tokio::test]
async fn durable_resume_with_journal_surfaces_resume_observations() {
// Same durable resume path as above, but with a graph event journal attached
// to both halves. The resumed run returns its own tinyagents run id and the
// journal stores observations under that id.
let cp: Arc<dyn Checkpointer<Value>> = Arc::new(InMemoryCheckpointer::<Value>::default());
let mut approval_gate = node("gate", NodeKind::OutputParser);
approval_gate.config = json!({ "requires_approval": true });
let graph = WorkflowGraph {
nodes: vec![
node("t", NodeKind::Trigger),
approval_gate,
node("downstream", NodeKind::OutputParser),
],
edges: vec![edge("t", "gate"), edge("gate", "downstream")],
..Default::default()
};
let compiled = compile(&graph).expect("compile");
let caps = mock_capabilities();
let journal = Arc::new(InMemoryGraphEventJournal::new());

let paused = run_with_checkpointer_journaled(
&compiled,
json!({ "request": 42 }),
&caps,
cp.clone(),
"thread-journal-resume",
journal.clone(),
)
.await
.expect("journaled run");
assert_eq!(paused.outcome.pending_approvals, vec!["gate".to_string()]);

let resumed = resume_with_checkpointer_journaled(
&compiled,
&caps,
cp.clone(),
"thread-journal-resume",
vec!["gate".to_string()],
journal.clone(),
)
.await
.expect("journaled resume");

assert!(resumed.outcome.pending_approvals.is_empty());
assert!(
!resumed.outcome.output["nodes"]["downstream"]["items"].is_null(),
"downstream should run during the checkpointed resume"
);
assert!(
!resumed.graph_run_ids.run_id.is_empty(),
"resume must surface the tinyagents run id"
);

let observations = journal
.read_from(&resumed.graph_run_ids.run_id, 0)
.await
.expect("read resume observations");
assert!(
!observations.is_empty(),
"resume observations should be journaled under the resumed run id"
);
assert!(
observations
.iter()
.any(|observation| observation.event.kind() == "run.completed"),
"resume journal should include run completion: {observations:?}"
);
}

#[tokio::test]
async fn plain_run_and_resumable_unchanged_by_injectable_checkpointer() {
// Regression: the default (non-injectable) `run` and `run_resumable`
Expand Down
24 changes: 24 additions & 0 deletions tests/cli_e2e.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//! CLI smoke tests for the tinyflows binary.

use std::process::Command;

#[test]
fn binary_prints_product_name() {
let output = Command::new(env!("CARGO_BIN_EXE_tinyflows"))
.output()
.expect("run tinyflows binary");

assert!(
output.status.success(),
"binary should exit successfully: {output:?}"
);
assert_eq!(
String::from_utf8(output.stdout).expect("stdout utf8"),
"tinyflows\n"
);
assert!(
output.stderr.is_empty(),
"binary should not write stderr: {:?}",
String::from_utf8_lossy(&output.stderr)
);
}