Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions docs/architecture/crds-and-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ AgentSpec
│ │ ├── summarizer: ContextSummarizerConfig
│ │ ├── tokenThreshold: int
│ │ └── eventRetentionSize: int
│ ├── reliability: ReliabilityConfig
│ │ ├── toolRetries: int (reflect-and-retry on failed tool calls)
│ │ ├── maxLLMCalls: int (cap on model calls per request)
│ │ └── debugLogging: bool (log every LLM request/response and tool call)
│ └── executeCodeBlocks: bool (currently ignored)
└── byo: BYOAgentSpec (if type=BYO)
Expand Down Expand Up @@ -126,6 +130,10 @@ ModelConfigSpec
│ ├── caCertSecretKey: string
│ └── disableSystemCAs: bool
├── retry: ModelRetryConfig
│ └── attempts: int (max retries of failed LLM HTTP requests with exponential backoff;
│ OpenAI/AzureOpenAI/Anthropic/Gemini only)
├── openAI: OpenAIConfig
│ ├── baseUrl, temperature, maxTokens, topP
│ ├── frequencyPenalty, presencePenalty
Expand Down Expand Up @@ -340,7 +348,8 @@ When adding a field to an existing CRD, update all layers:
5. **Translator** — `go/core/internal/controller/translator/agent/adk_api_translator.go` (wire field into config)
6. **Python ADK types** — `python/packages/kagent-adk/src/kagent/adk/types.py` (mirror Go types)
7. **Python runtime** — Use the field in agent setup if it affects runtime behavior
8. **Tests** — Translator unit tests (golden files), E2E tests
9. **Helm values** — If exposed to users installing via Helm
8. **Go runtime** — `go/adk/pkg/` (mirror runtime behavior for `runtime: go` agents)
9. **Tests** — Translator unit tests (golden files), E2E tests
10. **Helm values** — If exposed to users installing via Helm

