
    s,g4                       d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZmZmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ er	 d dlmZ d d	l!m"Z" d d
l#m$Z$m%Z%m&Z& 	 d dlm'Z' d dl m)Z) d dl*Z!ddl+m,Z, ddl-m.Z.m/Z/m0Z0 ddl1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7 ddl8m9Z9m:Z:m;Z; ddl<m=Z= ddl>m?Z? ddl@mAZAmBZBmCZC ddlDmEZEmFZFmGZGmHZH ddlImJZJ ddlKmLZLmMZM ddlNmOZO ddlPmQZQ ddlRmSZS ddlTmUZUmVZVmWZW ddlXmYZYmZZZm[Z[m\Z\m]Z]m^Z^m_Z_m`Z`maZa ddlbmcZc 	 d dldmdZe  ej                  d#      Zh G d$ d%ei      Zj ekd&  ele      D              Zmd' Zn G d( d)efe      Zo G d* d+efe      Zp G d, d-      Zq G d. d/eq      Zr G d0 d1er      Zs G d2 d3er      Zt G d4 d5er      Zu G d6 d7er      Zvy# e $ r Y Qw xY w# e $ r
 d dlm(Z' Y Lw xY w# e $ r d efd!dfd"ZeY w xY w)8    N)datetime	timedeltatimezone)Enum)shuffle)	FrameType)TYPE_CHECKINGCallableListOptionalTupleTypeUnion)uuid4)struct_rusage)Redis)PipelinePubSubPubSubWorkerThread)SIGKILL)SIGTERM)suppress   )worker_registration)PUBSUB_CHANNEL_TEMPLATEhandle_commandparse_payload)DEFAULT_JOB_MONITORING_INTERVALDEFAULT_LOGGING_DATE_FORMATDEFAULT_LOGGING_FORMAT!DEFAULT_MAINTENANCE_TASK_INTERVALDEFAULT_RESULT_TTLDEFAULT_WORKER_TTL)DequeueTimeoutDeserializationErrorShutDownImminentException)	Execution)Group)Job	JobStatusRetry)bluegreensetup_loghandlersyellowQueue)StartedJobRegistryclean_registries)RQScheduler)resolve_serializer)is_suspended)HorseMonitorTimeoutExceptionJobTimeoutExceptionUnixSignalDeathPenalty)	as_textbackend_classcompactensure_listget_connection_from_queuesget_versionnow	utcformatutcparse)VERSION)setproctitletitlereturnc                      y N )rE   s    c/var/www/trellinator.diamondhoofcare.com/public_html/venv/lib/python3.12/site-packages/rq/worker.pysetprocnamerK   K   s        z	rq.workerc                       e Zd Zy)StopRequestedN)__name__
__module____qualname__rI   rL   rJ   rN   rN   R   s    rL   rN   c              #   n   K   | ]-  }|j                  d       sd|vst        t        |      |f / yw)SIG_N)
startswithgetattrsignal).0signames     rJ   	<genexpr>rZ   V   s8      ,3gFXFXY^F_dgovdvWVWw's   555c                     	 t         j                  d d dk\  rt        j                  |       j                  S t
        |    S # t        $ r Y yt        $ r Y yw xY w)N   )      SIG_UNKNOWN)sysversion_inforW   Signalsname	_signamesKeyError
ValueError)signums    rJ   signal_namerh   [   sY    	BQ6)>>&)...V$$  s   4A  A   	AAAc                       e Zd ZdZdZdZy)DequeueStrategydefaultround_robinrandomN)rO   rP   rQ   DEFAULTROUND_ROBINRANDOMrI   rL   rJ   rj   rj   h   s    GKFrL   rj   c                       e Zd ZdZdZdZdZy)WorkerStatusstarted	suspendedbusyidleN)rO   rP   rQ   STARTED	SUSPENDEDBUSYIDLErI   rL   rJ   rr   rr   n   s    GIDDrL   rr   c                      e Zd ZdZej
                  ZeZe	Z
eZdZdZdZdZdedddedddddeddddfdee   ded	   d
edee   dee   deed      deed      dedededeeeeedgdf      fdZe	 	 	 dsdedd	deed      deed      ded    f
d       Ze	 	 	 	 	 dtded	   deed      deed      ded   ded   f
d       Zeduded	   ded   dee   fd       Zeduded	   ded   defd       Z d Z!e"d         Z#dvd!Z$e"defd"       Z%e"defd#       Z&d$ Z'd% Z(d& Z)dee   fd'Z*dee   fd(Z+e"d)        Z,e"d*        Z-e"defd+       Z.d, Z/d- Z0d.ed/ee1   fd0Z2d1 Z3dwd3Z4dd4e5e6ddde7jp                  fd5ed6ed7ed8ed9ee   d:ee   d;ed<e7defd=Z9dxd@Z:dA Z;dydBZ<dzdwdCZ=d{dDe>d>ed?   fdEZ?dudFee   d>ed?   fdGZ@d{d>ed?   dee   fdHZAded   fdIZBd{dJed>ed?   fdKZCdL ZDdefdMZEdN ZF e"eFeD      ZGdd4e5e6fd5ed6ed7ed8efdOZHdP ZIdQ ZJe"dR        ZKd4e5e6fd6ed7ed8efdSZLd5efdTZMdU ZNdV ZOe"dW        ZPe"dX        ZQdY ZRdZeSd[d\d]d^ddfd_ZTd` ZUda ZVd2ddefdbZWd2ddeXfdcZYdd ZZ	 d{deee   d:ee   dee[df      fdgZ\dudeee   d>ed?   fdhZ]di Z^dj Z_d{d>ed?   fdkZ`d{d>ed?   fdlZadmebd>d?fdnZcd|doZddp Zeeffdqegj                  fdrZiy)}
BaseWorker
rq:worker:T       @g      N@NFrc   
connectionr   maintenance_intervaldefault_worker_ttl
worker_ttl	job_classr)   queue_classr1   log_job_description!disable_default_exception_handlerprepare_for_workwork_horse_killed_handlerr   c           
      0   || _         |	r|	| _        n1|r$t        j                  dt        d       || _        nt
        | _        || _        || _        |st        |      }|sJ | j                  |      }|| _
        d | _        t        | d|
      | _        t        | d|      | _        t        | _        t"        j                   | _        t'        |      | _        d | _        t-        |      D cg c]H  }t/        |t0              r4| j                  ||| j                  | j(                  | j2                        n|J }}|xs t5               j6                  | _        || _        | j=                          | j:                  d d  | _        g | _         || _!        d | _"        d| _#        d	| _$        d
| _%        d	| _&        d | _'        tP        | _)        || _*        d | _+        d
| _,        d
| _-        d
| _.        d
| _/        d | _0        d | _1        d | _2        d | _3        th        jj                  | _6        || _7        |rtq        jr                         | _:        tw        jx                         | _=        	 |j}                  | j8                         |j                         D cg c]&  }|j                  d      | j8                  k(  s"|d   ( }}t        |      d
kD  r|d
   | _B        n7t        j                  dt               d| _B        nd | _:        d | _=        d| _B        t/        |t        t        f      r|D ]  }| j                  |        y || j                  |       y y c c}w c c}w # t        j                  j                  $ r$ t        j                  dt               d| _B        Y w xY w)Nz1default_worker_ttl is deprecated, use worker_ttl.r\   )
