U
    dE                     @   sr  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d d	lmZ d d
lmZ d dlmZ ddlmZ ddlmZ ddlmZ ddlmZ d dlmZ eeZ G dd deZ!G dd de!Z"G dd de!Z#G dd de!Z$G dd de!Z%G dd de!Z&G dd  d e!Z'G d!d" d"e!Z(G d#d$ d$e!Z)G d%d& d&e!Z*dS )'    N)datetime)web)gen)IOLoop)json_decode)	HTTPError)states)AsyncResult)AbortableAsyncResult)DisabledBackend   )tasks   )BaseApiHandler)Broker)ControlHandler)OrderedDictc                   @   sD   e Zd ZdZdd Zedd Zdd Zdd	 Zd
d Z	dd Z
dS )BaseTaskHandlerz%Y-%m-%d %H:%M:%S.%fc              
   C   s   z| j j}|rt|ni }W n0 tk
rL } ztdt|W 5 d }~X Y nX t|tsbtdd|dg }|di }t|t	t
fstdd|||fS )N  zinvalid optionsargskwargszargs must be an array)requestbodyr   
ValueErrorr   str
isinstancedictpoplisttuple)selfr   optionser   r    r#   4/tmp/pip-unpacked-wheel-3pokl8eb/flower/api/tasks.pyget_task_args   s     


zBaseTaskHandler.get_task_argsc                 C   s   t | jt S N)r   backendr   )resultr#   r#   r$   backend_configured0   s    z"BaseTaskHandler.backend_configuredc                 K   s   |  | d S r&   )Z
set_status)r    status_coder   r#   r#   r$   write_error4   s    zBaseTaskHandler.write_errorc                 C   sB   |j tjkr(|| |j|jd n|d| |ji d S )N)r(   	tracebackr(   )stater   FAILUREupdatesafe_resultr(   r,   )r    responser(   r#   r#   r$   update_response_result7   s
    
z&BaseTaskHandler.update_response_resultc                 C   s   d|krt |d | j|d< d|kr6t|d |d< d|kr~|d }zt|}W n" tk
rt   t || j}Y nX ||d< d S )NetaZ	countdownexpires)r   strptimeDATE_FORMATfloatr   )r    r!   r4   r#   r#   r$   normalize_options>   s    
z!BaseTaskHandler.normalize_optionsc                 C   s4   zt | W n tk
r*   t| Y S X |S dS )zreturns json encodable resultN)jsondumps	TypeErrorrepr)r    r(   r#   r#   r$   r0   L   s
    zBaseTaskHandler.safe_resultN)__name__
__module____qualname__r6   r%   staticmethodr)   r+   r2   r8   r0   r#   r#   r#   r$   r      s   
r   c                   @   s(   e Zd Zejejdd Zdd ZdS )	TaskApplyc                 c   s   |   \}}}td||| z| jj| }W n" tk
rP   tdd| Y nX z| | W n tk
r~   tddY nX |j	f ||d|}d|j
i}t d| j||V }| | dS )	a#  
Execute a task by name and wait results

**Example request**:

.. sourcecode:: http

  POST /api/task/apply/tasks.add HTTP/1.1
  Accept: application/json
  Accept-Encoding: gzip, deflate, compress
  Content-Length: 16
  Content-Type: application/json; charset=utf-8
  Host: localhost:5555

  {
      "args": [1, 2]
  }

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 71
  Content-Type: application/json; charset=UTF-8

  {
      "state": "SUCCESS",
      "task-id": "c60be250-fe52-48df-befb-ac66174076e6",
      "result": 3
  }

