【PYthon分布式】huey:python轻量级异步任务队列简介

前端之家收集整理的这篇文章主要介绍了【PYthon分布式】huey:python轻量级异步任务队列简介前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

简介并安装

huey,a little task queue.
轻量级异步任务队列。

下载安装huey。

下载安装redis依赖(huey暂时只支持redis)。

利用huey定义并执行一些任务时,可以分成这几个文件

  • config.py: 定义使用huey的一些配置,任务的redis存储。

The first step is to configure your queue. The consumer needs to be pointed at a instance,which specifies which backend to use.

  • task.py: 定义你需要执行的一些异步任务。
  • huey_consumer.py: 开启huey consumer的入口(huey提供)。
  • huey_main.py: 执行异步任务。

config.py

实际上就是利用redis创建consumer所指向的huey实例,huey实际上包含了一个队列queue,用来存储和取出消息。

 huey  RedisHuey

huey = RedisHuey(<span class="hljs-string">'base_app',host=<span class="hljs-string">'127.0.0.1')

或者

<code class="language-javascript"><code class="javascript"><span class="hljs-keyword">from huey <span class="hljs-keyword">import RedisHuey
<span class="hljs-keyword">from redis <span class="hljs-keyword">import ConnectionPool

<span class="hljs-keyword">import settings

redis_pool = ConnectionPool(host=settings.REDIS_ADDRESS,port=settings.REDIS_PORT,db=<span class="hljs-number">0)

huey = RedisHuey(<span class="hljs-string">'base_app',connection_pool=redis_pool)

task.py

利用config.py所创建的huey来修饰普通函数使之成为huey任务。
这样就定义了一个最基本的异步任务。(base_huey.py 及上述的 config.py)

<code class="language-python"><code class="python"><span class="hljs-keyword">from base.base_huey <span class="hljs-keyword">import huey

<span class="hljs-Meta">@huey.task()
<span class="hljs-function"><span class="hljs-keyword">def <span class="hljs-title">count_beans<span class="hljs-params">(num):
print(<span class="hljs-string">'-- counted %s beans --' % num)
<span class="hljs-keyword">for n <span class="hljs-keyword">in range(num):
print(n)
<span class="hljs-keyword">return <span class="hljs-string">'Counted %s beans' % num

huey_consumer.py

之前习惯把huey包里的huey_consumer.py文件直接拿出来到主目录然后执行,新版的的huey_consumer.py与旧版的稍微有点区别。
新版本增加了一个consumer_options.py,用来定义封装了一些consumer相关的配置和命令行解析的处理类;在旧版中,这些都是直接定义在huey_consumer.py中。
查看OptionParserHandler源码可知,huey_consumer可以包含很多参数,主要分为三个group(Logging日志记录,Workers任务worker相关,Scheduler计划任务相关)。

<code class="language-ruby"><code class="ruby">    <span class="hljs-function"><span class="hljs-keyword">def <span class="hljs-title">get_option_parser<span class="hljs-params">(<span class="hljs-keyword">self):
parser = optparse.OptionParser(<span class="hljs-string">'Usage: %prog [options] '
<span class="hljs-string">'path.to.huey_instance')

    <span class="hljs-function"&gt;<span class="hljs-keyword"&gt;def</span> <span class="hljs-title"&gt;add_group</span><span class="hljs-params"&gt;(name,description,options)</span></span>:
        group = parser.add_option_group(name,description)
        <span class="hljs-keyword"&gt;for</span> abbrev,name,kwargs <span class="hljs-keyword"&gt;in</span> <span class="hljs-symbol"&gt;options:</span>
            group.add_option(abbrev,**kwargs)

    add_group(<span class="hljs-string"&gt;'Logging'</span>,<span class="hljs-string"&gt;'The following options pertain to logging.'</span>,<span class="hljs-keyword"&gt;self</span>.get_logging_options())

    add_group(<span class="hljs-string"&gt;'Workers'</span>,(
        <span class="hljs-string"&gt;'By default huey uses a single worker thread. To specify a '</span>
        <span class="hljs-string"&gt;'different number of workers,or a different execution model (such'</span>
        <span class="hljs-string"&gt;' as multiple processes or greenlets),use the options below.'</span>),<span class="hljs-keyword"&gt;self</span>.get_worker_options())

    add_group(<span class="hljs-string"&gt;'Scheduler'</span>,(
        <span class="hljs-string"&gt;'By default Huey will run the scheduler once every second to check'</span>
        <span class="hljs-string"&gt;' for tasks scheduled in the future,or tasks set to run at '</span>
        <span class="hljs-string"&gt;'specfic intervals (periodic tasks). Use the options below to '</span>
        <span class="hljs-string"&gt;'configure the scheduler or to disable periodic task scheduling.'</span>),<span class="hljs-keyword"&gt;self</span>.get_scheduler_options())

    <span class="hljs-keyword"&gt;return</span> parser