stacklevelr   )overrider   )rc   r   r   
serializerdeath_penalty_classstartingFr   rc   addrz@CLIENT LIST command not supported, setting ip_address to unknownunknownzCCLIENT SETNAME command not supported, setting ip_address to unknown)Jdefault_result_ttlr   warningswarnDeprecationWarningr#   job_monitoring_intervalr   r>   _set_connectionr   redis_server_versionr;   r   r   rC   versionr`   python_versionr5   r   	executionr=   
isinstancestrr   r   hexrc   queuesvalidate_queues_ordered_queues_exc_handlers_work_horse_killed_handler_shutdown_requested_date_state	_is_horse
_horse_pid_stop_requested_stopped_job_idloggerlogr   last_cleaned_atsuccessful_job_countfailed_job_counttotal_working_timecurrent_job_working_time
birth_date	schedulerpubsubpubsub_threadrj   rn   _dequeue_strategyr   socketgethostnamehostnameosgetpidpidclient_setnameclient_listgetlen
ip_addressWarningredis
exceptionsResponseErrorlisttuplepush_exc_handler)selfr   rc   r   r   exc_handlerexception_handlersr   r   r   r   r   r   r   r   r   r   r   qclientclient_adresseshandlers                         rJ   __init__zBaseWorker.__init__   sQ   ( #5(DOMMMOanop0DO0DO'>$$8!3F;Jz))*5
$$(!&t[9M(}{S!kk,Z8.2 !(
  a%   )"nn#(,(@(@ !  
 
 ,	#{{1~-/*C'<@%%$ %*##6 #)*!%&)*/0%04*.!<K<S<S1R.+1+=+=+?DM&(iikDH0))$))4 2<1G1G1I#'-VZZX^M_cgclclMlF6N# # '!+&5a&8DOMM"dfmn&/DO DMDH'DO(4-8- /%%g./+!!"45 ,K
j#	 ##11 ,celm"+,s%   +AM
M 	#M-M>NN
worker_keyrF   c           	         | j                   }|j                  |      st        d|z        |j                  |      s|j	                  | j
                  |       y|t        |      d } | g ||||d|      }|j                          |S )a@  Returns a Worker instance, based on the naming conventions for
        naming the internal Redis keys.  Can be used to reverse-lookup Workers
        by their Redis keys.

        Args:
            worker_key (str): The worker key
            connection (Optional[Redis], optional): Redis connection. Defaults to None.
            job_class (Optional[Type[Job]], optional): The job class if custom class is being used. Defaults to None.
            queue_class (Optional[Type[Queue]]): The queue class if a custom class is being used. Defaults to None.
            serializer (Any, optional): The serializer to use. Defaults to None.

        Raises:
            ValueError: If the key doesn't start with `rq:worker:`, the default worker namespace prefix.

        Returns:
            worker (Worker): The Worker instance.
        zNot a valid RQ worker key: %sNF)r   r   r   r   r   )redis_worker_namespace_prefixrU   rf   existssremredis_workers_keysr   refresh)	clsr   r   r   r   r   prefixrc   workers	            rJ   find_by_keyzBaseWorker.find_by_key   s    4 22$$V,<zIJJ  ,OOC22J?#f+-(!#"!
 	rL   queueWorkerc           
          |r|j                   }|sJ t        j                  ||      }|D cg c]  }| j                  |||||       }}t	        |      S c c}w )ztReturns an iterable of all Workers.

        Returns:
            workers (List[Worker]): A list of workers
        r   r   )r   r   r   r   )r   r   get_keysr   r<   )	r   r   r   r   r   r   worker_keyskeyworkerss	            rJ   allzBaseWorker.all.  sy     ))Jz)22:V
 #	
  OO
i[eo  
 
 w
s   Ac                 h    t        j                  ||      D cg c]  }t        |       c}S c c}w )a  List of worker keys

        Args:
            connection (Optional[Redis], optional): A Redis Connection. Defaults to None.
            queue (Optional[Queue], optional): The Queue. Defaults to None.

        Returns:
            list_keys (List[str]): A list of worker keys
        r   )r   r   r:   )r   r   r   r   s       rJ   all_keyszBaseWorker.all_keysI  s*     )<(D(D5]g(hiiiis   /c                 B    t        t        j                  ||            S )a6  Returns the number of workers by queue or connection.

        Args:
            connection (Optional[Redis], optional): Redis connection. Defaults to None.
            queue (Optional[Queue], optional): The queue to use. Defaults to None.

        Returns:
            length (int): The queue length.
        r   )r   r   r   )r   r   r   s      rJ   countzBaseWorker.countV  s     &//e
STTrL   c                    | j                   j                  | j                  ddddddddd	d
dddd      }|\  }}}}}}}}	}
}}}}}|rt        |      nd| _        |rt        |      nd| _        |rt        |      nd| _        |rt        |      nd| _        |rt        |      nd| _	        t        |xs d      | _
        |xs d| _        |rt        t        |            | _        nd| _        |rt        t        |            | _        nd| _        |rt        t        |            | _        |rt        t        |            | _        |	rt#        t        |	            | _        |
rt#        t        |
            | _        |rat        |      }|j)                  d      D cg c]5  }| j+                  || j                   | j,                  | j.                        7 c}| _        yyc c}w )zvRefreshes the worker data.
        It will get the data from the datastore and update the Worker's attributes
        r   statecurrent_joblast_heartbeatbirthr   r   r   r   r   r   r   r   r   N?,)r   r   r   )r   hmgetr   r:   r   r   intr   r   r   r   _job_idrB   r   r   r   r   floatr   r   splitr   r   r   r   )r   datar   r   job_idr   r   r   r   r   r   r   r   r   r   r   r   s                    rJ   r   zBaseWorker.refreshc  s    $$HH" &
@ 	
 $-5)41;'*-"3s8+2ww'9Ggn5Tels+~"*7>+B"CD"&D&wu~6DO"DO$'0@(A$BD!(+G4H,I(JD%&+G4F,G&HD##,1':R2S,TD)V_F
 $\\#.	    doo\`\k\k ! DK s   :Gc                 |    | j                   yt               | j                   z
  t        | j                        kD  ryy)zBMaintenance tasks should run on first startup or every 10 minutes.TsecondsF)r   r@   r   r   r   s    rJ   should_run_maintenance_tasksz'BaseWorker.should_run_maintenance_tasks  s9     'ED(((Id>W>W,XXrL   c                     |j                   j                  j                  d      }||| j                  k  r3d| j                  i}|j                   j                  j	                  |       |S )a  Configures the Redis connection's socket timeout.
        This will timeout the connection in case any specific command hangs at any given time (eg. BLPOP), but
        also ensures that the timeout is long enough for those operations.
        If the connection provided already has an adequate `socket_timeout` defined, skips.

        Args:
            connection (Optional[Redis]): The Redis Connection.
        socket_timeout)connection_poolconnection_kwargsr   connection_timeoutupdate)r   r   current_socket_timeouttimeout_configs       rJ   r   zBaseWorker._set_connection  sf     ",!;!;!M!M!Q!QRb!c!)-CdF]F]-].0G0GHN&&88??OrL   c                 4    t        d| j                  dz
        S )Nr      )maxr   r   s    rJ   dequeue_timeoutzBaseWorker.dequeue_timeout  s    1doo*++rL   c                      | j                   dz   S )N
   )r  r   s    rJ   r   zBaseWorker.connection_timeout  s    ##b((rL   c                 `   | j                   D ]  }|j                         s| j                  j                  d|j                         t        || j                         t        j                  |       |j                  j                  | |       |j                           t               | _        y)z1Runs maintenance jobs on each Queue's registries.z!Cleaning registries for queue: %sN)r   acquire_maintenance_lockr   inforc   r3   r   r   clean_worker_registryintermediate_queuecleanuprelease_maintenance_lockr@   r   r   r   s     rJ   r3   zBaseWorker.clean_registries  s    [[ 	1E --/A5::N (:(:;#99%@((00u=..0	1  #urL   c                 f    | j                   st        | j                        | _         | j                   S )z)Return Redis server version of connection)r   r?   r   r   s    rJ   get_redis_server_versionz#BaseWorker.get_redis_server_version  s(    (((3DOO(DD%(((rL   c                     | j                   D ]=  }t        || j                        rt        dj	                  || j                               y)z"Sanity check for the given queues.z&{0} is not of type {1} or string typesN)r   r   r   	TypeErrorformatr  s     rJ   r   zBaseWorker.validate_queues  sH    [[ 	jEeT%5%56 H O OPUW[WgWg hii	jrL   c                 T    | j                   D cg c]  }|j                   c}S c c}w )zsReturns the queue names of this worker's queues.

        Returns:
            List[str]: The queue names.
        )r   rc   r  s     rJ   queue_nameszBaseWorker.queue_names  s      )-4u

444   %c                 T    | j                   D cg c]  }|j                   c}S c c}w )zReturns the Redis keys representing this worker's queues.

        Returns:
            List[str]: The list of strings with queues keys
        )r   r   r  s     rJ   
