U
    dE(                     @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ dZi Zdd ZedG dd dZG dd deZedG dd deZedG dd deZG dd dZG dd dZdS )z$Async I/O backend support utilities.    N)deque)Empty)sleep)WeakKeyDictionary)detect_environment)states)TimeoutError)THREAD_TIMEOUT_MAX)AsyncBackendMixinBaseResultConsumerDrainerregister_drainerc                    s    fdd}|S )z5Decorator used to register a new result drainer type.c                    s   | t  < | S N)drainers)clsname @/tmp/pip-unpacked-wheel-mu1yl971/celery/backends/asynchronous.py_inner   s    z register_drainer.<locals>._innerr   )r   r   r   r   r   r      s    r   defaultc                   @   s<   e Zd ZdZdd Zdd Zdd Zdd
dZdddZdS )r   zResult draining service.c                 C   s
   || _ d S r   )result_consumer)selfr   r   r   r   __init__$   s    zDrainer.__init__c                 C   s   d S r   r   r   r   r   r   start'   s    zDrainer.startc                 C   s   d S r   r   r   r   r   r   stop*   s    zDrainer.stopN   c                 c   sv   |p
| j j}t }|r0t | |kr0t z| j|||dV  W n tjk
r\   Y nX |rh|  |jrqrqd S Ntimeout)r   drain_eventstime	monotonicsocketr    wait_forready)r   pr    intervalon_intervalwaitZ
time_startr   r   r   drain_events_until-   s    zDrainer.drain_events_untilc                 C   s   ||d d S r   r   r   r'   r*   r    r   r   r   r%   >   s    zDrainer.wait_for)Nr   NN)N)	__name__
__module____qualname____doc__r   r   r   r+   r%   r   r   r   r   r       s   
r   c                       sZ   e Zd ZdZdZdZdd Zdd Z fddZdd	 Z	d
d Z
dd ZdddZ  ZS )greenletDrainerNc                 C   s   dS )z,create new self._drain_complete_event objectNr   r   r   r   r   _create_drain_complete_eventG   s    z,greenletDrainer._create_drain_complete_eventc                 C   s   dS )z5raise self._drain_complete_event for wakeup .wait_forNr   r   r   r   r   _send_drain_complete_eventK   s    z*greenletDrainer._send_drain_complete_eventc                    s8   t  j|| t | _t | _t | _|   d S r   )superr   	threadingEvent_started_stopped	_shutdownr2   )r   argskwargs	__class__r   r   r   O   s
    


zgreenletDrainer.__init__c                 C   s^   | j   | j sPz"| jjdd |   |   W q
 tj	k
rL   Y q
X q
| j
  d S )Nr   r   )r7   setr8   is_setr   r!   r3   r2   r$   r    r9   r   r   r   r   runV   s    

zgreenletDrainer.runc                 C   s&   | j  s"| | j| _| j   d S r   )r7   r?   spawnr@   _gr*   r   r   r   r   r   a   s    
zgreenletDrainer.startc                 C   s"   | j   |   | jt d S r   )r8   r>   r3   r9   r*   r	   r   r   r   r   r   f   s    
zgreenletDrainer.stopc                 C   s    |    |js| jj|d d S r   )r   r&   _drain_complete_eventr*   r,   r   r   r   r%   k   s    zgreenletDrainer.wait_for)N)r-   r.   r/   rA   rB   rC   r2   r3   r   r@   r   r   r%   __classcell__r   r   r<   r   r1   B   s   r1   eventletc                   @   s$   e Zd Zdd Zdd Zdd ZdS )eventletDrainerc                 C   s$   ddl m}m} ||}|d |S )Nr   )r   rA   )rE   r   rA   )r   funcr   rA   gr   r   r   rA   t   s    zeventletDrainer.spawnc                 C   s   ddl m} | | _d S Nr   )r6   )Zeventlet.eventr6   rC   r   r6   r   r   r   r2   z   s    z,eventletDrainer._create_drain_complete_eventc                 C   s   | j   d S r   )rC   sendr   r   r   r   r3   ~   s    z*eventletDrainer._send_drain_complete_eventNr-   r.   r/   rA   r2   r3   r   r   r   r   rF   q   s   rF   geventc                   @   s$   e Zd Zdd Zdd Zdd ZdS )geventDrainerc                 C   s    dd l }||}|d |S )Nr   )rM   rA   r   )r   rG   rM   rH   r   r   r   rA      s    

