Skip to content

Commit 4b418b9

Browse files
authored
Fix Pagination Bug in Github GraphQL Job Collector and add configurable page and batch size (#8616)
* fix: split pagination and batching for github graphql job_collector task * feat: github graphql job collection mode, page size, batch size configurable via Environment Variables
1 parent 60e7f22 commit 4b418b9

1 file changed

Lines changed: 200 additions & 42 deletions

File tree

backend/plugins/github_graphql/tasks/job_collector.go

Lines changed: 200 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,101 @@ import (
3434

3535
const RAW_GRAPHQL_JOBS_TABLE = "github_graphql_jobs"
3636

37-
type GraphqlQueryCheckRunWrapper struct {
37+
// Collection mode configuration
38+
const (
39+
JOB_COLLECTION_MODE_BATCHING = "BATCHING"
40+
JOB_COLLECTION_MODE_PAGINATING = "PAGINATING"
41+
)
42+
43+
// Set the collection mode here
44+
// BATCHING: Query multiple runs at once, no pagination (may miss jobs if >20 per run)
45+
// PAGINATING: Query one run at a time with full pagination (complete data, more API calls)
46+
const DEFAULT_JOB_COLLECTION_MODE = JOB_COLLECTION_MODE_BATCHING
47+
48+
// Mode-specific configuration
49+
const (
50+
DEFAULT_BATCHING_INPUT_STEP = 10 // Number of runs per request in BATCHING mode (must be > 1)
51+
DEFAULT_BATCHING_PAGE_SIZE = 20 // Jobs per run in BATCHING mode (no pagination)
52+
PAGINATING_INPUT_STEP = 1 // Number of runs per request in PAGINATING mode (always 1)
53+
DEFAULT_PAGINATING_PAGE_SIZE = 50 // Jobs per page in PAGINATING mode (with pagination)
54+
)
55+
56+
// JobCollectionConfig holds the configuration for job collection
57+
type JobCollectionConfig struct {
58+
Mode string
59+
PageSize int
60+
InputStep int
61+
BatchingInputStep int
62+
BatchingPageSize int
63+
PaginatingPageSize int
64+
}
65+
66+
// getJobCollectionConfig reads configuration from environment variables with fallback to defaults
67+
func getJobCollectionConfig(taskCtx plugin.SubTaskContext) *JobCollectionConfig {
68+
cfg := taskCtx.TaskContext().GetConfigReader()
69+
70+
config := &JobCollectionConfig{
71+
Mode: DEFAULT_JOB_COLLECTION_MODE,
72+
BatchingInputStep: DEFAULT_BATCHING_INPUT_STEP,
73+
BatchingPageSize: DEFAULT_BATCHING_PAGE_SIZE,
74+
PaginatingPageSize: DEFAULT_PAGINATING_PAGE_SIZE,
75+
}
76+
77+
// Read collection mode from environment
78+
if mode := taskCtx.TaskContext().GetConfig("GITHUB_GRAPHQL_JOB_COLLECTION_MODE"); mode != "" {
79+
if mode == JOB_COLLECTION_MODE_BATCHING || mode == JOB_COLLECTION_MODE_PAGINATING {
80+
config.Mode = mode
81+
}
82+
}
83+
84+
// Read batching input step (must be > 1)
85+
if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP") {
86+
if step := cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP"); step > 1 {
87+
config.BatchingInputStep = step
88+
}
89+
}
90+
91+
// Read page sizes
92+
if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE") {
93+
if size := cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE"); size > 0 {
94+
config.BatchingPageSize = size
95+
}
96+
}
97+
98+
if cfg.IsSet("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE") {
99+
if size := cfg.GetInt("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE"); size > 0 {
100+
config.PaginatingPageSize = size
101+
}
102+
}
103+
104+
// Set derived values based on mode
105+
if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
106+
config.PageSize = config.PaginatingPageSize
107+
config.InputStep = PAGINATING_INPUT_STEP // Always 1 for paginating
108+
} else {
109+
config.PageSize = config.BatchingPageSize
110+
config.InputStep = config.BatchingInputStep // User-configurable for batching
111+
}
112+
113+
return config
114+
}
115+
116+
// Batch mode: query multiple runs at once (array of nodes)
117+
type GraphqlQueryCheckRunWrapperBatch struct {
38118
RateLimit struct {
39119
Cost int
40120
}
41121
Node []GraphqlQueryCheckSuite `graphql:"node(id: $id)" graphql-extend:"true"`
42122
}
43123

