U
    d                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
Z
mZ ddlmZ ddlmZ ddlmZmZ ddlmZ d	Zd
ee dZdZdZdZdZi Ze Ze ZeeedZ e Z!dgZ"eeedZ#dZ$dZ%dd Z&dd Z'ej(ej)fddZ*dej)e!j+fddZ,dej-ej.ej.fddZ/ej01dpNej01dZ2e3ej01dppej01dppdZ4e2rddl5Z5ddl6m7Z7 dd l8m9Z9 dd!l:m;Z;m<Z< da=da>da?da@e4ZAg ZBe*ZCe/ZDe9 jEd"kre5jFd#d$ ZGd%d Z*d&d Z/G d'd( d(ZHdS ))zwInternal worker state (global).

This includes the currently active and reserved tasks,
statistics, and revoked tasks.
    N)Counter)picklepickle_protocol)cached_property)__version__)WorkerShutdownWorkerTerminate)
LimitedSet)
SOFTWARE_INFOreserved_requestsactive_requeststotal_countrevokedtask_reservedmaybe_shutdowntask_accepted
task_ready
Persistentz	py-celery)Zsw_identZsw_verZsw_sysiP  i  i0*  )maxlenexpiresc                   C   sB   t   t  t  t  t  dgtd d < t  d S )Nr   )requestsclearr   r   successful_requestsr   all_total_countr    r   r   7/tmp/pip-unpacked-wheel-mu1yl971/celery/worker/state.pyreset_stateJ   s    r   c                   C   s6   t dk	rt dk	rtt ntdk	r2tdk	r2ttdS )z Shutdown if flags have been set.NF)should_terminater   should_stopr   r   r   r   r   r   T   s    
r   c                 C   s   || j |  ||  dS )z2Update global state when a task has been reserved.N)id)requestZadd_requestZadd_reserved_requestr   r   r   r   \   s    r   c                 C   s2   |st }||  || jdi t d  d7  < dS )z2Update global state when a task has been accepted.   r   N)r   name)r    Z_all_total_countZadd_active_requestZadd_to_total_countr   r   r   r   d   s
    r   Fc                 C   s0   |rt | j || jd ||  ||  dS )z)Update global state when a task is ready.N)r   addr   )r    Z
successfulZremove_requestZdiscard_active_requestZdiscard_reserved_requestr   r   r   r   p   s
    r   C_BENCHZCELERY_BENCHC_BENCH_EVERYZCELERY_BENCH_EVERY)	monotonic)current_process)memdump
sample_memMainProcessc                   C   sF   t d k	rBtd k	rBtdtt   tdtttt  t  d S )Nz- Time spent in benchmark: {!r}z	- Avg: {})bench_first
bench_lastprintformatsumbench_samplelenr(   r   r   r   r   on_shutdown   s    r2   c                 C   s*   d}t dkrt  a }tdkr"|at| S )z-Called when a task is reserved by the worker.N)bench_startr&   r+   
__reserved)r    nowr   r   r   r      s    
c                 C   sX   t d7 a t t sPt }|t }tdt| tj  | aa	t
| t  t| S )z Called when a task is completed.r!   zG- Time spent processing {} tasks (since first task received): ~{:.4f}s
)	all_countbench_everyr&   r3   r-   r.   sysstdoutflushr,   r0   appendr)   __ready)r    r5   Zdiffr   r   r   r      s     

c                   @   s   e Zd ZdZeZeZej	Z	ej
Z
dZd$ddZdd Zdd	 Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zed d! Zed"d# ZdS )%r   zStores worker state between restarts.

    This is the persistent data stored by the worker when
    :option:`celery worker --statedb` is enabled.

    Currently only stores revoked task id's.
    FNc                 C   s   || _ || _|| _|   d S N)statefilenameclockmerge)selfr>   r?   r@   r   r   r   __init__   s    zPersistent.__init__c                 C   s   | j j| j| jddS )NT)protocolZ	writeback)storageopenr?   rD   rB   r   r   r   rF      s
      zPersistent.openc                 C   s   |  | j d S r=   )_merge_withdbrG   r   r   r   rA      s    zPersistent.mergec                 C   s   |  | j | j  d S r=   )
_sync_withrI   syncrG   r   r   r   rK      s    zPersistent.syncc                 C   s   | j r| j  d| _ d S )NF)_is_openrI   closerG   r   r   r   rM      s    
zPersistent.closec                 C   s   |    |   d S r=   )rK   rM   rG   r   r   r   save   s    zPersistent.savec                 C   s   |  | | | |S r=   )_merge_revoked_merge_clockrB   dr   r   r   rH      s    

zPersistent._merge_withc                 C   s>   | j   |d| | | j | jr0| j ndd |S )N   r   )Z	__proto__zrevokedr@   )_revoked_taskspurgeupdatecompress_dumpsr@   ZforwardrQ   r   r   r   rJ      s    
zPersistent._sync_withc                 C   s$   | j r | j |dpd|d< d S )Nr@   r   )r@   adjustgetrQ   r   r   r   rP      s    zPersistent._merge_clockc                 C   s`   z|  |d  W n> tk
rP   z| |d W n tk
rJ   Y nX Y nX | j  d S )NrT   r   )_merge_revoked_v3KeyError_merge_revoked_v2poprU   rV   rQ   r   r   r   rO      s    zPersistent._merge_revokedc                 C   s    |r| j t| | d S r=   )rU   rW   r   loads
decompress)rB   rT   r   r   r   r\     s    zPersistent._merge_revoked_v3c                 C   s$   t |ts| |S | j| d S r=   )
isinstancer	   _merge_revoked_v1rU   rW   )rB   savedr   r   r   r^     s    

zPersistent._merge_revoked_v2c                 C   s   | j j}|D ]}|| qd S r=   )rU   r#   )rB   rd   r#   itemr   r   r   rc     s    zPersistent._merge_revoked_v1c                 C   s   t j|| jdS )N)rD   )r   dumpsrD   )rB   objr   r   r   rY     s    zPersistent._dumpsc                 C   s   | j jS r=   )r>   r   rG   r   r   r   rU     s    zPersistent._revoked_tasksc                 C   s   d| _ |  S )NT)rL   rF   rG   r   r   r   rI     s    zPersistent.db)N)__name__
__module____qualname____doc__shelverE   r   rD   zlibrX   ra   rL   rC   rF   rA   rK   rM   rN   rH   rJ   rP   rO   r\   r^   rc   rY   propertyrU   r   rI   r   r   r   r   r      s0   
	
r   )Irk   osplatformrl   r8   weakrefrm   collectionsr   Zkombu.serializationr   r   Zkombu.utils.objectsr   Zceleryr   Zcelery.exceptionsr   r   Zcelery.utils.collectionsr	   __all__systemr
   ZREVOKES_MAXZSUCCESSFUL_MAXZREVOKE_EXPIRESZSUCCESSFUL_EXPIRESr   WeakSetr   r   r   r   r   r   r   r   r   r   __setitem__r#   r   rW   r   r_   discardr   environr[   r$   intr%   atexittimer&   Zbilliard.processr'   Zcelery.utils.debugr(   r)   r6   r+   r3   r,   r7   r0   r4   r<   _nameregisterr2   r   r   r   r   r   <module>   s   
	
	



