Skip to content

Commit

Permalink
executes asap
Browse files Browse the repository at this point in the history
  • Loading branch information
shaunpersad committed Jan 9, 2022
1 parent 307f729 commit c9b2a94
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 88 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ const throttledQueue = require('throttled-queue');
const throttle = throttledQueue(10, 1000, true); // at most make 10 requests every second, but evenly spaced.

const usernames = ['shaunpersad', 'forward-motion'];
const profiles = await Promise.all(usernames.map((username) => {
return throttle(() => {
const profiles = await Promise.all(
usernames.map((username) => throttle(() => {
return fetch(`https://api.github.com/search/users?q=${username}`);
});
}));
}))
);

const justMe = await throttle(() => fetch('https://api.github.com/search/users?q=shaunpersad'));
```
Expand All @@ -124,7 +124,7 @@ const justMe = await throttle(() => fetch('https://api.github.com/search/users?q
The package is written in Typescript and includes types by default. The `throttle` function is a generic,
and in most cases will automatically infer the right type for the result of the promise from the input.

However, you may also specify the return type when needed:
However, you may also specify the return type of the promise when needed:
```typescript
import throttledQueue from 'throttled-queue';
const throttle = throttledQueue<number>(1, 1000);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "throttled-queue",
"version": "2.0.3",
"version": "2.1.0",
"description": "Throttles arbitrary code to execute a maximum number of times per interval. Best for making throttled API requests.",
"main": "dist/throttledQueue.js",
"directories": {
Expand Down
35 changes: 24 additions & 11 deletions src/throttledQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,31 @@ function throttledQueue(
maxRequestsPerInterval = 1;
}
const queue: Array<() => Promise<void>> = [];
let lastCalled = 0;
let timeout: NodeJS.Timeout | undefined = undefined;
let lastIntervalStart = 0;
let numRequestsPerInterval = 0;
let timeout: NodeJS.Timeout | undefined;
/**
* Gets called at a set interval to remove items from the queue.
* This is a self-adjusting timer, since the browser's setTimeout is highly inaccurate.
*/
const dequeue = () => {
const threshold = lastCalled + interval;
const intervalEnd = lastIntervalStart + interval;
const now = Date.now();
/**
* Adjust the timer if it was called too early.
*/
if (now < threshold) {
if (now < intervalEnd) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
timeout && clearTimeout(timeout);
timeout = setTimeout(dequeue, threshold - now);
timeout !== undefined && clearTimeout(timeout);
timeout = setTimeout(dequeue, intervalEnd - now);
return;
}
lastIntervalStart = now;
numRequestsPerInterval = 0;
for (const callback of queue.splice(0, maxRequestsPerInterval)) {
callback.call({});
numRequestsPerInterval++;
void callback();
}
lastCalled = Date.now();
if (queue.length) {
timeout = setTimeout(dequeue, interval);
} else {
Expand All @@ -42,9 +45,19 @@ function throttledQueue(

return <Return = unknown>(fn: () => Promise<Return> | Return): Promise<Return> => new Promise<Return>(
(resolve, reject) => {
queue.push(() => Promise.resolve().then(fn).then(resolve).catch(reject));
if (!timeout) {
timeout = setTimeout(dequeue, interval);
const callback = () => Promise.resolve().then(fn).then(resolve).catch(reject);
const now = Date.now();
if (timeout === undefined && (now - lastIntervalStart) > interval) {
lastIntervalStart = now;
numRequestsPerInterval = 0;
}
if (numRequestsPerInterval++ < maxRequestsPerInterval) {
void callback();
} else {
queue.push(callback);
if (timeout === undefined) {
timeout = setTimeout(dequeue, lastIntervalStart + interval - now);
}
}
},
);
Expand Down
106 changes: 35 additions & 71 deletions test/throttledQueue.test.ts
Original file line number Diff line number Diff line change
@@ -1,145 +1,109 @@
import throttledQueue from '../src/throttledQueue';

function calculateRPMS(numRequests: number, timeStarted: number) {

return numRequests / (Date.now() - timeStarted);

}

describe('throttled-queue', function () {

it('should queue all fns', function (done) {

const requestsPerInterval = 1;
const interval = 200;
const throttle = throttledQueue(requestsPerInterval, interval);
let numRequests = 0;
const requestLimit = 100;
for (let x = 0; x < requestLimit; x++) {
void throttle(() => {
console.log('Throttling...');
numRequests++;
});
}
void throttle(() => {
if (numRequests !== requestLimit) {
throw new Error('Not all callbacks queued.');
}
done();
});
});

it('should queue the fn within the interval', function (done) {

it('should queue all fns', function () {
const requestsPerInterval = 1;
const interval = 200;
const throttle = throttledQueue(requestsPerInterval, interval);
let lastExecuted = Date.now();

let numRequests = 0;
const requestLimit = 100;

for (let x = 0; x < requestLimit; x++) {
void throttle(() => {
console.log('Throttling...');
const now = Date.now();
const timeElapsed = now - lastExecuted;
if (timeElapsed < interval) {
throw new Error('Did not honor interval.');
}
lastExecuted = now;
numRequests++;
});
}
void throttle(() => {
return throttle(() => {
if (numRequests !== requestLimit) {
throw new Error('Not all callbacks queued.');
}
done();
});
});

it('should queue the fn and honor the interval', function (done) {
it('should queue the fn and honor the interval', function () {

const requestsPerInterval = 1;
const interval = 500;
const throttle = throttledQueue(requestsPerInterval, interval);
const timeStarted = Date.now();
const maxRpms = requestsPerInterval / interval;

let numRequests = 0;
const requestLimit = 100;
let lastIntervalStart = process.hrtime.bigint();
let numRequests = 0;
let numRequestsPerInterval = 0;

for (let x = 0; x < requestLimit; x++) {
void throttle(() => {
const rpms = calculateRPMS(++numRequests, timeStarted);
console.log(rpms, maxRpms);
if (rpms > maxRpms) {
if ((process.hrtime.bigint() - lastIntervalStart) > (interval * 1000000)) {
lastIntervalStart = process.hrtime.bigint();
numRequestsPerInterval = 0;
}
if (++numRequestsPerInterval > requestsPerInterval) {
throw new Error('Did not honor interval.');
}
numRequests++;
});
}
void throttle(() => {
return throttle(() => {
if (numRequests !== requestLimit) {
throw new Error('Not all callbacks queued.');
}
done();
});
});

it('should queue the fn and honor the interval with multiple requests per interval', function (done) {
it('should queue the fn and honor the interval with multiple requests per interval', function () {

const requestsPerInterval = 3;
const interval = 1000;
const throttle = throttledQueue(requestsPerInterval, interval);
const timeStarted = Date.now();
const maxRpms = requestsPerInterval / interval;

let numRequests = 0;
const requestLimit = 100;
let lastIntervalStart = process.hrtime.bigint();
let numRequests = 0;
let numRequestsPerInterval = 0;

for (let x = 0; x < requestLimit; x++) {
void throttle(() => {
const rpms = calculateRPMS(++numRequests, timeStarted);
console.log(rpms, maxRpms);
if (rpms > maxRpms) {
if ((process.hrtime.bigint() - lastIntervalStart) > (interval * 1000000)) {
lastIntervalStart = process.hrtime.bigint();
numRequestsPerInterval = 0;
}
if (++numRequestsPerInterval > requestsPerInterval) {
throw new Error('Did not honor interval.');
}
numRequests++;
});
}
void throttle(() => {
return throttle(() => {
if (numRequests !== requestLimit) {
throw new Error('Not all callbacks queued.');
}
done();
});
});

it('should queue the fn and honor the interval with multiple evenly spaced requests per interval', function (done) {
it('should queue the fn and honor the interval with multiple evenly spaced requests per interval', function () {

const requestsPerInterval = 3;
const interval = 1000;
const throttle = throttledQueue(requestsPerInterval, interval, true);
const timeStarted = Date.now();
const maxRpms = requestsPerInterval / interval;

let numRequests = 0;
const requestLimit = 100;
let lastIntervalStart = process.hrtime.bigint();
let numRequests = 0;
let numRequestsPerInterval = 0;

for (let x = 0; x < requestLimit; x++) {
void throttle(() => {
const rpms = calculateRPMS(++numRequests, timeStarted);
console.log(rpms, maxRpms);
if (rpms > maxRpms) {
if ((process.hrtime.bigint() - lastIntervalStart) > (interval * 1000000)) {
lastIntervalStart = process.hrtime.bigint();
numRequestsPerInterval = 0;
}
if (++numRequestsPerInterval > requestsPerInterval) {
throw new Error('Did not honor interval.');
}
numRequests++;
});
}
void throttle(() => {
return throttle(() => {
if (numRequests !== requestLimit) {
throw new Error('Not all callbacks queued.');
}
done();
});
});

Expand Down

0 comments on commit c9b2a94

Please sign in to comment.