@@ -24,12 +24,16 @@ import (
2424 "github.com/apache/incubator-devlake/core/log"
2525 "github.com/apache/incubator-devlake/core/plugin"
2626 "github.com/apache/incubator-devlake/core/utils"
27+ "strconv"
2728 "sync"
2829 "time"
2930
3031 "github.com/merico-ai/graphql"
3132)
3233
34+ // GraphqlClientOption is a function that configures a GraphqlAsyncClient
35+ type GraphqlClientOption func (* GraphqlAsyncClient )
36+
3337// GraphqlAsyncClient send graphql one by one
3438type GraphqlAsyncClient struct {
3539 ctx context.Context
@@ -47,34 +51,48 @@ type GraphqlAsyncClient struct {
4751 getRateCost func (q interface {}) int
4852}
4953
50- const defaultRateLimit = 5000
54+ // defaultRateLimitConst is the generic fallback rate limit for GraphQL requests.
55+ // It is used as the initial remaining quota when dynamic rate limit
56+ // information is unavailable from the provider.
57+ const defaultRateLimitConst = 1000
5158
5259// CreateAsyncGraphqlClient creates a new GraphqlAsyncClient
5360func CreateAsyncGraphqlClient (
5461 taskCtx plugin.TaskContext ,
5562 graphqlClient * graphql.Client ,
5663 logger log.Logger ,
5764 getRateRemaining func (context.Context , * graphql.Client , log.Logger ) (rateRemaining int , resetAt * time.Time , err errors.Error ),
65+ opts ... GraphqlClientOption ,
5866) (* GraphqlAsyncClient , errors.Error ) {
5967 ctxWithCancel , cancel := context .WithCancel (taskCtx .GetContext ())
68+
69+ rateLimit := resolveRateLimit (taskCtx , logger )
70+
6071 graphqlAsyncClient := & GraphqlAsyncClient {
6172 ctx : ctxWithCancel ,
6273 cancel : cancel ,
6374 client : graphqlClient ,
6475 logger : logger ,
6576 rateExhaustCond : sync .NewCond (& sync.Mutex {}),
66- rateRemaining : 0 ,
77+ rateRemaining : rateLimit ,
6778 getRateRemaining : getRateRemaining ,
6879 }
6980
81+ // apply options
82+ for _ , opt := range opts {
83+ opt (graphqlAsyncClient )
84+ }
85+
7086 if getRateRemaining != nil {
7187 rateRemaining , resetAt , err := getRateRemaining (taskCtx .GetContext (), graphqlClient , logger )
7288 if err != nil {
7389 graphqlAsyncClient .logger .Warn (err , "failed to fetch initial graphql rate limit, fallback to default" )
74- graphqlAsyncClient .updateRateRemaining (defaultRateLimit , nil )
90+ graphqlAsyncClient .updateRateRemaining (graphqlAsyncClient . rateRemaining , nil )
7591 } else {
7692 graphqlAsyncClient .updateRateRemaining (rateRemaining , resetAt )
7793 }
94+ } else {
95+ graphqlAsyncClient .updateRateRemaining (graphqlAsyncClient .rateRemaining , nil )
7896 }
7997
8098 // load retry/timeout from configuration
@@ -119,6 +137,10 @@ func (apiClient *GraphqlAsyncClient) updateRateRemaining(rateRemaining int, rese
119137 apiClient .rateExhaustCond .Signal ()
120138 }
121139 go func () {
140+ if apiClient .getRateRemaining == nil {
141+ return
142+ }
143+
122144 nextDuring := 3 * time .Minute
123145 if resetAt != nil && resetAt .After (time .Now ()) {
124146 nextDuring = time .Until (* resetAt )
@@ -224,3 +246,29 @@ func (apiClient *GraphqlAsyncClient) Wait() {
224246func (apiClient * GraphqlAsyncClient ) Release () {
225247 apiClient .cancel ()
226248}
249+
250+ // WithFallbackRateLimit sets the initial/fallback rate limit used when
251+ // rate limit information cannot be fetched dynamically.
252+ // This value may be overridden later by getRateRemaining.
253+ func WithFallbackRateLimit (limit int ) GraphqlClientOption {
254+ return func (c * GraphqlAsyncClient ) {
255+ if limit > 0 {
256+ c .rateRemaining = limit
257+ }
258+ }
259+ }
260+
261+ // resolveRateLimit determines the rate limit for GraphQL requests using task configuration -> else default constant.
262+ func resolveRateLimit (taskCtx plugin.TaskContext , logger log.Logger ) int {
263+ rateLimit := defaultRateLimitConst
264+
265+ if v := taskCtx .GetConfig ("GRAPHQL_RATE_LIMIT" ); v != "" {
266+ if parsed , err := strconv .Atoi (v ); err == nil {
267+ rateLimit = parsed
268+ } else {
269+ logger .Warn (err , "invalid GRAPHQL_RATE_LIMIT, using default" )
270+ }
271+ }
272+
273+ return rateLimit
274+ }
0 commit comments