U
    dA                  
   @   s  d Z ddlZddlZddlmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ dZdZeeZ edddddddddgZ!dd Z"dd Z#G dd deZ$d d! Z%d"d# Z&e& d$d% Z'e&d&d'd(efgd)dd+d,Z(d-d. Z)e&d/d0d1d2d3 Z*ej+j,fd4d5Z-ej.j/ej0j/fd6d7Z1e%d8d0d1dd9d:Z2e%d8d;e3fgd<d=d>d? Z4e%d@e3fdAe3fgdBdCdDdA Z5e%d@e3fdEe6fdFe6fgdGdCddHdIZ7e& dJdK Z8e% ddLdMZ9e% dNdO Z:e% dPdQ Z;e% dRdS Z<e&d*dTddUdVZ=e&dWdXdYdZ Z>e& d[d\ Z?e&d]d^d_d` Z@dadb ZAe&dcd^ddde ZBe&dfd^ddgdhZCe&did^djdk ZDe&dldmdndoddpdqZEe&drde3fdseFfdteFfgdudvddzd{ZGe& d|d} ZHe&d~eFfgddCdddZIe%deFfgddCdddZJe%deFfgddCdddZKe% dddZLe%deFfdeFfgddCdddZMe% dddZNe%de3fde3fde3fde3fgddCdddZOe%de3fgddCdd ZPe& dd ZQdS )z.Worker remote control command implementations.    N)UserDict
namedtuple)TERM_SIGNAME)	safe_repr)WorkerShutdown)signals)
maybe_list)
get_logger)jsonify	strtobool)rate   state)Request)Panel)exchangerouting_key
rate_limitcontroller_info_taliastypevisibledefault_timeouthelp	signatureargsvariadicc                 C   s   d| iS )Nok valuer   r   9/tmp/pip-unpacked-wheel-mu1yl971/celery/worker/control.pyr      s    r   c                 C   s   d| iS )Nerrorr   r    r   r   r"   nok"   s    r$   c                
   @   s2   e Zd ZdZi Zi Zedd Zed
dd	ZdS )r   z+Global registry of remote control commands.c                 O   s    |r| j f || S | j f |S N)	_register)clsr   kwargsr   r   r"   register,   s    zPanel.registerNcontrolT      ?c
              
      s"    	f
dd}
|
S )Nc              	      s^   p| j }p$| jpd dd }| j|< t 	|j|<  rZ| j < | S )N 
r   )__name____doc__stripsplitdatar   meta)ZfunZcontrol_nameZ_help
r   r   r'   r   r   namer   r   r   r   r   r"   _inner7   s     

      

zPanel._register.<locals>._innerr   )r'   r5   r   r   r   r   r   r   r   r   r6   r   r4   r"   r&   2   s    
zPanel._register)	NNr*   Tr+   NNNN)	r.   
__module____qualname__r/   r2   r3   classmethodr)   r&   r   r   r   r"   r   &   s   
           r   c                  K   s   t jf ddi| S )Nr   r*   r   r)   r(   r   r   r"   control_commandD   s    r<   c                  K   s   t jf ddi| S )Nr   inspectr:   r;   r   r   r"   inspect_commandH   s    r>   c                 C   s   t | j S )z6Information about Celery installation for bug reports.)r   appZ	bugreportr   r   r   r"   reportN   s    r@   Z	dump_confz[include_defaults=False]with_defaults)r   r   r   Fc                 K   s   t | jjj|dttdS )zList configuration.)rA   )Z	keyfilterZunknown_type_filter)r
   r?   conftable_wanted_config_keyr   )r   rA   r(   r   r   r"   rB   T   s    rB   c                 C   s   t | to| d S )N__)
isinstancestr
startswith)keyr   r   r"   rD   `   s    rD   idsz[id1 [id2 [... [idN]]]])r   r   c                 K   s   dd t t|D S )z!Query for task information by id.c                 S   s    i | ]}|j t|| fqS r   )id_state_of_taskinfo).0reqr   r   r"   
<dictcomp>l   s    zquery_task.<locals>.<dictcomp>)_find_requests_by_idr   )r   rJ   r(   r   r   r"   
query_taskf   s    
rR   c              	   c   s2   | D ](}z||V  W q t k
r*   Y qX qd S r%   )KeyError)rJ   get_requesttask_idr   r   r"   rQ   r   s
    rQ   c                 C   s   || rdS || rdS dS )Nactivereservedreadyr   )requestZ	is_activeis_reservedr   r   r"   rL   {   s
    rL   rU   c                 K   s   t t|pg d }}t|}t  }tj| |rt|p>t}t	|D ]L}	|	j
