from time import time, sleep import asyncio from sam_aiohttp import ClientSession from core.influxdb import InfluxDB from core.mysqldb import MysqlDB from core.redisdb import RedisDB from core.log import get_logger from core.arg import ArgParser import setting class WebUtils: request_headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'fr,fr-FR;q=0.8,en-US;q=0.5,en;q=0.3', 'Accept-Encoding': 'gzip,deflate', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Pragma': 'no-cache', 'Upgrade-Insecure-Requests': '1' } def __init__(self, module_=None): self.module = module_ self.args = ArgParser().parse_args() self.logger = get_logger(name=self.module) self.mysqldb = MysqlDB() self.redisdb = RedisDB() self.influxdb = InfluxDB() self.loop = asyncio.get_event_loop() self.nb_tasks = 0 self.tasks_done = 0 self.quantity = 0 self.errors = 0 self.start_time = 0 self.end_time = 0 self.locked = False self._session = None self._semaphore = None async def _fetch(self, obj): async with self._session.get(url=obj.url, proxy=setting.PROXY) as response: return obj, await response.read() async def _bound_fetch(self, obj): async with self._semaphore: return await self._fetch(obj) async def _run(self, objects): # Create and launch tasks results = list() async with ClientSession(headers=self.request_headers) as self._session: tasks = [asyncio.ensure_future(self._bound_fetch(obj)) for obj in objects] # Get results of terminated tasks for obj, data in await asyncio.gather(*tasks): results.append((obj, data)) return results def start(self): # Start and get id_cron self.start_time = int(time()) self.locked = not self.redisdb.save_start(self.module) if self.locked: self.logger.error('[X] Cron already in progress') return None # Init web variables self.request_headers['User-Agent'] = self.mysqldb.get_random_ua() self._semaphore = asyncio.Semaphore(setting.SEMAPHORE) self.logger.info('[*] Starting job {} with {} tasks'.format(self.module, self.nb_tasks)) def run(self, objects, callback): if not self.locked: while self.loop.is_running(): sleep(0.5) future = asyncio.ensure_future(self._run(objects)) self.loop.run_until_complete(future) for obj, data in future.result(): callback(obj, data) def end(self): if not self.locked: self.end_time = int(time()) total_time = self.end_time - self.start_time self.logger.info('[*] {} objects updated'.format(self.quantity)) self.logger.info('[*] {}/{} tasks done in {} seconds'.format(self.tasks_done, self.nb_tasks, total_time)) self.redisdb.save_end(module=self.module) self.influxdb.save_stats( module=self.module, start_time=self.start_time, nb_tasks=self.nb_tasks, tasks_done=self.tasks_done, quantity=self.quantity, errors=self.errors, total_time=total_time ) self.mysqldb.commit() self.mysqldb.close()