:query args: a list of arguments
:query kwargs: a dictionary of arguments
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task
        'Invoking a task '%s' with '%s' and '%s'  Unknown task '%s'r   Invalid optionr   r   task-idN)r%   loggerdebugcappr   KeyErrorr   r8   r   apply_asynctask_idr   currentZrun_in_executorwait_resultswriter    tasknamer   r   r!   taskr(   r1   r#   r#   r$   postW   s.    *  
   zTaskApply.postc                 C   s4   |j dd | || | |r0|j|jd |S )NF)	propagater-   )getr2   r)   r/   r-   )r    r(   r1   r#   r#   r$   rO      s
    
zTaskApply.wait_resultsN)	r=   r>   r?   r   authenticatedr   	coroutinerT   rO   r#   r#   r#   r$   rA   V   s   =rA   c                   @   s   e Zd Zejdd ZdS )TaskAsyncApplyc                 C   s   |   \}}}td||| z| jj| }W n" tk
rP   tdd| Y nX z| | W n tk
r~   tddY nX |j	f ||d|}d|j
i}| |r|j|jd | | d	S )
ac  
Execute a task

**Example request**:

.. sourcecode:: http

  POST /api/task/async-apply/tasks.add HTTP/1.1
  Accept: application/json
  Accept-Encoding: gzip, deflate, compress
  Content-Length: 16
  Content-Type: application/json; charset=utf-8
  Host: localhost:5555

  {
      "args": [1, 2]
  }

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 71
  Content-Type: application/json; charset=UTF-8
  Date: Sun, 13 Apr 2014 15:55:00 GMT

  {
      "state": "PENDING",
      "task-id": "abc300c7-2922-4069-97b6-a635cc2ac47c"
  }

:query args: a list of arguments
:query kwargs: a dictionary of arguments
:query options: a dictionary of `apply_async` keyword arguments
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task
        rB   rC   rD   r   rE   rF   rG   rV   N)r%   rH   rI   rJ   r   rK   r   r8   r   rL   rM   r)   r/   r-   rP   rQ   r#   r#   r$   rT      s&    *  

zTaskAsyncApply.postNr=   r>   r?   r   rX   rT   r#   r#   r#   r$   rZ      s   rZ   c                   @   s   e Zd Zejdd ZdS )TaskSendc                 C   sh   |   \}}}td||| | jj|f||d|}d|ji}| |rZ|j|jd | 	| dS )a"  
Execute a task by name (doesn't require task sources)

**Example request**:

.. sourcecode:: http

  POST /api/task/send-task/tasks.add HTTP/1.1
  Accept: application/json
  Accept-Encoding: gzip, deflate, compress
  Content-Length: 16
  Content-Type: application/json; charset=utf-8
  Host: localhost:5555

  {
      "args": [1, 2]
  }

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 71
  Content-Type: application/json; charset=UTF-8

  {
      "state": "SUCCESS",
      "task-id": "c60be250-fe52-48df-befb-ac66174076e6"
  }

:query args: a list of arguments
:query kwargs: a dictionary of arguments
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task
        z%Invoking task '%s' with '%s' and '%s'rF   rG   rV   N)
r%   rH   rI   rJ   Z	send_taskrM   r)   r/   r-   rP   )r    rR   r   r   r!   r(   r1   r#   r#   r$   rT      s$    (   

zTaskSend.postNr[   r#   r#   r#   r$   r\      s   r\   c                   @   s   e Zd Zejdd ZdS )
TaskResultc                 C   s   |  dd}|dk	rt|nd}t|}| |s:td||jd}|rf|j|dd | || n| rz| || | 	| dS )a  
Get a task result

**Example request**:

.. sourcecode:: http

  GET /api/task/result/c60be250-fe52-48df-befb-ac66174076e6 HTTP/1.1
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 84
  Content-Type: application/json; charset=UTF-8

  {
      "result": 3,
      "state": "SUCCESS",
      "task-id": "c60be250-fe52-48df-befb-ac66174076e6"
  }

:query timeout: how long to wait, in seconds, before the operation times out
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
        timeoutN  )rG   r-   F)r^   rU   )
