如何使用redis构建异步任务处理程序

来源:互联网 时间:1970-01-01

Tags:python,redis

2015-11-03

如果一些任务没必要马上知道结果,可以将其放入队列中,后台处理程序去处理,这同时也达到了异步的效果。本文围绕python的rq模块,介绍如何使用redis构建异步任务处理程序。

rq官方地址: http://python-rq.org/

Github: https://github.com/nvie/rq

如何安装rq 方法1:

解压源码,将 rq的目录放在PYTHONPATH变量里。然后:

export PYTHONPATH=$PYTHONPATH:/home/letian/Desktop/rq/ 方法2: $ sudo python setup.py install 方法3:

该方法是将rq安装到当前用户目录,我也是采用的该方法:

$ python setup.py install --user...Installed /home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg Processing dependencies for rq==0.5.6 ...

另外,rq会在 $HOME/.local/bin安装几个命令行工具,所以需要:

$ export PATH=$PATH:$HOME/.local/bin

注意,rq对click包有依赖,如果安装rq时出现问题,可以先安装 click。

相关资料

Installing Python Modules What is the simplest way to do a user-local install of a python package? How to install python modules without root access?

命令行工具的实现

我们看一下命令行工具的实现:

首先rq源码中的setup.py中有以下内容:

entry_points={ 'console_scripts': ['rq = rq.cli:main',# NOTE: rqworker/rqinfo are kept for backward-compatibility,# remove eventually (TODO)'rqinfo = rq.cli:info','rqworker = rq.cli:worker', ],},

很容易看懂,rq就是执行rq.cli模块下main函数,另外两个类似。

查看rqworker的代码:

$ cd ~/.local/[email protected]:~/.local/bin$ cat rqworker #!/usr/bin/python# EASY-INSTALL-ENTRY-SCRIPT: 'rq==0.5.6','console_scripts','rqworker'__requires__ = 'rq==0.5.6' import sys from pkg_resources import load_entry_pointif __name__ == '__main__': sys.exit( load_entry_point('rq==0.5.6', 'console_scripts', 'rqworker')() )

其中 rq=0.5.6代表着包 site-packages/rq-0.5.6-py2.7.egg,在这里,实际位置是 ~/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg。进入该目录,查看 EGG-INFO/entry_points.txt的内容:

$ cat EGG-INFO/entry_points.txt [console_scripts]rq = rq.cli:main rqinfo = rq.cli:info rqworker = rq.cli:worker

相关资料:

pkg resources----Entry Points为程序提供扩展点 Package Discovery and Resource Access using pkgresources 分析rq如何在redis中存储数据 编写代码

按照 rq官网给的例子写代码:

建立项目目录 python-code和文件,结构如下:

$ tree python-code/python-code/ ├── my_module.py└── test.py

my_module.py内容如下:

import requestsdef count_words_at_url(url): resp = requests.get(url) return len(resp.text.split())

test.py内容如下:

from redis import Redis from rq import Queue from my_module import count_words_at_urlq = Queue(connection=Redis()) result = q.enqueue(count_words_at_url, 'http://www.baidu.com') print result

执行 test.py,

$ python test.py <Job ca82af29-5744-4d62-afe0-3cba45b2d31d: my_module.count_words_at_url('http://www.baidu.com')> 查看redis

现在看下redis:

$ redis-cli 127.0.0.1:6379> KEYS * 1) "rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d" 2) "rq:queue:default" 3) "rq:queues"

出现三个键值对。

看看 rq:queues有什么:

127.0.0.1:6379> TYPE rq:queues set 127.0.0.1:6379> SMEMBERS rq:queues 1) "rq:queue:default"

rq:queues是一个集合,保存着有哪些队列。

rq:queue:default就是当一个队列来用了:

127.0.0.1:6379> TYPE rq:queue:default list 127.0.0.1:6379> LRANGE rq:queue:default 0 12 1) "ca82af29-5744-4d62-afe0-3cba45b2d31d"

default队列中有一个任务,任务的标识是 ca82af29-5744-4d62-afe0-3cba45b2d31d,这正好和 rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d对应。

接着看看这个job里有什么:

127.0.0.1:6379> TYPE rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d hash 127.0.0.1:6379> HGETALL rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d 1) "origin" 2) "default" 3) "status" 4) "queued" 5) "created_at" 6) "2015-11-03T02:42:35Z" 7) "enqueued_at" 8) "2015-11-03T02:42:35Z" 9) "data"10) "/x80/x02(X/x1c/x00/x00/x00my_module.count_words_at_urlq/x01NU/x14http://www.baidu.comq/x02/x85q/x03}q/x04tq/x05." 11) "description" 12) "my_module.count_words_at_url('http://www.baidu.com')" 13) "timeout" 14) "180"

