cronpy/core/webutils.py

98 lines
3.4 KiB
Python
Raw Permalink Normal View History

2020-10-03 21:17:53 +00:00
from time import time, sleep
import asyncio
from sam_aiohttp import ClientSession
from core.influxdb import InfluxDB
from core.mysqldb import MysqlDB
from core.redisdb import RedisDB
from core.log import get_logger
from core.arg import ArgParser
import setting
class WebUtils:
request_headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'fr,fr-FR;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip,deflate',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Upgrade-Insecure-Requests': '1'
}
def __init__(self, module_=None):
self.module = module_
self.args = ArgParser().parse_args()
self.logger = get_logger(name=self.module)
self.mysqldb = MysqlDB()
self.redisdb = RedisDB()
self.influxdb = InfluxDB()
self.loop = asyncio.get_event_loop()
self.nb_tasks = 0
self.tasks_done = 0
self.quantity = 0
self.errors = 0
self.start_time = 0
self.end_time = 0
self.locked = False
self._session = None
self._semaphore = None
async def _fetch(self, obj):
async with self._session.get(url=obj.url, proxy=setting.PROXY) as response:
return obj, await response.read()
async def _bound_fetch(self, obj):
async with self._semaphore:
return await self._fetch(obj)
async def _run(self, objects):
# Create and launch tasks
results = list()
async with ClientSession(headers=self.request_headers) as self._session:
tasks = [asyncio.ensure_future(self._bound_fetch(obj)) for obj in objects]
# Get results of terminated tasks
for obj, data in await asyncio.gather(*tasks):
results.append((obj, data))
return results
def start(self):
# Start and get id_cron
self.start_time = int(time())
self.locked = not self.redisdb.save_start(self.module)
if self.locked:
self.logger.error('[X] Cron already in progress')
return None
# Init web variables
self.request_headers['User-Agent'] = self.mysqldb.get_random_ua()
self._semaphore = asyncio.Semaphore(setting.SEMAPHORE)
self.logger.info('[*] Starting job {} with {} tasks'.format(self.module, self.nb_tasks))
def run(self, objects, callback):
if not self.locked:
while self.loop.is_running():
sleep(0.5)
future = asyncio.ensure_future(self._run(objects))
self.loop.run_until_complete(future)
for obj, data in future.result():
callback(obj, data)
def end(self):
if not self.locked:
self.end_time = int(time())
total_time = self.end_time - self.start_time
self.logger.info('[*] {} objects updated'.format(self.quantity))
self.logger.info('[*] {}/{} tasks done in {} seconds'.format(self.tasks_done, self.nb_tasks, total_time))
self.redisdb.save_end(module=self.module)
self.influxdb.save_stats(
module=self.module, start_time=self.start_time, nb_tasks=self.nb_tasks, tasks_done=self.tasks_done,
quantity=self.quantity, errors=self.errors, total_time=total_time
)
self.mysqldb.commit()
self.mysqldb.close()