U
    d                     @   sB   d Z ddlmZ ddlmZ ddlmZ dZG dd dejZ	d	S )
zYWorker Event Dispatcher Bootstep.

``Events`` -> :class:`celery.events.EventDispatcher`.
    )ignore_errors)	bootsteps   )
Connection)Eventsc                       sH   e Zd ZdZefZd fdd	Zdd Zdd	 Zd
d Z	dd Z
  ZS )r   z+Service used for sending monitoring events.TFc                    sD   |rd ndg| _ |p| p| | _| j| _d |_t j|f| d S )NZworker)groupssend_eventsenabledevent_dispatchersuper__init__)selfcZtask_eventsZwithout_heartbeatZwithout_gossipkwargs	__class__ A/tmp/pip-unpacked-wheel-mu1yl971/celery/worker/consumer/events.pyr      s    zEvents.__init__c                 C   sf   |  |}|jjj| |j| j| j|jr0dgnd |jr>|j	nd d }|_
|rb|| |  d S )NZtask)hostnamer	   r   Zbuffer_groupZon_send_buffered)_closeZappeventsZ
DispatcherZconnection_for_writer   r   r   ZhubZon_send_event_bufferedr
   Zextend_bufferflush)r   r   prevdisr   r   r   start"   s    


zEvents.startc                 C   s   d S Nr   r   r   r   r   r   stop3   s    zEvents.stopc                 C   sB   |j r>|j }|j| _|jr(t||jj t||j d |_ |S d S r   )r
   r   
connectionr   close)r   r   
dispatcherr   r   r   r   6   s    zEvents._closec                 C   s   |  | d S r   )r   r   r   r   r   shutdownC   s    zEvents.shutdown)TFF)__name__
__module____qualname____doc__r   requiresr   r   r   r   r!   __classcell__r   r   r   r   r      s      r   N)
r%   Zkombu.commonr   Zceleryr   r   r   __all__ZStartStopStepr   r   r   r   r   <module>   s
   