Orchestrating Callbacks

来源:互联网 时间:1970-01-01

This is the second article from a new series about Node.js.We began by covering by covering the fundamental pattern for controlling flow in Node.js: the callback. In this new article we'll discuss how we can further use this basic pattern to coordinate asynchronous operations. I hope you enjoy it!

Some of the code samples in this article you can find in this github repo.

- Pedro Teixeira, CTO, YLD!

Orchestrating callbacks

It's not only Node core API that uses the callback pattern: a multitude of third-party libraries that live on NPM also use this pattern. This allows you to use a flow-control library like asyncto compose them in almost any way you would want.

To use async you must install it in the root directory of your code using:

$ npm install async Series

The simplest case of I/O orchestration is the case where you want to chain a series of I/O calls together, one after another, and interrupt them if an error occurs.

This example runs two operations, each of them removing one file:

async _series_1.js:

const async = require('async'); const fs = require('fs');var work = [removeFile1, removeFile2];function removeFile1(cb) { fs.unlink('./file1.txt', cb);}function removeFile2(cb) { fs.unlink('./file2.txt', cb);}async.series(work, done);function done(err) { if (err) throw err; console.log('done');}

The async.seriesfunction gets two arguments: a first argument with an array of functions that are to be invoked in sequence. These functions (our removeFile1and removeFile2) have to have the same signature: a callback function as the first and sole argument. Then the second argument of async.seriesis a final callback that gets called when all of the functions terminate, or when one function gets an error.

If you place two files named file1.txtand file2.txtin the current directory, you will be able to run this without an error:

$ node async_series_1.js

If you run this again you should get an error on the donecallback because the file1.txtwas removed:

if (err) throw err; ^Error: ENOENT, unlink './file1.txt'

Here you can see that the series flow breaks the first time an error occurs, and in this case the final callback function ( done) gets called with an error in the first arguments.

Parallel

If the I/O you're doing is independent, you can reduce the total time a set of operations take by doing them in parallel.

const async = require('async'); const fs = require('fs');var work = [removeFile1, removeFile2];function removeFile1(cb) { fs.unlink('./file1.txt', cb);}function removeFile2(cb) { fs.unlink('./file2.txt', cb);}async.parallel(work, done);function done(err) { if (err) throw err; console.log('done');}

The only thing we changed here was to replace async.serieswith async.parallelto cause these two I/O operations to be performed in parallel.

If one of these operations fails, the first one to fail triggers the donecallback with an error. If any of the pending calls finish, the donecallback doesn't get called. (Remember: a callback function is never called twice; this is part of the contract.)

This is one of the drawbacks of using I/O in parallel instead of in series: you're only able to handle the first error that happens. If you need to handle each one of them, either avoid parallel calls using async.seriesor have one custom callback to handle the error on each individual operation.

Waterfall

The reason for using async.serieswould be that the I/O calls were dependent. Imagine that first you need to fetch a value from a remote system like a database server in order to be able to create the next request. Here is such an example:

function befriend(user1, user2, cb) { var alreadyFrends = false; async.series([findIfAlreadyFriended, friend], cb); function findIfAlreadyFriended(cb) { Friends.exists({from: user1, to: user2}, function(err, areFriends) { if (err) { cb(err); } else { alreadyFriends = areFriends; } }); } function friend(cb) { if (alreadyExists) { return cb(); } Friends.insert({from: user1, to: user2}, cb); }}

Notice that we're using the befriendthird argument, a callback function, as a direct last argument of the async.seriescall. Here you can already see a big advantage of the callback pattern: you don't have to adapt between different function signature patterns – you can pass the callback function directly as an argument.

This befriendfunction creates a "friend" link in a remote database. It's using async.seriesto compose these two functions: the first finds out whether the link already exists, and the second creates the friendship link, but only if that link doesn't already exist.

The findIfAlreadyFriendedfunction then has to check the remote system to find out if such a link exists, and update the global alreadyFriendsBoolean variable. This Boolean variable is then used in the next function in list, the friendfunction, to decide whether to do the link insertion or to callback immediately.

We designed this befriendfunction to not error out if such a friend link already exists. You could also design it to callback with an error, which would break the async.seriesflow as we wanted.

You can entirely avoid keeping a shared variable between functions by using the async.waterfallmethod:

