U
    dx>                     @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlmZ ddlm	Z	 e
eZG dd de	ZG d	d
 d
eZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZdS )    N)web)gen)util   )BaseApiHandlerc                   @   s   e Zd Zdd Zdd ZdS )ControlHandlerc                 C   s   |o|| j jkS )N)Zapplicationworkersself
workername r   6/tmp/pip-unpacked-wheel-3pokl8eb/flower/api/control.py	is_worker   s    zControlHandler.is_workerc              	   C   sF   |D ]0}z||  ddW   S  tk
r2   Y qX qtd| dS )z$extracts error message from responseerrorzUnknown reasonz(Failed to extract error reason from '%s')getKeyErrorloggerr   )r
   r   responserr   r   r   error_reason   s    zControlHandler.error_reasonN)__name__
__module____qualname__r   r   r   r   r   r   r      s   r   c                   @   s   e Zd Zejdd ZdS )WorkerShutDownc                 C   sN   |  |stdd| td| | jjjd|gd | t	dd dS )	a  
Shut down a worker

**Example request**:

.. sourcecode:: http

  POST /api/worker/shutdown/celery@worker2 HTTP/1.1
  Content-Length: 0
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 29
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Shutting down!"
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown worker
          Unknown worker '%s'zShutting down '%s' workershutdown)destinationzShutting down!messageN)
r   r   	HTTPErrorr   infocappcontrol	broadcastwritedictr	   r   r   r   post    s
    
zWorkerShutDown.postNr   r   r   r   authenticatedr'   r   r   r   r   r      s   r   c                   @   s   e Zd Zejdd ZdS )WorkerPoolRestartc                 C   s   |  |stdd| td| | jjjdddi|gdd}|rld	|d
 | krl| t	d| d n.t
| | d | d|| ||f  dS )ac  
Restart worker's pool

**Example request**:

.. sourcecode:: http

  POST /api/worker/pool/restart/celery@worker2 HTTP/1.1
  Content-Length: 0
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 56
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Restarting 'celery@worker2' worker's pool"
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: pool restart is not enabled (see CELERYD_POOL_RESTARTS)
:statuscode 404: unknown worker
        r   r   zRestarting '%s' worker's poolZpool_restartreloadFT	argumentsr   replyokr   r     z#Failed to restart the '%s' pool: %sN)r   r   r    r   r!   r"   r#   r$   r%   r&   r   
set_statusr   )r
   r   r   r   r   r   r'   G   s&    
  


 
zWorkerPoolRestart.postNr(   r   r   r   r   r*   F   s   r*   c                   @   s   e Zd Zejdd ZdS )WorkerPoolGrowc                 C   s   |  |stdd| | jddtd}td|| | jjj	|d|gd}|r|d	|d
 | kr|| 
td||f d n.t| | d | 
d|| ||f  dS )as  
Grow worker's pool

**Example request**:

.. sourcecode:: http

  POST /api/worker/pool/grow/celery@worker2?n=3 HTTP/1.1
  Content-Length: 0
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 58
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Growing 'celery@worker2' worker's pool by 3"
  }

:query n: number of pool processes to grow, default is 1
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: failed to grow
:statuscode 404: unknown worker
        r   r   nr   defaulttypez"Growing '%s' worker's pool by '%s'Tr3   r.   r   r/   r   z Growing '%s' worker's pool by %sr   r0   z%Failed to grow '%s' worker's pool: %sN)r   r   r    get_argumentintr   r!   r"   r#   Z	pool_growr%   r&   r   r1   r   r
   r   r3   r   r   r   r   r'   y   s&    !
  



 
zWorkerPoolGrow.postNr(   r   r   r   r   r2   x   s   r2   c                   @   s   e Zd Zejdd ZdS )WorkerPoolShrinkc                 C   s   |  |stdd| | jddtd}td|| | jjj	|d|gd}|r|d	|d
 | kr|| 
td||f d n.t| | d | 
d|| ||f  dS )ay  
Shrink worker's pool

**Example request**:

.. sourcecode:: http

  POST /api/worker/pool/shrink/celery@worker2 HTTP/1.1
  Content-Length: 0
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 60
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Shrinking 'celery@worker2' worker's pool by 1"
  }

