U
    d                     @   s  d Z ddlZddlZddlmZ ddlmZmZmZ ddl	Z
ddl
mZmZ ddlmZmZ ddlmZ ddlmZ ejd	d
Zedi dZedddhdZeddhdZG dd dejZeddeddddfddZeddededdfddZeddedfddZdd ZdS )z'Embedded workers for integration tests.    N)contextmanager)AnyIterableUnion)Celeryworker)_set_task_join_will_blockallow_join_result)Signal)anon_nodenameWORKER_LOGLEVELerrortest_worker_starting)nameZproviding_argstest_worker_startedr   consumertest_worker_stoppedc                       s0   e Zd ZdZ fddZdd Zdd Z  ZS )TestWorkControllerz3Worker that can synchronize on being fully started.c                    s   t  | _t j|| d S )N)	threadingEvent_on_startedsuper__init__)selfargskwargs	__class__ A/tmp/pip-unpacked-wheel-mu1yl971/celery/contrib/testing/worker.pyr       s    
zTestWorkController.__init__c                 C   s    | j   tj| j| |d dS )z=Callback called when the Consumer blueprint is fully started.)senderr   r   N)r   setr   sendapp)r   r   r   r   r   on_consumer_ready%   s    
  z$TestWorkController.on_consumer_readyc                 C   s   | j   dS )zWait for worker to be fully up and running.

        Warning:
            Worker must be started within a thread for this to work,
            or it will block forever.
        N)r   wait)r   r   r   r   ensure_started,   s    z!TestWorkController.ensure_started)__name__
__module____qualname____doc__r   r$   r&   __classcell__r   r   r   r   r      s   r      ZsoloTg      $@c              
   k   s   t j| d t| f||||||d|F}	|rfddlm}
 t  |
 j|ddks\tW 5 Q R X |	V  W 5 Q R X t	j| |	d dS )	z[Start embedded worker.

    Yields:
        celery.app.worker.Worker: worker instance.
    )r    )concurrencypoolloglevellogfileperform_ping_checkshutdown_timeoutr,   )ping)timeoutZpong)r    r   N)
r   r"   _start_worker_threadtasksr3   r	   delaygetAssertionErrorr   )r#   r-   r.   r/   r0   r1   Zping_task_timeoutr2   r   r   r3   r   r   r   start_worker7   s$    "r:   c                 k   s   t | || |rd| jkst| jtjdd}	|	jj W 5 Q R X |f | |t	 |||d|
ddddd
|}
tj|
jdd}|  |
  td	 |
V  d
dlm} d
|_|| | rtdd|_dS )zaStart Celery worker in a thread.

    Yields:
        celery.worker.Worker: worker instance.
    zcelery.pingZTEST_BROKER)hostnameNwithout_heartbeatT)
r#   r-   r;   r.   r/   r0   Zready_callbackr<   Zwithout_mingleZwithout_gossip)targetdaemonFr   )statezWorker thread failed to exit within the allocated timeout. Consider raising `shutdown_timeout` if your tasks take longer to execute.)setup_app_for_workerr6   r9   
connectionosenvironr8   Zdefault_channelZqueue_declarer   popr   Threadstartr&   r   Zcelery.workerr?   Zshould_terminatejoinis_aliveRuntimeError)r#   r-   r.   r/   r0   WorkControllerr1   r2   r   connr   tr?   r   r   r   r5   \   s@    

r5   c           	      k   s@   ddl m}m} |   ||dg}|  dV  |  dS )zfStart worker in separate process.

    Yields:
        celery.app.worker.Worker: worker instance.
    r   )ClusterNodeztestworker1@%hN)Zcelery.apps.multirM   rN   set_currentrF   Zstopwait)	r#   r-   r.   r/   r0   r   rM   rN   Zclusterr   r   r   _start_worker_process   s    rP   c                 C   s8   |    |   |   dt| j_| jj||d dS )z9Setup the app to be used for starting an embedded worker.F)r/   r0   N)finalizerO   set_defaulttypelog_setupsetup)r#   r/   r0   r   r   r   r@      s
    r@   ) r*   rB   r   
contextlibr   typingr   r   r   Zcelery.worker.consumerZceleryr   r   Zcelery.resultr   r	   Zcelery.utils.dispatchr
   Zcelery.utils.nodenamesr   rC   r8   r   r   r   r   rJ   r   r:   r5   rP   r@   r   r   r   r   <module>   s`   $7