这个job的信息是以hash形式存储的,奇数行是key,偶数是value。我们看下data的内容:

$ ipython>>> aa'/x80/x02(X/x1c/x00/x00/x00my_module.count_words_at_urlq/x01NU/x14http://www.baidu.comq/x02/x85q/x03}q/x04tq/x05.' >>> import pickle>>> pickle.loads(aa)(u'my_module.count_words_at_url', None, ('http://www.baidu.com',), {}) 不在项目目录中执行rqworker $ rqworker 13:38:17 RQ worker u'rq:worker:myhost.10179' started, version 0.5.6 13:38:17 13:38:17 *** Listening on default... 13:38:17 default: my_module.count_words_at_url('http://www.baidu.com') (ca82af29-5744-4d62-afe0-3cba45b2d31d) 13:38:17 ImportError: No module named my_module Traceback (most recent call last): File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/worker.py", line 568, in perform_job rv = job.perform() File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py", line 495, in perform self._result = self.func(*self.args, **self.kwargs) File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py", line 206, in func return import_attribute(self.func_name) File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/utils.py", line 150, in import_attribute module = importlib.import_module(module_name) File "/usr/lib/python2.7/importlib/__init__.py", line 37, in import_module __import__(name)ImportError: No module named my_module Traceback (most recent call last): File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/worker.py", line 568, in perform_job rv = job.perform() File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py", line 495, in perform self._result = self.func(*self.args, **self.kwargs) File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py", line 206, in func return import_attribute(self.func_name) File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/utils.py", line 150, in import_attribute module = importlib.import_module(module_name) File "/usr/lib/python2.7/importlib/__init__.py", line 37, in import_module __import__(name)ImportError: No module named my_module 13:38:17 Moving job to u'failed' queue 13:38:17 13:38:17 *** Listening on default...

执行失败,因为 No module named my_module。

注意, rqworker输出的第一行中 rq:worker:myhost.10179是该worker的标识。

我们看一下redis中内容的变化:

127.0.0.1:6379> KEYS * 1) "rq:queue:failed" 2) "rq:workers" 3) "rq:queues" 4) "rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d" 5) "rq:worker:myhost.10179"

新增加了一个 rq:queue:failed队列, rq:workers集合。

查看 rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d的变化:

127.0.0.1:6379> HGETALL rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d 1) "ttl" 2) "-1" 3) "created_at" 4) "2015-11-03T02:42:35Z" 5) "ended_at" 6) "2015-11-03T05:38:17Z" 7) "origin" 8) "default" 9) "enqueued_at"10) "2015-11-03T02:42:35Z" 11) "exc_info" 12) "Traceback (most recent call last):/n File /"/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/worker.py/", line 568, in perform_job/n rv = job.perform()/n File /"/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py/", line 495, in perform/n self._result = self.func(*self.args, **self.kwargs)/n File /"/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py/", line 206, in func/n return import_attribute(self.func_name)/n File /"/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/utils.py/", line 150, in import_attribute/n module = importlib.import_module(module_name)/n File /"/usr/lib/python2.7/importlib/__init__.py/", line 37, in import_module/n __import__(name)/nImportError: No module named my_module/n" 13) "data" 14) "/x80/x02(X/x1c/x00/x00/x00my_module.count_words_at_urlq/x01NU/x14http://www.baidu.comq/x02/x85q/x03}q/x04tq/x05." 15) "description" 16) "my_module.count_words_at_url('http://www.baidu.com')" 17) "timeout" 18) "180" 19) "status" 20) "failed"

可以看到出现了新的键值对,例如 ended_at、 exc_info。 status的值也发生了变化:由 queued变成了 failed。

查看 rq:workers中的内容:

127.0.0.1:6379> TYPE rq:workers set 127.0.0.1:6379> SMEMBERS rq:workers 1) "rq:worker:myhost.10179"

有一个worker,其中myhost是hostname,10179是该worker的pid,查看其信息:

127.0.0.1:6379> TYPE rq:worker:myhost.10179 hash127.0.0.1:6379> HGETALL rq:worker:myhost.10179 1) "birth" 2) "2015-11-03T05:38:17Z" 3) "queues" 4) "default" 5) "state" 6) "idle" 7) "current_job" 8) "ca82af29-5744-4d62-afe0

查看队列 rq:queue:default中的内容:

127.0.0.1:6379> LRANGE rq:queue:default 0 12 (empty list or set)

查看队列 rq:queue:failed中的内容:

127.0.0.1:6379> LRANGE rq:queue:failed 0 12 1) "ca82af29-5744-4d62-afe0-3cba45b2d31d" 应该这样执行

把上面运行的rqworker停掉,清空redis:

