From 245660a8cb81e2aaa6e7f72521b94e8e8909ecaf Mon Sep 17 00:00:00 2001 From: simonfaltum Date: Tue, 9 Jun 2026 22:51:22 +0200 Subject: [PATCH 1/2] Validate job_cluster_key inside for_each_task The job_cluster_key validation only checked top-level tasks, so an undefined key referenced by a task nested under for_each_task passed "bundle validate" silently and failed at run time. Extend the check to tasks[*].for_each_task.task, mirroring the path set used by the job paths visitor. Co-authored-by: Isaac --- .../validate/job_cluster_key_defined.go | 42 ++++++++----- .../validate/job_cluster_key_defined_test.go | 61 +++++++++++++++++++ 2 files changed, 89 insertions(+), 14 deletions(-) diff --git a/bundle/config/validate/job_cluster_key_defined.go b/bundle/config/validate/job_cluster_key_defined.go index 5ae2f5437b8..a21eb29ffb3 100644 --- a/bundle/config/validate/job_cluster_key_defined.go +++ b/bundle/config/validate/job_cluster_key_defined.go @@ -31,23 +31,37 @@ func (v *jobClusterKeyDefined) Apply(ctx context.Context, b *bundle.Bundle) diag } for index, task := range job.Tasks { - if task.JobClusterKey != "" { - if _, ok := jobClusterKeys[task.JobClusterKey]; !ok { - path := fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index) - - diags = diags.Append(diag.Diagnostic{ - Severity: diag.Warning, - Summary: fmt.Sprintf("job_cluster_key %s is not defined", task.JobClusterKey), - // Show only the location where the job_cluster_key is defined. - // Other associated locations are not relevant since they are - // overridden during merging. - Locations: b.Config.GetLocations(path), - Paths: []dyn.Path{dyn.MustPathFromString(path)}, - }) - } + diags = diags.Extend(checkJobClusterKey(b, jobClusterKeys, task.JobClusterKey, + fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index))) + + // The task wrapped by a for_each_task can reference a job cluster as well. + // The Jobs API rejects nested for_each_task, so one level is sufficient. + if task.ForEachTask != nil { + diags = diags.Extend(checkJobClusterKey(b, jobClusterKeys, task.ForEachTask.Task.JobClusterKey, + fmt.Sprintf("resources.jobs.%s.tasks[%d].for_each_task.task.job_cluster_key", k, index))) } } } return diags } + +// checkJobClusterKey warns if jobClusterKey is set but not defined in the job's job_clusters. +func checkJobClusterKey(b *bundle.Bundle, jobClusterKeys map[string]bool, jobClusterKey, path string) diag.Diagnostics { + if jobClusterKey == "" { + return nil + } + if _, ok := jobClusterKeys[jobClusterKey]; ok { + return nil + } + + return diag.Diagnostics{{ + Severity: diag.Warning, + Summary: fmt.Sprintf("job_cluster_key %s is not defined", jobClusterKey), + // Show only the location where the job_cluster_key is defined. + // Other associated locations are not relevant since they are + // overridden during merging. + Locations: b.Config.GetLocations(path), + Paths: []dyn.Path{dyn.MustPathFromString(path)}, + }} +} diff --git a/bundle/config/validate/job_cluster_key_defined_test.go b/bundle/config/validate/job_cluster_key_defined_test.go index 4cc35c1ad68..248bc31a60e 100644 --- a/bundle/config/validate/job_cluster_key_defined_test.go +++ b/bundle/config/validate/job_cluster_key_defined_test.go @@ -62,6 +62,67 @@ func TestJobClusterKeyNotDefined(t *testing.T) { require.Equal(t, "job_cluster_key do-not-exist is not defined", diags[0].Summary) } +func TestJobClusterKeyDefinedInForEachTask(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": { + JobSettings: jobs.JobSettings{ + Name: "job1", + JobClusters: []jobs.JobCluster{ + {JobClusterKey: "do-not-exist"}, + }, + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{JobClusterKey: "do-not-exist"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + diags := JobClusterKeyDefined().Apply(t.Context(), b) + require.Empty(t, diags) + require.NoError(t, diags.Error()) +} + +func TestJobClusterKeyNotDefinedInForEachTask(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": { + JobSettings: jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{JobClusterKey: "do-not-exist"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + diags := JobClusterKeyDefined().Apply(t.Context(), b) + require.Len(t, diags, 1) + require.NoError(t, diags.Error()) + require.Equal(t, diag.Warning, diags[0].Severity) + require.Equal(t, "job_cluster_key do-not-exist is not defined", diags[0].Summary) + require.Len(t, diags[0].Paths, 1) + require.Equal(t, "resources.jobs.job1.tasks[0].for_each_task.task.job_cluster_key", diags[0].Paths[0].String()) +} + func TestJobClusterKeyDefinedInDifferentJob(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ From 416c1ced863dae6271ba11897d0aca606748e8db Mon Sep 17 00:00:00 2001 From: simonfaltum Date: Wed, 10 Jun 2026 07:23:06 +0200 Subject: [PATCH 2/2] Remove redundant comments Co-authored-by: Isaac --- bundle/config/validate/job_cluster_key_defined.go | 1 - 1 file changed, 1 deletion(-) diff --git a/bundle/config/validate/job_cluster_key_defined.go b/bundle/config/validate/job_cluster_key_defined.go index a21eb29ffb3..019a785658a 100644 --- a/bundle/config/validate/job_cluster_key_defined.go +++ b/bundle/config/validate/job_cluster_key_defined.go @@ -34,7 +34,6 @@ func (v *jobClusterKeyDefined) Apply(ctx context.Context, b *bundle.Bundle) diag diags = diags.Extend(checkJobClusterKey(b, jobClusterKeys, task.JobClusterKey, fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index))) - // The task wrapped by a for_each_task can reference a job cluster as well. // The Jobs API rejects nested for_each_task, so one level is sufficient. if task.ForEachTask != nil { diags = diags.Extend(checkJobClusterKey(b, jobClusterKeys, task.ForEachTask.Task.JobClusterKey,