Skip to main content

Producer - consumer file processing

· 10 min read

My [previous post] got some questions about the front-end architecture and its potential improvements. So, this time, I am spending a bit more time within the JavaScript space with some modern front-end decisions that could be made here.

Intro

As usual, let's start with the problem. We have a bunch a files selected by the user for the upload. All files have to be split and uploaded to the server by chunks. Some configurations to consider: chunk size, variable chunk size per file, number of files to upload simultaneously.

Initial approach

Starting with a simplified version of what we had originally, the implementation boils down to two main steps. First, we need to process a list of files. Second, every file is going to have a list of chunks. Both of the processing steps cab to be asynchronous, former - because files are independent and can be processes independently, latter - becace chunks are independent as well with an additional twist that calculating one chunk starting position requires knowing previous chunk end position in the file body. Also, we cannot make all the pieces of the puzzle completely independent. Network resources are unlimited, so as the server capabilities. So how can me organize the upload by parallelizing certain pieces making sure that resources are not overloaded yet utilizing them fully. Well, when it comes to optimizations, there is no silver bullet and everything has be measured according to some real conditions. But here is what we can do. We can imagine all the file upload parts as a 2D matrix where columns represent files and rows represent chunks:

    -------------
f f f f f
c c c c c
c c c c c
c c c c
c c
c
-------------

Uploading chunks to the server would mean traversing the matrix exactly once per element. Not a difficult task, though. But it could be accomplished in a different ways. For example, we could do it by columns, top to bottom. This is the equivalent of uploading all the files fully one by one. Another example would be row based traversal from left to right. That is for uploading all files simultaneously. And the more exotic one, pattern based traversal (diagonal, spiral, etc). These upload techniques fit better situations where some files or some chunks of one file are more important and have to go first. In any case, we are forming a queue (or number of queues) and arranging elements within it according to some priority (priority queue).

To get a better understanding of what we are taking about, let's implement a straight forward approach first - uploading files fully one by one. For that one, we are going to need a facility for reading out a file chunk:

splitChunk

const splitChunk = (file, start) => {
const end = file.size;

if(start >= end) {
return {
success: false,
result: null
};
}
const chunkStart = start;
const chunkEnd = Math.min(start + chunkSize, end);

return {
success: true,
chunkEnd,
result: file.slice(chunkStart, chunkEnd)
};
}

So we read the chunk in a RORO style making sure we never cross the boundary of the upper limit.

readChunk

Next, we need a method that can process/upload these chunks one by one:

const readChunk = (fileEntry) => {
const { file, progress } = fileEntry;
const { success, chunkEnd } = splitChunk(file, progress.start);

if(success) {
progress.index++;
progress.start = chunkEnd;

// simulating HTTP call
setTimeout(() => {
readChunk(fileEntry);
}, 100)
} else {
read()
}
}

fileEntry is an object with two properties: file and progress. Progress is used to keep track of the next chunk start reading point as well as for generating chunk indexes. Since this is a pure front-end talk, we are simulating HTTP call delays by a timeout. If the reading process is exhausted, we are moving to the next file.

read

And finally, a little bit of a glue:

const read = () => {
if(files.length <= 0) {
return;
}

const file = files.pop();
const fileEntry = {
file,
progress: {
start: 0,
index: 0
},
}

readChunk(fileEntry);
}

So we take a file out of a queue and send it for processing. It does not take much time to realize that the read method is producing units of work and readChunk is consuming them.

readChunk is actually a producer and a consumer on itself. It produces a file chunk and consumes it via a simulated HTTP call forming its own queue for every file. And this is how the idea of two queues forms a matrix coverage.

Producer - consumer

Producer and consumer problem is a very well known one in computer science. It models two or more processes working against the sames resource (or a group of resources). In a simplest form, the problem could be formulated as two processes working against the same queue. One of them is a producer that adds items to the queue, another one is a consumer that removes items from the queue. Without much synchronization boilerplate, the solution looks like this:

Producer

We have a producer that generates data and signals via a stop state that there is nothing to produce anymore:

const fileQueue = [];
let stop = false;

const produce = () => {
if(files.length) {
fileQueue.push(files.pop());
}
}

