U
    d                     @   s   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZeeZejej ZZG dd dejZdS )z)Worker <-> Worker communication Bootstep.    )defaultdict)partial)heappush)
itemgetter)Consumer)	DummyLock)ContentDisallowedDecodeError)	bootsteps)
get_logger)Bunch   )Mingle)Gossipc                       s   e Zd ZdZd ZefZedddddddZd	d
hZ	d- fdd	Z
dd Zd.ddZdd Zdd Z fddZdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Z  ZS )/r   zfBootstep consuming events from other workers.

    This keeps the logical clock value up to date.
    idclockhostnamepidtopicactioncverZamqpZredisF      @       @c                    s   | o|  |j| _|j| _| |_|jjj| _|j| _d| jt|j	g| _
tt t t d| _|j| _| jr|jjj| j| jdd| _|jrt |_| jj| _|| _|| _d | _tt| _i | _| j| j d| _!|jj"| _"d| j#i| _$t% j&|f| d S )N.)	node_join
node_leave	node_lostr   )on_node_joinon_node_leaveZmax_tasks_in_memory)zworker.electzworker.elect.acktask)'compatible_transportappZenabledZgossipeventsReceiverr   joinstrr   full_hostnamer   setontimerStater   r   stateZhubr   Z_mutexeventupdate_stateintervalheartbeat_interval_trefr   listconsensus_requestsconsensus_replieson_electon_elect_ackevent_handlersr   	call_taskelection_handlerssuper__init__)selfcZwithout_gossipr.   r/   kwargs	__class__ A/tmp/pip-unpacked-wheel-mu1yl971/celery/worker/consumer/gossip.pyr:   $   sD    


 zGossip.__init__c              
   C   s.   |  }|jj| jkW  5 Q R  S Q R X d S N)Zconnection_for_read	transportZdriver_typecompatible_transports)r;   r!   connr@   r@   rA   r    M   s    
zGossip.compatible_transportNc                 C   s$   g | j |< | jjd|||dd d S )Nzworker-electr   )r   r   r   r   )r3   
dispatchersend)r;   r   r   r   r@   r@   rA   electionQ   s    
   zGossip.electionc              
   C   sH   z| j |  W n. tk
rB } ztd| W 5 d }~X Y nX d S )NzCould not call task: %r)r!   	signatureZapply_async	Exceptionlogger	exception)r;   r   excr@   r@   rA   r7   X   s    zGossip.call_taskc           
   
   C   s   z|  |\}}}}}}}W n2 tk
rN }	 ztd|	 W Y S d }	~	X Y nX t| j| || d| ||f | jjd|d d S )Nz!election request missing field %sr   zworker-elect-ack)r   )_cons_stamp_fieldsKeyErrorrK   rL   r   r2   rF   rG   )
r;   r,   Zid_r   r   r   r   r   _rM   r@   r@   rA   r4   ^   s    
  "zGossip.on_electc                    s   t  | |j| _d S rB   )r9   startZevent_dispatcherrF   )r;   r<   r>   r@   rA   rQ   j   s    zGossip.startc           
      C   s   |d }z| j | }W n tk
r,   Y d S X t| j }||d  t|t|kr| j| j	| \}}}}|| j
krtd| z| j| }	W n  tk
r   td| Y qX |	| ntd|| | j	|d  | j |d  d S )Nr   r   zI won the election %rzUnknown election topic %rznode %s elected for %r)r3   rO   r'   r+   alive_workersappendlenr   Z	sort_heapr2   r&   infor8   rK   rL   pop)
r;   r,   r   ZrepliesrR   rP   Zleaderr   r   handlerr@   r@   rA   r5   n   s*    


zGossip.on_elect_ackc                 C   s    t d|j | | jj| d S )Nz%s joined the party)debugr   _call_handlersr(   r   r;   workerr@   r@   rA   r      s    zGossip.on_node_joinc                 C   s    t d|j | | jj| d S )Nz%s left)rX   r   rY   r(   r   rZ   r@   r@   rA   r      s    zGossip.on_node_leavec                 C   s    t d|j | | jj| d S )Nzmissed heartbeat from %s)rU   r   rY   r(   r   rZ   r@   r@   rA   on_node_lost   s    zGossip.on_node_lostc                 O   sN   |D ]D}z||| W q t k
rF } ztd|| W 5 d }~X Y qX qd S )Nz!Ignored error from handler %r: %r)rJ   rK   rL   )r;   handlersargsr=   rW   rM   r@   r@   rA   rY      s      zGossip._call_handlersc                 C   s,   | j d k	r| j   | j| j| j| _ d S rB   )r0   cancelr)   Zcall_repeatedlyr.   periodic)r;   r@   r@   rA   register_timer   s    

zGossip.register_timerc                 C   sR   | j j}t }| D ]}|js|| | | q|D ]}||jd  q:d S rB   )	r+   workersr'   valuesaliveaddr\   rV   r   )r;   rb   Zdirtyr[   r@   r@   rA   r`      s    
zGossip.periodicc                 C   s:   |    | j|d| jd}t||jgt| j|jddgS )Nzworker.#)routing_keyZ	queue_ttlT)Zqueues
on_messageZno_ack)ra   r#   r/   r   queuer   rg   Zevent_from_message)r;   ZchannelZevr@   r@   rA   get_consumers   s    zGossip.get_consumersc           	   
   C   s   |j d }|ddd dkr"d S z| j| }W n tk
rD   Y nX ||jS |jdpd|jd }|| jkrz||j\}}| | W q t	t
tfk
r } zt| W 5 d }~X Y qX n
| j  d S )Nrf   r   r   r   r   r   )Zdelivery_infosplitr6   rO   payloadheadersgetr   r-   r	   r   	TypeErrorrK   errorr   Zforward)	r;   preparemessage_typerW   r   rP   r,   rM   r@   r@   rA   rg      s$    


zGossip.on_message)Fr   r   )N)__name__
__module____qualname____doc__labelr   requiresr   rN   rD   r:   r    rH   r7   r4   rQ   r5   r   r   r\   rY   ra   r`   ri   rg   __classcell__r@   r@   r>   rA   r      s>             )

r   N)rv   collectionsr   	functoolsr   heapqr   operatorr   Zkombur   Zkombu.asynchronous.semaphorer   Zkombu.exceptionsr   r	   Zceleryr
   Zcelery.utils.logr   Zcelery.utils.objectsr   Zmingler   __all__rs   rK   rX   rU   ZConsumerStepr   r@   r@   r@   rA   <module>   s   