127.0.0.1:6379> FLUSHALL OK

重新在redis中添加job:

$ python test.py <Job 4122eed3-d521-4738-8cf6-20d9501d0ab0: my_module.count_words_at_url('http://www.baidu.com')> 然后,在项目目录下执行rqworker: $ rqworker 20:17:21 RQ worker u'rq:worker:myhost.27283' started, version 0.5.6 20:17:21 20:17:21 *** Listening on default... 20:17:21 default: my_module.count_words_at_url('http://www.baidu.com') (4122eed3-d521-4738-8cf6-20d9501d0ab0) 20:17:22 Job OK

查看redis:

# rqworker运行之前127.0.0.1:6379> KEYS * 1) "rq:job:4122eed3-d521-4738-8cf6-20d9501d0ab0" 2) "rq:queue:default" 3) "rq:queues"# rqworker运行之后127.0.0.1:6379> KEYS * 1) "rq:worker:myhost.27283" 2) "rq:workers" 3) "rq:finished:default" 4) "rq:queues" 5) "rq:job:4122eed3-d521-4738-8cf6-20d9501d0ab0"127.0.0.1:6379> HGETALL rq:job:4122eed3-d521-4738-8cf6-20d9501d0ab0 1) "ttl" 2) "-1" 3) "created_at" 4) "2015-11-03T12:17:12Z" 5) "ended_at" 6) "2015-11-03T12:17:22Z" 7) "origin" 8) "default" 9) "enqueued_at"10) "2015-11-03T12:17:12Z" 11) "result" 12) "/x80/x02K-." 13) "data" 14) "/x80/x02(X/x1c/x00/x00/x00my_module.count_words_at_urlq/x01NU/x14http://www.baidu.comq/x02/x85q/x03}q/x04tq/x05." 15) "timeout" 16) "180" 17) "description" 18) "my_module.count_words_at_url('http://www.baidu.com')" 19) "status" 20) "finished"# zset是有序集合127.0.0.1:6379> TYPE rq:finished:default zset127.0.0.1:6379> ZRANGE rq:finished:default 0 -1 1) "4122eed3-d521-4738-8cf6-20d9501d0ab0"

可以看到job的 status是 finished; result是 /x80/x02K-.,用pickle处理这个值:

>>> import pickle>>> pickle.loads("/x80/x02K-.")45

注意,可以同时执行多个 rqworker。

rq的设计思路 redis中如何存储一个job

一个job,在这里就是一个函数或者可执行对象。

job的id由uuid模块生成,加上固定前缀 rq:job:,作为该job在redis中的key。value是hash类型,同样以键值对保存该job的信息。一个job的信息包括:入队时间,job使用的函数(或者可执行对象)本身的名称、所属的module名称,函数(或者可执行对象)的参数,状态(排队中、完成、失败等),执行结果等。这些信息有些是直接保存,有些用pickle模块处理之后才保存。

如何将job添加到队列

假设待处理job队列只有一个,即默认的 rq:queue:default,这个队列名称同时也是它在redis中的键,对应的值是list类型,在这个list中记录job的id即可。

job如果执行失败,则将其记录到失败队列(即 rq:queue:failed)中。如果执行成功,则记录到 rq:finished:default对应的有序集合中。

rqworker命令如何工作

rqworker从给定的队列中取出一个job,提取job的信息,其中最重要的是:

job使用的函数(或者可执行对象)所属的module名称 job使用的函数(或者可执行对象)本身的名称 job使用的函数(或者可执行对象)的参数

job基本是我们自己写的,都是放在当前项目下的某个模块中,rqworker使用 importlib库导入模块,使用 getattr函数从模块中拿到job使用的函数(或者可执行对象),然后就可以执行任务了。也因为这个原因,rqworker命令需要在当前项目的目录中执行。

rqworker在处理完一个job后,将该job放入成功队列或者失败队列。

rqworker在执行job时候会fork一个子进程去处理job,但是父进程会等待子进程完成,所以认为rqworker是串行执行的。若要加快处理速度,可以执行多个rqworker。

如果要在多台主机上去处理一个任务队列执行rqworker,这些主机都应该部署项目源码。

关于rq的源码

可以到github上下载rq的源码。源码中最主要的是三个文件: queue.py, worker.py, job.py。 如果要阅读,不建议从细节下手,建议由顶向下。 local.py中的代码也很有趣。 我对部分代码做了注释,放在了 https://github.com/pastlink/rq-source。

python、redis基础

以下是阅读源码时候遇到的一些基本概念,很基础,但是我之前了解/熟练。

python操作redis $ sudo pip install redis import redis r = redis.StrictRedis(host='localhost', port=6379, db=0) r.set('foo', 'bar') r.get('foo')

