U
    d9                     @   s  d Z ddlZddlZddlmZmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ dd	lmZmZmZmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZ ddlm Z  ddl!m"Z" dZ#dZ$dZ%ee&Z'e'j(e'j) Z(Z)G dd dZ*G dd dZ+dS )zGeneric process mailbox.    N)defaultdictdeque)contextmanager)copy)count)time   )ConsumerExchangeProducerQueue)LamportClock)maybe_declareoid_from)InconsistencyError)
get_logger)match)maybe_evaluatereprcall)cached_property)uuid
   zA node named {node.hostname} is already using this process mailbox!

Maybe you forgot to shutdown the other node or did not do so properly?
Or if you meant to start multiple nodes on the same host please make sure
you give each node a unique node name!
)NodeMailboxc                   @   s   e Zd ZdZdZdZdZdZdZdddZ	dddZ
dd	 Zd
d ZdddZdddZdddZdd Zdd ZdddZeZdd ZdS ) r   zMailbox node.Nc                 C   s:   || _ || _|| _|| _| jjj| _|d kr0i }|| _d S N)channelmailboxhostnamestateclockadjustadjust_clockhandlers)selfr   r   r   r"   r    r$   0/tmp/pip-unpacked-wheel-hqfrjlvz/kombu/pidbox.py__init__6   s    zNode.__init__Tc                    sP    j  j} fdd}||_t|p* j|gf||d krB j jn|d|S )Nc                    s   |rt tj d d S )N)node)warningswarnW_PIDBOX_IN_USEformat)namemessagesZ	consumersr#   r$   r%   verify_exclusiveD   s    z'Node.Consumer.<locals>.verify_exclusive)no_ackaccept)r   	get_queuer   Zon_declaredr	   r   r1   )r#   r   r0   r1   optionsqueuer/   r$   r.   r%   r	   A   s     zNode.Consumerc                 C   s   || j |j< |S r   )r"   __name__)r#   Zfunr$   r$   r%   handlerO   s    zNode.handlerc                 C   s   t d|dd d S )NzCannot decode message: %rr   exc_info)error)r#   messageexcr$   r$   r%   on_decode_errorS   s    zNode.on_decode_errorc                 C   s&   | j ||p| jg| jd}|  |S )N)r   	callbacksr<   )r	   handle_messager<   consume)r#   r   callbackconsumerr$   r$   r%   listenV   s    
zNode.listenc           	   
   K   s   |pi }t dt|d|d|| |r*| jp.| j}z|||}W nP tk
rT    Y n< tk
r } ztd|dd dt|i}W 5 d }~X Y nX |r| j| j	|i|d |d	 |d
 |S )Nz1pidbox received method %s [reply_to:%s ticket:%s]r$   )kwargszpidbox command error: %rr   r7   r9   exchangerouting_key)rD   rE   ticket)
debugr   handle_callhandle_cast
SystemExit	Exceptionr9   reprreplyr   )	r#   method	argumentsreply_torF   rC   handlerM   r;   r$   r$   r%   dispatch]   s*      zNode.dispatchc                 C   s    |si n|}| j | | jf|S r   )r"   r   r#   rN   rO   r$   r$   r%   rQ   r   s    zNode.handlec                 C   s   |  ||S r   rQ   rS   r$   r$   r%   rH   v   s    zNode.handle_callc                 C   s   |  ||S r   rT   rS   r$   r$   r%   rI   y   s    zNode.handle_castc                 C   s   | d}| d}| d}|r8| |j dp4d | j}d}|rT||krrd}n|rn|rnt|||rrd}nd}|r| jf |S d S )Ndestinationpatternmatcherr   r   FT)getr!   headersr   r   rR   )r#   bodyr:   rU   rV   rW   r   Zrun_dispatchr$   r$   r%   r>   |   s     


