Node.js + Redis Sorted Set实现任务队列

前端之家收集整理的这篇文章主要介绍了Node.js + Redis Sorted Set实现任务队列前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果","status": "正在异步处理中" } ,这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。

根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。

在设计任务队列的过程中需要考虑到的几个问题

  • 并行执行多个任务
  • 任务唯一性
  • 任务成功或失败后的处理

针对以上问题的解决方

  • 并行执行多个任务利用 Promise.all 来实现
  • 任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。
  • 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部

示例代码

const app = require('express')();

app.get('/',(req,res) => {
setTimeout(() => {
let arr = [200,300]; // 200 代表成功,300 代表失败需要重新请求
res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
},3000);
});

app.listen('9001',() => {
console.log('API 服务监听端口:9001');
});
// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列
'use strict';

const app = require('express')();
const redisClient = require('redis').createClient();

const QUEUE_NAME = 'queue:example';

function addTaskToQueue(taskName,callback) {
// 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列
redisClient.zscore(QUEUE_NAME,taskName,(error,task) => {
if (error) {
console.log(error);
} else {
if (task) {
console.log('任务已存在,不新增相同任务');
callback(null,task);
} else {
redisClient.zadd(QUEUE_NAME,new Date().getTime(),result) => {
if (error) {
callback(error);
} else {
callback(null,result);
}
});
}
}
});
}

app.get('/',res) => {
let taskName = req.query['task-name'];
addTaskToQueue(taskName,result) => {
if (error) {
console.log(error);
} else {
res.status(200).send('正在查询中......');
}
});
});

app.listen(9002,() => {
console.log('生产者服务监听端口:9002');
});
// consumer.js 定时获取任务并执行
'use strict';

const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');

const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // 并行执行任务数量

function getTasksFromQueue(callback) {
// 获取多个任务
redisClient.zrangebyscore([QUEUE_NAME,1,'LIMIT',PARALLEL_TASK_NUMBER],tasks) => {
if (error) {
callback(error);
} else {
// 将任务分值设置为 0,表示正在处理
if (tasks.length > 0) {
let tmp = [];
tasks.forEach((task) => {
tmp.push(0);
tmp.push(task);
});
redisClient.zadd([QUEUE_NAME].concat(tmp),tasks)
}
});
}
}
});
}

function addFailedTaskToQueue(taskName,callback) {
redisClient.zadd(QUEUE_NAME,result) => {
if (error) {
callback(error);
} else {
callback(null,result);
}
});
}

function removeSucceedTaskFromQueue(taskName,callback) {
redisClient.zrem(QUEUE_NAME,result);
}
})
}

function execTask(taskName) {
return new Promise((resolve,reject) => {
let requestOptions = {
'url': 'http://127.0.0.1:9001','method': 'GET','timeout': 5000
};
request(requestOptions,response,body) => {
if (error) {
resolve('Failed');
console.log(error);
addFailedTaskToQueue(taskName,(error) => {
if (error) {
console.log(error);
} else {

      }
    });
  } else {
    try {
      body = typeof body !== 'object' ? JSON.parse(body) : body;
    } catch (error) {
      resolve('<a href="/tag/Failed/" target="_blank" class="keywords">Failed</a>');
      console.log(error);
      add<a href="/tag/Failed/" target="_blank" class="keywords">Failed</a>TaskToQueue(taskName,result) => {
        if (error) {
          console.log(error);
        } else {

        }
      });
      return;
    }
    if (body.status !== 200) {
      resolve('<a href="/tag/Failed/" target="_blank" class="keywords">Failed</a>');
      add<a href="/tag/Failed/" target="_blank" class="keywords">Failed</a>TaskToQueue(taskName,result) => {
        if (error) {
          console.log(error);
        } else {

        }
      });
    } else {
      resolve('succeed');
      removeSucceedTaskFromQueue(taskName,result) => {
        if (error) {
          console.log(error);
        } else {

        }
      });
    }
  }
});

});
}

// 定时,每隔 5 秒获取新的任务来执行
let job = schedule.scheduleJob('/5 ',() => {
console.log('获取新任务');
getTasksFromQueue((error,tasks) => {
if (error) {
console.log(error);
} else {
if (tasks.length > 0) {
console.log(tasks);

    Promise.all(tasks.map(execTask))
    .then((results) => {
      console.log(results);
    })
    .catch((error) => {
      console.log(error);
    });

  }
}

});
});

猜你在找的Node.js相关文章