示例2:

import redis r = redis.StrictRedis(host='localhost', port=6300, db=0) r.set(r, 'bar') print r print r.get(r)

运行结果输出:

StrictRedis<ConnectionPool<Connection<host=localhost,port=6300,db=0>>> bar

看下redis:

$ redis-cli -h 127.0.0.1 -p 6300127.0.0.1:6300> KEYS * 1) "StrictRedis<ConnectionPool<Connection<host=localhost,port=6300,db=0>>>" 2) "name" 127.0.0.1:6300> exit python __slots__

默认情况下每个类都会有一个dict,通过 dict访问,这个dict维护了这个实例的所有属性。slots的作用是阻止在实例化类时为实例分配dict。具体见 http://blog.csdn.net/tianqio/article/details/2374086。

python thread下的get_ident()函数

返回当前线程的标识,是一个非0的整数。

Return the ‘thread identifier’ of the current thread. This is a nonzero integer. Its value has no direct meaning; it is intended as a magic cookie to be used e.g. to index a dictionary of thread-specific data. Thread identifiers may be recycled when a thread exits and another thread is created.

python property函数

摘自 http://blog.csdn.net/yatere/article/details/6658457。

就是为类定义一个属性。

class C(object): def __init__(self): self._x = Nonedef getx(self): print "get x";return self._xdef setx(self, value): print "set x"; self._x = valuedef delx(self): print "del x";del self._xx = property(getx, setx, delx, "I'm the 'x' property.")

使用

>>> t=C()>>> t.xget x >>> t.x="en"set x >>> print t.xget x en >>> del t.xdel x >>> t.xget x python property装饰器 class C(object): def __init__(self):self.__x = [email protected] x(self):return self.__xif __name__ == "__main__": c = C()print c.x # 10c.x = 10# 报错 AttributeError: can’t set attribute python all函数的用法 >>> print all.__doc__all(iterable) -> boolReturn True if bool(x) is True for all values x in the iterable. If the iterable is empty, return True. >>> all([1==1, 'a' != 'b'])True function模块下的partial函数

https://docs.python.org/2/library/functools.html#functools.partial

举个例子,先看一下int函数的用法:

>>> print int.__doc__int(x=0) -> int or long int(x, base=10) -> int or longConvert a number or string to an integer, or return 0 if no arguments are given. If x is floating point, the conversion truncates towards zero. If x is outside the integer range, the function returns a long instead.If x is not a number or if base is given, then x must be a string or Unicode object representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal. >>> int('0b100', base=0)4 >>> from functools import partial>>> basetwo = partial(int, base=2)>>> basetwo.__doc__ = 'Convert base 2 string to an int.'>>> basetwo('10010')18 function模块下的total_ordering数

见 https://docs.python.org/2/library/functools.html#functools.total_ordering。

是一个装饰器,使相同类的对象之间可以使用各种比较符号。

python相对导入和绝对导入

Python Relative and Absolute Import

python signal SIG

DFL:默认信号处理程序。

IGN:忽略信号的处理程序。

SIGALRM是在定时器终止时发送给进程的信号。

signal.alarm函数。将signal文档, signal — Set handlers for asynchronous events。

signal.alarm(time) If time is non-zero, this function requests that a SIGALRM signal be sent to the process in time seconds. Any previously scheduled alarm is canceled (only one alarm can be scheduled at any time). The returned value is then the number of seconds before any previously set alarm was to have been delivered. If time is zero, no alarm is scheduled, and any scheduled alarm is canceled. If the return value is zero, no alarm is currently scheduled. (See the Unix man page alarm(2).) Availability: Unix.

python from contextlib import contextmanager

见 python中关于with及contextlib的用法。

示例:

from contextlib import [email protected] make_context() : print 'enter'try :yield {}except RuntimeError, err :print 'error' , errfinally :print 'exit'with make_context() as value : print value

运行结果:

enter {}exit python __all__

__all__可用于模块导入时限制,如: from module import *

此时被导入模块若定义了 all属性,则只有all内指定的属性、方法、类可被导入~ 若没定义,则模块内的所有将被导入。

见 http://leequery.blog.163.com/blog/static/16842209620117184345180/。

python logging.config.dictConfig

https://docs.python.org/2/library/logging.config.html#logging.config.dictConfig

python: complete example of dict for logging.config.dictConfig?

python 类方法、静态方法

Python中类方法和静态方法

python importlib

用于导入字符串指定的模块。

importlib – Convenience wrappers for __import__() python __name__和__module__ >>> import pickle as pk>>> pk.load.__name__'load' >>> pk.load.__module__'pickle' >>> int.__module__'__builtin__' >>> __builtin__.int(1.23)1 python inspect模块



相关阅读:
Top