zgeventDrainer.spawnc                 C   s   ddl m} | | _d S rI   )Zgevent.eventr6   rC   rJ   r   r   r   r2      s    z*geventDrainer._create_drain_complete_eventc                 C   s   | j   |   d S r   )rC   r>   r2   r   r   r   r   r3      s    
z(geventDrainer._send_drain_complete_eventNrL   r   r   r   r   rN      s   rN   c                   @   s   e Zd ZdZdd ZdddZddd	Zd
d ZdddZd ddZ	dd Z
dd Zdd Zd!ddZd"ddZedd ZdS )#r
   z.Mixin for backends that enables the async API.c                 C   s   || j j|< d S r   )r   buckets)r   resultbucketr   r   r   _collect_into   s    zAsyncBackendMixin._collect_intoTc                 k   s   |    |j}|st t }|D ]8}t|ds<|| q"|jrN|| q"| || q"| j|fd|i|D ]:}|rr|	 }t|ds|j
|jfV  qv|j
|jfV  qvqr|r|	 }|j
|jfV  qd S )N_cacheno_ack)_ensure_not_eagerresultsStopIterationr   hasattrappendrS   rR   _wait_for_pendingpopleftidchildren)r   rP   rT   r;   rV   rQ   node_r   r   r   iter_native   s(    

zAsyncBackendMixin.iter_nativeFc                 C   sJ   |r| j j  z| | W n& tk
rD   | j|j||d Y nX |S )N)weak)r   drainerr   _maybe_resolve_from_bufferr   _add_pending_resultr\   )r   rP   ra   start_drainerr   r   r   add_pending_result   s    z$AsyncBackendMixin.add_pending_resultc                 C   s   | | j|j d S r   )_maybe_set_cache_pending_messagesZtaker\   r   rP   r   r   r   rc      s    z,AsyncBackendMixin._maybe_resolve_from_bufferc                 C   s<   | j \}}||kr8|j|kr8||r&|n||< | j| d S r   )_pending_resultsr\   r   consume_from)r   task_idrP   ra   ZconcreteZweak_r   r   r   rd      s    
z%AsyncBackendMixin._add_pending_resultc                    s     j j   fdd|D S )Nc                    s   g | ]} j |d dqS )F)ra   re   )rf   ).0rP   r   ra   r   r   
<listcomp>   s   z9AsyncBackendMixin.add_pending_results.<locals>.<listcomp>)r   rb   r   )r   rV   ra   r   rn   r   add_pending_results   s    z%AsyncBackendMixin.add_pending_resultsc                 C   s   |  |j | | |S r   )_remove_pending_resultr\   on_result_fulfilledri   r   r   r   remove_pending_result   s    
z'AsyncBackendMixin.remove_pending_resultc                 C   s   | j D ]}||d  qd S r   )rj   popr   rl   mappingr   r   r   rq      s    
z(AsyncBackendMixin._remove_pending_resultc                 C   s   | j |j d S r   )r   
cancel_forr\   ri   r   r   r   rr      s    z%AsyncBackendMixin.on_result_fulfilledNc                 K   s*   |    | j|f|D ]}q|j||dS )N)callback	propagate)rU   rZ   Zmaybe_throw)r   rP   rx   ry   r;   r_   r   r   r   wait_for_pending   s    z"AsyncBackendMixin.wait_for_pendingc                 K   s   | j j|f|||d|S )N)r    r)   
on_message)r   rZ   )r   rP   r    r)   r{   r;   r   r   r   rZ      s     z#AsyncBackendMixin._wait_for_pendingc                 C   s   dS )NTr   r   r   r   r   is_async   s    zAsyncBackendMixin.is_async)T)FT)F)F)NT)NNN)r-   r.   r/   r0   rR   r`   rf   rc   rd   rp   rs   rq   rr   rz   rZ   propertyr|   r   r   r   r   r
      s&   

	

   
     
	r
   c                   @   s   e Zd ZdZdd Zdd Zdd Zdd	d
Zdd Zdd Z	dd Z
dd Zd ddZd!ddZd"ddZdd Zdd Zdd ZdS )#r   z2Manager responsible for consuming result messages.c                 C   s@   || _ || _|| _|| _|| _d | _t | _tt	  | | _
d S r   )backendappacceptrj   rh   r{   r   rO   r   r   rb   )r   r~   r   r   Zpending_resultsZpending_messagesr   r   r   r      s    zBaseResultConsumer.__init__c                 K   s
   t  d S r   NotImplementedError)r   Zinitial_task_idr;   r   r   r   r      s    zBaseResultConsumer.startc                 C   s   d S r   r   r   r   r   r   r      s    zBaseResultConsumer.stopNc                 C   s
   t  d S r   r   )r   r    r   r   r   r!     s    zBaseResultConsumer.drain_eventsc                 C   s
   t  d S r   r   r   rl   r   r   r   rk     s    zBaseResultConsumer.consume_fromc                 C   s
   t  d S r   r   r   r   r   r   rw   	  s    zBaseResultConsumer.cancel_forc                 C   s$   | j   t | _ d | _|   d S r   )rO   clearr   r{   on_after_forkr   r   r   r   _after_fork  s    
zBaseResultConsumer._after_forkc                 C   s   d S r   r   r   r   r   r   r     s    z BaseResultConsumer.on_after_forkc                 C   s   | j j|||dS )Nr    r)   )rb   r+   )r   r'   r    r)   r   r   r   r+     s
      z%BaseResultConsumer.drain_events_untilc                 k   s   | j |fd|i| | j| }| _zNz*| j|j||dD ]}d V  td q:W n tjk
