U
    d8                     @   s2  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z ddlmZ dd	lmZmZ dd
lmZmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZ ddl m!Z! ddl"m#Z# ddl$m%Z% zddl&Z&W n e'k
r   dZ&Y nX dZ(dZ)dZ*dZ+G dd dZ,dS )a  WorkController can be used to instantiate in-process workers.

The command-line interface for the worker is in :mod:`celery.bin.worker`,
while the worker program is in :mod:`celery.apps.worker`.

The worker program is responsible for adding signal handlers,
setting up logging, etc.  This is a bare-bones worker without
global side-effects (i.e., except for the global state stored in
:mod:`celery.worker.state`).

The worker consists of several components, all managed by bootsteps
(mod:`celery.bootsteps`).
    N)datetime)	cpu_count)detect_environment)	bootsteps)concurrency)signals)RUN	TERMINATE)ImproperlyConfiguredTaskRevokedErrorWorkerTerminate)
EX_FAILUREcreate_pidlock)reload_from_cwd)mlevel)worker_logger)default_nodenameworker_direct)str_to_list)default_socket_timeout   state)WorkControllerg      @z
Trying to select queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.

If you want to automatically declare unknown queues you can
enable the `task_create_missing_queues` setting.
ze
Trying to deselect queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.
c                   @   sR  e Zd ZdZdZdZdZdZdZdZ	G dd de
jZdHddZdIddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd ZdJddZdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* ZdKd,d-ZdLd.d/Z dMd1d2Z!dNd3d4Z"dOd5d6Z#dPd7d8Z$d9d: Z%d;d< Z&d=d> Z'd?d@ Z(dAdB Z)e*dCdD Z+dQdFdGZ,dS )Rr   zUnmanaged worker instance.Nc                   @   s&   e Zd ZdZdZddddddd	hZd
S )zWorkController.BlueprintzWorker bootstep blueprint.ZWorkerzcelery.worker.components:Hubzcelery.worker.components:Poolzcelery.worker.components:Beatzcelery.worker.components:Timerz celery.worker.components:StateDBz!celery.worker.components:Consumerz'celery.worker.autoscale:WorkerComponentN)__name__
__module____qualname____doc__nameZdefault_steps r   r   8/tmp/pip-unpacked-wheel-mu1yl971/celery/worker/worker.py	BlueprintL   s   r!   c                 K   sh   |p| j | _ t|| _t | _| j j  | jf | | j	f | | j
f | | jf | jf | d S N)appr   hostnamer   utcnowstartup_timeloaderZinit_workeron_before_initsetup_defaultson_after_initsetup_instanceprepare_args)selfr#   r$   kwargsr   r   r    __init__Z   s    

zWorkController.__init__c                 K   s   || _ | || | t| | jsNzt | _W n tk
rL   d| _Y nX t| j| _|pb| j	| _
| j | _|d kr|  n|| _|| _tjj| d t| j| _g | _|   | j| jjd | j| j| jd| _| jj| f| d S )N   ZsenderZworker)stepson_starton_close
on_stopped)pidfilesetup_queuessetup_includesr   r   r   NotImplementedErrorr   loglevelon_consumer_readyready_callbackr#   Zconnection_for_read	_conninfoshould_use_eventloopuse_eventloopoptionsr   Zworker_initsend_concurrencyZget_implementationpool_clsr2   on_init_blueprintr!   r3   r4   r5   	blueprintapply)r-   queuesr<   r6   includer?   Zexclude_queuesr.   r   r   r    r+   e   s4    
zWorkController.setup_instancec                 C   s   d S r"   r   r-   r   r   r    rD      s    z WorkController.on_init_blueprintc                 K   s   d S r"   r   r-   r.   r   r   r    r(      s    zWorkController.on_before_initc                 K   s   d S r"   r   rJ   r   r   r    r*      s    zWorkController.on_after_initc                 C   s   | j rt| j | _d S r"   )r6   r   pidlockrI   r   r   r    r3      s    zWorkController.on_startc                 C   s   d S r"   r   )r-   consumerr   r   r    r;      s    z WorkController.on_consumer_readyc                 C   s   | j j  d S r"   )r#   r'   Zshutdown_workerrI   r   r   r    r4      s    zWorkController.on_closec                 C   s(   | j   | j  | jr$| j  d S r"   )ZtimerstoprL   shutdownrK   releaserI   r   r   r    r5      s    

zWorkController.on_stoppedc              
   C   s   t |}t |}z| jjj| W n6 tk
