前言

最近我在将一个项目由Python2转换为Python3, 项目中大部分代码已经尽量的 做了兼容, 但有两天时间, 仍然为rq版本的升级所苦恼, 我将升级过程中遇到的问题记录 一下, 如果有朋友需要可以考虑复用这部分代码.

Python2预计将于今年2020停止支持, 我经手的某个项目仍然是Python2代码, 照理来说, 升级Python3基本是件没有KPI, 并且吃力不讨好的事情. 但说真的, 我在公司其实已经很 佛系, 基本也不在乎KPI了, 开会讲过之后, 老大没有反对我去升级, 所以就把 这个项目拾掇拾掇, 毕竟我还要一直维护下去. 如果之后还有问题我也会记录在博客中, 可能rq的问题是一个开头吧, 也可能是结尾.

rq

[rq][rq]是一个Python的异步任务库, 可以通过redis的队列分发任务. 并交由worker去做. 我们项目中对rq进行了重度的依赖, 是不可能换用其他任务队列的, 只能做好单元测试后升级, 升级包括两个方面:

  1. 对rq版本的升级
  2. 对Python的升级

下面我会就这两个方面分别介绍遇到的问题与兼容方案. 原有版本到目的版本的情况如下:

项目 原有版本 目的版本
Python 2.7 3.7
redis 2.10.5 3.3.8
rq 0.6.0 1.11.0

rq版本升级中遇到的问题

unicode

在Python2中用户显式指定(u'')的才是unicode字符串, 在0.6.0中, rq甚至没有加入 对exception中中文的处理, 自从在0.8.0中已经增加了_get_safe_exception_string 在Python3对unicode更为友好, 这个升级比较平滑.

1
2
3
4
5
6
7
8
9
10
11
12
# 这是针对用户代码中重新实现_get_safe_exception_string的修改, 如果你的版本比0.8.0大,
# 没必要增加这部分.
def _get_safe_exception_string(self, exc_strings):
"""Ensure list of exception strings is decoded and
joined as one string safely.
"""
if rq.__version__ < '1.0':
exc_strings = map(
lambda exc: exc if PY3 else exc.decode("utf-8"), exc_strings)
return ''.join(exc_strings)
else:
return Worker._get_safe_exception_string(exc_strings)

move_to_failed_queue

在rq 1.0版本以后, move_to_failed_queue就已经被移除, 失败的任务会直接加入, 不需要用户再手动指定了.

1
2
3
4
5
6
7
8
9
10
11
12
13
if rq.__version__ < '1.0':
self.ori_move_to_failed_queue = self.move_to_failed_queue
self.ori_handle_exception = self.handle_exception

kwargs['exception_handlers'] = [
self._handle_job_exception,
self.move_to_failed_queue,
]
else:
# 在1.0之后的版本, 只保留_handle_job_exception用于打印错误.
kwargs['exception_handlers'] = [
self._handle_job_exception,
]

Job数据存储与读入

在rq > 0.13版本后, Job中的exc_infodata数据使用zlib进行了压缩, 也就是说 新版的数据在旧版的rq中是没法被读到的, 需要在自定义的fetch函数中做一层兼容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try:
raw_data = obj['data']
except KeyError:
raise NoSuchJobError('Unexpected job format: {0}'.format(obj))
try:
self.data = zlib.decompress(raw_data)
except zlib.error:
# Fallback to uncompressed string
self.data = raw_data

raw_exc_info = obj.get('exc_info')
if raw_exc_info:
try:
self.exc_info = as_text(zlib.decompress(raw_exc_info))
except zlib.error:
# Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info)

完整的fetch函数可以在附录中看到.

时间格式

rq的时间格式做过几次改变, 直接升级肯定会有问题的, 我们自定义的Job.fetch需要做一层兼容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def utcparse(string):
_d = None
exists_time_format = [
'%Y-%m-%dT%H:%M:%S.%fZ',
'%y-%m-%dt%h:%m:%sz', # rq < 0.9 date format
'%y-%m-%dt%h:%m:%s.%f+00:00', # rq < 0.4 datetime format
]
for t_format in exists_time_format:
try:
_d = datetime.datetime.strptime(string, t_format)
except ValueError:
_d = None
else:
break

if not _d:
log.warning("The %s can't be parsed", string)

return _d

def to_date(date_str):
if date_str is None:
return
else:
return utcparse(as_text(date_str))

完整的fetch函数可以在附录中看到.

Python升级使rq产生的问题

pickle协议