zNode.handle_messagec                 K   s"   | j j||||| j| j jd d S )N)r   
serializer)r   _publish_replyr   r[   )r#   datarD   rE   rF   rC   r$   r$   r%   rM      s    z
Node.reply)NNNN)NTN)NN)NNN)N)N)r5   
__module____qualname____doc__r   r   r"   r   r   r&   r	   r6   r<   rB   rR   rQ   rH   rI   r>   Zdispatch_from_messagerM   r$   r$   r$   r%   r   $   s0       


    


r   c                
   @   s   e Zd ZdZeZdZdZdZdZ	dZ
dZdZdgZdZd0dd	Zd
d Zd1ddZd2ddZd3ddZd4ddZd5ddZdd Zedd Zdd Zed6ddZd7dd Zd8d!d"Zd9d$d%Zd:d&d'Zd(d) Zd*d+ Z e!d,d- Z"ed.d/ Z#dS );r   zProcess Mailbox.z	%s.pidboxzreply.%s.pidboxNdirectjson      $@c                 C   s   || _ || _|| _|d kr t n|| _| | j | j| _| | j | _t	t
| _|d kr^| jn|| _|d krr| jn|| _|| _|	| _|
| _|| _|| _d S r   )	namespace
connectiontyper   r   _get_exchangerD   _get_reply_exchangereply_exchanger   r   	unclaimedr1   r[   	queue_ttlqueue_expiresreply_queue_ttlreply_queue_expires_producer_pool)r#   rd   rf   re   r   r1   r[   producer_poolrk   rl   rm   rn   r$   r$   r%   r&      s    
zMailbox.__init__c                 C   s   t | }||_|S r   )r   re   )r#   re   boundr$   r$   r%   __call__   s    zMailbox.__call__c                 C   s    |p
t  }| j||||| dS )N)r   )socketgethostnamenode_cls)r#   r   r   r   r"   r$   r$   r%   r      s    zMailbox.Nodec              	   C   s$   |si n|}| j |||d|||dS )NT)rM   timeoutr@   r   
_broadcast)r#   rU   commandrC   rv   r@   r   r$   r$   r%   call   s    
 zMailbox.callc                 C   s   |si n|}| j |||ddS NF)rM   rw   )r#   rU   ry   rC   r$   r$   r%   cast   s    zMailbox.castc                 C   s   |si n|}| j ||ddS r{   rw   )r#   ry   rC   r$   r$   r%   abcast   s    zMailbox.abcastr   c              	   C   s$   |si n|}| j ||d||||dS )NT)rM   rv   limitr@   r   rw   )r#   ry   rC   rv   r~   r@   r   r$   r$   r%   
multi_call   s    
 zMailbox.multi_callc              	   C   s0   | j }t| d| jj | j|dd| j| jdS )N.FT)rD   rE   durableauto_deleteexpiresmessage_ttl)oidr   ri   r,   rn   rm   )r#   r   r$   r$   r%   get_reply_queue   s    zMailbox.get_reply_queuec                 C   s   |   S r   )r   r.   r$   r$   r%   reply_queue   s    zMailbox.reply_queuec                 C   s(   t | d| j d| jdd| j| jdS )Nr   z.pidboxFT)rD   r   r   r   r   )r   rd   rD   rl   rk   )r#   r   r$   r$   r%   r2      s    zMailbox.get_queuec              	   c   sB   |r|V  n2| j r0| j  }|V  W 5 Q R X nt|ddV  d S )NF)Zauto_declare)rp   acquirer   )r#   producerr   r$   r$   r%   producer_or_acquire  s    zMailbox.producer_or_acquirec           	   	   K   s~   |p
| j j}t|dddd}| ||L}z0|j|f|||g|| j ddd| W n tk
rn   Y nX W 5 Q R X d S )Nra   	transientF)Zexchange_typedelivery_moder   )rF   r   T)rD   rE   declarerY   retry)re   default_channelr
   r   publishr   forwardr   )	r#   rM   rD   rE   rF   r   r   optschanr$   r$   r%   r\     s,      
