2.5 KiB
2.5 KiB
promise all 并发限制
async-pool、es6-promise-pool、p-limit
async-pool的代码:
function asyncPool(poolLimit, array, iteratorFn) {
let i = 0;
const ret = [];
const executing = [];
const enqueue = function () {
// 边界处理,array为空数组
if (i === array.length) {
return Promise.resolve();
}
// 每调一次enqueue,初始化一个promise
const item = array[i++];
const p = Promise.resolve().then(() => iteratorFn(item, array));
// 放入promises数组
ret.push(p);
// promise执行完毕,从executing数组中删除
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
// 插入executing数字,表示正在执行的promise
executing.push(e);
// 使用Promise.rece,每当executing数组中promise数量低于poolLimit,就实例化新的promise并执行
let r = Promise.resolve();
if (executing.length >= poolLimit) {
r = Promise.race(executing);
}
// 递归,直到遍历完array
return r.then(() => enqueue());
};
return enqueue().then(() => Promise.all(ret));
}
因为是promise加上递归,所以在代码注释上不太好标注执行顺序,但是大概的逻辑可以总结为:
- 从
array
第1个元素开始,初始化promise
对象,同时用一个executing
数组保存正在执行的promise - 不断初始化promise,直到达到
poolLimt
- 使用
Promise.race
,获得executing
中promise的执行情况,当有一个promise执行完毕,继续初始化promise并放入executing
中 - 所有promise都执行完了,调用
Promise.all
返回
使用方式就是:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
return asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => {
...
});
Promise.all(iterable)
方法返回一个Promise
实例,此实例在iterable
参数内所有的promise
都"完成(resolved)"或参数中不包含promise
时回调完成(resolve);如果参数中promise
有一个失败(rejected),此实例回调失败(reject),失败原因的是第一个失败promise
的结果