From c4331b643313cbfbfe0a7199deea495cfdf3630f Mon Sep 17 00:00:00 2001 From: Walter Galvao Date: Tue, 9 Jul 2024 20:30:45 -0300 Subject: [PATCH] fix(api): rate limit moveToDelayed --- .../services/github-rate-limit.service.ts | 34 ++++++++----------- .../workers/github-members-sync.worker.ts | 4 ++- .../github-repositories-sync.worker.ts | 6 +++- .../github-repository-pull-requests.worker.ts | 6 +++- .../workers/github-sync-code-review.worker.ts | 4 ++- .../github-sync-pull-request.worker.ts | 4 ++- 6 files changed, 34 insertions(+), 24 deletions(-) diff --git a/apps/api/src/app/github/services/github-rate-limit.service.ts b/apps/api/src/app/github/services/github-rate-limit.service.ts index 23f18fc..0f15b3c 100644 --- a/apps/api/src/app/github/services/github-rate-limit.service.ts +++ b/apps/api/src/app/github/services/github-rate-limit.service.ts @@ -1,11 +1,10 @@ -import { Job } from "bullmq"; +import { DelayedError, Job } from "bullmq"; import { fromUnixTime } from "date-fns/fromUnixTime"; import { logger } from "../../../lib/logger"; import { redisConnection } from "../../../bull-mq/redis-connection"; import { BusinessRuleException } from "../../errors/exceptions/business-rule.exception"; import { isPast } from "date-fns/isPast"; import { getTime } from "date-fns/getTime"; -import { SweetQueue, addJob } from "../../../bull-mq/queues"; import { addSeconds } from "date-fns"; const oneSecondInMs = 1000; @@ -13,10 +12,11 @@ const oneSecondInMs = 1000; interface WithDelayedRetryOnRateLimitArgs { job: Job; installationId: number; + jobToken?: string; } export const withDelayedRetryOnRateLimit = async ( callback: () => Promise, - { job, installationId }: WithDelayedRetryOnRateLimitArgs + { job, jobToken, installationId }: WithDelayedRetryOnRateLimitArgs ) => { try { const rateLimitResetAtInMs = @@ -32,11 +32,8 @@ export const withDelayedRetryOnRateLimit = async ( } ); - await addJob(job.queueName as SweetQueue, job.data, { - delay: rateLimitResetAtInMs + oneSecondInMs - Date.now(), - }); - - return; + await job.moveToDelayed(rateLimitResetAtInMs + oneSecondInMs, jobToken); + throw new DelayedError(); } await callback(); @@ -71,13 +68,15 @@ export const withDelayedRetryOnRateLimit = async ( getTime(rateLimitResetAt) ); - await addJob(job.queueName as SweetQueue, job.data, { - delay: getTime(rateLimitResetAt) + oneSecondInMs - Date.now(), - }); + await job.moveToDelayed( + getTime(rateLimitResetAt) + oneSecondInMs, + jobToken + ); + throw new DelayedError(); + } - return; - } else if ("headers" in error && error.headers["retry-after"]) { - // Secondary rate limit (Concurrency) + // Secondary rate limit (Concurrency) + if ("headers" in error && error.headers["retry-after"]) { const retryAfterSeconds = parseInt(error.headers["retry-after"]) + 5; // 5 seconds buffer const canRetryAt = addSeconds(new Date(), retryAfterSeconds); @@ -88,11 +87,8 @@ export const withDelayedRetryOnRateLimit = async ( setGitInstallationRateLimitEpochInMs(installationId, getTime(canRetryAt)); - await addJob(job.queueName as SweetQueue, job.data, { - delay: getTime(canRetryAt) + oneSecondInMs - Date.now(), - }); - - return; + await job.moveToDelayed(getTime(canRetryAt) + oneSecondInMs, jobToken); + throw new DelayedError(); } throw error; diff --git a/apps/api/src/app/github/workers/github-members-sync.worker.ts b/apps/api/src/app/github/workers/github-members-sync.worker.ts index 6b8f78a..7498e96 100644 --- a/apps/api/src/app/github/workers/github-members-sync.worker.ts +++ b/apps/api/src/app/github/workers/github-members-sync.worker.ts @@ -12,7 +12,8 @@ import { withDelayedRetryOnRateLimit } from "../services/github-rate-limit.servi export const githubMemberSyncWorker = createWorker( SweetQueue.GITHUB_MEMBERS_SYNC, async ( - job: Job + job: Job, + token?: string ) => { const installationId = job.data.installation?.id; @@ -28,6 +29,7 @@ export const githubMemberSyncWorker = createWorker( syncOrganizationMembers(installationId, job.data.organization.login), { job, + jobToken: token, installationId, } ); diff --git a/apps/api/src/app/github/workers/github-repositories-sync.worker.ts b/apps/api/src/app/github/workers/github-repositories-sync.worker.ts index f5e6ea4..2341fef 100644 --- a/apps/api/src/app/github/workers/github-repositories-sync.worker.ts +++ b/apps/api/src/app/github/workers/github-repositories-sync.worker.ts @@ -8,7 +8,10 @@ import { withDelayedRetryOnRateLimit } from "../services/github-rate-limit.servi export const githubRepositoriesSyncWorker = createWorker( SweetQueue.GITHUB_REPOSITORIES_SYNC, - async (job: Job) => { + async ( + job: Job, + token?: string + ) => { const installationId = job.data.installation?.id; if (!installationId) { @@ -26,6 +29,7 @@ export const githubRepositoriesSyncWorker = createWorker( ), { job, + jobToken: token, installationId, } ); diff --git a/apps/api/src/app/github/workers/github-repository-pull-requests.worker.ts b/apps/api/src/app/github/workers/github-repository-pull-requests.worker.ts index 083aa2f..90cf646 100644 --- a/apps/api/src/app/github/workers/github-repository-pull-requests.worker.ts +++ b/apps/api/src/app/github/workers/github-repository-pull-requests.worker.ts @@ -8,7 +8,10 @@ import { withDelayedRetryOnRateLimit } from "../services/github-rate-limit.servi export const githubRepositoryPullRequestsSyncWorker = createWorker( SweetQueue.GITHUB_SYNC_REPOSITORY_PULL_REQUESTS, - async (job: Job<{ gitInstallationId: number; repository: Repository }>) => { + async ( + job: Job<{ gitInstallationId: number; repository: Repository }>, + token?: string + ) => { const installationId = job.data.gitInstallationId; if (!installationId || !job.data.repository.name) { @@ -23,6 +26,7 @@ export const githubRepositoryPullRequestsSyncWorker = createWorker( ), { job, + jobToken: token, installationId, } ); diff --git a/apps/api/src/app/github/workers/github-sync-code-review.worker.ts b/apps/api/src/app/github/workers/github-sync-code-review.worker.ts index 429bce7..e504ac1 100644 --- a/apps/api/src/app/github/workers/github-sync-code-review.worker.ts +++ b/apps/api/src/app/github/workers/github-sync-code-review.worker.ts @@ -12,7 +12,8 @@ import { withDelayedRetryOnRateLimit } from "../services/github-rate-limit.servi export const syncCodeReviewWorker = createWorker( SweetQueue.GITHUB_SYNC_CODE_REVIEW, async ( - job: Job + job: Job, + token?: string ) => { const installationId = job.data.installation?.id; @@ -34,6 +35,7 @@ export const syncCodeReviewWorker = createWorker( () => syncCodeReviews(installationId, job.data.pull_request.node_id), { job, + jobToken: token, installationId, } ); diff --git a/apps/api/src/app/github/workers/github-sync-pull-request.worker.ts b/apps/api/src/app/github/workers/github-sync-pull-request.worker.ts index 8132882..9ceae0d 100644 --- a/apps/api/src/app/github/workers/github-sync-pull-request.worker.ts +++ b/apps/api/src/app/github/workers/github-sync-pull-request.worker.ts @@ -17,7 +17,8 @@ export const syncPullRequestWorker = createWorker( syncReviews?: boolean; initialSync?: boolean; } - > + >, + token?: string ) => { if (!job.data.installation?.id) { throw new InputValidationException( @@ -45,6 +46,7 @@ export const syncPullRequestWorker = createWorker( syncPullRequest(installationId, job.data.pull_request.node_id, options), { job, + jobToken: token, installationId, } );