|krJ||	j
 td|	j
| |	j| jj|d t||krJ qqJ|stdS tdd|S d|}
td|
 td|
 d	S )
zRevoke task by task id (or list of ids).

    Keyword Arguments:
        terminate (bool): Also terminate the process if the task is active.
        signal (str): Name of signal to use for terminate (e.g., ``KILL``).
    NzTerminating %s (%s))signalzterminate: tasks unknownzterminate: {}z, zTasks flagged as revoked: %sztasks z flagged as revoked)setr   lenworker_staterevokedupdate_signalssignumr   rQ   rK   addloggerrM   	terminateconsumerpoolr   formatjoin)r   rU   re   r[   r(   Ztask_idssizeZ
terminatedrb   rY   Zidstrr   r   r"   revoke   s&    

rk   r[   z <signal> [id1 [id2 [... [idN]]]])r   r   r   c                 K   s   t | |d|dS )z+Terminate task by task id (or list of ids).T)re   r[   )rk   )r   r[   rU   r(   r   r   r"   re      s    re   	task_namer   z0<task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>)r   r   c              
   K   s   zt | W n4 tk
r@ } ztd| W Y S d}~X Y nX z|| jj| _W n, tk
r   tjd|dd td Y S X | j	
  |std| tdS td	|| td
S )zTell worker(s) to modify the rate limit for a task by type.

    See Also:
        :attr:`celery.app.task.Task.rate_limit`.

    Arguments:
        task_name (str): Type of task to set rate limit for.
        rate_limit (int, str): New rate limit.
    zInvalid rate limit string: Nz&Rate limit attempt for unknown task %sTexc_infounknown taskz)Rate limits disabled for tasks of type %sz rate limit disabled successfullyz(New rate limit for tasks of type %s: %s.znew rate limit set successfully)r   
ValueErrorr$   r?   tasksr   rS   rd   r#   rf   Zreset_rate_limitsrM   r   )r   rl   r   r(   excr   r   r"   r      s*    $ 
 softhardz#<task_name> <soft_secs> [hard_secs]c                 K   sb   z| j j| }W n, tk
r<   tjd|dd td Y S X ||_||_td||| t	dS )zTell worker(s) to modify the time limit for task by type.

    Arguments:
        task_name (str): Name of task to change.
        hard (float): Hard time limit.
        soft (float): Soft time limit.
    z-Change time limit attempt for unknown task %sTrm   ro   z5New time limits for tasks of type %s: soft=%s hard=%sztime limits set successfully)
r?   rq   rS   rd   r#   r$   Zsoft_time_limit
time_limitrM   r   )r   rl   rt   rs   r(   taskr   r   r"   ru      s        ru   c                 K   s   d| j jjiS )z Get current logical clock value.clock)r?   rw   r!   r   r(   r   r   r"   rw      s    rw   c                 K   s   | j jr| j j||| dS )zHold election.

    Arguments:
        id (str): Unique election id.
        topic (str): Election topic.
        action (str): Action to take for elected actor.
    N)rf   Zgossipelection)r   rK   Ztopicactionr(   r   r   r"   ry     s    	ry   c                 C   s>   | j j}|jr6d|jkr6|jd td tdS tdS )z+Tell worker(s) to send task-related events.rv   z)Events of group {task} enabled by remote.ztask events enabledztask events already enabled)rf   event_dispatchergroupsrc   rd   rM   r   r   
dispatcherr   r   r"   enable_events  s    
r   c                 C   s8   | j j}d|jkr0|jd td tdS tdS )z3Tell worker(s) to stop sending task-related events.rv   z*Events of group {task} disabled by remote.ztask events disabledztask events already disabled)rf   r{   r|   discardrd   rM   r   r}   r   r   r"   disable_events  s    