zMailbox._publish_replyc              	   C   s   ||||	|
d}|p| j j}| j}|rNt| | |j|| jj| jdd |pV| j	}| 
||:}|j||j|g| j |rt | ndd|dd W 5 Q R X d S )N)rN   rO   rU   rV   rW   )rD   rE   )rF   rP   r   )r   r   T)rD   r   rY   r[   r   )re   r   rD   r   r   updateri   r,   r   r[   r   r   r   r   r   )r#   rf   rO   rU   reply_ticketr   rv   r[   r   rV   rW   r:   r   rD   r$   r$   r%   _publish  s6    
   zMailbox._publishFc                 C   s   |d k	r(t |ttfs(tdt||
d k	rdt |
tsd|d k	rdt |tsdtdt|
t||pji }|rvt pxd }|p| jj	}|d kr|r|rt
|pd }|	p| j}	| j|||||||	|
|d	 |r| j|||||dS d S )Nz'destination must be a list/tuple not {}z.pattern and matcher must be strings not {}, {})rU   r   r   rv   r[   rV   rW   )r~   rv   r@   r   )
isinstancelisttuple
ValueErrorr+   rf   strr   re   r   lenr[   r   _collect)r#   ry   rO   rU   rM   rv   r~   r@   r   r[   rV   rW   r   r   r$   r$   r%   rx   6  sN     

zMailbox._broadcastc              
      s  |d kr| j }|p| jj}| j}t||g|dd}	g | j| jj zW S  t	k
rd   Y nX  fdd}
|	
|
 zl|	^ |rt|pt D ]4}z| jj|d W q tjk
r   Y  qY qX qW  5 Q R  W S Q R X W 5 ||j X d S )NT)r1   r0   c                    sn   |j j} |dpd |d}|r2t |kr2d S |d}|kr\rP|  |  n| |  d S )Nr   r   r   rF   )rY   rX   r   append)rZ   r:   headerr   Zthis_idr!   r@   	responsesrF   rj   r$   r%   
on_messagen  s    
z$Mailbox._collect.<locals>.on_message)rv   )r1   re   r   r   r	   rj   r   r    popKeyErrorZregister_callbackZafter_reply_message_receivedr,   ranger   Zdrain_eventsrs   rv   )r#   rF   r~   rv   r@   r   r1   r   r4   rA   r   ir$   r   r%   r   ]  s.    
zMailbox._collectc                 C   s   t | j| |dddS )NFr   rf   r   r   )r
   exchange_fmt)r#   rd   rf   r$   r$   r%   rg     s
    
zMailbox._get_exchangec                 C   s   t | j| ddddS )Nra   Fr   r   )r
   reply_exchange_fmt)r#   rd   r$   r$   r%   rh     s
    
zMailbox._get_reply_exchangec                 C   s   t | S r   )r   r.   r$   r$   r%   r     s    zMailbox.oidc                 C   s
   t | jS r   )r   ro   r.   r$   r$   r%   rp     s    zMailbox.producer_pool)
ra   NNNNNNNNrc   )NNNN)NNNN)N)N)Nr   NNN)NN)NN)NNNNNNNN)
NNFr   NNNNNN)Nr   NNN)$r5   r^   r_   r`   r   ru   r   r   rd   re   rf   rD   ri   r1   r[   r&   rr   rz   r|   r}   r   r   r   r   r2   r   r   r\   r   rx   r   rg   rh   propertyr   rp   r$   r$   r$   r%   r      s                   

      


      



   
             
              
(        
,
r   ),r`   rs   r(   collectionsr   r   
contextlibr   r   	itertoolsr   r    r	   r
   r   r   Zclocksr   commonr   r   
exceptionsr   logr   rW   r   Zutils.functionalr   r   Zutils.objectsr   Z
utils.uuidr   ZREPLY_QUEUE_EXPIRESr*   __all__r5   loggerrG   r9   r   r   r$   r$   r$   r%   <module>   s.   r