Skip to content
目录

Promise 并发调度

ts
function schedule(limit: number) {
  /*code here*/
}

const request1 = () =>
  new Promise((resolve) => {
    setTimeout(() => {
      resolve("request1");
    }, 1000);
  });
const request2 = () =>
  new Promise((resolve) => {
    setTimeout(() => {
      resolve("request2");
    }, 6000);
  });
const request3 = () =>
  new Promise((resolve) => {
    setTimeout(() => {
      resolve("request3");
    }, 3000);
  });
const request4 = () =>
  new Promise((resolve) => {
    setTimeout(() => {
      resolve("request4");
    }, 4000);
  });

const time = +new Date();
const addTask = schedule(2);

addTask(request1).then((res) => {
  console.log(+new Date() - time, res);
});
addTask(request2).then((res) => {
  console.log(+new Date() - time, res);
});
addTask(request3).then((res) => {
  console.log(+new Date() - time, res);
});
addTask(request4).then((res) => {
  console.log(+new Date() - time, res);
});
/**
 * 1s request1
 * 4s request3
 * 6s request2
 * 8s request4
 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

审题:

  1. 每个addTask函数返回的都是一个Promise

自信直接先return new Promise()<后面我们讲这个Promise称为——returnPromise>再说

  1. 每个Promise不是传入后立刻进行执行的

很明显我们需要来让每一个addTaskreturnPromise状态被传入的task Promise<后面我们统称——promiseCreator>控制,用闭包创建一个tasks数组来存储tasks: {promiseCreator, resolve, reject}[],分别是:需要被兑现的Promise,对应returnPromiseresolve/reject

  1. 每个Promise是否能够执行需要满足条件:按传入顺序排列兑现,同时并发的数量小于限制数量

并发调度按照传入的先后顺序来,我们先全部推入tasks数组中,每次从头读取limittask进行执行,判断promiseCreator()状态改变时,让其对应returnPromise调用对应的resolve/reject方法,从而改变returnPromise状态完成returnPromise的兑现

我们声明一个alive变量和一个requestTask函数,前者用来记录当前有几个task正在并发执行,后者用于从tasks数组中读取limit-alivetask执行

requestTask执行后,如果当前执行的Promise有被兑现了的,在then方法的第一个回调函数中令alive--,然后重新调用requestTask函数读取新的task即可

代码实现:

ts
type Task = {
  promiseCreator: () => Promise<unknown>;
  resolve: (value: unknown) => void;
  reject: (reason: unknown) => void;
};

function schedule(limit: number) {
  const tasks: Task[] = [];
  let alive = 0;
  function requestTask() {
    if (!tasks.length) return;
    while (alive < limit && tasks.length) {
      const { promiseCreator, resolve, reject } = tasks.shift() || {};
      promiseCreator &&
        promiseCreator()
          .then((value) => {
            resolve && resolve(value);
            alive--;
            requestTask();
          })
          .catch((reason) => {
            reject && reject(reason);
          });
      alive++;
    }
  }
  return (promiseCreator: () => Promise<unknown>): Promise<unknown> => {
    return new Promise((resolve, reject) => {
      tasks.push({ promiseCreator, resolve, reject });
      requestTask();
    });
  };
}

const request1 = () =>
  new Promise((resolve) => {
    setTimeout(() => {
      resolve("request1");
    }, 1000);
  });
const request2 = () =>
  new Promise((resolve) => {
    setTimeout(() => {
      resolve("request2");
    }, 6000);
  });
const request3 = () =>
  new Promise((resolve) => {
    setTimeout(() => {
      resolve("request3");
    }, 3000);
  });
const request4 = () =>
  new Promise((resolve) => {
    setTimeout(() => {
      resolve("request4");
    }, 4000);
  });

const time = +new Date();
const addTask = schedule(2);

addTask(request1).then((res) => {
  console.log(+new Date() - time, res);
});
addTask(request2).then((res) => {
  console.log(+new Date() - time, res);
});
addTask(request3).then((res) => {
  console.log(+new Date() - time, res);
});
addTask(request4).then((res) => {
  console.log(+new Date() - time, res);
});
/**
 * 1s request1
 * 4s request3
 * 6s request2
 * 8s request4
 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
ts
/**更加优雅的写法*/
function schedule(limit: number) {
  const resolveQueue: ((value: unknown) => void)[] = [];
  let alive = 0;
  return async (promiseCreator: () => Promise<unknown>) => {
    alive >= limit &&
      (await new Promise((resolve) => resolveQueue.push(resolve)));
    alive++;
    try {
      const res = await promiseCreator();
      alive--;
      resolveQueue.length && resolveQueue.shift()!(res);
      return res;
    } catch (error) {
      return error;
    }
  };
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

Released under the MIT License.