get_argumentr7   r	   r)   r   r-   rW   r2   readyrP   )r    taskidr^   r(   r1   r#   r#   r$   rW     s     
zTaskResult.getNr=   r>   r?   r   rX   rW   r#   r#   r#   r$   r]     s   r]   c                   @   s   e Zd Zejdd ZdS )	TaskAbortc                 C   sF   t d| t|}| |s&td|  | td| d dS )a)  
Abort a running task

**Example request**:

.. sourcecode:: http

  POST /api/task/abort/c60be250-fe52-48df-befb-ac66174076e6 HTTP/1.1
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 61
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Aborted '1480b55c-b8b2-462c-985e-24af3e9158f9'"
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
        zAborting task '%s'r_   zAborted '%s')messageN)rH   infor
   r)   r   abortrP   r   )r    rb   r(   r#   r#   r$   rT   G  s    
zTaskAbort.postNr[   r#   r#   r#   r$   rd   F  s   rd   c                   @   s    e Zd Zejejdd ZdS )GetQueueLengthsc                 c   s   | j }| jjj}d}|jdkr.|jjr.|jj}d}| jjjrF| jjj}t|j	 j
dd|||d}|  }|st| jjjgtdd | jjjpg D B }|t|V }| d|i dS )	aH  
Return length of all active queues

**Example request**:

.. sourcecode:: http

  GET /api/queues/length
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 94
  Content-Type: application/json; charset=UTF-8

  {
      "active_queues": [
          {"name": "celery", "messages": 0},
          {"name": "video-queue", "messages": 5}
      ]
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
        NZamqpT)Zinclude_password)http_apibroker_optionsbroker_use_sslc                 S   s   g | ]}|j r|j qS r#   )name).0qr#   r#   r$   
<listcomp>  s      z'GetQueueLengths.get.<locals>.<listcomp>Zactive_queues)applicationrJ   confZBROKER_TRANSPORT_OPTIONS	transportr!   Z
broker_apiZBROKER_USE_SSLr   
connectionas_uriZget_active_queue_namessetZCELERY_DEFAULT_QUEUEZCELERY_QUEUESqueuessortedrP   )r    apprj   ri   rk   ZbrokerZqueue_namesrv   r#   r#   r$   rW   p  s(    !


  zGetQueueLengths.getN)r=   r>   r?   r   rX   r   rY   rW   r#   r#   r#   r$   rh   o  s   rh   c                   @   s   e Zd Zejdd ZdS )	ListTasksc                 C   s&  | j }| dd}| jddtd}| dd}| dd}| dd}| d	d}| d
d}| dd}	|ott|}t|d}|dkr|nd}|dkr|nd}|dkr|nd}g }
tj|j|||	|||||d	D ]@\}}t|}|dd}|dk	r|j	|d< |

||f q| t|
 dS )aU
  
List tasks

**Example request**:

.. sourcecode:: http

  GET /api/tasks HTTP/1.1
  Host: localhost:5555
  User-Agent: HTTPie/0.8.0

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 1109
  Content-Type: application/json; charset=UTF-8
  Etag: "b2478118015c8b825f7b88ce6b660e5449746c37"
  Server: TornadoServer/3.1.1

  {
      "e42ceb2d-8730-47b5-8b4d-8e0d2a1ef7c9": {
          "args": "[3, 4]",
          "client": null,
          "clock": 1079,
          "eta": null,
          "exception": null,
          "exchange": null,
          "expires": null,
          "failed": null,
          "kwargs": "{}",
          "name": "tasks.add",
          "received": 1398505411.107885,
          "result": "'7'",
          "retried": null,
          "retries": 0,
          "revoked": null,
          "routing_key": null,
          "runtime": 0.01610181899741292,
          "sent": null,
          "started": 1398505411.108985,
          "state": "SUCCESS",
          "succeeded": 1398505411.124802,
          "timestamp": 1398505411.124802,
          "traceback": null,
          "uuid": "e42ceb2d-8730-47b5-8b4d-8e0d2a1ef7c9",
          "worker": "celery@worker1"
      },
      "f67ea225-ae9e-42a8-90b0-5de0b24507e0": {
          "args": "[1, 2]",
          "client": null,
          "clock": 1042,
          "eta": null,
          "exception": null,
          "exchange": null,
          "expires": null,
          "failed": null,
          "kwargs": "{}",
          "name": "tasks.add",
          "received": 1398505395.327208,
          "result": "'3'",
          "retried": null,
          "retries": 0,
          "revoked": null,
          "routing_key": null,
          "runtime": 0.012884548006695695,
          "sent": null,
          "started": 1398505395.3289,
          "state": "SUCCESS",
          "succeeded": 1398505395.341089,
          "timestamp": 1398505395.341089,
          "traceback": null,
          "uuid": "f67ea225-ae9e-42a8-90b0-5de0b24507e0",
          "worker": "celery@worker1"
      }
  }

