
    s,g&                        d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZmZmZmZmZmZmZ d dlmZ d dlmZmZ d dlmZ dd	lmZ dd
lmZmZ ddlmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z'm(Z( erd dlm)Z)  G d de      Z* G d d      Z+e(eeddd fde,dee,   de-dee'   ded   dee   de.de,d e/fd!Z0y)"    N)Enum)Process)TYPE_CHECKINGDictList
NamedTupleOptionalTypeUnion)uuid4)ConnectionPoolRedis)DefaultSerializer   )parse_connection)DEFAULT_LOGGING_DATE_FORMATDEFAULT_LOGGING_FORMAT)Job)setup_loghandlers)Queue)parse_names)
BaseWorkerWorker)
Serializerc                   ,    e Zd ZU eed<   eed<   eed<   y)
WorkerDatanamepidprocessN)__name__
__module____qualname__str__annotations__intr        h/var/www/trellinator.diamondhoofcare.com/public_html/venv/lib/python3.12/site-packages/rq/worker_pool.pyr   r      s    
I	Hr'   r   c                   z   e Zd Z G d de      Zdeeefdee	e
ef      dededee   ded	   d
ee   fdZedee   fd       Zedefd       Zd Zd#dZdefdZd ZdefdZd$deddfdZ	 	 d%de
dedede
def
dZ	 	 	 	 d&dee   dedede
fdZ d'dedede
fdZ!e"jF                  fdefd Z$d! Z%d(dede
fd"Z&y))
WorkerPoolc                       e Zd ZdZdZdZy)WorkerPool.Statusr         N)r    r!   r"   IDLESTARTEDSTOPPEDr&   r'   r(   Statusr,   #   s    r'   r2   r   queues
connectionnum_workersworker_class
serializerr   	job_classc                    || _         g | _        t        dt        t        t
               t        j                  t
              | _        t        |      | _
        || _        t               j                  | _        d| _        d| _        | j"                  j$                  | _        || _        || _        || _        i | _        t1        |      \  | _        | _        | _        y )NINFOr   Tr   )r5   _workersr   r   r   r    logging	getLoggerlogr   _queue_namesr4   r   hexr   _burst_sleepr2   r/   statusr6   r7   r8   worker_dictr   _connection_class_pool_class_pool_kwargs)	selfr3   r4   r5   r6   r7   r8   argskwargss	            r(   __init__zWorkerPool.__init__(   s     !,&(&"=?U\de#*#4#4X#>'26':$	 #';;#3#3.:.8$- 35FVWaFbC 0$2Cr'   returnc                 j    | j                   D cg c]  }t        || j                         c}S c c}w )Returns a list of Queue objectsr4   )r@   r   r4   )rI   r   s     r(   r3   zWorkerPool.queuesF   s+     EIDUDUVDdt7VVVs   0c                 ,    t        | j                        S )rO   )lenrE   rI   s    r(   number_of_active_workersz#WorkerPool.number_of_active_workersK   s     4##$$r'   c                     t        j                   t         j                  | j                         t        j                   t         j                  | j                         y)zUInstalls signal handlers for handling SIGINT and SIGTERM
        gracefully.
        N)signalSIGINTrequest_stopSIGTERMrS   s    r(   _install_signal_handlersz#WorkerPool._install_signal_handlersP   s4     	fmmT%6%67fnnd&7&78r'   Nc                     | j                   j                  d       | j                  j                  | _        | j                          y)z8Toggle self._stop_requested that's checked on every loopz)Received SIGINT/SIGTERM, shutting down...N)r?   infor2   r1   rD   stop_workers)rI   signumframes      r(   rX   zWorkerPool.request_stopW   s0    ABkk))r'   c                 @    | j                          | j                  dk(  S )z)Returns True if all workers have stopped.r   )reap_workersrT   rS   s    r(   all_workers_have_stoppedz#WorkerPool.all_workers_have_stopped]   s    ,,11r'   c                 ~   | j                   j                  d       t        | j                  j	                               }|D ]z  }|j
                  j                  d       |j
                  j                         r2| j                   j                  d|j                  |j                         j| j                  |       | y)z%Removes dead workers from worker_dictzReaping dead workersg?zWorker %s with pid %d is aliveN)r?   debuglistrE   valuesr   joinis_aliver   r   handle_dead_worker)rI   worker_datasdatas      r(   ra   zWorkerPool.reap_workersc   s    -.D,,3356  	DLLc"||$$&?DHHU''-	r'   worker_datac                    | j                   j                  d|j                  |j                         t	        j
                  t              5  | j                  j                  |j                         ddd       y# 1 sw Y   yxY w)z&
        Handle a dead worker
        zWorker %s with pid %d is deadN)	r?   r\   r   r   
contextlibsuppressKeyErrorrE   pop)rI   rl   s     r(   ri   zWorkerPool.handle_dead_worker}   s`     	5{7G7GY  * 	3  !1!12	3 	3 	3s   &A::Brespawnc                 `   | j                   j                  d       | j                          |r| j                  | j                  j
                  k7  r]| j                  t        | j                        z
  }|r8t        |      D ])  }| j                  | j                  | j                         + yyyy)z7
        Check whether workers are still alive
        zChecking worker processes)burstrC   N)r?   rd   ra   rD   r2   r1   r5   rR   rE   rangestart_workerrB   rC   )rI   rr   deltais       r(   check_workerszWorkerPool.check_workers   s     	23 t{{dkk&9&99$$s4+;+;'<<Eu MA%%DKK%LM  :7r'   r   rt   rC   logging_levelc                     t        t        || j                  | j                  | j                  | j
                  f|||| j                  | j                  | j                  dd| d| j                   d      S )zReturns the worker process)rC   rt   rz   r6   r8   r7   zWorker z (WorkerPool ))targetrJ   rK   r   )