function befriend(user1, user2, cb) { async.waterfall([findIfAlreadyFriended, friend], cb); function findIfAlreadyFriended(cb) { Friends.exists({from: user1, to: user2}, cb); } function friend(alreadyExists, cb) { if (alreadyExists) return cb(); Friends.insert({from: user1, to: user2}, cb); }}

Knowing that the Friends.existsfunction already calls back with a signature of (err, exists), we use that to our advantage: we let async.waterfallpass whatever the results of the callback are into the next function, keeping the state you need in the call stack instead of the outer scope.

As in all the async flow control directives, async.waterfallalso breaks on the first error, handing it off to the final callback.

Watch here that the friendfunction always has to invoke the cbcallback no matter what. If we don't, the befriendcallback will not get invoked in some cases. This is one of the hardest parts of using callbacks: making sure they always get invoked exactly once, no matter what.

Collections

If, instead of having a set of different functions, you want to apply the same function to a set of objects, you can use async's collection iteration function utilities.

Parallel iteration

As an example let's say that, given a list of documents, you want to insert all of the documents into this remote database:

var docs = // an array of documentsasync.each(docs, insertDocument, done);function insertDocument(doc, cb) { db.insert(doc, cb);}function done(err) { if (err) { handleError(err); } else { console.log('inserted all documents'); }}

Here we see that async.eachaccepts three arguments:

The collection (a JavaScript array) The iterator function A final callback function, called when all iterations are complete, or on the first occasion that an error occurs.

Our iterator function insertDocumentwill then be called by async for each document, in parallel. This function is responsible for accepting one object as the first argument, and a callback function as the last argument; this last to be called when there is an error, or this particular operation terminates.

Given that our insertDocumentis only calling one function, we could use JavaScript Object#bindand reduce our example to:

var docs = // an array of documentsasync.each(docs, db.insert.bind(db), done);function done(err) { if (err) { handleError(err); } else { console.log('inserted all documents'); }} Parallel, but with limit

The async.eachfunction is quite handy, but is only useful if we know that the maximum length of the collection is relatively small. If the collection is too large, we risk:

overloading the receiving system. For instance if, as in this example, we're invoking this procedure on a remote system, too many of those in parallel will risk overburdening that system. Worse still, if many of these operations are done, each one when answering a client request, we multiply this effect by the number of parallel client requests. overloading Node.js memory. Each of these calls inevitably allocates some resources on our Node.js process: some structures, closures and file descriptors. These resources will eventually be closed once the operation finishes, or will be garbage-collected in the near future – but too many of these in parallel and we risk putting too memory much pressure, eventually leading to resource exhaustion. blocking the event loop. Each one of the calls to start one of these operations blocks the event for a short period of time. Multiply that period of time by the number of objects you're iterating on, and you may be blocking the event loop for a long time, possibly degrading the experience for other users currently being served by your Node.js process.

To prevent this you can either cap the collection, thus losing documents, or you can simply use async.eachLimit, which allows you to impose a limit on the number of outstanding operations:

var docs = // an array of documentsvar parallelLimit = 5;async.eachLimit(docs, parallelLimit, db.insert.bind(db), done);function done(err) { if (err) { handleError; } else { console.log('inserted all documents'); }}

By using async.eachLimithere, we're defining that we're allowing a maximum of five outstanding operations: that is, at any time, there is a maximum of five ongoing document insert calls. In this way you can help reduce the pressure on an external service, as well helping to reduce the pressure on memory and other resources.

Bear in mind, though, that this technique doesn't reduce the overallpressure that your Node.js process puts on an external service. If, for instance, you're going to perform this operation for every user request, even a local parallel limit of five like this one may not be enough, since the effect must be calculated by multiplying the parallel limit by the number of requests being served at any given time. (To reduce that, you can use other patterns such as a global queue or a connection pool).

Note that the time that it takes for a client request to complete may increase as you increasely limit the maximum outstanding parallel operations, which in turn may increase the number of pending parallel requests. This number can be fine-tuned according to your application load patterns.

In the next article of this series we'll analyze how, still leveraging the callback pattern, how you could use a work queue to further tight down and control the flow of operations.

This article was extracted from the Flow Control Patterns, a book from the Node Patterns series.



相关阅读:
Top