:query limit: maximum number of tasks
:query offset: skip first n tasks
:query sort_by: sort tasks by attribute (name, state, received, started)
:query workername: filter task by workername
:query taskname: filter tasks by taskname
:query state: filter tasks by state
:query received_start: filter tasks by received date (must be greater than) format %Y-%m-%d %H:%M
:query received_end: filter tasks by received date (must be less than) format %Y-%m-%d %H:%M
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
        limitNoffsetr   )defaulttypeZ
workernamerR   r-   received_startreceived_endsort_byZAll)rz   r{   r   r}   workerr-   r~   r   r   )rp   r`   intmaxr   Z
iter_taskseventsas_dictr   hostnameappendrP   r   )r    rx   rz   r{   r   r}   r-   r~   r   r   r(   rM   rS   r#   r#   r$   rW     s@    \
     


zListTasks.getNrc   r#   r#   r#   r$   ry     s   ry   c                   @   s   e Zd Zejdd ZdS )ListTaskTypesc                 C   s(   | j jj }i }||d< | | dS )a  
List (seen) task types

**Example request**:

.. sourcecode:: http

  GET /api/task/types HTTP/1.1
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 44
  Content-Type: application/json; charset=UTF-8

  {
      "task-types": [
          "tasks.add",
          "tasks.sleep"
      ]
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
        z
task-typesN)rp   r   r-   Z
task_typesrP   )r    Zseen_task_typesr1   r#   r#   r$   rW   %  s    zListTaskTypes.getNrc   r#   r#   r#   r$   r   $  s   r   c                   @   s   e Zd Zejdd ZdS )TaskInfoc                 C   sN   t | jj|}|s"tdd| | }|jdk	r@|jj|d< | | dS )a  
Get a task info

**Example request**:

.. sourcecode:: http

  GET /api/task/info/91396550-c228-4111-9da4-9d88cfd5ddc6 HTTP/1.1
  Accept: */*
  Accept-Encoding: gzip, deflate, compress
  Host: localhost:5555


**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 575
  Content-Type: application/json; charset=UTF-8

  {
      "args": "[2, 2]",
      "client": null,
      "clock": 25,
      "eta": null,
      "exception": null,
      "exchange": null,
      "expires": null,
      "failed": null,
      "kwargs": "{}",
      "name": "tasks.add",
      "received": 1400806241.970742,
      "result": "'4'",
      "retried": null,
      "retries": null,
      "revoked": null,
      "routing_key": null,
      "runtime": 2.0037889280356467,
      "sent": null,
      "started": 1400806241.972624,
      "state": "SUCCESS",
      "succeeded": 1400806243.975336,
      "task-id": "91396550-c228-4111-9da4-9d88cfd5ddc6",
      "timestamp": 1400806243.975336,
      "traceback": null,
      "worker": "celery@worker1"
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task
        rC   rD   Nr   )	r   Zget_task_by_idrp   r   r   r   r   r   rP   )r    rb   rS   r1   r#   r#   r$   rW   L  s    9
zTaskInfo.getNrc   r#   r#   r#   r$   r   K  s   r   )+r9   loggingr   Ztornador   r   Ztornado.ioloopr   Ztornado.escaper   Ztornado.webr   Zceleryr   Zcelery.resultr	   Zcelery.contrib.abortabler
   Zcelery.backends.baser   utilsr    r   Zutils.brokerr   Zapi.controlr   collectionsr   	getLoggerr=   rH   r   rA   rZ   r\   r]   rd   rh   ry   r   r   r#   r#   r#   r$   <module>   s6   
;JA41):{'