rn   tdY nX W 5 || _X d S )Nr    r   r   zThe operation timed out.)on_wait_for_pendingr{   r+   Zon_readyr   r$   r    r   )r   rP   r    r)   r{   r;   Z	prev_on_mr_   r   r   r   rZ     s     
z$BaseResultConsumer._wait_for_pendingc                 K   s   d S r   r   )r   rP   r    r;   r   r   r   r   )  s    z&BaseResultConsumer.on_wait_for_pendingc                 C   s   |  |j| d S r   )on_state_changepayload)r   messager   r   r   on_out_of_band_result,  s    z(BaseResultConsumer.on_out_of_band_resultc              	   C   s<   | j D ](}z|| W   S  tk
r,   Y qX qt|d S r   )rj   KeyErrorru   r   r   r   _get_pending_result/  s    
z&BaseResultConsumer._get_pending_resultc                 C   s   | j r|  | |d tjkr|d }z| |}W n" tk
rV   | j|| Y n@X || | j}z|	|}W n tk
r   Y nX |
| td d S )Nstatusrl   r   )r{   r   ZREADY_STATESr   r   rh   putrg   rO   rt   rY   r   )r   metar   rl   rP   rO   rQ   r   r   r   r   7  s     


z"BaseResultConsumer.on_state_change)N)NN)NNN)N)r-   r.   r/   r0   r   r   r   r!   rk   rw   r   r   r+   rZ   r   r   r   r   r   r   r   r   r      s$   

     

r   )r0   r$   r5   r"   collectionsr   queuer   r   weakrefr   Zkombu.utils.compatr   Zceleryr   Zcelery.exceptionsr   Zcelery.utils.threadsr	   __all__r   r   r   r1   rF   rN   r
   r   r   r   r   r   <module>   s.   !/[