queue_keyszBaseWorker.queue_keys  s      (,{{3e		333r  c                 4    | j                   | j                  z   S z$Returns the worker's Redis hash key.)r   rc   r   s    rJ   r   zBaseWorker.key  s     11DII==rL   c                 (    t         | j                  z  S r  )r   rc   r   s    rJ   pubsub_channel_namezBaseWorker.pubsub_channel_name  s     '22rL   c                 (    | j                         dk\  S )z2Only supported by Redis server >= 5.0 is required.)r^   r   r   )r  r   s    rJ   supports_redis_streamsz!BaseWorker.supports_redis_streams  s     ,,.);;rL   c                 d   | j                   j                  dt        |             t               | _        t        j
                  t
        j                  | j                         t        j
                  t
        j                  | j                         | j                          | j                          y)zStops the current worker loop but waits for child processes to
        end gracefully (warm shutdown).

        Args:
            signum (Any): Signum
            frame (Any): Frame
        zGot signal %sN)r   debugrh   r@   r   rW   SIGINTrequest_force_stopr   handle_warm_shutdown_request	_shutdownr   rg   frames      rJ   request_stopzBaseWorker.request_stop  sk     	F(;<(+%fmmT%<%<=fnnd&=&=>))+rL   c                 0   | j                         t        j                  k(  rPd| _        | j	                          | j
                  j                  d       | j                  r| j                          yy| j                  r| j                          t               )z
        If shutdown is requested in the middle of a job, wait until
        finish before shutting down and save the request in redis
        TzQStopping after current horse is finished. Press Ctrl+C again for a cold shutdown.N)
	get_staterr   ry   r   set_shutdown_requested_dater   r!  r   stop_schedulerrN   r   s    rJ   r%  zBaseWorker._shutdown  ss    
 >>|000#'D ,,.HHNNno~~##%  ~~##%/!rL   rg   r'  c                     t               rH   NotImplementedErrorr&  s      rJ   r#  zBaseWorker.request_force_stop"      !##rL   c                     t        j                   t         j                  | j                         t        j                   t         j                  | j                         y)zDInstalls signal handlers for handling SIGINT and SIGTERM gracefully.N)rW   r"  r(  r   r   s    rJ   _install_signal_handlersz#BaseWorker._install_signal_handlers%  s2    fmmT%6%67fnnd&7&78rL   jobc                     t         )z To be implemented by subclasses.r.  r   r3  r   s      rJ   execute_jobzBaseWorker.execute_job*  s    !!rL   INFOburstlogging_leveldate_format
log_formatmax_jobsmax_idle_timewith_schedulerdequeue_strategyc	                 `   | j                  |||       || _        d}	|r| j                  ||||       | j                          	 	 	 | j	                  |       | j
                  r| j                          | j                  r'| j                  j                  d| j                         n|rdn| j                  }
| j                  |
|      }|S|r'| j                  j                  d| j                         n)|'| j                  j                  d| j                  |       n]|\  }}| j                  ||       | j                          |	dz  }	|-|	|k\  r(| j                  j                  d| j                  |	       n5| j+                          t-        |	      S # t        j                   j"                  $ r) | j                  j%                  d	| j                         Y `t&        $ r Y jt(        $ r   | j                  j%                  d
| j                  d       Y xY w# | j+                          w xY w)a:  Starts the work loop.

        Pops and performs all jobs on the current list of queues.  When all
        queues are empty, block and wait for new jobs to arrive on any of the
        queues, unless `burst` mode is enabled.
        If `max_idle_time` is provided, worker will die when it's idle for more than the provided value.

        The return value indicates whether any jobs were processed.

        Args:
            burst (bool, optional): Whether to work on burst mode. Defaults to False.
            logging_level (str, optional): Logging level to use. Defaults to "INFO".
            date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
            log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
            max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
            max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None.
            with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
            dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs.
                Defaults to DequeueStrategy.DEFAULT

        Returns:
            worked (bool): Will return True if any job was processed, False otherwise.
        r   TzWorker %s: stopping on requestNzWorker %s: done, quittingz(Worker %s: idle for %d seconds, quittingr   z/Worker %s: finished executing %d jobs, quittingz0Worker %s: Redis connection timeout, quitting...z4Worker %s: found an unhandled exception, quitting...)exc_info)	bootstrapr   _start_schedulerr2  check_for_suspensionr   run_maintenance_tasksr   r   r
  r   r  dequeue_job_and_maintain_ttlr6  	heartbeatr   r   TimeoutErrorerrorrN   
SystemExitteardownbool)r   r8  r9  r:  r;  r<  r=  r>  r?  completed_jobstimeoutresultr3  r   s                 rJ   workzBaseWorker.work.  s   D 	}k:>!1!!%ZP%%'.	*--e488224++&FQ&+d1E1EG!>>wVF~  HHMM*EtxxP*6 HHMM*TVZV^V^`mn!'JC$$S%0NN$"a'N+)X5 HHMM*[]a]e]eguv!9 Z MMON##! ''44 HHNN#UW[W_W_`$ ! HHNN#Y[_[c[cnrNsMMOsX   H AF %H &A6F H AF 7H AHH H"H $2HH H-pipeliner   c                    | j                   j                  d|j                         t        |j                  | j
                  | j                  | j                        }| j                  d|       |j                  ||       | j                  rC|j                  | j                  ||       | j                  j                  ||       d| _
        yy)zCleans up the execution of a job.
        It will remove the job from the `StartedJobRegistry` and delete the Execution object.
        zCleaning up execution of job %sr   r   NrQ  )r3  rQ  )r   r!  idr2   originr   r   r   set_current_job_idremover   remove_executiondelete)r   r3  rQ  started_job_registrys       rJ   cleanup_executionzBaseWorker.cleanup_execution  s     	8#&&A1JJ4>>doo 
 	x8##C(#;>> 11$..cT\1]NN!!cH!=!DN rL   c                 f    | j                   j                  d| j                  | j                         y )Nz,Worker %s [PID %d]: warm shut down requested)r   r
  rc   r   r   s    rJ   r$  z'BaseWorker.handle_warm_shutdown_request  s"    DdiiQUQYQYZrL   c                    | j                   t        j                  | _         | j                   dvrt        d| j                    d      | j                   t        j                  k(  ry| j                   t        j                  k(  rF| j
                  j                  |      }| j
                  |dz   d | j
                  d|dz    z   | _        y| j                   t        j                  k(  rt        | j
                         yy)aR  Reorder the queues according to the strategy.
        As this can be defined both in the `Worker` initialization or in the `work` method,
        it doesn't take the strategy directly, but rather uses the private `_dequeue_strategy` attribute.

        Args:
            reference_queue (Union[Queue, str]): The queues to reorder
        N)rk   rm   rl   zDequeue strategy z: is not allowed. Use `default`, `random` or `round_robin`.r   )	r   rj   rn   rf   ro   r   indexrp   r   r   reference_queueposs      rJ   reorder_queueszBaseWorker.reorder_queues  s     !!)%4%<%<D"!!)MM#D$:$:#;;uv  !!_%<%<<!!_%@%@@&&,,_=C#'#7#7a	#BTEYEYZc\_bc\cEd#dD !!_%;%;;D(() <rL   c                 D   | j                   j                  d|j                         | j                  j	                         5 }|7t        |j                  | j                  | j                  | j                        }| j                  |j                  k(  }|j                  xr | }|r)|j                  t        j                  |       d| _	        n#|s!|j                  t        j                  |       | j                  ||       | j                   sQ|sO|j#                  ||       t%        t&        j(                  j*                        5  |j-                          ddd       | j/                  |       |j0                  r5|j2                  r)| j5                  |j2                  |j0                  z
  |       |r|j7                  ||       d}nd}	 |j-                          |r|j9                  |       ddd       y# 1 sw Y   xY w# t:        $ r Y  w xY w# 1 sw Y   yxY w)aO  
        Handles the failure or an executing job by:
            1. Setting the job status to failed
            2. Removing the job from StartedJobRegistry
            3. Setting the workers current job to None
            4. Add the job to FailedJobRegistry
        `save_exc_to_job` should only be used for testing purposes
        z#Handling failed execution of job %sNrS  rT  FT)r   r!  rU  r   rQ  r2   rV  r   r   r   should_retry
set_statusr*   STOPPEDFAILEDr\  r   _handle_failurer   r   r   ConnectionErrorexecuteincrement_failed_job_count
started_atended_atincrement_total_working_timeretryenqueue_dependents	Exception)	r   r3  r   r[  
exc_stringrQ  job_is_stoppedrp  rq  s	            rJ   handle_job_failurezBaseWorker.handle_job_failure  s    	<cffE__%%' +	8#+'9JJ4>>VZVeVe($ "11SVV;N$$;^);Ey008D'+$ NN9#3#3hNG""3":99%##J#Be..>>? '$$&' ++H5~~#,,11#,,2OQYZ		%*%*"%)"  "%,,S1O+	 +	.' '"   Q+	 +	sD   D	H
G;A3H#H;H	 H	HHHHHr   c                 l    || _         ||n| j                  }|j                  | j                  d|       y)zSets the current job working time in seconds

        Args:
            current_job_working_time (float): The current job working time in seconds
            pipeline (Optional[Pipeline], optional): Pipeline to use. Defaults to None.
        Nr   )r   r   hsetr   )r   r   rQ  r   s       rJ   set_current_job_working_timez'BaseWorker.set_current_job_working_time  s3     )A%!)!5X4??
"<>VWrL   r   c                     ||n| j                   }||j                  | j                  d       y|j                  | j                  d|       y)a  Sets the current job id.
        If `None` is used it will delete the current job key.

        Args:
            job_id (Optional[str], optional): The job id. Defaults to None.
            pipeline (Optional[Pipeline], optional): The pipeline to use. Defaults to None.
        Nr   )r   hdelr   rw  )r   r   rQ  r   s       rJ   rW  zBaseWorker.set_current_job_id  s?     "*!5X4??
