Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/src/main/java/com/google/adk/runner/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,12 @@ protected Flowable<Event> runAsyncImpl(
BaseAgent rootAgent = this.agent;
String invocationId = InvocationContext.newInvocationContextId();

// Pre-merge stateDelta so onUserMessageCallback can access it.
// Safe: session is a copy; persistence still happens via appendNewMessageToSession.
if (stateDelta != null && !stateDelta.isEmpty()) {
stateDelta.forEach((key, value) -> session.state().put(key, value));
}

// Create initial context
InvocationContext initialContext =
newInvocationContextBuilder(session)
Expand Down
36 changes: 36 additions & 0 deletions core/src/test/java/com/google/adk/runner/RunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,42 @@ public void beforeRunCallback_withStateDelta_seesMergedState() {
assertThat(sessionInCallback.state()).containsEntry("number", 123);
}

@Test
public void onUserMessageCallback_withStateDelta_seesMergedState() {
// Snapshot the session state *inside* the callback, otherwise the assertion would
// observe the post-runAsync state which is mutated by appendEvent regardless of whether
// the pre-merge in Runner is applied.
AtomicReference<ConcurrentHashMap<String, Object>> stateInCallback = new AtomicReference<>();
when(plugin.onUserMessageCallback(any(), any()))
.thenAnswer(
invocation -> {
InvocationContext ctx = invocation.getArgument(0);
stateInCallback.set(new ConcurrentHashMap<>(ctx.session().state()));
return Maybe.empty();
});

ImmutableMap<String, Object> stateDelta =
ImmutableMap.of("callback_key", "callback_value", "number", 123);

var unused =
runner
.runAsync(
"user",
session.id(),
createContent("test with state"),
RunConfig.builder().build(),
stateDelta)
.toList()
.blockingGet();

// Verify onUserMessageCallback was called
verify(plugin).onUserMessageCallback(any(), any());

// Verify state delta was merged before onUserMessageCallback was invoked
assertThat(stateInCallback.get()).containsEntry("callback_key", "callback_value");
assertThat(stateInCallback.get()).containsEntry("number", 123);
}

@Test
public void runAsync_ensureEventsAreAppendedInOrder() throws Exception {
Event event1 = TestUtils.createEvent("1");
Expand Down