98 lines
3.4 KiB
Python
98 lines
3.4 KiB
Python
from time import time, sleep
|
|
|
|
import asyncio
|
|
|
|
from sam_aiohttp import ClientSession
|
|
from core.influxdb import InfluxDB
|
|
from core.mysqldb import MysqlDB
|
|
from core.redisdb import RedisDB
|
|
from core.log import get_logger
|
|
from core.arg import ArgParser
|
|
import setting
|
|
|
|
|
|
class WebUtils:
|
|
|
|
request_headers = {
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
|
'Accept-Language': 'fr,fr-FR;q=0.8,en-US;q=0.5,en;q=0.3',
|
|
'Accept-Encoding': 'gzip,deflate',
|
|
'Cache-Control': 'no-cache',
|
|
'Connection': 'keep-alive',
|
|
'Pragma': 'no-cache',
|
|
'Upgrade-Insecure-Requests': '1'
|
|
}
|
|
|
|
def __init__(self, module_=None):
|
|
self.module = module_
|
|
self.args = ArgParser().parse_args()
|
|
self.logger = get_logger(name=self.module)
|
|
self.mysqldb = MysqlDB()
|
|
self.redisdb = RedisDB()
|
|
self.influxdb = InfluxDB()
|
|
self.loop = asyncio.get_event_loop()
|
|
self.nb_tasks = 0
|
|
self.tasks_done = 0
|
|
self.quantity = 0
|
|
self.errors = 0
|
|
self.start_time = 0
|
|
self.end_time = 0
|
|
self.locked = False
|
|
self._session = None
|
|
self._semaphore = None
|
|
|
|
async def _fetch(self, obj):
|
|
async with self._session.get(url=obj.url, proxy=setting.PROXY) as response:
|
|
return obj, await response.read()
|
|
|
|
async def _bound_fetch(self, obj):
|
|
async with self._semaphore:
|
|
return await self._fetch(obj)
|
|
|
|
async def _run(self, objects):
|
|
# Create and launch tasks
|
|
results = list()
|
|
async with ClientSession(headers=self.request_headers) as self._session:
|
|
tasks = [asyncio.ensure_future(self._bound_fetch(obj)) for obj in objects]
|
|
|
|
# Get results of terminated tasks
|
|
for obj, data in await asyncio.gather(*tasks):
|
|
results.append((obj, data))
|
|
return results
|
|
|
|
def start(self):
|
|
# Start and get id_cron
|
|
self.start_time = int(time())
|
|
self.locked = not self.redisdb.save_start(self.module)
|
|
if self.locked:
|
|
self.logger.error('[X] Cron already in progress')
|
|
return None
|
|
|
|
# Init web variables
|
|
self.request_headers['User-Agent'] = self.mysqldb.get_random_ua()
|
|
self._semaphore = asyncio.Semaphore(setting.SEMAPHORE)
|
|
self.logger.info('[*] Starting job {} with {} tasks'.format(self.module, self.nb_tasks))
|
|
|
|
def run(self, objects, callback):
|
|
if not self.locked:
|
|
while self.loop.is_running():
|
|
sleep(0.5)
|
|
future = asyncio.ensure_future(self._run(objects))
|
|
self.loop.run_until_complete(future)
|
|
for obj, data in future.result():
|
|
callback(obj, data)
|
|
|
|
def end(self):
|
|
if not self.locked:
|
|
self.end_time = int(time())
|
|
total_time = self.end_time - self.start_time
|
|
self.logger.info('[*] {} objects updated'.format(self.quantity))
|
|
self.logger.info('[*] {}/{} tasks done in {} seconds'.format(self.tasks_done, self.nb_tasks, total_time))
|
|
self.redisdb.save_end(module=self.module)
|
|
self.influxdb.save_stats(
|
|
module=self.module, start_time=self.start_time, nb_tasks=self.nb_tasks, tasks_done=self.tasks_done,
|
|
quantity=self.quantity, errors=self.errors, total_time=total_time
|
|
)
|
|
self.mysqldb.commit()
|
|
self.mysqldb.close()
|