See [controller-reconciliation.md](controller-reconciliation.md) for the reconciliation flow and the kagent-dev skill for step-by-step examples.
14 changes: 14 additions & 0 deletions go/adk/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func CreateLLM(ctx context.Context, m adk.Model, log logr.Logger) (adkmodel.LLM,
if err != nil {
return nil, fmt.Errorf("failed to build HTTP client for Gemini: %w", err)
}
warnIgnoredMaxRetries(log, m.BaseModel, "gemini")
return adkgemini.NewModel(ctx, modelName, &genai.ClientConfig{
APIKey: apiKey,
HTTPClient: httpClient,
Expand All @@ -246,6 +247,7 @@ func CreateLLM(ctx context.Context, m adk.Model, log logr.Logger) (adkmodel.LLM,
if modelName == "" {
modelName = DefaultGeminiModel
}
warnIgnoredMaxRetries(log, m.BaseModel, "gemini_vertex_ai")
return adkgemini.NewModel(ctx, modelName, &genai.ClientConfig{
Backend: genai.BackendVertexAI,
Project: project,
Expand Down Expand Up @@ -278,6 +280,7 @@ func CreateLLM(ctx context.Context, m adk.Model, log logr.Logger) (adkmodel.LLM,
modelName = DefaultOllamaModel
}
// Create OllamaConfig with native SDK support for Ollama-specific options
warnIgnoredMaxRetries(log, m.BaseModel, "ollama")
cfg := &models.OllamaConfig{
TransportConfig: transportConfigFromBase(m.BaseModel, nil),
Model: modelName,
Expand All @@ -299,6 +302,7 @@ func CreateLLM(ctx context.Context, m adk.Model, log logr.Logger) (adkmodel.LLM,
return nil, fmt.Errorf("bedrock requires a model name (e.g. anthropic.claude-3-sonnet-20240229-v1:0)")
}
// Use Bedrock Converse API for ALL models (including Anthropic)
warnIgnoredMaxRetries(log, m.BaseModel, "bedrock")
cfg := &models.BedrockConfig{
TransportConfig: transportConfigFromBase(m.BaseModel, nil),
Model: modelName,
Expand Down Expand Up @@ -329,6 +333,7 @@ func CreateLLM(ctx context.Context, m adk.Model, log logr.Logger) (adkmodel.LLM,
return models.NewAnthropicVertexAIModelWithLogger(ctx, cfg, region, project, log)

case *adk.SAPAICore:
warnIgnoredMaxRetries(log, m.BaseModel, "sap_ai_core")
cfg := models.SAPAICoreConfig{
Model: m.Model,
BaseUrl: m.BaseUrl,
Expand All @@ -352,6 +357,15 @@ func transportConfigFromBase(b adk.BaseModel, timeout *int) models.TransportConf
TLSDisableSystemCAs: b.TLSDisableSystemCAs,
APIKeyPassthrough: b.APIKeyPassthrough,
Timeout: timeout,
MaxRetries: b.MaxRetries,
}
}

// warnIgnoredMaxRetries logs a warning when retry configuration is set on a
// provider whose Go SDK does not support configurable HTTP retries.
func warnIgnoredMaxRetries(log logr.Logger, b adk.BaseModel, provider string) {
if b.MaxRetries != nil {
log.Info("Model retry configuration (max_retries) is not supported for this provider in the Go runtime; ignoring", "provider", provider)
}
}

Expand Down
9 changes: 9 additions & 0 deletions go/adk/pkg/models/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func newAnthropicModelFromConfig(config *AnthropicConfig, apiKey string, logger
if config.BaseUrl != "" {
opts = append(opts, option.WithBaseURL(config.BaseUrl))
}
if config.MaxRetries != nil {
opts = append(opts, option.WithMaxRetries(*config.MaxRetries))
}

// Create HTTP client with TLS, custom headers, and timeout.
httpClient, err := BuildHTTPClient(config.TransportConfig)
Expand Down Expand Up @@ -95,6 +98,9 @@ func NewAnthropicVertexAIModelWithLogger(ctx context.Context, config *AnthropicC
opts := []option.RequestOption{
vertex.WithGoogleAuth(ctx, region, projectID),
}
if config.MaxRetries != nil {
opts = append(opts, option.WithMaxRetries(*config.MaxRetries))
}

// Create HTTP client with timeout, custom headers, TLS, and passthrough
httpClient, err := BuildHTTPClient(config.TransportConfig)
Expand Down Expand Up @@ -126,6 +132,9 @@ func NewAnthropicBedrockModelWithLogger(ctx context.Context, config *AnthropicCo
awsconfig.WithRegion(region),
),
}
if config.MaxRetries != nil {
opts = append(opts, option.WithMaxRetries(*config.MaxRetries))
}

// Create HTTP client with timeout, custom headers, TLS, and passthrough
httpClient, err := BuildHTTPClient(config.TransportConfig)
Expand Down
1 change: 1 addition & 0 deletions go/adk/pkg/models/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type TransportConfig struct {
TLSDisableSystemCAs *bool
APIKeyPassthrough bool
Timeout *int // seconds; nil = defaultTimeout
MaxRetries *int // HTTP retry attempts for transient failures; nil = SDK default
}

// BuildHTTPClient creates an http.Client with the full transport stack:
Expand Down
6 changes: 6 additions & 0 deletions go/adk/pkg/models/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func newOpenAIModelFromConfig(config *OpenAIConfig, apiKey string, logger logr.L
if config.BaseUrl != "" {
opts = append(opts, option.WithBaseURL(config.BaseUrl))
}
if config.MaxRetries != nil {
opts = append(opts, option.WithMaxRetries(*config.MaxRetries))
}
httpClient, err := BuildHTTPClient(config.TransportConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -123,6 +126,9 @@ func NewAzureOpenAIModelWithLogger(config *AzureOpenAIConfig, logger logr.Logger
option.WithQueryAdd("api-version", apiVersion),
option.WithMiddleware(azurePathRewriteMiddleware()),
}
if config.MaxRetries != nil {
opts = append(opts, option.WithMaxRetries(*config.MaxRetries))
}

if !config.APIKeyPassthrough {
apiKey := os.Getenv("AZURE_OPENAI_API_KEY")
Expand Down
51 changes: 51 additions & 0 deletions go/adk/pkg/runner/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/kagent-dev/kagent/go/api/adk"
adkmemory "google.golang.org/adk/memory"
adkplugin "google.golang.org/adk/plugin"
"google.golang.org/adk/plugin/loggingplugin"
"google.golang.org/adk/plugin/retryandreflect"
"google.golang.org/adk/runner"
adksession "google.golang.org/adk/session"
adktool "google.golang.org/adk/tool"
Expand Down Expand Up @@ -83,6 +85,12 @@ func CreateRunnerConfig(
}
}

reliabilityPlugins, err := buildReliabilityPlugins(agentConfig.Reliability, log)
if err != nil {
return runner.Config{}, nil, err
}
adkPlugins = append(adkPlugins, reliabilityPlugins...)

cfg := runner.Config{
AppName: appName,
Agent: adkAgent,
Expand All @@ -96,6 +104,49 @@ func CreateRunnerConfig(
return cfg, subagentSessionIDs, nil
}

// buildReliabilityPlugins translates the agent reliability configuration into
// ADK plugins: debug logging, tool retry (reflect-and-retry), and a max LLM
// calls limit.
func buildReliabilityPlugins(r *adk.ReliabilityConfig, log logr.Logger) ([]*adkplugin.Plugin, error) {
if r == nil {
return nil, nil
}

var plugins []*adkplugin.Plugin

if r.DebugLogging != nil && *r.DebugLogging {
p, err := loggingplugin.New("kagent_debug_logging")
if err != nil {
return nil, fmt.Errorf("failed to create debug logging plugin: %w", err)
}
plugins = append(plugins, p)
log.Info("Debug logging enabled for agent")
}

if r.ToolRetries != nil && *r.ToolRetries > 0 {
p, err := retryandreflect.New(
retryandreflect.WithMaxRetries(*r.ToolRetries),
retryandreflect.WithErrorIfRetryExceeded(false),
)
if err != nil {
return nil, fmt.Errorf("failed to create tool retry plugin: %w", err)
}
plugins = append(plugins, p)
log.Info("Tool retry enabled for agent", "toolRetries", *r.ToolRetries)
}

if r.MaxLLMCalls != nil && *r.MaxLLMCalls > 0 {
p, err := newMaxLLMCallsPlugin(*r.MaxLLMCalls)
if err != nil {
return nil, fmt.Errorf("failed to create max LLM calls plugin: %w", err)
}
plugins = append(plugins, p)
log.Info("Max LLM calls limit enabled for agent", "maxLLMCalls", *r.MaxLLMCalls)
}

return plugins, nil
}

func buildTokenPropagationPlugin(ctx context.Context, log logr.Logger) (*sts.TokenPropagationPlugin, error) {
propagateToken := strings.EqualFold(strings.TrimSpace(os.Getenv("KAGENT_PROPAGATE_TOKEN")), "true")
stsWellKnownURI := strings.TrimSpace(os.Getenv("STS_WELL_KNOWN_URI"))
Expand Down
128 changes: 128 additions & 0 deletions go/adk/pkg/runner/adapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package runner

import (
"context"
"strings"
"testing"

"github.com/go-logr/logr"
"github.com/kagent-dev/kagent/go/api/adk"
"google.golang.org/adk/agent"
"google.golang.org/adk/session"
"google.golang.org/genai"
)

func TestBuildReliabilityPlugins(t *testing.T) {
tests := []struct {
name string
config *adk.ReliabilityConfig
wantNames []string
}{
{
name: "nil config",
config: nil,
wantNames: nil,
},
{
name: "empty config",
config: &adk.ReliabilityConfig{},
wantNames: nil,
},
{
name: "debug logging only",
config: &adk.ReliabilityConfig{DebugLogging: new(true)},
wantNames: []string{"kagent_debug_logging"},
},
{
name: "debug logging false",
config: &adk.ReliabilityConfig{DebugLogging: new(false)},
wantNames: nil,
},
{
name: "tool retries only",
config: &adk.ReliabilityConfig{ToolRetries: new(3)},
wantNames: []string{"RetryAndReflectPlugin"},
},
Comment thread
peterj marked this conversation as resolved.
{
name: "max llm calls only",
config: &adk.ReliabilityConfig{MaxLLMCalls: new(10)},
wantNames: []string{"kagent_max_llm_calls"},
},
{
name: "all enabled",
config: &adk.ReliabilityConfig{
ToolRetries: new(2),
MaxLLMCalls: new(50),
DebugLogging: new(true),
},
wantNames: []string{"kagent_debug_logging", "RetryAndReflectPlugin", "kagent_max_llm_calls"},
},
Comment thread
peterj marked this conversation as resolved.
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugins, err := buildReliabilityPlugins(tt.config, logr.Discard())
if err != nil {
t.Fatalf("buildReliabilityPlugins() error = %v", err)
}
if len(plugins) != len(tt.wantNames) {
t.Fatalf("got %d plugins, want %d", len(plugins), len(tt.wantNames))
}
for i, want := range tt.wantNames {
if got := plugins[i].Name(); got != want {
t.Errorf("plugin[%d].Name() = %q, want %q", i, got, want)
}
}
})
}
}

// fakeCallbackContext is a minimal agent.CallbackContext for testing.
type fakeCallbackContext struct {
context.Context
invocationID string
}

func (f *fakeCallbackContext) UserContent() *genai.Content { return nil }
func (f *fakeCallbackContext) InvocationID() string { return f.invocationID }
func (f *fakeCallbackContext) AgentName() string { return "test-agent" }
func (f *fakeCallbackContext) ReadonlyState() session.ReadonlyState { return nil }
func (f *fakeCallbackContext) UserID() string { return "user" }
func (f *fakeCallbackContext) AppName() string { return "app" }
func (f *fakeCallbackContext) SessionID() string { return "session" }
func (f *fakeCallbackContext) Branch() string { return "" }
func (f *fakeCallbackContext) Artifacts() agent.Artifacts { return nil }
func (f *fakeCallbackContext) State() session.State { return nil }

func TestMaxLLMCallsPlugin(t *testing.T) {
p, err := newMaxLLMCallsPlugin(2)
if err != nil {
t.Fatalf("newMaxLLMCallsPlugin() error = %v", err)
}
cb := p.BeforeModelCallback()
if cb == nil {
t.Fatal("BeforeModelCallback is nil")
}

ctxA := &fakeCallbackContext{Context: context.Background(), invocationID: "inv-a"}
ctxB := &fakeCallbackContext{Context: context.Background(), invocationID: "inv-b"}

// First two calls within the limit succeed.
for i := range 2 {
if _, err := cb(ctxA, nil); err != nil {
t.Fatalf("call %d: unexpected error: %v", i+1, err)
}
}

// Third call exceeds the limit.
if _, err := cb(ctxA, nil); err == nil {
t.Fatal("expected error after exceeding limit, got nil")
} else if !strings.Contains(err.Error(), "limit of 2 model calls") {
t.Errorf("unexpected error message: %v", err)
}

// A different invocation has its own counter.
if _, err := cb(ctxB, nil); err != nil {
t.Fatalf("different invocation should not be limited: %v", err)
}
}
42 changes: 42 additions & 0 deletions go/adk/pkg/runner/maxllmcalls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package runner

import (
"fmt"
"sync"

"google.golang.org/adk/agent"
"google.golang.org/adk/model"
adkplugin "google.golang.org/adk/plugin"
)

// newMaxLLMCallsPlugin returns a plugin that enforces a limit on the number of
// LLM calls within a single invocation. The Go ADK has no native equivalent of
// the Python RunConfig.max_llm_calls, so this is implemented as a
// BeforeModelCallback that counts model calls per invocation ID and aborts the
// run when the limit is exceeded.
func newMaxLLMCallsPlugin(limit int) (*adkplugin.Plugin, error) {
var mu sync.Mutex
counts := make(map[string]int)

return adkplugin.New(adkplugin.Config{
Name: "kagent_max_llm_calls",
BeforeModelCallback: func(ctx agent.CallbackContext, llmRequest *model.LLMRequest) (*model.LLMResponse, error) {
mu.Lock()
defer mu.Unlock()
id := ctx.InvocationID()
counts[id]++
if counts[id] > limit {
return nil, fmt.Errorf(
"agent stopped: exceeded the configured limit of %d model calls in a single run (reliability.maxLLMCalls)",
limit,
)
}
return nil, nil
},
AfterRunCallback: func(ictx agent.InvocationContext) {
mu.Lock()
defer mu.Unlock()
delete(counts, ictx.InvocationID())
},
})
}
Loading
Loading