>OODHHm4OODHHmV<rL   c                 v    ||n| j                   }|j                  | j                  d      }|yt        |      S )zRetrieves the current job id.

        Args:
            pipeline (Optional[&#39;Pipeline&#39;], optional): The pipeline to use. Defaults to None.

        Returns:
            job_id (Optional[str): The job id
        Nr   )r   hgetr   r:   )r   rQ  r   rO  s       rJ   get_current_job_idzBaseWorker.get_current_job_id  s:     "*!5X4??
=9>vrL   c                     | j                         }|y| j                  j                  || j                  | j                        S )zqReturns the currently executing job instance.

        Returns:
            job (Job): The job instance.
        N)r}  r   fetchr   r   )r   r   s     rJ   get_current_jobzBaseWorker.get_current_job  s;     ((*>~~##FDOOT__MMrL   r   c                 l    || _         ||n| j                  }|j                  | j                  d|       y)zSets the worker's state.

        Args:
            state (str): The state
            pipeline (Optional[Pipeline], optional): The pipeline to use. Defaults to None.
        Nr   )r   r   rw  r   )r   r   rQ  r   s       rJ   	set_statezBaseWorker.set_state  s/     !)!5X4??
'51rL   c                 Z    t        j                  dt               | j                  |       y)z:Raise a DeprecationWarning if ``worker.state = X`` is usedz;worker.state is deprecated, use worker.set_state() instead.N)r   r   r   r  )r   r   s     rJ   
_set_statezBaseWorker._set_state(  s    SUghurL   c                     | j                   S rH   )r   r   s    rJ   r*  zBaseWorker.get_state-  s    {{rL   c                 V    t        j                  dt               | j                         S )z;Raise a DeprecationWarning if ``worker.state == X`` is usedz;worker.state is deprecated, use worker.get_state() instead.)r   r   r   r*  r   s    rJ   
_get_statezBaseWorker._get_state0  s    SUgh~~rL   c                 p   t        | j                  | j                  |||| j                        | _        | j                  j                          | j                  j                  rR|r5| j                  j                          | j                  j                          y| j                  j                          yy)a  Starts the scheduler process.
        This is specifically designed to be run by the worker when running the `work()` method.
        Instanciates the RQScheduler and tries to acquire a lock.
        If the lock is acquired, start scheduler.
        If worker is on burst mode just enqueues scheduled jobs and quits,
        otherwise, starts the scheduler in a separate process.

        Args:
            burst (bool, optional): Whether to work on burst mode. Defaults to False.
            logging_level (str, optional): Logging level to use. Defaults to "INFO".
            date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
            log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
        )r   r9  r:  r;  r   N)
