|
13 | 13 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
14 | 14 | # License for the specific language governing permissions and limitations |
15 | 15 | # under the License. |
| 16 | +import redis |
| 17 | +import ujson as json |
16 | 18 |
|
17 | | -# version is a human-readable version number. |
| 19 | +import pytz |
| 20 | +import time |
| 21 | +import datetime |
| 22 | +import logging |
| 23 | +from collections import defaultdict |
| 24 | +from django.utils import timezone |
| 25 | +from croniter import croniter |
| 26 | +import asyncio |
| 27 | +from abc import abstractmethod, ABCMeta |
| 28 | +import aioredis |
18 | 29 |
|
19 | | -# version_info is a four-tuple for programmatic comparison. The first |
20 | | -# three numbers are the components of the version number. The fourth |
21 | | -# is zero for an official release, positive for a development branch, |
22 | | -# or negative for a release candidate or beta (after the base version |
23 | | -# number has been incremented) |
| 30 | +from trader.utils.func_container import CallbackFunctionContainer |
| 31 | +from trader.utils.read_config import config |
24 | 32 |
|
25 | | -version = "0.1" |
26 | | -version_info = (0, 1, 0, 0) |
| 33 | +logger = logging.getLogger('BaseModule') |
| 34 | + |
| 35 | + |
| 36 | +class BaseModule(CallbackFunctionContainer, metaclass=ABCMeta): |
| 37 | + def __init__(self): |
| 38 | + super().__init__() |
| 39 | + self.io_loop = asyncio.new_event_loop() |
| 40 | + asyncio.set_event_loop(self.io_loop) |
| 41 | + self.redis_client = aioredis.from_url( |
| 42 | + f"redis://{config.get('REDIS', 'host', fallback='localhost')}:" |
| 43 | + f"{config.getint('REDIS', 'port', fallback=6379)}/{config.getint('REDIS', 'db', fallback=0)}", |
| 44 | + decode_responses=True) |
| 45 | + self.raw_redis = redis.StrictRedis(host=config.get('REDIS', 'host', fallback='localhost'), |
| 46 | + port=config.getint('REDIS', 'port', fallback=6379), |
| 47 | + db=config.getint('REDIS', 'db', fallback=0), decode_responses=True) |
| 48 | + self.sub_client = self.redis_client.pubsub() |
| 49 | + self.initialized = False |
| 50 | + self.sub_tasks = list() |
| 51 | + self.sub_channels = list() |
| 52 | + self.channel_router = dict() |
| 53 | + self.crontab_router = defaultdict(dict) |
| 54 | + self.datetime = None |
| 55 | + self.time = None |
| 56 | + self.loop_time = None |
| 57 | + |
| 58 | + def _register_callback(self): |
| 59 | + self.datetime = timezone.localtime() |
| 60 | + self.time = time.time() |
| 61 | + self.loop_time = self.io_loop.time() |
| 62 | + for fun_name, args in self.callback_fun_args.items(): |
| 63 | + if 'crontab' in args: |
| 64 | + key = args['crontab'] |
| 65 | + self.crontab_router[key]['func'] = getattr(self, fun_name) |
| 66 | + self.crontab_router[key]['iter'] = croniter(args['crontab'], self.datetime) |
| 67 | + self.crontab_router[key]['handle'] = None |
| 68 | + elif 'channel' in args: |
| 69 | + self.channel_router[args['channel']] = getattr(self, fun_name) |
| 70 | + |
| 71 | + def _get_next(self, key): |
| 72 | + return self.loop_time + (self.crontab_router[key]['iter'].get_next() - self.time) |
| 73 | + |
| 74 | + def _call_next(self, key): |
| 75 | + if self.crontab_router[key]['handle'] is not None: |
| 76 | + self.crontab_router[key]['handle'].cancel() |
| 77 | + self.crontab_router[key]['handle'] = self.io_loop.call_at(self._get_next(key), self._call_next, key) |
| 78 | + self.io_loop.create_task(self.crontab_router[key]['func']()) |
| 79 | + |
| 80 | + async def install(self): |
| 81 | + try: |
| 82 | + self._register_callback() |
| 83 | + await self.sub_client.psubscribe(*self.channel_router.keys()) |
| 84 | + asyncio.run_coroutine_threadsafe(self._msg_reader(), self.io_loop) |
| 85 | + # self.io_loop.create_task(self._msg_reader()) |
| 86 | + for key, cron_dict in self.crontab_router.items(): |
| 87 | + if cron_dict['handle'] is not None: |
| 88 | + cron_dict['handle'].cancel() |
| 89 | + cron_dict['handle'] = self.io_loop.call_at(self._get_next(key), self._call_next, key) |
| 90 | + self.initialized = True |
| 91 | + logger.debug('%s plugin installed', type(self).__name__) |
| 92 | + except Exception as e: |
| 93 | + logger.error('%s plugin install failed: %s', type(self).__name__, repr(e), exc_info=True) |
| 94 | + |
| 95 | + async def uninstall(self): |
| 96 | + try: |
| 97 | + await self.sub_client.punsubscribe() |
| 98 | + # await asyncio.wait(self.sub_tasks, loop=self.io_loop) |
| 99 | + self.sub_tasks.clear() |
| 100 | + await self.sub_client.close() |
| 101 | + for key, cron_dict in self.crontab_router.items(): |
| 102 | + if self.crontab_router[key]['handle'] is not None: |
| 103 | + self.crontab_router[key]['handle'].cancel() |
| 104 | + self.crontab_router[key]['handle'] = None |
| 105 | + self.initialized = False |
| 106 | + logger.debug('%s plugin uninstalled', type(self).__name__) |
| 107 | + except Exception as e: |
| 108 | + logger.error('%s plugin uninstall failed: %s', type(self).__name__, repr(e), exc_info=True) |
| 109 | + |
| 110 | + async def _msg_reader(self): |
| 111 | + # {'type': 'pmessage', 'pattern': 'channel:*', 'channel': 'channel:1', 'data': 'Hello'} |
| 112 | + async for msg in self.sub_client.listen(): |
| 113 | + if msg['type'] == 'pmessage': |
| 114 | + channel = msg['channel'] |
| 115 | + pattern = msg['pattern'] |
| 116 | + data = json.loads(msg['data']) |
| 117 | + # logger.debug("%s channel[%s] Got Message:%s", type(self).__name__, channel, msg) |
| 118 | + self.io_loop.create_task(self.channel_router[pattern](channel, data)) |
| 119 | + elif msg['type'] == 'punsubscribe': |
| 120 | + break |
| 121 | + logger.debug('%s quit _msg_reader!', type(self).__name__) |
| 122 | + |
| 123 | + async def start(self): |
| 124 | + await self.install() |
| 125 | + |
| 126 | + async def stop(self): |
| 127 | + await self.uninstall() |
| 128 | + |
| 129 | + def run(self): |
| 130 | + try: |
| 131 | + self.io_loop.create_task(self.start()) |
| 132 | + self.io_loop.run_forever() |
| 133 | + except KeyboardInterrupt: |
| 134 | + self.io_loop.run_until_complete(self.stop()) |
| 135 | + except Exception as ee: |
| 136 | + logger.error('发生错误: %s', repr(ee), exc_info=True) |
| 137 | + self.io_loop.run_until_complete(self.stop()) |
| 138 | + finally: |
| 139 | + logger.debug('程序已退出') |
0 commit comments