Skip to content

Commit

Permalink
Support rate limits based on hosts
Browse files Browse the repository at this point in the history
  • Loading branch information
synzen committed Dec 7, 2024
1 parent a0f248a commit 99c23cc
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 5 deletions.
4 changes: 4 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ services:
- LOG_LEVEL=debug
- USER_FEEDS_START_TARGET=api
- NODE_ENV=local
- USER_FEEDS_FEED_REQUESTS_API_URL=http://feed-requests-api:5000/v1/feed-requests
- USER_FEEDS_FEED_REQUESTS_GRPC_URL=dns:///feed-requests-service:4999

user-feeds-postgres-migration:
extends:
Expand Down Expand Up @@ -210,6 +212,8 @@ services:
- BACKEND_API_USER_FEEDS_API_HOST=http://user-feeds-api:5000
- LOG_LEVEL=debug
- NODE_ENV=local
- BACKEND_API_USER_FEEDS_API_HOST=http://user-feeds-api:5000
- BACKEND_API_FEED_REQUESTS_API_HOST=http://feed-requests-api:5000
command: npm run start:local
networks:
- monitorss-default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const GetUserFeedRequestsOutputSchema = object({
.shape({
requests: array(UserFeedRequestSchema).required(),
nextRetryTimestamp: number().nullable().default(null),
feedHostGlobalRateLimit: object({
intervalSec: number().required(),
requestLimit: number().required(),
}).nullable(),
})
.required(),
}).required();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
Alert,
Badge,
Box,
Button,
Expand Down Expand Up @@ -93,6 +94,14 @@ export const RequestHistory = () => {
description={error.message}
/>
)}
{data?.result.feedHostGlobalRateLimit && (
<Alert rounded="md">
To stay in compliance with rate limits, MonitoRSS is forced to globally limit the number
of requests made to this feed&apos;s host to have a maximum of{" "}
{data.result.feedHostGlobalRateLimit.requestLimit} request(s) per{" "}
{data.result.feedHostGlobalRateLimit.intervalSec} seconds.
</Alert>
)}
{hasNoData && (
<Text color="whiteAlpha.700">
No historical requests found. This is likely because the feed has not been polled yet -
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
const messages = {
BAD_REQUEST: "Bad Request",
UNAUTHORIZED: "Unauthorized",
FORBIDDEN: "Forbidden",
INTERNAL_SERVER_ERROR: "Internal Server Error",
BAD_REQUEST:
"An unexpected issue occurred. If the error persists, please contact [email protected].",
UNAUTHORIZED:
"Unauthorized. Refresh the page, or log out and log in again. If the error persists, contact [email protected].",
FORBIDDEN:
"Access forbidden. Refresh the page, or log out and log in again. If the error persists, contact [email protected].",
INTERNAL_SERVER_ERROR:
"An unexpected issue occurred. Try refreshing the page. If the error persists, please contact [email protected].",
} as const;

const getStatusCodeErrorMessage = (statusCode: number) => {
Expand All @@ -24,7 +28,7 @@ const getStatusCodeErrorMessage = (statusCode: number) => {
return messages.BAD_REQUEST;
}

return `Internal error occurred. You may try refreshing the page. If the error persists, please contact [email protected] (status code: ${statusCode})`;
return `An unexpected issue occurred (status ${statusCode}). You may try refreshing the page. If the error persists, please contact [email protected].`;
};

export default getStatusCodeErrorMessage;
32 changes: 32 additions & 0 deletions services/feed-requests/src/cache-storage/cache-storage.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,38 @@ export class CacheStorageService {
await this.redisClient.disconnect();
}

generateKey(key: string) {
return `feed-requests:${key}`;
}

async increment(
key: string,
opts?: {
expire?: {
seconds: number;
mode: 'NX';
};
},
): Promise<number> {
const useKey = this.generateKey(key);

const multi = this.redisClient.multi().incr(useKey);

if (opts?.expire) {
multi.expire(useKey, opts?.expire.seconds);
}

const [newVal] = await multi.exec();

return newVal as number;
}

async decrement(key: string): Promise<number> {
const useKey = this.generateKey(key);

return this.redisClient.decr(useKey);
}

async setFeedHtmlContent({ key, body }: { body: string; key: string }) {
try {
await this.redisClient.set(key, body, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ interface Request {
interface Result {
requests: Request[];
nextRetryTimestamp: number | null;
feedHostGlobalRateLimit: null | {
requestLimit: number;
intervalSec: number;
};
}

export interface GetFeedRequestsOutputDto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { FeedFetcherService } from './feed-fetcher.service';
import { RequestSource } from './constants/request-source.constants';
import PartitionedRequestsStoreService from '../partitioned-requests-store/partitioned-requests-store.service';
import { PartitionedRequestInsert } from '../partitioned-requests-store/types/partitioned-request.type';
import { HostRateLimiterService } from '../host-rate-limiter/host-rate-limiter.service';

interface BatchRequestMessage {
timestamp: number;
Expand All @@ -35,6 +36,7 @@ export class FeedFetcherListenerService {
private readonly orm: MikroORM, // For @UseRequestContext decorator
private readonly em: EntityManager,
private readonly partitionedRequestsStoreService: PartitionedRequestsStoreService,
private readonly hostRateLimiterService: HostRateLimiterService,
) {
this.maxFailAttempts = this.configService.get(
'FEED_REQUESTS_MAX_FAIL_ATTEMPTS',
Expand Down Expand Up @@ -90,6 +92,15 @@ export class FeedFetcherListenerService {
let request: PartitionedRequestInsert | undefined = undefined;

try {
const { isRateLimited } =
await this.hostRateLimiterService.incrementUrlCount(url);

if (isRateLimited) {
logger.debug(`Host ${url} is rate limited, skipping`);

return;
}

const result = await this.handleBrokerFetchRequest({
lookupKey,
url,
Expand Down
10 changes: 10 additions & 0 deletions services/feed-requests/src/feed-fetcher/feed-fetcher.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { plainToClass } from 'class-transformer';
import { validateSync } from 'class-validator';
import { Metadata } from '@grpc/grpc-js';
import { ConfigService } from '@nestjs/config';
import { HostRateLimiterService } from '../host-rate-limiter/host-rate-limiter.service';

@Controller({
version: '1',
Expand All @@ -36,6 +37,7 @@ export class FeedFetcherController {
private readonly feedFetcherService: FeedFetcherService,
private readonly orm: MikroORM,
private readonly configService: ConfigService,
private readonly hostRateLimiterService: HostRateLimiterService,
) {
this.API_KEY = this.configService.getOrThrow<string>(
'FEED_REQUESTS_API_KEY',
Expand Down Expand Up @@ -63,6 +65,8 @@ export class FeedFetcherController {
})
: null;

const globalRateLimit = this.hostRateLimiterService.getLimitForUrl(dto.url);

return {
result: {
requests: requests.map((r) => ({
Expand All @@ -77,6 +81,12 @@ export class FeedFetcherController {
})),
// unix timestamp in seconds
nextRetryTimestamp: nextRetryDate ? dayjs(nextRetryDate).unix() : null,
feedHostGlobalRateLimit: globalRateLimit
? {
intervalSec: globalRateLimit.data.intervalSec,
requestLimit: globalRateLimit.data.requestLimit,
}
: null,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { ObjectFileStorageModule } from '../object-file-storage/object-file-stor
import { CacheStorageModule } from '../cache-storage/cache-storage.module';
import { PartitionedRequestsStoreModule } from '../partitioned-requests-store/partitioned-requests-store.module';
import { FeatureFlaggerModule } from '../feature-flagger/feature-flagger.module';
import { HostRateLimiterModule } from '../host-rate-limiter/host-rate-limiter.module';

@Module({
controllers: [],
Expand All @@ -21,6 +22,7 @@ import { FeatureFlaggerModule } from '../feature-flagger/feature-flagger.module'
MikroOrmModule.forFeature([Request, Response]),
PartitionedRequestsStoreModule,
FeatureFlaggerModule.forRoot(),
HostRateLimiterModule,
],
})
export class FeedFetcherModule {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { CacheStorageModule } from '../cache-storage/cache-storage.module';
import { HostRateLimiterService } from './host-rate-limiter.service';

@Module({
providers: [HostRateLimiterService],
imports: [CacheStorageModule],
exports: [HostRateLimiterService],
})
export class HostRateLimiterModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { URL } from 'node:url';
import { Injectable } from '@nestjs/common';
import { CacheStorageService } from '../cache-storage/cache-storage.service';

interface RateLimitData {
requestLimit: number;
intervalSec: number;
}

const RATE_LIMITED_HOSTS = new Map<string, RateLimitData>([
[
'data.sec.gov',
{
requestLimit: 10,
intervalSec: 2,
},
],
]);

@Injectable()
export class HostRateLimiterService {
constructor(private readonly cacheStorageService: CacheStorageService) {}

async incrementUrlCount(url: string): Promise<{ isRateLimited: boolean }> {
const applicableLimit = this.getLimitForUrl(url);

if (!applicableLimit) {
return {
isRateLimited: false,
};
}

const {
host,
data: { intervalSec, requestLimit },
} = applicableLimit;

const cacheKey = this.getCacheKeyForHost(host);

const newVal = await this.cacheStorageService.increment(cacheKey, {
expire: {
seconds: intervalSec,
mode: 'NX',
},
});

return {
isRateLimited: newVal - 1 >= requestLimit,
};
}

getLimitForUrl(url: string): null | { host: string; data: RateLimitData } {
const host = new URL(url).host;

const found = RATE_LIMITED_HOSTS.get(host);

if (!found) {
return null;
}

return { host, data: found };
}

private getCacheKeyForHost(urlHost: string) {
return `host-rate-limiter:${urlHost}`;
}
}

0 comments on commit 99c23cc

Please sign in to comment.