const producer = () => {

const interval = setInterval(() => {

if(files.length <= 0) {
stop = true;
clearInterval(interval)
return;
}

produce();
}, 500);
}

Consumer

Consumer, on the over hand, removes data from the queue and checks if it needs to stop:

const consume = () => {
const file = fileQueue.pop();

if(file) {
console.log(file.name);
}
}

const consumer = () => {
const interval = setInterval(() => {

if(stop && fileQueue.length <= 0) {
clearInterval(interval)
return;
}

consume();
}, 500);
}

Observables

So what happens when we put two aforementioned solution together? We get something similar to what was described in a [previous post]. This time, let's make it a bit more interesting.

Observer pattern

As it is stated here:

The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods

Sounds familiar? It's because subjects could be seen as producers and observers as consumers. It's the notification part that is different because it is specific to the pattern. We are going to need something that implements this pattern, so we can recreate our initial file upload example using subjects and observers.

ReactiveX

ReactiveX is a library that has exactly what we need:

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

Since we are in a JavaScript world, the implementation we need to use is RxJs.

There are two ways to work with RxJs. One could create observables from scratch, or use higher level operators for the predefine stream manipulations. Mostly, we are going to be using operators. So where do we start? From the very bottom.

splitChunk method stays as is. We still need a way to read a chunk out of the file body. Next, we need to create a stream of all the chunks from the same file (recall that we are going vertical in the matrix):

splitChunks

const splitChunks = (file) =>
new Observable(subscriber => {
let start = 0;

while(true) {
const { success, chunkEnd } = splitChunk(file, start);

if(success) {
subscriber.next({ success, chunkEnd });
start = chunkEnd;
} else {
subscriber.complete();
break;
}
}
});

We are not using any operators for this method, just a plain Observable that notifies subscribers (subscriber.next) about new chunks arrivals or completes when no chunks available (subscriber.complete).

Now we need to combine pieces together to process chunks from files. This is where the idea of a queue of queues converges to the idea of stream of streams:

process

const process = (files) =>
forkJoin(
files.map(file =>
splitChunks(file)
.pipe(
map((chunk) =>
defer(() => from(new Promise(resolve =>
// simulating HTTP call
setTimeout(() => {

resolve(chunk);
}, 200)
)))),
concatAll()
)
)
);

So we take an array of files, map it with the splitChunks method. This gives us a stream of chunks. Next, we map every chunk via a Promise with a deferred execution (otherwise promises start to resolve sooner then subscribes need them). By making all of these manipulations we are diving deeper into the layers of streams. To get back to the surface, we have to "bubble" up observables and concatenate them together (put them into on timeline). As a result, we got a stream of all the processed chunks from all given files. To make this stream less verbose and to get only one notification when all the chunks processed, we are melting them all down to one.

Operators used in this example:

  • map - mapping chunks to promises
  • defer - deferred promise execution
  • concatAll - concatenating deeper nested observables
  • forkJoin - melting the stream down to one notification

This could have been the end of the story, but let's introduce one more twist. Uploading all the files in one go might not be feasible. It's a good practice to split files themselves into groups and upload the as such (group by group):

group

const group = (files) =>
from(files)
.pipe(
reduce((acc, cur, index) => {

if(index % 2 === 0) {
const item = [cur];
acc.result.push(item);
acc.last = item;
} else {
acc.last.push(cur);
}

return acc;

}, { result: [], last: null }),
map(x => from(x.result)),
concatAll()
);

Essentially, group takes an array of files and converts it into an observable stream where every data point is a pair of sequential elements in the initial array. For example, if we have an array on integers [1, 2, 3, 4, 5]. It gets converted to a stream of [[1, 2], [3, 4], [5]]. There are only two new operators here:

  • from - creates an observable from a given array
  • reduce - reduces a list of files into a list of pairs of files

And here is an entry point that combines all the parts together:

run

group(files)
.pipe(
map(process),
concatAll()
)
.subscribe(EMPTY)

EMPTY subscriber represents a subscriber that does not do anything. We just need it to get the process started.

Source code

Reference implementation

References

Producer and consumer

Observer pattern

ReactiveX introduction

Observable introduction

Understanding Marble Diagrams for Reactive Streams

RxJs operators