:query n: number of pool processes to shrink, default is 1
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: failed to shrink
:statuscode 404: unknown worker
        r   r   r3   r   r4   z$Shrinking '%s' worker's pool by '%s'Tr7   r/   r   z"Shrinking '%s' worker's pool by %sr   r0   z'Failed to shrink '%s' worker's pool: %sN)r   r   r    r8   r9   r   r!   r"   r#   Zpool_shrinkr%   r&   r   r1   r   r:   r   r   r   r'      s(    !
   

 
zWorkerPoolShrink.postNr(   r   r   r   r   r;      s   r;   c                   @   s   e Zd Zejdd ZdS )WorkerPoolAutoscalec                 C   s   |  |stdd| | jdtd}| jdtd}td|||f | jjj	d||d|gd	d
}|rd|d | kr| 
td|||f d n.t| | d | 
d|| ||f  dS )a  
Autoscale worker pool

**Example request**:

.. sourcecode:: http

  POST /api/worker/pool/autoscale/celery@worker2?min=3&max=10 HTTP/1.1
  Content-Length: 0
  Content-Type: application/x-www-form-urlencoded; charset=utf-8
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 66
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Autoscaling 'celery@worker2' worker (min=3, max=10)"
  }

:query min: minimum number of pool processes
:query max: maximum number of pool processes
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: autoscaling is not enabled (see CELERYD_AUTOSCALER)
:statuscode 404: unknown worker
        r   r   min)r6   maxzAutoscaling '%s' worker by '%s'Z	autoscale)r=   r>   Tr,   r/   r   z(Autoscaling '%s' worker (min=%s, max=%s)r   r0   z#Failed to autoscale '%s' worker: %sN)r   r   r    r8   r9   r   r!   r"   r#   r$   r%   r&   r   r1   r   )r
   r   r=   r>   r   r   r   r   r'      s6    #
     

 
zWorkerPoolAutoscale.postNr(   r   r   r   r   r<      s   r<   c                   @   s   e Zd Zejdd ZdS )WorkerQueueAddConsumerc              	   C   s   |  |stdd| | d}td|| | jjjdd|i|gdd}|rd|d	 | kr| 	t
|d	 | d d
 n0t| | d | 	d||| ||f  dS )a  
Start consuming from a queue

**Example request**:

.. sourcecode:: http

  POST /api/worker/queue/add-consumer/celery@worker2?queue=sample-queue
  Content-Length: 0
  Content-Type: application/x-www-form-urlencoded; charset=utf-8
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 40
  Content-Type: application/json; charset=UTF-8

  {
      "message": "add consumer sample-queue"
  }

:query queue: the name of a new queue
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: failed to add consumer
:statuscode 404: unknown worker
        r   r   queuez#Adding consumer '%s' to worker '%s'Zadd_consumerTr,   r/   r   r   r0   z.Failed to add '%s' consumer to '%s' worker: %sNr   r   r    r8   r   r!   r"   r#   r$   r%   r&   r   r1   r   r
   r   r@   r   r   r   r   r'     s,    !

   

  
zWorkerQueueAddConsumer.postNr(   r   r   r   r   r?     s   r?   c                   @   s   e Zd Zejdd ZdS )WorkerQueueCancelConsumerc              	   C   s   |  |stdd| | d}td|| | jjjdd|i|gdd}|rd|d	 | kr| 	t
|d	 | d d
 n0t| | d | 	d||| ||f  dS )a  
Stop consuming from a queue

**Example request**:

.. sourcecode:: http

  POST /api/worker/queue/cancel-consumer/celery@worker2?queue=sample-queue
  Content-Length: 0
  Content-Type: application/x-www-form-urlencoded; charset=utf-8
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 52
  Content-Type: application/json; charset=UTF-8

  {
      "message": "no longer consuming from sample-queue"
  }