r   
run_workerr@   rF   rG   rH   r6   r8   r7   r   )rI   r   rt   rC   rz   s        r(   get_worker_processzWorkerPool.get_worker_process   sx     ))4+A+A4CSCSUYUfUfg !. $ 1 1!^^"oo 4&dii[:
 	
r'   countc                    t               j                  }| j                  ||||      }|j                          t	        ||j
                  |      }|| j                  |<   | j                  j                  d||j
                         y)z
        Starts a worker and adds the data to worker_datas.
        * sleep: waits for X seconds before creating worker, for testing purposes
        rt   rC   rz   )r   r   r   zSpawned worker: %s with PID %dN)	r   rA   r   startr   r   rE   r?   rd   )rI   r   rt   rC   rz   r   r   rl   s           r(   rv   zWorkerPool.start_worker   sm     w{{))$eFZg)h dWM!,7w{{Kr'   c                     | j                   j                  d| j                   d       t        | j                        D ]  }| j	                  |dz   |||        y)zx
        Run the workers
        * sleep: waits for X seconds before creating worker, only for testing purposes
        z	Spawning z workersr   r   N)r?   rd   r5   ru   rv   )rI   rt   rC   rz   rx   s        r(   start_workerszWorkerPool.start_workers   s[    
 	4#3#3"4H=>t''( 	^Aa!e5}]	^r'   c                 2   	 t        j                  |j                  |       | j                  j	                  d|j                         y# t
        $ rD}|j                  t        j                  k(  r| j                  j                  d       n Y d}~yd}~ww xY w)zm
        Send stop signal to worker and catch "No such process" error if the worker is already dead.
        z'Sent shutdown command to worker with %szHorse already deadN)	oskillr   r?   r\   OSErrorerrnoESRCHrd   )rI   rl   siges       r(   stop_workerzWorkerPool.stop_worker   si    	GGKOOS)HHMMC[__U 	ww%++%34 5	s   AA	 		B:BBc                     | j                   j                  dt        | j                               t	        | j                  j                               }|D ]  }| j                  |        y)zSend SIGINT to all workersz!Sending stop signal to %s workersN)r?   r\   rR   rE   re   rf   r   )rI   rj   rl   s      r(   r]   zWorkerPool.stop_workers   sV    93t?O?O;PQD,,3356' 	*K[)	*r'   c                     || _         | }t        |t        t        t               | j
                  j                  d| j                   dt        j                                | j                  j                  | _        | j                  | j                   |       | j                          	 | j                  | j                  j                  k(  r]| j!                         r| j
                  j                  d       y | j
                  j                  d       t#        j$                  d       | j'                  |       |r+| j(                  d	k(  r| j
                  j                  d       y t#        j$                  d       )
Nr;   zStarting worker pool z with pid %d...)rt   rz   zAll workers stopped, exiting...z"Waiting for workers to shutdown...r   )rr   r   )rB   r   r   r   r    r?   r\   r   r   getpidr2   r/   rD   r   rZ   r1   rb   timesleepry   rT   )rI   rt   rz   rr   s       r(   r   zWorkerPool.start   s   )-)DF\ckl-dii[H"))+Vkk&&MJ%%'{{dkk111002HHMM"CDHHMM"FGJJqM""7"3T::a?HHMM"CD

1 r'   )NN)T)r   r:   )NTr   r:   )Tr   r:   )Fr:   )'r    r!   r"   r   r2   r   r   r   r   r   r#   r   r   r%   r
   r   rL   propertyr3   rT   rZ   rX   boolrb   ra   r   ri   ry   floatr   r   r	   rv   r   rV   rW   r   r]   r   r&   r'   r(   r*   r*   "   s     )/):"cU3:&'c c 	c
 :&c &c 9c< WU W W %# % %92$ 243j 3MT MT M$ #

 
 	

 
 

0  $#L}L L 	L
 L$^4 ^ ^RU ^ 8>}} z *4  r'   r*   Tr:   worker_namequeue_namesconnection_pool_kwargsr6   r7   r   r8   rt   rz   rC   c                 2    |t        dd|i|      }|D cg c]  }t        ||       }} ||| |||      }|j                  j                  dt	        j
                                t        j                  |
       |j                  |d|	       y c c}w )	Nconnection_class)connection_poolrP   )r   r4   r7   r8   z#Starting worker started with PID %sT)rt   with_schedulerrz   r&   )	r   r   r?   r\   r   r   r   r   work)r   r   r   connection_pool_classr   r6   r7   r8   rt   rz   rC   r4   r   r3   workers                  r(   r~   r~      s     "&h8MhQghJ >IITeDZ0IFI&{zV`luvF
JJOO9299;GJJv
KKeDKN	 Js   B)1rn   r   r=   r   rV   r   enumr   multiprocessingr   typingr   r   r   r   r	   r
   r   uuidr   redisr   r   rq.serializersr   connectionsr   defaultsr   r   jobr   logutilsr   queuer   utilsr   r   r   r   r   r   r*   r#   dictr   r%   r~   r&   r'   r(   <module>r      s       	    # O O O  ' , ) I  '   &) N Nn &,%6OOcO
 !O z"O \"O CyO O O Or'   