r   c                 C   s,   t d | jj}|jddditj dS )z3Tell worker(s) to send event heartbeat immediately.zHeartbeat requested by remote.worker-heartbeatfreq   N)r   )rd   debugrf   r{   sendr^   ZSOFTWARE_INFOr}   r   r   r"   	heartbeat%  s    
r   )r   c                 K   sJ   || j krFtd| |r&tj| tj  tjj| jj	
 dS dS )zRequest mingle sync-data.zsync with %s)r_   rw   N)hostnamerd   rM   r^   r_   r`   purge_datar?   rw   Zforward)r   Z	from_noder_   r(   r   r   r"   hello/  s    


r   g?)r   c                 K   s   t dS )zPing worker(s).Zpong)r   rx   r   r   r"   pingA  s    r   c                 K   s   | j j S )z&Request worker statistics/information.)rf   
controllerstatsrx   r   r   r"   r   G  s    r   Zdump_schedule)r   c                 K   s   t t| jjS )z0List of currently scheduled ETA/countdown tasks.)list_iter_schedule_requestsrf   timerrx   r   r   r"   	scheduledM  s    r   c              
   c   sn   | j jD ]`}z|jjd }W n ttfk
r8   Y qY qX t|tr|jrT|j	 nd |j
| dV  qd S )Nr   )etapriorityrY   )schedulequeueentryr   
IndexError	TypeErrorrF   r   r   	isoformatr   rM   )r   ZwaitingZarg0r   r   r"   r   S  s    

r   Zdump_reservedc                 K   s.   |  tj|  tj }|s g S dd |D S )zAList of currently reserved tasks, not including scheduled/active.c                 S   s   g | ]}|  qS r   rM   rN   rY   r   r   r"   
<listcomp>k  s     zreserved.<locals>.<listcomp>)tsetr^   reserved_requestsactive_requests)r   r(   Zreserved_tasksr   r   r"   rW   b  s    

rW   Zdump_activec                    s    fdd|  tjD S )z'List of tasks currently being executed.c                    s   g | ]}|j  d qS )safer   r   r   r   r"   r   q  s   zactive.<locals>.<listcomp>)r   r^   r   )r   r   r(   r   r   r"   rV   n  s    

rV   Zdump_revokedc                 K   s
   t tjS )zList of revoked task-ids.)r   r^   r_   rx   r   r   r"   r_   u  s    r_   Z
dump_taskstaskinfoitemsz[attr1 [attr2 [... [attrN]]]])r   r   r   c                    sJ   | j jpt|rndd D }fdd  fddt|D S )zList of registered tasks.

    Arguments:
        taskinfoitems (Sequence[str]): List of task attributes to include.
            Defaults to ``exchange,routing_key,rate_limit``.
        builtins (bool): Also include built-in tasks.
    c                 s   s   | ]}| d s|V  qdS )zcelery.N)rH   rN   rv   r   r   r"   	<genexpr>  s    
 zregistered.<locals>.<genexpr>c                    sB    fddD }|r<dd |  D }d jd|S  jS )Nc                    s.   i | ]&}t  |d d k	r|tt  |d qS r%   )getattrrG   )rN   fieldrv   r   r"   rP     s    z5registered.<locals>._extract_info.<locals>.<dictcomp>c                 S   s   g | ]}d  |qS )=)ri   )rN   fr   r   r"   r     s     z5registered.<locals>._extract_info.<locals>.<listcomp>z{} [{}] )itemsrh   r5   ri   )rv   fieldsrM   )r   r   r"   _extract_info  s    
z!registered.<locals>._extract_infoc                    s   g | ]} | qS r   r   r   )r   regr   r"   r     s     zregistered.<locals>.<listcomp>)r?   rq   DEFAULT_TASK_INFO_ITEMSsorted)r   r   builtinsr(   rq   r   )r   r   r   r"   
registered{  s    
r   g      N@num	max_depthz.[object_type=Request] [num=200 [max_depth=10]])r   r   r      
   r   c              
      s   zddl }W n tk
