priorityQueue.js

  1. import isArray from 'lodash/isArray';
  2. import noop from 'lodash/noop';
  3. import setImmediate from './setImmediate';
  4. import queue from './queue';
  5. /**
  6. * The same as [async.queue]{@link module:ControlFlow.queue} only tasks are assigned a priority and
  7. * completed in ascending priority order.
  8. *
  9. * @name priorityQueue
  10. * @static
  11. * @memberOf module:ControlFlow
  12. * @method
  13. * @see [async.queue]{@link module:ControlFlow.queue}
  14. * @category Control Flow
  15. * @param {AsyncFunction} worker - An async function for processing a queued task.
  16. * If you want to handle errors from an individual task, pass a callback to
  17. * `q.push()`.
  18. * Invoked with (task, callback).
  19. * @param {number} concurrency - An `integer` for determining how many `worker`
  20. * functions should be run in parallel. If omitted, the concurrency defaults to
  21. * `1`. If the concurrency is `0`, an error is thrown.
  22. * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two
  23. * differences between `queue` and `priorityQueue` objects:
  24. * * `push(task, priority, [callback])` - `priority` should be a number. If an
  25. * array of `tasks` is given, all tasks will be assigned the same priority.
  26. * * The `unshift` method was removed.
  27. */
  28. export default function(worker, concurrency) {
  29. // Start with a normal queue
  30. var q = queue(worker, concurrency);
  31. // Override push to accept second parameter representing priority
  32. q.push = function(data, priority, callback) {
  33. if (callback == null) callback = noop;
  34. if (typeof callback !== 'function') {
  35. throw new Error('task callback must be a function');
  36. }
  37. q.started = true;
  38. if (!isArray(data)) {
  39. data = [data];
  40. }
  41. if (data.length === 0) {
  42. // call drain immediately if there are no tasks
  43. return setImmediate(function() {
  44. q.drain();
  45. });
  46. }
  47. priority = priority || 0;
  48. var nextNode = q._tasks.head;
  49. while (nextNode && priority >= nextNode.priority) {
  50. nextNode = nextNode.next;
  51. }
  52. for (var i = 0, l = data.length; i < l; i++) {
  53. var item = {
  54. data: data[i],
  55. priority: priority,
  56. callback: callback
  57. };
  58. if (nextNode) {
  59. q._tasks.insertBefore(nextNode, item);
  60. } else {
  61. q._tasks.push(item);
  62. }
  63. }
  64. setImmediate(q.process);
  65. };
  66. // Remove unshift function
  67. delete q.unshift;
  68. return q;
  69. }