rZ } ztt 	||W 5 d }~X Y nX z| jjj
| W n6 tk
r } ztt 	||W 5 d }~X Y nX | jjjr| jjjt| j d S r"   )r   r#   ZamqprG   selectKeyErrorr
   SELECT_UNKNOWN_QUEUEstripformatZdeselectDESELECT_UNKNOWN_QUEUEconfr   Z
select_addr$   )r-   rH   excludeexcr   r   r    r7      s     
zWorkController.setup_queuesc                    sf   t  jjj}|r0|t |7 } fdd|D  | _dd  jj D }t t||B  jj_d S )Nc                    s   g | ]} j j|qS r   )r#   r'   Zimport_task_module.0mrI   r   r    
<listcomp>   s     z1WorkController.setup_includes.<locals>.<listcomp>c                 S   s   h | ]}|j jqS r   )	__class__r   )rZ   Ztaskr   r   r    	<setcomp>   s   z0WorkController.setup_includes.<locals>.<setcomp>)tupler#   rV   rH   Ztasksvaluesset)r-   Zincludesprevtask_modulesr   rI   r    r8      s    
zWorkController.setup_includesc                 K   s   |S r"   r   rJ   r   r   r    r,      s    zWorkController.prepare_argsc                 C   s   t jj| d d S )Nr1   )r   Zworker_shutdownrA   rI   r   r   r    _send_worker_shutdown   s    z$WorkController._send_worker_shutdownc              
   C   s   z| j |  W n tk
r,   |   Y n tk
rh } z tjd|dd | jtd W 5 d }~X Y nN t	k
r } z| j|j
d W 5 d }~X Y n  tk
r   | jtd Y nX d S )NzUnrecoverable error: %rT)exc_info)exitcode)rE   startr   	terminate	ExceptionloggercriticalrM   r   
SystemExitcodeKeyboardInterrupt)r-   rX   r   r   r    rg      s    zWorkController.startc                 C   s   | j j| d|fdd d S )Nregister_with_event_loopzhub.register)argsdescription)rE   Zsend_all)r-   Zhubr   r   r    ro      s      z'WorkController.register_with_event_loopc                 C   s   |  | j|S r"   )Z_quick_acquire_process_taskr-   reqr   r   r    _process_task_sem   s    z WorkController._process_task_semc                 C   sL   z| | j W n6 tk
rF   z|   W n tk
r@   Y nX Y nX dS )z2Process task by sending it to the pool of workers.N)Zexecute_using_poolpoolr   Z_quick_releaseAttributeErrorrs   r   r   r    rr      s    zWorkController._process_taskc                 C   s(   z| j   W n tk
r"   Y nX d S r"   )rL   closerw   rI   r   r   r    signal_consumer_close   s    z$WorkController.signal_consumer_closec                 C   s    t  dko| jjjjo| jj S )Ndefault)r   r=   	transportZ
implementsZasynchronousr#   Z
IS_WINDOWSrI   r   r   r    r>      s
    

z#WorkController.should_use_eventloopFc                 C   sF   |dk	r|| _ | jjtkr:|   |r.| jjr:| jdd |   dS )z'Graceful shutdown of the worker server.NTwarm)	rf   rE   r   r   ry   rv   signal_safe	_shutdownrd   )r-   in_sighandlerrf   r   r   r    rM      s    zWorkController.stopc                 C   s0   | j jtkr,|   |r | jjr,| jdd dS )z.Not so graceful shutdown of the worker server.Fr|   N)rE   r   r	   ry   rv   r~   r   )r-   r   r   r   r    rh      s    zWorkController.terminateTc              	   C   s>   | j d k	r:tt" | j j| | d | j   W 5 Q R X d S )N)rh   )rE   r   SHUTDOWN_SOCKET_TIMEOUTrM   join)r-   r}   r   r   r    r     s    

zWorkController._shutdownc                 C   sV   t | j|||d | jr.| j  | j  z| j  W n tk
rP   Y nX d S )N)force_reloadreloader)list_reload_modulesrL   Zupdate_strategiesZreset_rate_limitsrv   Zrestartr9   )r-   modulesreloadr   r   r   r    r     s      

