Noah's ARQ
Published on by Jonathan Maiorana
Trying my hand at making an Async Request Queue and using it to power an interactive demo.
How it Started
Some time ago ThePrimeagen mentioned that creating an Async Request Queue was one of their standard interview questions. I've heard those words before and even think I know what they mean. After a quick search to confirm what an Async Request Queue is, I wanted to try my hand at building one. I thought it should be a pretty straightforward exercise and if I got it working I could even build an interactive demo for people to play with. Lets get to it.
I Just Want to See the Demo
If you aren't interested in the details, you can check out the interactive demo here: Async Request Queue demo.
My First Attempt
I started with outlining how the ARQ should work. Here's what I came up with:
- Class that can be instantiated
- Constructor optionally takes an initial array of Promises
- Public method to start processing the current queue
- Public method to enqueue new items, even while the queue is running
- Public method to set the max concurrent items
- The ARQ will only let a specified number of items run concurrently
Looks like a good start so I got to coding up the first version.
The following examples are approximations of my attempts. I lost a lot of the meandering, intermediate versions since I was just noodling around and not committing every small change.
Here's what I came up with to start:
// Version 1
export default class AsyncRequestQueue {
#queue = [];
#executionQueue = [];
#maxSimultaneousItems = 3;
#currentItemsRunning = 0;
constructor(initialItems) {
this.#queue = [...initialItems];
}
#runItem(item) {
return new Promise( async (resolve, reject) => {
let retVal;
try {
retVal = await item();
this.#currentItemsRunning--;
await this.process();
resolve(retVal);
} catch(error) {
this.#currentItemsRunning--;
await this.process();
reject(error);
}
});
}
process() {
return new Promise( (resolve, reject) => {
resolve( new Promise( async (resolve, reject) => {
if(
this.#queue.length > 0
&& this.#currentItemsRunning < this.#maxSimultaneousItems
) {
let nextItem = this.#queue.shift();
if(nextItem) {
this.#currentItemsRunning++;
this.#executionQueue.push(this.#runItem(nextItem));
await this.process();
}
}
if(this.#queue.length === 0 ) {
resolve(this.#executionQueue);
}
}));
});
}
startProcessing() {
return this.process();
}
enqueue(newItem) {
this.#queue.push(newItem);
}
setMaxSimultaneousItems(newMax = 3) {
this.#maxSimultaneousItems = newMax;
}
}
The initial shape of the class is there, and at first glance this appears to work. When you call startProcessing() the initial queue of items starts running, with no more than #maxSimultaneousItems running at once.
However, there are several issues with this implementation. In no specific order they are:
- Queue items that reject are not handled in any way
- The top-level Promise returned by startProcessing() only settles in certain conditions
- Items added while the queue is actively processing are not always run immediately (even if there is capacity available)
There are likely other issues here but those are the main ones that I found during testing and are more than reason enough to give it another shot.
Take Two
I was having trouble wrapping my head around how to ensure the top-level (parent most) call would settle once the last child did. The implementation in the first version sets up a resursive stack of calls to #process() which each rely on #runItem() to make additional #process() calls to keep the queue running.
The way these recursive calls are setup is to blame for issue #3 above and partly to blame for issue #2. Once the #runItem() calls recurse and make calls to #process(), they become detached from their parent context and thus aren't blocking it from returning. So the parent most Promise is free to settle before the queue exhausts all items or even to never settle.
At this point I realized I wanted this to work like Promise.allSettled(), except it should only run so many items concurrently and can be modified while running. Well then why not just jam an allSettled() in there somewhere? That should fix it right? Right? Well no. I'm kind of floundering at this point, but here is what I came up with for the second attempt (complete with a random Promise.race() because reasons...):
// Version 2
export default class AsyncRequestQueue {
#queue = [];
#executionQueue = [];
#maxSimultaneousItems = 3;
#currentItemsRunning = 0;
constructor(initialItems) {
this.#queue = [...initialItems];
}
#runItem(item) {
return new Promise( async (resolve, reject) => {
let retVal;
try {
retVal = await item();
this.#currentItemsRunning--;
await this.#process();
resolve(retVal);
} catch(error) {
this.#currentItemsRunning--;
await this.#process();
reject(error);
}
});
}
#process() {
if(this.#queue.length === 0 ) {
return;
}
if(this.#currentItemsRunning < this.#maxSimultaneousItems) {
this.#executionQueue.push(this.#runItem(this.#queue.shift()));
this.#currentItemsRunning++;
this.#process();
}
return Promise.race(this.#executionQueue);
}
async startProcessing() {
await this.#process();
return Promise.allSettled(this.#executionQueue);
}
enqueue(newItem) {
this.#queue.push(newItem);
}
setMaxSimultaneousItems(newMax = 3) {
this.#maxSimultaneousItems = newMax;
}
}
This version is much like the previous except it has some random Promise junk sprinkled on top. I also moved the terminal conditional to the top of #process() because clearly that's the main problem here. It goes without saying that this version doesn't really work either. Sometimes it almost seems to work correctly. Other times, it never settles or settles before all the queue items are complete just like the previous version. Although unlike the previous version this one introduces a new unique bug: under certain conditions it will allow more than #maxSimultaneousItems to run. Nice. Nailed it.
This version also relies on mutating the array passed to Promise.allSettled() which is probably not a good idea. I figured it was time to throw out this hot pile of garbage and try again.
Final Version
I bashed my head against the problem some more and eventually came up with something that achieves all of the original goals, has none of the bugs encountered so far, and hits some new requirements not originally defined.
Here it is in all its glory:
// Version 3
export default class AsyncRequestQueue {
#queue = [];
#executionQueue = [];
#maxSimultaneousItems = 3;
#currentItemsRunning = 0;
#processCompleteHandler = () => {};
#processPromise = null;
#isProcessing = false;
constructor(initialItems) {
if(
initialItems
&& typeof initialItems[Symbol.iterator] === 'function'
) {
this.#queue = [...initialItems];
}
}
#runItem(item) {
return new Promise( async (resolve, reject) => {
let retVal;
try {
retVal = await item();
this.#currentItemsRunning--;
await this.#process();
resolve(retVal);
} catch(error) {
this.#currentItemsRunning--;
await this.#process();
reject(error);
}
});
}
#process() {
if(
this.#queue.length > 0
&& this.#currentItemsRunning < this.#maxSimultaneousItems
) {
this.#runItem(
this.#queue.shift()
).then(
(result) => {
this.#executionQueue.push(
{
'status': 'fulfilled',
'value': result,
}
);
}).catch(
(error) => {
this.#executionQueue.push(
{
'status': 'rejected',
'reason': error,
}
);
}).finally(
() => {
if(
this.#queue.length === 0
&& this.#currentItemsRunning == 0
) {
this.#processCompleteHandler();
return;
}
}
);
this.#currentItemsRunning++;
this.#process();
}
}
startProcessing() {
if(this.#isProcessing === true) {
//Allow calling multiple times while processing?
return this.#processPromise;
// Or reject?
//return Promise.reject('Already processing');
}
if(
this.#isProcessing === false
&& this.#queue.length === 0
) {
this.#processCompleteHandler();
return this.#processPromise ?? Promise.resolve([]);
}
this.#isProcessing = true;
this.#processPromise = new Promise((resolve, reject) => {
this.#processCompleteHandler = () => {
this.#isProcessing = false;
resolve(this.#executionQueue);
};
this.#process();
});
return this.#processPromise;
}
enqueue(newItem) {
this.#queue.push(newItem);
if(this.#isProcessing) {
this.#process();
}
}
setMaxSimultaneousItems(newMax = 3) {
this.#maxSimultaneousItems = newMax;
if(this.#isProcessing) {
this.#process();
}
}
}
This version has the same shape as the previous version but works a bit differently. Instead of relying on Promise.allSettled() this version tries to recreate its functionality.
startProcessing() creates a Promise and within it a function to resolve the Promise. This resolving function is assigned to a class variable, #processCompleteHandler. The top-level Promise itself is also assigned to a class variable, #processPromise. In this way, whenever a call to #process() hits the terminal conditional (the queue length is zero), it can call #processCompleteHandler to resolve the top-level promise. This means a call to startProcessing() returns a Promise that only resolves once the last queue item completes. Exactly what we want.
Since we also capture the top-level promise (in #processPromise), if we call startProcessing() while the queue is already running, we can just return the same Promise again. So multiple calls to startProcessing() are fine and they will all resolve with the same Promise. This is looking promising.
Because it is safe to call #process() whenever we want, we can make the enqueue() and setMaxSimultaneousItems() methods call #process(). Now whenever we add new items they will start processing right away (up to the current limit). In the same way, whenever we change the #maxSimultaneousItems limit it will affect the queue immediately.
Additionally, in this version the top-level Promise resolves with an object like the one given by Promise.allSettled(). This version also handles items rejecting and returns them as 'rejected' instead of 'fulfilled'.
As far as I have seen this final version does not suffer from the issues of the earlier attempts and does not appear to introduce any new ones. Finally, an implementation of an Async Request Queue that works.
Interactive Demo
To show off how it works I made a visual, interactive demo of the final Async Request Queue implementation.