Skip to content

Commit

Permalink
fix(api): rate limit moveToDelayed
Browse files Browse the repository at this point in the history
  • Loading branch information
waltergalvao committed Jul 9, 2024
1 parent fe17756 commit c4331b6
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 24 deletions.
34 changes: 15 additions & 19 deletions apps/api/src/app/github/services/github-rate-limit.service.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import { Job } from "bullmq";
import { DelayedError, Job } from "bullmq";
import { fromUnixTime } from "date-fns/fromUnixTime";
import { logger } from "../../../lib/logger";
import { redisConnection } from "../../../bull-mq/redis-connection";
import { BusinessRuleException } from "../../errors/exceptions/business-rule.exception";
import { isPast } from "date-fns/isPast";
import { getTime } from "date-fns/getTime";
import { SweetQueue, addJob } from "../../../bull-mq/queues";
import { addSeconds } from "date-fns";

const oneSecondInMs = 1000;

interface WithDelayedRetryOnRateLimitArgs {
job: Job;
installationId: number;
jobToken?: string;
}
export const withDelayedRetryOnRateLimit = async (
callback: () => Promise<void>,
{ job, installationId }: WithDelayedRetryOnRateLimitArgs
{ job, jobToken, installationId }: WithDelayedRetryOnRateLimitArgs
) => {
try {
const rateLimitResetAtInMs =
Expand All @@ -32,11 +32,8 @@ export const withDelayedRetryOnRateLimit = async (
}
);

await addJob(job.queueName as SweetQueue, job.data, {
delay: rateLimitResetAtInMs + oneSecondInMs - Date.now(),
});

return;
await job.moveToDelayed(rateLimitResetAtInMs + oneSecondInMs, jobToken);
throw new DelayedError();
}

await callback();
Expand Down Expand Up @@ -71,13 +68,15 @@ export const withDelayedRetryOnRateLimit = async (
getTime(rateLimitResetAt)
);

await addJob(job.queueName as SweetQueue, job.data, {
delay: getTime(rateLimitResetAt) + oneSecondInMs - Date.now(),
});
await job.moveToDelayed(
getTime(rateLimitResetAt) + oneSecondInMs,
jobToken
);
throw new DelayedError();
}

return;
} else if ("headers" in error && error.headers["retry-after"]) {
// Secondary rate limit (Concurrency)
// Secondary rate limit (Concurrency)
if ("headers" in error && error.headers["retry-after"]) {
const retryAfterSeconds = parseInt(error.headers["retry-after"]) + 5; // 5 seconds buffer
const canRetryAt = addSeconds(new Date(), retryAfterSeconds);

Expand All @@ -88,11 +87,8 @@ export const withDelayedRetryOnRateLimit = async (

setGitInstallationRateLimitEpochInMs(installationId, getTime(canRetryAt));

await addJob(job.queueName as SweetQueue, job.data, {
delay: getTime(canRetryAt) + oneSecondInMs - Date.now(),
});

return;
await job.moveToDelayed(getTime(canRetryAt) + oneSecondInMs, jobToken);
throw new DelayedError();
}

throw error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import { withDelayedRetryOnRateLimit } from "../services/github-rate-limit.servi
export const githubMemberSyncWorker = createWorker(
SweetQueue.GITHUB_MEMBERS_SYNC,
async (
job: Job<OrganizationMemberAddedEvent | OrganizationMemberRemovedEvent>
job: Job<OrganizationMemberAddedEvent | OrganizationMemberRemovedEvent>,
token?: string
) => {
const installationId = job.data.installation?.id;

Expand All @@ -28,6 +29,7 @@ export const githubMemberSyncWorker = createWorker(
syncOrganizationMembers(installationId, job.data.organization.login),
{
job,
jobToken: token,
installationId,
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import { withDelayedRetryOnRateLimit } from "../services/github-rate-limit.servi

export const githubRepositoriesSyncWorker = createWorker(
SweetQueue.GITHUB_REPOSITORIES_SYNC,
async (job: Job<RepositoryEvent & { syncPullRequests?: boolean }>) => {
async (
job: Job<RepositoryEvent & { syncPullRequests?: boolean }>,
token?: string
) => {
const installationId = job.data.installation?.id;

if (!installationId) {
Expand All @@ -26,6 +29,7 @@ export const githubRepositoriesSyncWorker = createWorker(
),
{
job,
jobToken: token,
installationId,
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import { withDelayedRetryOnRateLimit } from "../services/github-rate-limit.servi

export const githubRepositoryPullRequestsSyncWorker = createWorker(
SweetQueue.GITHUB_SYNC_REPOSITORY_PULL_REQUESTS,
async (job: Job<{ gitInstallationId: number; repository: Repository }>) => {
async (
job: Job<{ gitInstallationId: number; repository: Repository }>,
token?: string
) => {
const installationId = job.data.gitInstallationId;

if (!installationId || !job.data.repository.name) {
Expand All @@ -23,6 +26,7 @@ export const githubRepositoryPullRequestsSyncWorker = createWorker(
),
{
job,
jobToken: token,
installationId,
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import { withDelayedRetryOnRateLimit } from "../services/github-rate-limit.servi
export const syncCodeReviewWorker = createWorker(
SweetQueue.GITHUB_SYNC_CODE_REVIEW,
async (
job: Job<PullRequestReviewSubmittedEvent | PullRequestReviewDismissedEvent>
job: Job<PullRequestReviewSubmittedEvent | PullRequestReviewDismissedEvent>,
token?: string
) => {
const installationId = job.data.installation?.id;

Expand All @@ -34,6 +35,7 @@ export const syncCodeReviewWorker = createWorker(
() => syncCodeReviews(installationId, job.data.pull_request.node_id),
{
job,
jobToken: token,
installationId,
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ export const syncPullRequestWorker = createWorker(
syncReviews?: boolean;
initialSync?: boolean;
}
>
>,
token?: string
) => {
if (!job.data.installation?.id) {
throw new InputValidationException(
Expand Down Expand Up @@ -45,6 +46,7 @@ export const syncPullRequestWorker = createWorker(
syncPullRequest(installationId, job.data.pull_request.node_id, options),
{
job,
jobToken: token,
installationId,
}
);
Expand Down

0 comments on commit c4331b6

Please sign in to comment.