最常用的一些参数:

  • -l 指定huey异步任务执行时的日志文件(也可以通过ConsumerConfigsetup_logger()来定义logger)。
  • -w 执行器worker队列的数量
  • -k worker的类型(process,thread,greenlet,默认是thread)
  • -d 轮询队列的最短时间间隔
huey_main.py

定义需要执行huey任务的方法

<code class="language-python"><code class="python"><span class="hljs-keyword">from tasks.huey_task <span class="hljs-keyword">import count_beans

<span class="hljs-comment"># base test
<span class="hljs-function"><span class="hljs-keyword">def <span class="hljs-title">test_1<span class="hljs-params">():
count_beans(<span class="hljs-number">10) <span class="hljs-comment"># no block

count_beans.schedule(args=(<span class="hljs-number"&gt;5</span>,),delay=<span class="hljs-number"&gt;5</span>)  <span class="hljs-comment"&gt;# delay 5s</span>

res = count_beans.schedule(args=(<span class="hljs-number"&gt;5</span>,delay=<span class="hljs-number"&gt;5</span>)
<span class="hljs-comment"&gt;# res.get(blocking=True)</span>
res(blocking=<span class="hljs-keyword"&gt;True</span>)  <span class="hljs-comment"&gt;# block</span>

<span class="hljs-keyword">if name == <span class="hljs-string">'main':

test_1()

print(<span class="hljs-string"&gt;'end'</span>)

执行脚本
  1. 开启consumer轮询:python huey_consumer_new.py tasks.huey_task.huey -l logs/base_huey.log -w 1
    tasks.task.huey即上述的task.py,在此时后缀名需要替换成 .huey。
  2. 执行异步方法pyton huey_main.py

ps:官方文档中是将 huey 实例和task任务都引入到main.py中。

<pre class="hljs undefined">

main.py

from config import huey # import our "huey" object
from tasks import count_beans # import our task
if name == 'main':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans)

<code class="language-ruby"><code class="ruby">To run these scripts,follow these <span class="hljs-symbol">steps:
<span class="hljs-number">1. Ensure you have [Redis](<span class="hljs-symbol">http:/<span class="hljs-regexp">/redis.io/) running locally
<span class="hljs-number">2. Ensure you have [installed huey](<span class="hljs-symbol">http:/<span class="hljs-regexp">/huey.readthedocs.io/en<span class="hljs-regexp">/latest/installation.html<span class="hljs-comment">#installation)
<span class="hljs-number">3. Start the <span class="hljs-symbol">consumer: huey_consumer.py main.huey
(notice this is “main.huey” <span class="hljs-keyword">and <span class="hljs-keyword">not “config.huey”).
<span class="hljs-number">4. Run the main <span class="hljs-symbol">program: python main.py


<span class="hljs-comment">#####huey task简单api介绍
利用<span class="hljs-string"></span><span class="hljs-string"&gt;`@huey.task()`</span><span class="hljs-string"&gt;能来定义一些基本异步任务,当然还有其他延时任务,周期性任务等。
<span class="hljs-number">1. 延时执行:下例展示了两种延时执行的方法:第一种时直接执行延时时间n秒并传入参数;第二种是指定了eta参数,即estimated time of arrival,传入未来的某个时间点,使其在计划时间点执行。

import datetime

from tasks.huey_task import count_beans

count_beans(3) # normal

count_beans.schedule(args=(3,delay=5) # delay 5s

in_a_minute = datetime.datetime.now() + datetime.timedelta(seconds=60)
count_beans.schedule(args=(100,eta=in_a_minute)

<pre class="hljs undefined">2. 阻塞:利用block参数能够使其阻塞。

res = count_beans(100)

res.get(blocking=True)

res(blocking=True) # block

<code class="language-ruby"><code class="ruby"><span class="hljs-number">3. 异常重试:当任务出现异常时进行<span class="hljs-keyword">retry,并且可以指定重试延时时间。

from base.base_huey import huey

retry 3 times delay 5s

@huey.task(retries=3,retry_delay=5)
def try_reties_by_delay():
print('trying %s' % datetime.now())
raise Exception('try_reties_by_delay')

<code class="language-javascript"><code class="javascript"><span class="hljs-number">4. 周期性任务:利用<span class="hljs-string"></span><span class="hljs-string"&gt;`@huey.periodic_task()`</span><span class="hljs-string"&gt;来定义一个周期性任务。

from base.base_huey import huey
from huey import crontab

@huey.periodic_task(crontab(minute='*'))
def print_time():
print(datetime.now())

猜你在找的Python相关文章