U
    dQ                     @   s|   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 dZeeZdd
dZdddZdddZdddZdS )z*The consumers highly-optimized inner loop.    N)	bootsteps)WorkerLostError)
get_logger   )state)asynloopsynloop皙?c              
   C   sX   z| j |d W nB tk
rR } z$t|dd }|d k	rB|tjkrB W 5 d }~X Y nX d S )Ntimeouterrno)drain_events	Exceptiongetattrr   EAGAIN)
connectionr   excZ	exc_errno r   7/tmp/pip-unpacked-wheel-mu1yl971/celery/worker/loops.py_quick_drain   s    r          @c                    sJ   d g sS    }|r  js$S  fdd}| || ||f S )Nc              
      s>   z  |  W n* tk
r8 } z|d< W 5 d }~X Y nX d S )Nr   )Zheartbeat_checkr   )rateer   heartbeat_errorr   r   tick&   s    z#_enable_amqheartbeats.<locals>.tick)Zget_heartbeat_intervalZsupports_heartbeatsZcall_repeatedly)timerr   r   	heartbeatr   r   r   r   _enable_amqheartbeats   s    
r   c	                 C   sN  t j}	|j}
|j}|  }t|j||d}||_| j	| | 	| |
  |   | jsn| j sntd|jjdkr|t| ||_| }zt|j|	kr| jrt  |d dk	r|d |j|jkr|
  zt| W q t k
r   | }Y qX qW 5 z|  W n0 tk
rF } ztd| W 5 d}~X Y nX X dS )zNon-blocking event loop.r   z Could not start worker processesZamqpz&Error cleaning up after event loop: %rNr   )!r   RUNupdateZconnection_errorscreate_task_handlerr   r   
on_message
controllerZregister_with_event_loopconsumeon_readyZrestart_countpoolZdid_start_okr   	transportZdriver_typeZ	call_soonr   Zpropagate_errorsZcreate_loopresetr   logger	exceptionr   r   maybe_shutdownprevvaluenextStopIteration)objr   consumer	blueprinthubqosr   clockhbrater    Z
update_qoserrorson_task_receivedr   Zloopr   r   r   r   r   2   sD    
 r   c	                 K   s   t j}
|  }| j}dg}t| jddr8t| j||d}||_|	  | 
  |j|
kr| jrt  |d dk	rz|d |j|jkr|  z|  |jdd W qN tjk
r   Y qN tk
r   |j|
krԂ Y qNX qNdS )zEFallback blocking event loop for transports that doesn't support AIO.NZis_greenFr   r   r   r
   )r   r    r"   perform_pending_operationsr   r'   r   r   r#   r%   r&   r   r   r,   r-   r.   r!   r   socketr   OSError)r1   r   r2   r3   r4   r5   r   r6   r7   kwargsr    r9   r:   r   r   r   r   r   l   s.    
r   )r	   )r   )r   )r   )__doc__r   r;   Zceleryr   Zcelery.exceptionsr   Zcelery.utils.logr    r   __all____name__r*   r   r   r   r   r   r   r   r   <module>   s   
	
 
; 