zWorkController.reloadc                    s.    fddt |d kr jjjn|p&dD S )Nc                 3   s   | ]}j |f V  qd S r"   )_maybe_reload_modulerY   r.   r-   r   r    	<genexpr>  s   z1WorkController._reload_modules.<locals>.<genexpr>r   )ra   r#   r'   rc   )r-   r   r.   r   r   r    r     s    zWorkController._reload_modulesc                 C   sH   |t jkr$td| | jj|S |rDtd| tt j| |S d S )Nzimporting module %szreloading module %s)sysr   rj   debugr#   r'   Zimport_from_cwdr   )r-   moduler   r   r   r   r    r      s    
z#WorkController._maybe_reload_modulec                 C   s4   t  | j }| jjt t| jj	t
| dS )N)totalpidclockuptime)r   r%   r&   r   Ztotal_countosgetpidstrr#   r   roundtotal_seconds)r-   r   r   r   r    info(  s    

zWorkController.infoc                 C   sb   t d krtdt t j}|j|j|j|j|j|j	|j
|j|j|j|j|j|j|j|j|jdS )Nz%rusage not supported by this platform)utimeZstimeZmaxrssZixrssZidrssZisrssZminfltZmajfltZnswapZinblockZoublockZmsgsndZmsgrcvZnsignalsZnvcswZnivcsw)resourcer9   Z	getrusageZRUSAGE_SELFZru_utimeZru_stimeZ	ru_maxrssZru_ixrssZru_idrssZru_isrssZ	ru_minfltZ	ru_majfltZru_nswapZ
ru_inblockZ
ru_oublockZ	ru_msgsndZ	ru_msgrcvZru_nsignalsZru_nvcswZ	ru_nivcsw)r-   sr   r   r    rusage/  s(    zWorkController.rusagec                 C   sb   |   }|| j |  || jj | j z|  |d< W n tk
r\   d|d< Y nX |S )Nr   zN/A)r   updaterE   rL   r   r9   )r-   r   r   r   r    statsF  s    zWorkController.statsc                 C   s   dj | | jr| j nddS )z``repr(worker)``.z#<Worker: {self.hostname} ({state})>ZINIT)r-   r   )rT   rE   Zhuman_staterI   r   r   r    __repr__P  s    zWorkController.__repr__c                 C   s   | j S )z#``str(worker) == worker.hostname``.)r$   rI   r   r   r    __str__W  s    zWorkController.__str__c                 C   s   t S r"   r   rI   r   r   r    r   [  s    zWorkController.stateWARNc                 K   s  | j j}|| _|| _|d|| _|d|| _|d||| _|d|| _|d|| _|d|| _	|pd|| _
|d|	| _|d|
| _|d	|| _|d
||| _|d|| _|d||| _|d||| _|d||| _|d|| _|d|| _t|d|| _|d|| _|d|| _d S )NZworker_concurrencyZworker_send_task_eventsZworker_poolZworker_consumerZworker_timerZworker_timer_precisionZworker_autoscalerZworker_pool_putlocksZworker_pool_restartsZworker_state_dbZbeat_schedule_filenameZbeat_schedulertask_time_limittask_soft_time_limitZworker_max_tasks_per_childZworker_max_memory_per_childZworker_prefetch_multiplierZworker_disable_rate_limitsworker_lost_wait)r#   eitherr:   logfiler   task_eventsrC   consumer_cls	timer_clstimer_precisionoptimizationautoscaler_clspool_putlockspool_restartsstatedbschedule_filename	scheduler
time_limitsoft_time_limitmax_tasks_per_childmax_memory_per_childintprefetch_multiplierdisable_rate_limitsr   )r-   r   r:   r   r   rv   r   r   r   r   r   r   r   Or   r   r   r   rC   Zstate_dbr   r   Zscheduler_clsr   r   r   r   r   r   Z_kwr   r   r   r    r)   _  sb     
         zWorkController.setup_defaults)NN)NNNNNN)N)FN)F)T)NFN)N)FN)Nr   NNNNNNNNNNNNNNNNNNNNNNNNNN)-r   r   r   r   r#   rK   rE   rv   Z	semaphorerf   r   r!   r/   r+   rD   r(   r*   r3   r;   r4   r5   r7   r8   r,   rd   rg   ro   ru   rr   ry   r>   rM   rh   r   r   r   r   r   r   r   r   r   propertyr   r)   r   r   r   r    r   ?   s   
      
(










                               r   )-r   r   r   r   Zbilliardr   Zkombu.utils.compatr   Zceleryr   r   rB   r   Zcelery.bootstepsr   r	   Zcelery.exceptionsr
   r   r   Zcelery.platformsr   r   Zcelery.utils.importsr   Zcelery.utils.logr   r   rj   Zcelery.utils.nodenamesr   r   Zcelery.utils.textr   Zcelery.utils.threadsr    r   r   ImportError__all__r   rR   rU   r   r   r   r   r    <module>   s6   