:query queue: the name of queue
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: failed to cancel consumer
:statuscode 404: unknown worker
        r   r   r@   z(Canceling consumer '%s' from worker '%s'Zcancel_consumerTr,   r/   r   r   r0   z3Failed to cancel '%s' consumer from '%s' worker: %sNrA   rB   r   r   r   r'   S  s0    !

   

  
zWorkerQueueCancelConsumer.postNr(   r   r   r   r   rC   R  s   rC   c                   @   s   e Zd Zejdd ZdS )
TaskRevokec                 C   sX   t d| | jddtd}| jddtd}| jjj|||d | t	d| d	 d
S )a  
Revoke a task

**Example request**:

.. sourcecode:: http

  POST /api/task/revoke/1480b55c-b8b2-462c-985e-24af3e9158f9?terminate=true
  Content-Length: 0
  Content-Type: application/x-www-form-urlencoded; charset=utf-8
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 61
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Revoked '1480b55c-b8b2-462c-985e-24af3e9158f9'"
  }

:query terminate: terminate the task if it is running
:query signal: name of signal to send to process if terminate (default: 'SIGTERM')
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
        zRevoking task '%s'	terminateFr4   signalSIGTERM)rE   rF   zRevoked '%s'r   N)
r   r!   r8   boolstrr"   r#   Zrevoker%   r&   )r
   ZtaskidrE   rF   r   r   r   r'     s
     zTaskRevoke.postNr(   r   r   r   r   rD     s   rD   c                   @   s   e Zd Zejdd ZdS )
TaskTimoutc                 C   s  |  d}| j ddtd}| j ddtd}|| jjkrFtdd| |dk	rh| |shtdd| td	||| |dk	r|gnd}| jj	j
|d
|||d}|rd|d | kr| t|d | d d n*t| | d | d| ||  dS )a  
Change soft and hard time limits for a task

**Example request**:

.. sourcecode:: http

    POST /api/task/timeout/tasks.sleep HTTP/1.1
    Content-Length: 44
    Content-Type: application/x-www-form-urlencoded; charset=utf-8
    Host: localhost:5555

    soft=30&hard=100&workername=celery%40worker1

**Example response**:

.. sourcecode:: http

    HTTP/1.1 200 OK
    Content-Length: 46
    Content-Type: application/json; charset=UTF-8

    {
        "message": "time limits set successfully"
    }

:query workername: worker name
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task/worker
        r   hardNr4   softr   Unknown task '%s'r   z'Setting timeouts for '%s' task (%s, %s)T)r.   rK   rL   r   r/   r   r   r0   zFailed to set timeouts: '%s')r8   floatr"   tasksr   r    r   r   r!   r#   Z
time_limitr%   r&   r   r1   r   )r
   tasknamer   rK   rL   r   r   r   r   r   r'     s6    "
     


zTaskTimout.postNr(   r   r   r   r   rJ     s   rJ   c                   @   s   e Zd Zejdd ZdS )TaskRateLimitc                 C   s   |  d}|  d}|| jjkr0tdd| |dk	rR| |sRtdd| td|| |dk	rn|gnd}| jjj	||d|d	}|rd
|d | kr| 
t|d | d
 d n*t| | d | 
d| ||  dS )a  
Change rate limit for a task

**Example request**:

.. sourcecode:: http

    POST /api/task/rate-limit/tasks.sleep HTTP/1.1
    Content-Length: 41
    Content-Type: application/x-www-form-urlencoded; charset=utf-8
    Host: localhost:5555

    ratelimit=200&workername=celery%40worker1

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 61
  Content-Type: application/json; charset=UTF-8

  {
      "message": "new rate limit set successfully"
  }

:query workername: worker name
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task/worker
        r   	ratelimitr   rM   Nr   z%Setting '%s' rate limit for '%s' taskT)r.   r   r/   r   r   r0   zFailed to set rate limit: '%s')r8   r"   rO   r   r    r   r   r!   r#   Z
rate_limitr%   r&   r   r1   r   )r
   rP   r   rR   r   r   r   r   r   r'     s0    "

    


zTaskRateLimit.postNr(   r   r   r   r   rQ     s   rQ   )timeloggingdatetimecollectionsZtornador   r   r    r   	getLoggerr   r   r   r   r*   r2   r;   r<   r?   rC   rD   rJ   rQ   r   r   r   r   <module>   s&   
'245;67(<