124+
// Paginating mode: query single run (single node)
125+
type GraphqlQueryCheckRunWrapperSingle struct {
126+
RateLimit struct {
127+
Cost int
128+
}
129+
Node GraphqlQueryCheckSuite `graphql:"node(id: $id)"`
130+
}
131+
44132
type GraphqlQueryCheckSuite struct {
45133
Id string
46134
Typename string `graphql:"__typename"`
@@ -111,47 +199,81 @@ var CollectJobsMeta = plugin.SubTaskMeta{
111199

112200
var _ plugin.SubTaskEntryPoint = CollectJobs
113201

114-
func getPageInfo(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
115-
queryWrapper := query.(*GraphqlQueryCheckRunWrapper)
116-
hasNextPage := false
117-
endCursor := ""
118-
for _, node := range queryWrapper.Node {
119-
if node.CheckSuite.CheckRuns.PageInfo.HasNextPage {
120-
hasNextPage = true
121-
endCursor = node.CheckSuite.CheckRuns.PageInfo.EndCursor
122-
break
202+
// createGetPageInfoFunc returns the appropriate page info function based on collection mode
203+
func createGetPageInfoFunc(mode string) func(interface{}, *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
204+
if mode == JOB_COLLECTION_MODE_PAGINATING {
205+
// PAGINATING mode: supports full pagination
206+
return func(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
207+
queryWrapper := query.(*GraphqlQueryCheckRunWrapperSingle)
208+
return &helper.GraphqlQueryPageInfo{
209+
EndCursor: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.EndCursor,
210+
HasNextPage: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.HasNextPage,
211+
}, nil
123212
}
124213
}
125-
return &helper.GraphqlQueryPageInfo{
126-
EndCursor: endCursor,
127-
HasNextPage: hasNextPage,
128-
}, nil
214+
215+
// BATCHING mode: no pagination support
216+
return func(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
217+
return &helper.GraphqlQueryPageInfo{
218+
EndCursor: "",
219+
HasNextPage: false,
220+
}, nil
221+
}
129222
}
130223

131-
func buildQuery(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
132-
query := &GraphqlQueryCheckRunWrapper{}
133-
if reqData == nil {
134-
return query, map[string]interface{}{}, nil
135-
}
136-
workflowRuns := reqData.Input.([]interface{})
137-
checkSuiteIds := []map[string]interface{}{}
138-
for _, iWorkflowRuns := range workflowRuns {
139-
workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
140-
checkSuiteIds = append(checkSuiteIds, map[string]interface{}{
141-
`id`: graphql.ID(workflowRun.CheckSuiteNodeID),
142-
})
143-
}
144-
variables := map[string]interface{}{
145-
"node": checkSuiteIds,
146-
"pageSize": graphql.Int(reqData.Pager.Size),
147-
"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
148-
}
149-
return query, variables, nil
224+
// createBuildQueryFunc returns the appropriate build query function based on collection mode
225+
func createBuildQueryFunc(mode string) func(*helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
226+
if mode == JOB_COLLECTION_MODE_PAGINATING {
227+
// PAGINATING mode: single run per request
228+
return func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
229+
if reqData == nil {
230+
return &GraphqlQueryCheckRunWrapperSingle{}, map[string]interface{}{}, nil
231+
}
232+
233+
workflowRun := reqData.Input.(*SimpleWorkflowRun)
234+
query := &GraphqlQueryCheckRunWrapperSingle{}
235+
variables := map[string]interface{}{
236+
"id": graphql.ID(workflowRun.CheckSuiteNodeID),
237+
"pageSize": graphql.Int(reqData.Pager.Size),
238+
"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
239+
}
240+
return query, variables, nil
241+
}
242+
}
243+
244+
// BATCHING mode: multiple runs per request
245+
return func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
246+
if reqData == nil {
247+
return &GraphqlQueryCheckRunWrapperBatch{}, map[string]interface{}{}, nil
248+
}
249+
250+
workflowRuns := reqData.Input.([]interface{})
251+
query := &GraphqlQueryCheckRunWrapperBatch{}
252+
checkSuiteIds := []map[string]interface{}{}
253+
for _, iWorkflowRuns := range workflowRuns {
254+
workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
255+
checkSuiteIds = append(checkSuiteIds, map[string]interface{}{
256+
`id`: graphql.ID(workflowRun.CheckSuiteNodeID),
257+
})
258+
}
259+
variables := map[string]interface{}{
260+
"node": checkSuiteIds,
261+
"pageSize": graphql.Int(reqData.Pager.Size),
262+
"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
263+
}
264+
return query, variables, nil
265+
}
150266
}
151267

152268
func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
153269
db := taskCtx.GetDal()
154270
data := taskCtx.GetData().(*githubTasks.GithubTaskData)
271+
logger := taskCtx.GetLogger()
272+
273+
// Get configuration from environment variables or defaults
274+
config := getJobCollectionConfig(taskCtx)
275+
logger.Info("GitHub Job Collector - Mode: %s, InputStep: %d, PageSize: %d",
276+
config.Mode, config.InputStep, config.PageSize)
155277

156278
apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
157279
Ctx: taskCtx,
@@ -175,28 +297,40 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
175297
clauses = append(clauses, dal.Where("github_updated_at > ?", *apiCollector.GetSince()))
176298
}
177299

178-
cursor, err := db.Cursor(
179-
clauses...,
180-
)
300+
cursor, err := db.Cursor(clauses...)
181301
if err != nil {
182302
return err
183303
}
184304
defer cursor.Close()
305+
185306
iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(SimpleWorkflowRun{}))
186307
if err != nil {
187308
return err
188309
}
189310

311+
// Create closures that capture the runtime mode configuration
312+
buildQueryFunc := createBuildQueryFunc(config.Mode)
313+
var getPageInfoFunc func(interface{}, *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error)
314+
315+
if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
316+
getPageInfoFunc = createGetPageInfoFunc(config.Mode) // Enable pagination
317+
} else {
318+
getPageInfoFunc = nil // Disable pagination for BATCHING mode
319+
}
320+
190321
err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
191322
Input: iterator,
192-
InputStep: 10,
323+
InputStep: config.InputStep,
193324
GraphqlClient: data.GraphqlClient,
194-
BuildQuery: buildQuery,
195-
GetPageInfo: getPageInfo,
325+
BuildQuery: buildQueryFunc,
326+
GetPageInfo: getPageInfoFunc, // nil for BATCHING, function for PAGINATING
196327
ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) {
197-
query := queryWrapper.(*GraphqlQueryCheckRunWrapper)
198-
for _, node := range query.Node {
328+
if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
329+
// Single node processing
330+
query := queryWrapper.(*GraphqlQueryCheckRunWrapperSingle)
331+
node := query.Node
199332
runId := node.CheckSuite.WorkflowRun.DatabaseId
333+
200334
for _, checkRun := range node.CheckSuite.CheckRuns.Nodes {
201335
dbCheckRun := &DbCheckRun{
202336
RunId: runId,
@@ -215,11 +349,35 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
215349
}
216350
messages = append(messages, errors.Must1(json.Marshal(dbCheckRun)))
217351
}
352+
} else {
353+
// Batch processing (multiple nodes)
354+
query := queryWrapper.(*GraphqlQueryCheckRunWrapperBatch)
355+
for _, node := range query.Node {
356+
runId := node.CheckSuite.WorkflowRun.DatabaseId
357+
for _, checkRun := range node.CheckSuite.CheckRuns.Nodes {
358+
dbCheckRun := &DbCheckRun{
359+
RunId: runId,
360+
GraphqlQueryCheckRun: &checkRun,
361+
}
362+
// A checkRun without a startedAt time is a run that was never started (skipped), GitHub returns
363+
// a ZeroTime (Due to the GO implementation) for startedAt, so we need to check for that here.
364+
dbCheckRun.StartedAt = utils.NilIfZeroTime(dbCheckRun.StartedAt)
365+
dbCheckRun.CompletedAt = utils.NilIfZeroTime(dbCheckRun.CompletedAt)
366+
updatedAt := dbCheckRun.StartedAt
367+
if dbCheckRun.CompletedAt != nil {
368+
updatedAt = dbCheckRun.CompletedAt
369+
}
370+
if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(*updatedAt) {
371+
return messages, helper.ErrFinishCollect
372+
}
373+
messages = append(messages, errors.Must1(json.Marshal(dbCheckRun)))
374+
}
375+
}
218376
}
219377
return
220378
},
221379
IgnoreQueryErrors: true,
222-
PageSize: 20,
380+
PageSize: config.PageSize,
223381
})
224382
if err != nil {
225383
return err

0 commit comments

Comments
 (0)