# 这是针对用户代码中重新实现_get_safe_exception_string的修改, 如果你的版本比0.8.0大, # 没必要增加这部分. def_get_safe_exception_string(self, exc_strings): """Ensure list of exception strings is decoded and joined as one string safely. """ if rq.__version__ < '1.0': exc_strings = map( lambda exc: exc if PY3 else exc.decode("utf-8"), exc_strings) return''.join(exc_strings) else: return Worker._get_safe_exception_string(exc_strings)
################################################ # This config is for rq. cause in rq/job.py always use the higest protocal # dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) # # After refer to https://github.com/rq/rq/issues/598, we limit the # rq pickle protocol version. # TODO: Remove this after upgrade python and rq all if PY3: try: import cPickle as pickle except ImportError: # noqa # pragma: no cover import pickle # pickle.HIGHEST_PROTOCOL=2
from unittest import mock from functools import partial mydumps = partial(pickle.dumps, protocol=2) mock.patch('rq.job.dumps', side_effect=mydumps).start() ################################################
# 代码中手动替换Job.fetch函数, 放在代码开头 if rq.__version__ == '0.6.0': from ...you_mock import manual_fetch old_fetch = Job.fetch @classmethod defmy_fetch(cls, _id, connection=None): try: _j = old_fetch(_id, connection) return _j except ValueError as e: log.warning("Please update the rq version {}".format(e)) return manual_fetch(_id, connection=connection) except Exception as e: log.error(u"Get job error with: {} {}".format( e, traceback.format_exc())) Job.fetch = my_fetc
defmanual_fetch(job_id, connection=None): # type: (int) -> Job """manual_fetch TODO: Remove when rq upgrade, this is only needed in rq==0.6 Give another try: In differenty versions, rq change time format many times. like: https://github.com/rq/rq/issues/721 if the fetch not worke, we give a try to parse the job, hope it can works. :param job_id: [description] :type job_id: [type] """ _job = Job(job_id, connection=connection)
# patch refresh function defnew_refresh(self): # noqa """Overwrite the current instance's properties with the values in the corresponding Redis key. Will raise a NoSuchJobError if no corresponding Redis key exists. """ from rq.job import decode_redis_hash, unpickle from rq.utils import as_text key = self.key obj = decode_redis_hash(self.connection.hgetall(key)) ifnot obj: raise NoSuchJobError('No such job: {0}'.format(key))
defutcparse(string): _d = None exists_time_format = [ '%Y-%m-%dT%H:%M:%S.%fZ', '%y-%m-%dt%h:%m:%sz', # rq < 0.9 date format '%y-%m-%dt%h:%m:%s.%f+00:00', # rq < 0.4 datetime format ] for t_format in exists_time_format: try: _d = datetime.datetime.strptime(string, t_format) except ValueError: _d = None else: break
ifnot _d: log.warning("The %s can't be parsed", string)
return _d
defto_date(date_str): if date_str isNone: return else: return utcparse(as_text(date_str))
# rq > v0.13 the exc_info and data has been compressed by zlib # please refer to: # https://github.com/rq/rq/commit/f500186f3dc652278786a6223e5f24303ad81336 try: raw_data = obj['data'] except KeyError: raise NoSuchJobError('Unexpected job format: {0}'.format(obj))