r4   r   r   r   r   acquire_locksacquired_locksenqueue_scheduled_jobsrelease_locksstart)r   r8  r9  r:  r;  s        rJ   rC  zBaseWorker._start_scheduler7  s    ( %KK'#!
 	$$&>>((557,,.$$& )rL   c           
      j   | j                   j                  d| j                         | j                  j	                  | j
                        rL| j                  j                  | j
                  d      s&d}t        |j                  | j                              | j
                  }dj                  | j                               }| j                  j                         5 }|j                  |       t               }t        |      }|| _        |||| j                   | j"                  | j$                  | j&                  | j(                  d}|j+                  ||       t-        j.                  | |       |j1                  || j2                  dz          |j5                          ddd       y# 1 sw Y   yxY w)	zRegisters its own birth.zRegistering birth of worker %sdeathz1There exists an active worker named {0!r} alreadyr   )r   r   r   r   r   r   r   r   )mapping<   N)r   r!  rc   r   r   r   hexistsrf   r  joinr  rQ  rZ  r@   rA   r   r   r   r   r   r   rw  r   registerexpirer   rk  )r   msgr   r   p	right_nownow_in_stringr  s           rJ   register_birthzBaseWorker.register_birth[  sG   7C??!!$((+DOO4K4KDHHV]4^ECSZZ		233hh$**,-__%%' 	1HHSMI%i0M'DO '"/ xx MM"oo<<"&"5"5	G FF3F(((q1HHS$//B./IIK)	 	 	s   CF))F2c           	      x   | j                   j                  d       | j                  j                         5 }t	        j
                  | |       |j                  | j                  dt        t                            |j                  | j                  d       |j                          ddd       y# 1 sw Y   yxY w)zRegisters its own death.zRegistering deathr  r  N)r   r!  r   rQ  r   
unregisterrw  r   rA   r@   r  rk  )r   r  s     rJ   register_deathzBaseWorker.register_deathy  s    *+__%%' 	1  **43FF488Wi&67HHTXXr"IIK	 	 	s   A1B00B9c                     | j                   S )zuThe horse's process ID.  Only available in the worker.  Will return
        0 in the horse part of the fork.
        )r   r   s    rJ   	horse_pidzBaseWorker.horse_pid  s    
 rL   c                    t        |||       | j                          | j                  j                  d| j                  t        j                         t               | j                          | j                  t        j                         | j                         }| j                  j                  dt        dj                  |                   y)a  Bootstraps the worker.
        Runs the basic tasks that should run when the worker actually starts working.
        Used so that new workers can focus on the work loop implementation rather
        than the full bootstraping process.

        Args:
            logging_level (str, optional): Logging level to use. Defaults to "INFO".
            date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
            log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
        z)Worker %s started with PID %d, version %s*** Listening on %s...z, N)r.   r  r   r
  r   r   r   rC   	subscriber  rr   rw   r  r-   r  )r   r9  r:  r;  qnamess        rJ   rB  zBaseWorker.bootstrap  s      	-jAA488RYY[Zab|++,!!#.dii6G0HIrL   c                     d}d}| j                   st        | j                  |       r|r<| j                  j	                  d       | j                  j	                  d       t
        |sL| j                  j	                  d       | j                         }| j                  t        j                         d}t        j                  d       | j                   st        | j                  |       r|r| j                  |       yy)z;Check to see if workers have been suspended by `rq suspend`NFz Suspended in burst mode, exitingz7Note: There could still be unfinished jobs on the queuez+Worker suspended, run `rq resume` to resumeTr   )r   r6   r   r   r
  rN   r*  r  rr   rx   timesleep)r   r8  before_statenotifieds       rJ   rD  zBaseWorker.check_for_suspension  s    &&<+N@AWX##KL#~~/|556JJqM &&<+N NN<( rL   c                 :    t        d| j                   d|        y)zwChanges the current procname for the process.

        This can be used to make `ps -ef` output more readable.
        r}   z: N)rK   rc   )r   messages     rJ   proclinezBaseWorker.procline  s    
 	j2gY78rL   c                 x    | j                   j                  | j                  dt        | j                               y)zDSets the date on which the worker received a (warm) shutdown requestshutdown_requested_dateN)r   rw  r   rA   r   r   s    rJ   r+  z&BaseWorker.set_shutdown_requested_date  s(    TXX'@)DLiLiBjkrL   c                 |    | j                   j                  | j                  d      }|t        t	        |            S y)z+Fetches shutdown_requested_date from Redis.r  Nr   r|  r   rB   r:   )r   shutdown_requested_timestamps     rJ   r  z"BaseWorker.shutdown_requested_date  s<     (,';';DHHF_'`$'3G$@ABB 4rL   c                 |    | j                   j                  | j                  d      }|t        t	        |            S y)zFetches death date from Redis.r  Nr  )r   death_timestamps     rJ   
death_datezBaseWorker.death_date  s9     //..txxA&GO455 'rL   c                 @   | j                   rb| j                  rV| j                  j                  r$| j                  j                  j                         s| j                  j	                  d       | j                          t        j
                  | j                         y)a\  
        Runs periodic maintenance tasks, these include:
        1. Check if scheduler should be started. This check should not be run
           on first run since worker.work() already calls
           `scheduler.enqueue_scheduled_jobs()` on startup.
        2. Cleaning registries

        No need to try to start scheduler on first run
        T)
auto_startr   N)r   r   _processis_aliver  r3   r(   r   r   s    rJ   rE  z BaseWorker.run_maintenance_tasks  sh     ~~t~~'>'>dnnF]F]FfFfFh,,,=$//:rL   excr   r   r   r   c                     t        |t        j                  j                        r3| j                  j                  d|d       t        j                  d       y| j                  j                  d|z          )a>  
        This exception handler allows the pubsub_thread to continue & retry to
        connect after a connection problem the same way the main worker loop
        indefinitely retries.
        redis-py internal mechanism will restore the channels subscriptions
        once the connection is re-established.
        ACould not connect to Redis instance: %s Retrying in %d seconds...r\   r~   zPubsub thread exitin on %sN)	r   r   r   rj  r   rI  r  r  warning)r   r  r   r   s       rJ   _pubsub_exception_handlerz$BaseWorker._pubsub_exception_handler  s[     cE,,<<>HHNNS
 JJsOHH9C?@rL   c                 j    | j                   j                  d|       t        |      }t        | |       y)zHandle external commandszReceived message: %sN)r   r!  r   r   )r   r  payloads      rJ   handle_payloadzBaseWorker.handle_payload  s)    -w7(tW%rL   c                 L   | j                   j                  d| j                         | j                  j	                         | _         | j                  j
                  di | j                  | j                  i | j                  j                  dd| j                        | _	        y)z"Subscribe to this worker's channelzSubscribing to channel %sg?T)
sleep_timedaemonexception_handlerNrI   )
r   r
  r  r   r   r  r  run_in_threadr  r   r   s    rJ   r  zBaseWorker.subscribe  s    143K3KLoo,,.P!9!94;N;N OP![[6644;Y;Y 7 
rL   c                     |j                   rJ|j                   dkD  r;|j                   | j                  z
  }t        t        || j                              dz   S | j                  dz   S zGet's the TTL for the next heartbeat.

        Args:
            job (Job): The Job

        Returns:
            int: The heartbeat TTL.
        r   r  rN  r   r   minr   r   r3  remaining_execution_times      rJ   get_heartbeat_ttlzBaseWorker.get_heartbeat_ttl  Z     ;;3;;?'*{{T5R5R'R$s3T5Q5QRSVXXX//"44rL   c                 L   | j                   j                         5 }| j                  |      }t        j                  |||      | _        | j                  t        j                  |       |j                          ddd       | j
                  S # 1 sw Y   | j
                  S xY w)zThis method is called by the main `Worker` (not the horse) as it prepares for execution.
        Do not confuse this with worker.prepare_job_execution() which is called by the horse.
        rT  N)
r   rQ  r  r'   creater   r  rr   ry   rk  )r   r3  rQ  heartbeat_ttls       rJ   prepare_executionzBaseWorker.prepare_execution  s     __%%' 	8 2237M&--c=8TDNNN<,,xN@		
 ~~	
 ~~s   A BB#c                 :   | j                   r| j                  j                  d| j                         | j                   j	                          | j                   j                          | j                  j                          | j                  j                          yy)zUnsubscribe from pubsub channelzUnsubscribing from channel %sN)	r   r   r
  r  stopr  r   unsubscribecloser   s    rJ   r  zBaseWorker.unsubscribe  sm    HHMM94;S;ST##%##%KK##%KK rL   rN  )r)   r1   c                 D   d}dj                  | j                               }| j                  t        j                         | j                  d|z          | j                  j                  dt        |             d}t               }|}	 	 | j                          | j                  r| j                          ||t        ||      }| j                  j                  dt        |      |       | j                  j                  | j                   || j"                  | j$                  | j&                  | j(                        }||\  }}	| j+                  |	       | j                  j                  d	t-        |j.                        t        |	j0                               | j3                         |_        | j6                  rO| j                  j9                  d
t        |	j0                        t-        |j:                        |j.                         n:| j                  j9                  dt        |	j0                        |j.                         	 | j                          |S # t<        $ r? |:t               |z
  j?                         }
tA        jB                  ||
z
        }|dk  rY WY ntD        jF                  jH                  $ ra}| j                  jK                  d||       tM        jN                  |       || jP                  z  }t        || jR                        }Y d}~nd}~ww xY w)zDequeues a job while maintaining the TTL.

        Returns:
            result (Tuple[Job, Queue]): A tuple with the job and the queue.
        Nr   zListening on r  g      ?z+Dequeueing jobs on queues %s and timeout %s)r   r   r   r   )ra  zDequeued job %s from %s%s: %s (%s)z%s: %sr   r  )*r  r  r  rr   rz   r  r   r!  r-   r@   rG  r   rE  r  r   dequeue_anyr   r   r   r   r   rc  r,   rU  rc   r  r   r   r
  descriptionr$   total_secondsmathceilr   r   rj  rI  r  r  exponential_backoff_factormax_connection_wait_time)r   rN  r=  rO  r  connection_wait_time
idle_sinceidle_time_leftr3  r   idle_forconn_errs               rJ   rF  z'BaseWorker.dequeue_job_and_maintain_ttl&  s    $**,-|(()o.//v?"U
&)` 44..0&>+E!'>:GLeTZm]de))55((#"nn#(,(@(@ 6  %!'JC'''>HHNN#<d366lERWR\R\L]^/3/L/L/NC,//mU5::5FS__H]_b_e_efhejj0A366J 	 "  , #
 2AACH%)YY}x/G%HN%*##33 `WYacw 

/0$(G(GG$'*+?A^A^'_$`I s!   
F<I AL L<ALLc                    |xs | j                   dz   }||n| j                  }|j                  | j                  |       |j	                  | j                  dt        t                            | j                  j                  d|       y)a6  Specifies a new worker timeout, typically by extending the
        expiration time of the worker, effectively making this a "heartbeat"
        to not expire the worker until the timeout passes.

        The next heartbeat should come before this time, or the worker will
        die (at least from the monitoring dashboards).

        If no timeout is given, the worker_ttl will be used to update
        the expiration time of the worker.

        Args:
            timeout (Optional[int]): Timeout
            pipeline (Optional[Redis]): A Redis pipeline
        r  Nr   zOSent heartbeat to prevent worker timeout. Next one should arrive in %s seconds.)	r   r   r  r   rw  rA   r@   r   r!  )r   rN  rQ  r   s       rJ   rG  zBaseWorker.heartbeatf  sn     1T__r1;C;OxUYUdUd
$((G,"2Ice4DEhjqrrL   c                     | j                   s=| j                  r| j                          | j                          | j	                          y y rH   )is_horser   r,  r  r  r   s    rJ   rK  zBaseWorker.teardown{  s9    }}~~##%!	 rL   c                 `   | j                   j                  r| j                   j                  j                  rh	 t        j                  | j                   j                  j                  t
        j                         | j                   j                  j                          yyy# t        $ r Y 2w xY w)zEnsure scheduler process is stopped
        Will send the kill signal to scheduler process,
        if there's an OSError, just passes and `join()`'s the scheduler process,
        waiting for the process to finish.
        N)	r   r  r   r   killrW   r   OSErrorr  r   s    rJ   r,  zBaseWorker.stop_scheduler  s     >>""t~~'>'>'B'B//33V^^D NN##((* (C"  s   AB! !	B-,B-c                 ^    ||n| j                   }|j                  | j                  dd       y)zUsed to keep the worker stats up to date in Redis.
        Increments the failed job count.

        Args:
            pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None.
        Nr   r   r   hincrbyr   r   rQ  r   s      rJ   rl  z%BaseWorker.increment_failed_job_count  s+     "*!5X4??
488%7;rL   c                 ^    ||n| j                   }|j                  | j                  dd       y)zUsed to keep the worker stats up to date in Redis.
        Increments the successful job count.

        Args:
            pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None.
        Nr   r   r  r  s      rJ   increment_successful_job_countz)BaseWorker.increment_successful_job_count  s+     "*!5X4??
488%;Q?rL   job_execution_timec                 Z    |j                  | j                  d|j                                y)a,  Used to keep the worker stats up to date in Redis.
        Increments the time the worker has been workig for (in seconds).

        Args:
            job_execution_time (timedelta): A timedelta object.
            pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None.
        r   N)hincrbyfloatr   r  )r   r  rQ  s      rJ   ro  z'BaseWorker.increment_total_working_time  s%     	dhh(<>P>^>^>`arL   c                 :   | j                   j                  d|j                         dj                  t	        j
                  |       }	 |j                  |j                  |j                  d}|j                  }|j                  |j                  |j                  d       | j                   j                  d|j                  |||       | j                  D ]/  }| j                   j                  d|        ||g| }|d
}|r/ y	 y	# t        $ r i }d}Y w xY w)a  Walks the exception handler stack to delegate exception handling.
        If the job cannot be deserialized, it will raise when func_name or
        the other properties are accessed, which will stop exceptions from
        being properly logged, so we guard against it here.
        zHandling exception for %s. )func	argumentskwargsz<DeserializationError>)r   r   z2[Job %s]: exception raised while executing (%s)
%s)extrazInvoking exception handler %sNT)r   r!  rU  r  	tracebackformat_exception	func_nameargsr  r%   r   rV  rI  r   )r   r3  rA  rs  r  r  r   fallthroughs           rJ   handle_exceptionzBaseWorker.handle_exception  s    	3SVV<WWY77BC
	1 ]]SZZXEI 	szzSVV<= 	A3669V`hm 	 	
 )) 
	GHHNN:GD!#11K ""
	 $ 	1E0I	1s   	0D
 
DDc                 :    | j                   j                  |       y)z7Pushes an exception handler onto the exc handler stack.N)r   append)r   handler_funcs     rJ   r   zBaseWorker.push_exc_handler  s    !!,/rL   sigc                     t               rH   r.  )r   r  s     rJ   
kill_horsezBaseWorker.kill_horse  r0  rL   )NNN)NNNNN)NN)r   r   rF   r   r3  r)   r   r1   )r3  r)   rQ  r   )ra  r1   )Nr  rH   r3  r)   )jrO   rP   rQ   r   r   REDIS_WORKER_KEYSr   r9   r   r1   r   r)   r   log_result_lifespanr   r  r  r"   r!   r   r   r   r   r   rL  r
   r   classmethodr   r   r   r   r   r   propertyr   r   r  r   r3   r  r   r  r  r   r  r  r(  r%  r   r#  r2  r6  r   r    rj   rn   rP  r\  r$  rc  ru  r   rx  rW  r}  r  r  r  r*  r  r   rC  r  r  r  rB  rD  r  r+  r  r  rE  rr  r  r  r  r  r'   r  r  r   rF  rG  rK  r,  rl  r  r   ro  r  r   r   rW   rb   r  rI   rL   rJ   r|   r|   u   s   $0!,>>0KI !$#
 #-(,$E,0$(+//3$( ?27!%`d%v6 smv6
 W%v6 "v6 %SMv6 SMv6 DK(v6 d7m,v6 "v6 ,0v6  !v6$ $,Hc3_5UW[5[,\#]%v6p 
 ,0/3-- - DK(	-
 d7m,- 
,	- -^  )-+//3#' W%  DK(  d7m,	 
    
h   4 
j(7"3 
j8GCT 
j`deh`i 
j 
j 
Ux0 
U@Q 
U]` 
U 
UDL   , , , )C ) )%)j5T#Y 54DI 4 > > 3 3 < < <"" $ $Xi5H $9
" #60"&'+$,;,C,CX$X$ X$ 	X$
 X$ 3-X$  }X$ X$ *X$ 
X$t"[25n	XU 	XV^_iVj 	X=# =R\I] =8J+? 8TW= 	N% 	N	2s 	2hz.B 	2
3  
 Z,E #60"'"' "' 	"'
 "'H<	   $60	JJ J 	J0)$ )*9l C C 6 6; Y  Ym rv &&
5U 5s 5	U 	y 	  FJ>}>5=c]>	%'	(>@s# s*AU s*+<8J3G <@x
7K @by bT^ b!F0 07 $fnn $rL   r|   c                   @   e Zd Zed        Zefdej                  fdZde	e
e   e
e   e
d   f   fdZdede
e   fd	Zd#dZd
ddefdZd#dZd#dZd$dZd#dZd Zd%d
ddeddfdZd
ddddedefdZd
ddddefdZd
ddddefdZd
ddddefdZd Zd  Zd! Z d" Z!y)&r   c                     | j                   S )z<Returns whether or not this is the worker or the work horse.)r   r   s    rJ   r  zWorker.is_horse  s     ~~rL   r  c                 X   	 t        j                  t        j                  | j                        |       | j                  j                  d| j                         y# t        $ rD}|j                  t        j                  k(  r| j                  j                  d       n Y d}~yd}~ww xY w)zKill the horse but catch "No such process" error has the horse could already be dead.

        Args:
            sig (signal.Signals, optional): _description_. Defaults to SIGKILL.
        zKilled horse pid %szHorse already deadN)
r   killpggetpgidr  r   r
  r  errnoESRCHr!  )r   r  es      rJ   r  zWorker.kill_horse  sr    	IIbjj0#6HHMM/@ 	ww%++%34 5	s   AA 	B)%:B$$B)rF   r   c                     dx}x}}t        j                  t              5  t        j                  | j
                  d      \  }}}ddd       |||fS # 1 sw Y   xY w)zWaits for the horse process to complete.
        Uses `0` as argument as to include "any child in the process group of the current process".
        Nr   )
contextlibr   ChildProcessErrorr   wait4r  )r   r   statrusages       rJ   wait_for_horsezWorker.wait_for_horse  s_     #""dV  !23 	< " ;Cv	<D&  	< 	<s   %AArg   r'  c                 p   t               | j                  z
  t        d      k  r| j                  j	                  d       y| j                  j                  d       | j                  rF| j                  j	                  d| j                         | j                          | j                          t               )zTerminates the application (cold shutdown).

        Args:
            signum (Any): Signum
            frame (Any): Frame

        Raises:
            SystemExit: SystemExit
        r   r   z=Shutdown signal ignored, received twice in less than 1 secondNzCold shut downzTaking down horse %s with me)
r@   r   r   r   r!  r  r  r  r  rJ  r&  s      rJ   r#  zWorker.request_force_stop  s     ED111Yq5IIHHNNZ[)* >>HHNN94>>JOO!lrL   r3  r)   r   r1   c                    t        j                         }| j                  t         j                  d<   |j                  t         j                  d<   |dk(  r<t        j
                          | j                  ||       t        j                  d       y|| _        | j                  dj                  |t        j                                      y)zSpawns a work horse to perform the actual work and passes it a job.
        This is where the `fork()` actually happens.

        Args:
            job (Job): The Job that will be ran
            queue (Queue): The queue
        RQ_WORKER_ID	RQ_JOB_IDr   zForked {0} at {1}N)r   forkrc   environrU  setpgrpmain_work_horse_exitr   r  r  r  )r   r3  r   	child_pids       rJ   fork_work_horsezWorker.fork_work_horse  s     GGI	%)YY

>""%&&

;>JJL  e,HHQK'DOMM-44Y		LMrL   c                     |j                   rJ|j                   dkD  r;|j                   | j                  z
  }t        t        || j                              dz   S | j                  dz   S r  r  r  s      rJ   r  zWorker.get_heartbeat_ttl%  r  rL   c                    dx}x}}t               |_        	 	 | j                  | j                  t              5  | j                         \  }}}ddd       	 | j                  d       d| _        |t"        j$                  k(  ry|j'                         }|y| j(                  |j*                  k(  rW| j,                  j/                  d       |j0                  r|j3                  | j                         | j5                  ||d       y|t6        j8                  t6        j:                  fvr|j<                  st               |_        |r.t#        j>                  |      rdt#        j@                  |       d	nd
}d| | d}	| j,                  j/                  d|	       | jC                  ||||       | j5                  |||	       yy# 1 sw Y   wxY w# t        $ r | j                  t               |j                  z
  j                                |j                  dk7  r]| j                  |j                  dz   kD  rA| j                  | j                  dz          | j                          | j                          Y "| j                  |       Y nCt        $ r8}|j                  t        j                  k7  r | j                          Y d}~nd}~ww xY w)zThe worker will monitor the work horse and make sure that it
        either executes successfully or the status of the job is set to
        failed

        Args:
            job (Job): _description_
            queue (Queue): _description_
        Nr  r   z4Job stopped by user, moving job to FailedJobRegistryz+Job stopped by user, work-horse terminated.r   rs  z	 (signal )r  z5Work-horse terminated unexpectedly; waitpid returned z; z$Moving job to FailedJobRegistry (%s))"r@   rm  r   r   r7   r  rx  r  rN  r   rG  r  maintain_heartbeatsr  r  EINTRr   r   EX_OK
get_statusr   rU  r   r  stopped_callbackexecute_stopped_callbackru  r*   FINISHEDrh  rn  WIFSIGNALEDWTERMSIGhandle_work_horse_killed)
r   r3  r   retpidret_valr  r  
job_status
signal_msgrs  s
             rJ   monitor_work_horsezWorker.monitor_work_horse4  sx    %)((6!--d.J.JLhi D.2.A.A.C+FGVD6 	))!,bhh^^%
!!SVV+HHST##,,T-E-EF##CuAn#o	 2 2I4D4DEE<<"u AHBNN[bLc9R[[%9$:!<ikJPQXPYZdYeeghJHHCZP))#vwG##Cu#L FYD D 0 . 11353>>3I2X2X2Z[ ;;"$)F)F#++XZJZ)[NN4#?#?"#DEOO%'')((- 
! 77ekk)  
!' s;    G GG GG B&K8KK.KKc                     | j                  |       | j                  ||       | j                  ||       | j                  t        j
                         y)zSpawns a work horse to perform the actual work and passes it a job.
        The worker will wait for the work horse and make sure it executes
        within the given timeout bounds, or will end the work horse with
        SIGALRM.
        N)r  r$  r8  r  rr   rz   r5  s      rJ   r6  zWorker.execute_joby  sD     	s#S%(U+|(()rL   c                    | j                   j                         5 }| j                  | j                  dz   |       t	        | j                  |            }| j                  j                  |j                  ||       |j                  t               ||d       |j                         }|d   dk(  r%| j                   j                  |j                         ddd       y# 1 sw Y   yxY w)z:Updates worker, execution and job's last heartbeat fields.r  rT  T)rQ  xx   r   N)r   rQ  rG  r   r   r  r   r[  r@   rk  rZ  r   )r   r3  rQ  ttlresultss        rJ   r*  zWorker.maintain_heartbeats  s    __%%' 	08NN477"<xNPd,,S12C NN$$S%=%=sX$VMM#%xDMA&&(G qzQ&&sww/+	0 	0 	0s   B=C!!C*c                     t        j                          | j                          d| _        t        | _        	 | j                  ||       t        j                  d       y#  t        j                  d       Y /xY w)a)  This is the entry point of the newly spawned work horse.
        After fork()'ing, always assure we are generating random sequences
        that are different from the worker.

        os._exit() is the way to exit from childs after a fork(), in
        contrast to the regular sys.exit()
        Tr   r   N)	rm   seedsetup_work_horse_signalsr   r   r   perform_jobr   r"  r5  s      rJ   r!  zWorker.main_work_horse  s[     	%%'	S%( 		HHQKs   A   A9c                     t        j                   t         j                  t         j                         t        j                   t         j                  t         j                         y)ab  Setup signal handing for the newly spawned work horse

        Always ignore Ctrl+C in the work horse, as it might abort the
        currently running job.

        The main worker catches the Ctrl+C and requests graceful shutdown
        after the current work is done.  When cold shutdown is requested, it
        kills the current job anyway.
        N)rW   r"  SIG_IGNr   SIG_DFLr   s    rJ   rA  zWorker.setup_work_horse_signals  s0     	fmmV^^4fnnfnn5rL   remove_from_intermediate_queueNc                 L   | j                   j                  d|j                         | j                  j	                         5 }| j                  |j                  |       | j                  d|       | j                  |      }| j                  ||       |j                  t               ||       |j                  | j                  |       |rKddlm}  ||j                  | j                        }|j                  |j                   d|j                         |j#                          | j                   j                  d       ddd       d	}| j%                  |j'                  |j(                  |j                  t+        j*                                      y# 1 sw Y   TxY w)
zWPerforms misc bookkeeping like updating states prior to
        job execution.
        z$Preparing for execution of Job ID %srT  r   r   r0   r  zJob preparation finished.Nz!Processing {0} from {1} since {2})r   r!  rU  r   rQ  rW  rx  r  rG  r@   prepare_for_executionrc   r   r1   rV  lremintermediate_queue_keyrk  r  r  r  r  )r   r3  rF  rQ  r  r1   r   r  s           rJ   prepare_job_executionzWorker.prepare_job_execution  s6    	=svvF__%%' 	88##CFFX#>--a(-C 2237MNN=8N<MM#%MB%%dii(%C-(cjjT__Ee::AsvvFHHNN67	8" 2cjj

DIIKHI%	8 	8s   DFF#rp  r[  c           	         | j                   j                  d|j                         |j                  ro|j                  |j                  k\  rV| j                   j                  d|j                  |j                         d|j                   d}| j                  |||       yt        j                  |j                  xs d|j                        }| j                  j                         5 }| j                  |       | j                  |j                  |j                  z
  |       |dkD  rt!        j"                  t$        j&                        t)        |	      z   }|j+                  t,        j.                  |       |j1                  |||       | j                   j                  d
|j                  ||j                  |j                  xs dz
         nU| j                   j                  d|j                  |j                  |j                  xs dz
         |j3                  ||       | j5                  ||       |j7                          | j                   j                  d|j                         ddd       y# 1 sw Y   yxY w)a?  Handles the retry of certain job.
        It will remove the job from the `StartedJobRegistry` and requeue or reschedule the job.

        Args:
            job (Job): The job that will be retried.
            queue (Queue): The queue
            started_job_registry (StartedJobRegistry): The started registry
        zHandling retry of job %sz/Job %s has exceeded maximum retry attempts (%d)zJob failed after z retry attemptsr(  Nr   rT  r   z8Job %s: scheduled for retry at %s, %s attempts remainingz1Job %s: enqueued for retry, %s attempts remaining)r   rQ  z!Finished handling retry of job %s)r   r!  rU  number_of_retriesr  r  ru  r+   get_interval	intervalsr   rQ  rl  ro  rn  rm  r   r@   r   utcr   rf  r*   	SCHEDULEDschedule_job_handle_retry_resultr\  rk  )	r   r3  r   rp  r[  rs  retry_intervalrQ  scheduled_times	            rJ   handle_job_retryzWorker.handle_job_retry  s    	1366:   S%:%:eii%GHHNPSPVPVX]XaXab,UYYKGJ##Cu#L ++C,A,A,FQX__%%' 	H8++X+>--cllS^^.KXV!!)hll!;iP^>_!_y22XF""3"JNFF"II!6!6!;!<	 GFFII!6!6!;!<
 ((ux(H""3":HHNN>G7	H 	H 	Hs   *FI;;Jc                    | j                   j                  d|j                         | j                  j	                         5 }	 	 |j                  |j                         |j                  ||       |j                  s|j                          | j                  |       | j                  |j                  |j                  z
  |       |j                  | j                        }|dk7  r9| j                   j                  d|j                         |j!                  ||       |j#                  ||d       | j                   j                  d|j                         | j%                  ||       |j'                          |j                  sJ |j                  sJ |j                  |j                  z
  }| j                   j)                  d|j*                  || j,                         | j                   j                  d	|j                         	 d
d
d
       y
# t.        j0                  j2                  $ r Y w xY w# 1 sw Y   y
xY w)a  Handles the successful execution of certain job.
        It will remove the job from the `StartedJobRegistry`, adding it to the `SuccessfulJobRegistry`,
        and run a few maintenance tasks including:
            - Resting the current job ID
            - Enqueue dependents
            - Incrementing the job count and working time
            - Handling of the job successful execution

        Runs within a loop with the `watch` method so that protects interactions
        with dependents keys.

        Args:
            job (Job): The job that was successful.
            queue (Queue): The queue
            started_job_registry (StartedJobRegistry): The started registry
        z'Handling successful execution of job %srT  r   z+Saving job %s's successful execution resultF)rQ  remove_from_queuez'Removing job %s from StartedJobRegistryz1Successfully completed %s job in %ss on worker %sz0Finished handling successful execution of job %sN)r   r!  rU  r   rQ  watchdependents_keyrq  explicit_transactionmultir  ro  rn  rm  get_result_ttlr   _handle_successr  r\  rk  r
  r  rc   r   r   
WatchError)r   r3  r   r[  rQ  
result_ttl
time_takens          rJ   handle_job_successzWorker.handle_job_success  s   " 	@#&&I__%%' )	8' NN3#5#56,,S8,D#88 !(777J55cllS^^6SU]^!$!3!3D4K4K!LJ!Q'TVYV\V\]++J+JKK
XQVKWHHNN#LcffU**3*B
 $$&>>)><<'<!$!>JHHMMKS__^hjnjsjs HHNN#UWZW]W]^O)	 )	P ''22 Q)	 )	s+   H:GHH73H:6H77H::Ir  c                 V    t               |_        |j                  t               |       y)z(Called after job has finished execution.N)r@   rn  rG  )r   r3  r   r  s       rJ   handle_execution_endedzWorker.handle_execution_endedF  s    uce]+rL   c                    |j                   }| j                  j                  d       	 t        | j                        dk(  }| j                  ||       t               |_        |j                  xs | j                  j                  }| j                  |t        |j                        5  | j                  j                  d|j                         |j                         }| j                  j                  d|j                         ddd       | j                  |||j                          |_        t%        |t&              r<| j                  j                  d|j                         | j)                  ||||       y	|j+                  | j                  |       | j-                  |||
       	 | j                  jG                  dtI        |jJ                        tM        d      |j                         |.| j                  j                  dtO        tQ        |                   | jR                  ry|jU                  | jV                        }	|	dk(  r| j                  jG                  d       y	|	dkD  r| j                  jG                  d|	       y	| j                  jG                  d       y	# 1 sw Y   xY w#  | j                  j                  d|j                         t.        j0                  |_        | j                  |||j4                         t7        j8                         }dj;                  t=        j>                  |       }	  |j@                  | j                  g|  n<#  t7        j8                         }dj;                  t=        j>                  |       }Y nxY w | jB                  |g|  | jE                  ||||       Y yxY w)zPerforms the actual work of a job.  Will/should only be called
        inside the work horse's process.

        Args:
            job (Job): The Job
            queue (Queue): The Queue

        Returns:
            bool: True after finished.
        zStarted Job Registry set.r   )r   zPerforming Job %s ...zFinished performing Job %sNzJob %s returns a Retry object)r3  r   rp  r[  T)r3  r   r[  zJob %s raised an exception.r  )r3  rs  r   r[  Fr  zJob OKz
Result: %rr   zResult discarded immediatelyzResult is kept for %s secondsz6Result will never expire, clean up result key manually),r[  r   r!  r   r   rK  r@   rm  rN  r   DEFAULT_TIMEOUTr   r8   rU  performrd  success_callback_timeout_resultr   r+   rV  execute_success_callbackrb  r*   rh  _statusfailure_callback_timeoutr`   rA  r  r  r  execute_failure_callbackr  ru  r
  r-   rV  r,   r/   r   r  r]  r   )
r   r3  r   r[  rF  rN  return_valuerA  rs  r`  s
             rJ   rB  zWorker.perform_jobK  s     %99230	-0-=-B*&&s,JK UCNkkET%5%5%E%EG))'3Fsvv)V E6?"{{};SVVDE
 ''UC4P4PQ 'CK,.>G%%5Sg &  ,,T-E-E|T''CuSg'h2 	mU3::%6XO#HHNN<L0A)BC##++D,C,CDJQ<=  a=zJ  VWsE E*	HHNN8#&&A#**CK''UC4P4PQ||~H!;!;X!FGJL,,,T-E-EQQL<<>WWY%?%?%JK
 "D!!#11##JeRf $  sE   A?J5 (AJ(A7J5 =0J5 (J2-J5 5BO	M#"O	#7N-O	c                 6    | j                   j                         S )z?Pops the latest exception handler off of the exc handler stack.)r   popr   s    rJ   pop_exc_handlerzWorker.pop_exc_handler  s    !!%%''rL   c                 F    | j                   y | j                  ||||       y rH   )r   )r   r3  r4  r5  r  s        rJ   r3  zWorker.handle_work_horse_killed  s%    **2''VWfErL   c                 v    t        || j                        st        d      | j                  |j                  k(  S )z;Equality does not take the database/connection into accountz2Cannot compare workers to other types (of workers))r   	__class__r  rc   )r   others     rJ   __eq__zWorker.__eq__  s/    %0PQQyyEJJ&&rL   c                 ,    t        | j                        S )z;The hash does not take the database/connection into account)hashrc   r   s    rJ   __hash__zWorker.__hash__  s    DIIrL   r  r  )F)"rO   rP   rQ   r
  r  r   rW   rb   r  r   r   r   r  r   r#  r$  r  r8  r6  r*  r!  rA  rL  rK  r+   r2   rV  rb  rd  rB  rq  r3  rv  ry  rI   rL   rJ   r   r     sN     07 fnn  !hsmXc]H_D]&] ^ ! Xi5H 6N&5U 5s 5CMJ	*02$6J JPT Jae J21HE 1H' 1H% 1H_q 1Hf<e <G <Se <|,% , ,PS ,
Mu MW M M^(F'rL   r   c                   $    e Zd ZddZdddefdZy)SimpleWorkerr3  r)   c                     | j                  |       | j                  ||       | j                  t        j                         y)z1Execute job in same thread/process, do not fork()N)r  rB  r  rr   rz   r5  s      rJ   r6  zSimpleWorker.execute_job  s2    s#e$|(()rL   rF   c                 l    |j                   dk(  rt        S t        |j                   xs t              dz   S )z-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59.
        We should just stick to DEFAULT_WORKER_TTL.

        Args:
            job (Job): The Job

        Returns:
            ttl (int): TTL
        r'  r  )rN  r#   r   )r   r3  s     rJ   r  zSimpleWorker.get_heartbeat_ttl  s0     ;;"%%9'9;b@@rL   Nr  )rO   rP   rQ   r6  r   r  rI   rL   rJ   r{  r{    s    *AU As ArL   r{  c                   4    e Zd ZdZdZg dZd Zd Zd Zd Z	y)	HerokuWorkerz
    Modified version of rq worker which:
    * stops work horses getting killed with SIGTERM
    * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn
    causes the horse to crash `imminent_shutdown_delay` seconds later
       )f_codef_lastif_linenof_localsf_tracec                 (   t        j                   t         j                  | j                         t        j                   t         j                  t         j                         t        j                   t         j
                  t         j                         y)z>Modified to ignore SIGINT and SIGTERM and only handle SIGRTMINN)rW   SIGRTMINrequest_stop_sigrtminr"  rD  r   r   s    rJ   rA  z%HerokuWorker.setup_work_horse_signals  sF    foot'A'ABfmmV^^4fnnfnn5rL   c                     | j                   dk7  rG| j                  j                  d| j                         | j	                  t
        j                         y| j                  j                  d       y)z"If horse is alive send it SIGRTMINr   zBWorker %s: warm shut down requested, sending horse SIGRTMIN signal)r  z(Warm shut down requested, no horse foundN)r  r   r
  r   r  rW   r  r  r   s    rJ   r$  z)HerokuWorker.handle_warm_shutdown_request  sM    >>QHHMM^`d`h`hiOOO0HHGHrL   c                    | j                   dk(  r.| j                  j                  d       | j                  ||       y | j                  j                  d| j                          t	        j                  t        j
                  | j                         t	        j                  t        j                  | j                         t	        j                  | j                          y )Nr   z@Imminent shutdown, raising ShutDownImminentException immediatelyzBImminent shutdown, raising ShutDownImminentException in %d seconds)imminent_shutdown_delayr   r  request_force_stop_sigrtminrW   r  SIGALRMalarmr&  s      rJ   r  z"HerokuWorker.request_stop_sigrtmin  s    ''1,HH_`,,VU;HHTVZVrVr MM&//4+K+KLMM&..$*J*JKLL556rL   c                     t        fd| j                  D              }| j                  j                  d       t	        dt        |      z  |      )Nc              3   :   K   | ]  }|t        |      f  y wrH   )rV   )rX   attrr'  s     rJ   rZ   z;HerokuWorker.request_force_stop_sigrtmin.<locals>.<genexpr>  s     STT75$/0Ss   z2raising ShutDownImminentException to cancel job...zshut down imminent (signal: %s))dictframe_propertiesr   r  r&   rh   )r   rg   r'  r
  s     ` rJ   r  z(HerokuWorker.request_force_stop_sigrtmin  sF    ST=R=RSSMN'(IKX^L_(_aeffrL   N)
rO   rP   rQ   __doc__r  r  rA  r$  r  r  rI   rL   rJ   r  r    s+      O6I
7grL   r  c                       e Zd ZdZd Zy)RoundRobinWorkerze
    Modified version of Worker that dequeues jobs from the queues using a round-robin strategy.
    c                     | j                   j                  |      }| j                   |dz   d  | j                   d |dz    z   | _         y )Nr   )r   r_  r`  s      rJ   rc  zRoundRobinWorker.reorder_queues  sK    ""((9#33C!GI>AUAUV_X[^_X_A``rL   NrO   rP   rQ   r  rc  rI   rL   rJ   r  r    s    arL   r  c                       e Zd ZdZd Zy)RandomWorkerz`
    Modified version of Worker that dequeues jobs from the queues using a random strategy.
    c                 .    t        | j                         y rH   )r   r   )r   ra  s     rJ   rc  zRandomWorker.reorder_queues   s    $$%rL   Nr  rI   rL   rJ   r  r    s    &rL   r  )wr  r  loggingr  r   rm   rW   r   r`   r  r  r   r   r   r   enumr   r   typesr   typingr	   r
   r   r   r   r   r   uuidr   resourcer   ImportErrorr   r   redis.clientr   r   r   r   r   r   redis.exceptionsr  r   commandr   r   r   defaultsr   r   r    r!   r"   r#   r   r$   r%   r&   
executionsr'   groupr(   r3  r)   r*   r+   logutilsr,   r-   r.   r/   r   r1   registryr2   r3   r   r4   serializersr5   
suspensionr6   timeoutsr7   r8   r9   utilsr:   r;   r<   r=   r>   r?   r@   rA   rB   r   rC   rD   rK   r   	getLoggerr   rr  rN   r  dirrd   rh   rj   rr   r|   r   r{  r  r  r  rI   rL   rJ   <module>r     s       	    
    2 2    N N N * AA*    ! K K  X W !  & & < <  : " + $ _ _
 
 
 8 
		;	'	I 	  7:6{ 	

c4 3 `$ `$F#TZ TnA6 A.(g6 (gVav a&6 &I7    *))*V  3 4 s6   (F7 ?G "G 7G ?G GGG%$G%