rq中, 使用了pickle, 但是是以如下方式使用的, 本身这个用法没什么问题, 但是 pickle.HIGHEST_PROTOCOL这个变量在Python2与Python3是不一样的. 在Python3中dumps得到的数据, 由于Python2中的协议较旧, 无法读取, 也就是说rq下发的任务无法被读到.

1
2
# 见rq/job.py
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)

因此, 需要我们在Python3中, 对此情况做一下转换, 直接的方法就是将此dumps函数替换, 下面这段代码需要放置在程序开头, 直到我们的worker全部更新为Python3才可以将代码删除.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
################################################
# This config is for rq. cause in rq/job.py always use the higest protocal
# dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
#
# After refer to https://github.com/rq/rq/issues/598, we limit the
# rq pickle protocol version.
# TODO: Remove this after upgrade python and rq all
if PY3:
try:
import cPickle as pickle
except ImportError: # noqa # pragma: no cover
import pickle
# pickle.HIGHEST_PROTOCOL=2

from unittest import mock
from functools import partial
mydumps = partial(pickle.dumps, protocol=2)
mock.patch('rq.job.dumps', side_effect=mydumps).start()
################################################

总结

此次的改动, 基础类库的很多地方都加上了对Python版本或是rq版本的判断. 由于不同版本间 带来的问题, 只能通过升级到统一的版本进行解决, 希望我能早日把我们的应用完全升级, 再也不用为Python2所困扰.

附录

mock_fetch函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# 代码中手动替换Job.fetch函数, 放在代码开头
if rq.__version__ == '0.6.0':
from ...you_mock import manual_fetch
old_fetch = Job.fetch
@classmethod
def my_fetch(cls, _id, connection=None):
try:
_j = old_fetch(_id, connection)
return _j
except ValueError as e:
log.warning("Please update the rq version {}".format(e))
return manual_fetch(_id, connection=connection)
except Exception as e:
log.error(u"Get job error with: {} {}".format(
e, traceback.format_exc()))
Job.fetch = my_fetc

def manual_fetch(job_id, connection=None):
# type: (int) -> Job
"""manual_fetch
TODO: Remove when rq upgrade, this is only needed in rq==0.6

Give another try:
In differenty versions, rq change time format many times.
like: https://github.com/rq/rq/issues/721
if the fetch not worke, we give a try to parse
the job, hope it can works.


:param job_id: [description]
:type job_id: [type]
"""
_job = Job(job_id, connection=connection)

# patch refresh function
def new_refresh(self): # noqa
"""Overwrite the current instance's properties with the values in the
corresponding Redis key.

Will raise a NoSuchJobError if no corresponding Redis key exists.
"""
from rq.job import decode_redis_hash, unpickle
from rq.utils import as_text
key = self.key
obj = decode_redis_hash(self.connection.hgetall(key))
if not obj:
raise NoSuchJobError('No such job: {0}'.format(key))

def utcparse(string):
_d = None
exists_time_format = [
'%Y-%m-%dT%H:%M:%S.%fZ',
'%y-%m-%dt%h:%m:%sz', # rq < 0.9 date format
'%y-%m-%dt%h:%m:%s.%f+00:00', # rq < 0.4 datetime format
]
for t_format in exists_time_format:
try:
_d = datetime.datetime.strptime(string, t_format)
except ValueError:
_d = None
else:
break

if not _d:
log.warning("The %s can't be parsed", string)

return _d

def to_date(date_str):
if date_str is None:
return
else:
return utcparse(as_text(date_str))

# rq > v0.13 the exc_info and data has been compressed by zlib
# please refer to:
# https://github.com/rq/rq/commit/f500186f3dc652278786a6223e5f24303ad81336
try:
raw_data = obj['data']
except KeyError:
raise NoSuchJobError('Unexpected job format: {0}'.format(obj))

try:
self.data = zlib.decompress(raw_data)
except zlib.error:
# Fallback to uncompressed string
self.data = raw_data

raw_exc_info = obj.get('exc_info')
if raw_exc_info:
try:
self.exc_info = as_text(zlib.decompress(raw_exc_info))
except zlib.error:
# Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info)

self.created_at = to_date(as_text(obj.get('created_at')))
self.origin = as_text(obj.get('origin'))
self.description = as_text(obj.get('description'))
self.enqueued_at = to_date(as_text(obj.get('enqueued_at')))
self.started_at = to_date(as_text(obj.get('started_at')))
self.ended_at = to_date(as_text(obj.get('ended_at')))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
# self.exc_info = as_text(obj.get('exc_info'))
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = as_text(obj.get('dependency_id', None))
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}

new_refresh(_job)

return _job