r(   tdY nX td| tjddddF}||d|  |j | fd	d
|jd d|jiW  5 Q R  S Q R X dS )a  Create graph of uncollected objects (memory-leak debugging).

    Arguments:
        num (int): Max number of objects to graph.
        max_depth (int): Traverse at most n levels deep.
        type (str): Name of object to graph.  Default is ``"Request"``.
    r   NzRequires the objgraph libraryzDumping graph for type %rZcobjgz.pngF)prefixsuffixdeletec                    s   |  kS r%   r   )vZobjectsr   r"   <lambda>      zobjgraph.<locals>.<lambda>)r   Z	highlightfilenamer   )	objgraphImportErrorrd   rM   tempfileNamedTemporaryFileZby_typeZshow_backrefsr5   )r   r   r   r   Z	_objgraphfhr   r   r"   r     s$      
r   c                 K   s   ddl m} | S )z Sample current RSS memory usage.r   )
sample_mem)Zcelery.utils.debugr   )r   r(   r   r   r   r"   	memsample  s    r   samplesz[n_samples=10]c                 K   s(   ddl m} t }|j|d | S )z/Dump statistics of previous memsample requests.r   )r   )file)Zcelery.utilsr   ioStringIOmemdumpgetvalue)r   r   r(   r   outr   r   r"   r     s    r   nz[N=1]c                 K   s4   | j jjrtdS | j j| | j | tdS )z!Grow pool by n processes/threads.zJpool_grow is not supported with autoscale. Adjust autoscale range instead.zpool will grow)rf   r   
autoscalerr$   rg   Zgrow_update_prefetch_countr   r   r   r(   r   r   r"   	pool_grow  s
    
r   c                 K   s6   | j jjrtdS | j j| | j |  tdS )z#Shrink pool by n processes/threads.zLpool_shrink is not supported with autoscale. Adjust autoscale range instead.zpool will shrink)rf   r   r   r$   rg   shrinkr   r   r   r   r   r"   pool_shrink  s
    
r   c                 K   s2   | j jjr&| jjj|||d tdS tddS )zRestart execution pool.)reloaderzreload startedzPool restarts not enabledN)r?   rB   Zworker_pool_restartsrf   r   reloadr   rp   )r   modulesr   r   r(   r   r   r"   pool_restart  s    
r   maxminz[max [min]]c                 C   s>   | j jj}|r2|||\}}td| d| S tddS )zModify autoscale settings.zautoscale now max=z min=zAutoscale not enabledN)rf   r   r   r`   r   rp   )r   r   r   r   Zmax_Zmin_r   r   r"   	autoscale  s
    
r   Got shutdown from remotec                 K   s   t | t|dS )zShutdown worker(s).N)rd   warningr   )r   msgr(   r   r   r"   shutdown  s    
r   r   r   exchange_typer   z'<queue> [exchange [type [routing_key]]]c                 K   s.   | j j| j j|||pd|f| td| S )z2Tell worker(s) to consume from task queue by name.directzadd consumer )rf   	call_soonZadd_task_queuer   )r   r   r   r   r   optionsr   r   r"   add_consumer  s       r   z<queue>c                 K   s    | j | j j| td| S )z9Tell worker(s) to stop consuming from task queue by name.zno longer consuming from )rf   r   Zcancel_task_queuer   )r   r   _r   r   r"   cancel_consumer  s
     r   c                 C   s    | j jrdd | j jjD S g S )z:List the task queues a worker is currently consuming from.c                 S   s   g | ]}t |jd dqS )T)Zrecurse)dictZas_dict)rN   r   r   r   r"   r   -  s   z!active_queues.<locals>.<listcomp>)rf   Ztask_consumerZqueuesr   r   r   r"   active_queues)  s
    r   )F)FN)NNN)N)N)F)NF)r   r   r   )r   )r   )r   )NFN)NN)r   )NNN)Rr/   r   r   collectionsr   r   Zbilliard.commonr   Zkombu.utils.encodingr   Zcelery.exceptionsr   Zcelery.platformsr   ra   Zcelery.utils.functionalr   Zcelery.utils.logr	   Zcelery.utils.serializationr
   r   Zcelery.utils.timer   r,   r   r^   rY   r   __all__r   r.   rd   r   r   r$   r   r<   r>   r@   rB   rD   rR   requests__getitem__rQ   r   __contains__r   rL   rk   rG   re   r   floatru   rw   ry   r   r   r   r   r   r   r   r   rW   rV   r_   r   intr   r   r   r   r   r   r   r   r   r   r   r   r   r   r"   <module>   s0        

	



#

$





	








			  	
