JFIF$        dd7 

Viewing File: /opt/cloudlinux/venv/lib/python3.11/site-packages/xray/continuous/tracing.py

# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains X Ray continuous tracing implementation
"""
import re
from collections import defaultdict
from typing import List

from xray import gettext as _
from .common import ContinuousCommon
from .mailer import Mailer
from ..apiclient import get_client
from ..console_utils.validations import regex
from ..internal.exceptions import XRayError, XRayAPIError
from ..internal.types import url_split
from ..internal.utils import read_sys_id, prev_date, date_of_timestamp, \
    get_formatted_date, get_html_formatted_links, get_text_formatted_links
from ..manager import initialize_manager


class ContinuousTracing(ContinuousCommon):
    """
    Continuous tracing implementation
    """

    def __init__(self):
        super().__init__()
        system_id = read_sys_id()
        client_object = get_client('auto_tasks')
        self.client = client_object(system_id=system_id)
        self.manager_instance = initialize_manager(system_id)
        self.mail_sender = Mailer()

    def __call__(self):
        self.clear_zombie_auto_tasks()
        self.start_tracing()
        self.generate_mail()

    def current_auto_tasks(self) -> list:
        """
        Get auto tasks list
        """
        try:
            tasks = self.client.get_tasks()
        except XRayAPIError as e:
            self.logger.error('Failed to get auto tasks',
                              extra={'err': str(e)})
            tasks = list()
        return tasks

    def clear_zombie_auto_tasks(self) -> None:
        """
        Stop and complete running auto tasks
        """
        auto_running_tasks = {t.task_id: t.status for t in
                              self.current_auto_tasks() if
                              t.auto_task and t.status != 'completed'
                              and t.user != '*autotracing*'}
        for t_id in auto_running_tasks:
            try:
                if auto_running_tasks[t_id] == 'running':
                    self.manager_instance.stop(t_id)
                self.manager_instance.complete(t_id)
            except XRayError as e:
                self.logger.error('%s', str(e))

    @staticmethod
    def _url(domain: str) -> str:
        """
        Construct URL for auto tracing: {domain}/*
        """
        u = f"{domain.strip('/')}/*"
        if not re.match(regex, u):
            raise ValueError(_('invalid url'), u)
        return u

    def executed(self, domain: str) -> None:
        """
        Increment execution_count of given domain
        """
        self.tracing_conf[domain]['execution_count'] += 1

    def start_tracing(self) -> List[dict]:
        """
        Start auto tasks with status 'running'
        """
        err_text = 'Start auto task failed'
        for continuous_task in self.get_continuous_tasks():
            if continuous_task.status == 'running':
                try:
                    self.manager_instance.start_auto(
                        url=self._url(continuous_task.original_url))
                except ValueError as e:
                    self.logger.error(err_text,
                                      extra={'err': str(e)})
                except XRayError as e:
                    self.logger.error(err_text,
                                      extra={
                                          'err': str(e),
                                          'continuous_task': continuous_task
                                      })
                else:
                    self.executed(continuous_task.domain)
        return self.dump_tracing_configuration()

    def retrieve_mail_addrs(self) -> dict:
        """
        Map e-mail addresses to urls in existing continuous tracing configuration
        """
        url_mail_map = dict()
        for t in self.get_continuous_tasks():
            try:
                url_mail_map[self._url(t.original_url)] = t.email
            except ValueError:
                pass
        return url_mail_map

    def filter_tasks_by_date(self) -> list:
        """
        Select previous date completed tasks
        """

        def check_date(ts: int) -> bool:
            """
            Verify if task timestamp corresponds to previous date
            """
            try:
                return date_of_timestamp(ts) == prev_date()
            except TypeError:
                # in case of starttime == None
                return False

        return [t for t in
                self.current_auto_tasks() if t.status == 'completed' and
                check_date(t.starttime)]

    def generate_mail(self) -> None:
        """
        Generate e-mails with links and send them to recipients
        """
        self.logger.info('Generating e-mails started')
        email_conf = defaultdict(list)
        url_mail_mapping = self.retrieve_mail_addrs()
        # filter tasks by date
        yesterday_tasks = self.filter_tasks_by_date()
        # generate links along with mapping them to e-mail addresses
        for task in yesterday_tasks:
            mail = url_mail_mapping.get(task.url)
            if mail:
                email_conf[mail].append(
                    {url_split(task.url)[0]: task.shared_link})
                self.logger.info('Selected task %s for mailing to %s',
                                 task.task_id, mail)

        # send e-mails
        for k, v in email_conf.items():
            self.mail_sender.send_mail(k, template='report',
                                       date=get_formatted_date(),
                                       links=get_text_formatted_links(v),
                                       html_links=get_html_formatted_links(
                                           v))
        self.logger.info('E-mails sending finished')
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg