U
    d6$                     @   sL  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZmZ zd dlZW n ek
r   dZY nX eeZG dd deZG dd	 d	eZG d
d deZG dd deZG dd deZG dd deZG dd deZG dd deZejdd ZedkrHej Z e !e e "  dS )    N)ioloop)gen)
httpclient)urlparseurljoinquoteunquotec                   @   s   e Zd Zdd Zdd ZdS )
BrokerBasec                 O   s\   t |}|j| _|j| _|jdd  | _|j}|j}|r@t|n|| _|rRt|n|| _d S )N   )	r   hostnamehostportpathvhostusernamepasswordr   )self
broker_urlargskwargspurlr   r    r   7/tmp/pip-unpacked-wheel-3pokl8eb/flower/utils/broker.py__init__   s    zBrokerBase.__init__c                 C   s   t d S NNotImplementedErrorr   namesr   r   r   queues%   s    zBrokerBase.queuesN)__name__
__module____qualname__r   r   r   r   r   r   r	      s   r	   c                       s8   e Zd Zd fdd	Zejdd Zedd Z  Z	S )	RabbitMQNc                    s   t t| | |ptj | _| jp(d| _| jp4d| _| j	dkrRt
| j	dpVdn| j	| _	| jpbd| _| jpnd| _|sdj| j| j| j| j| j	d}z| | W n  tk
r   td| Y nX || _d S )	N	localhosti8=  / Zguestz6http://{username}:{password}@{host}:{port}/api/{vhost})r   r   r   r   r   zInvalid broker api url:%s)superr#   r   r   IOLoopinstanceio_loopr   r   r   r   r   r   formatvalidate_http_api	Exceptionloggererrorhttp_api)r   r   r0   r*   r   	__class__r   r   r   *   s(    "   zRabbitMQ.__init__c           
   
   #   s   t | jd| j }t| j}t|jp&dp.| j}t|jp:dpB| j}t }zbz|j
|||ddddV }W n@ tjtjfk
r } ztd| tg W 5 d }~X Y nX W 5 |	  X |jdkrt|j }	t fd	d
|	D n|  d S )Nzqueues/r&   g      ?g       @F)Zauth_usernameZauth_passwordconnect_timeoutrequest_timeoutZvalidate_certz'RabbitMQ management API call failed: %s   c                    s   g | ]}|d   kr|qS )namer   .0xr   r   r   
<listcomp>V   s      z#RabbitMQ.queues.<locals>.<listcomp>)r   r0   r   r   r   r   r   r   ZAsyncHTTPClientclosefetchsocketr/   	HTTPErrorr.   r   ReturncodejsonloadsbodydecodeZrethrow)
r   r   urlZapi_urlr   r   http_clientresponseeinfor   r:   r   r   A   s,    
    

zRabbitMQ.queuesc                 C   s$   t |}|jdkr td|j d S )N)httphttpszInvalid http api schema: %s)r   scheme
ValueError)clsr0   rF   r   r   r   r,   Z   s    
zRabbitMQ.validate_http_api)N)
r    r!   r"   r   r   	coroutiner   classmethodr,   __classcell__r   r   r1   r   r#   )   s
   
r#   c                       sB   e Zd ZdZddddgZ fddZdd	 Zejd
d Z	  Z
S )	RedisBasezr         	   c                    sR   t t| | d | _ts"td|di }|d| j| _|d| j| _	d S )Nzredis library is requiredbroker_optionspriority_stepssep)
r'   rS   r   redisImportErrorgetDEFAULT_PRIORITY_STEPSrX   DEFAULT_SEPrY   r   r   r   r   rW   r1   r   r   r   e   s     zRedisBase.__init__c                 C   s2   || j krtddj|r&|| j|fn|ddf S )NzPriority not in priority stepsz	{0}{1}{2}r&   )rX   rN   r+   rY   )r   queueprir   r   r   
_q_for_priq   s    
zRedisBase._q_for_pric                    sT   g }|D ]<  fddj D }| tfdd|D d qt|d S )Nc                    s   g | ]}  |qS r   )rb   )r8   ra   r6   r   r   r   r;   z   s
    z$RedisBase.queues.<locals>.<listcomp>c                    s   g | ]} j |qS r   )rZ   Zllenr7   r   r   r   r;   ~   s     )r6   messages)rX   appendsumr   r@   )r   r   Zqueue_statspriority_namesr   rc   r   r   v   s    
zRedisBase.queues)r    r!   r"   r^   r]   r   rb   r   rP   r   rR   r   r   r1   r   rS   a   s   rS   c                       s4   e Zd Z fddZdd Zdd Zdd Z  ZS )	Redisc                    sL   t t| j|f|| | jp d| _| jp,d| _| | j| _|  | _d S )Nr$   i  )	r'   ri   r   r   r   _prepare_virtual_hostr   _get_redis_clientrZ   r   r   r   r   r1   r   r   r      s
    zRedis.__init__c                 C   sh   t |tjsd|r|dkrd}n|dr4|dd  }zt|}W n" tk
rb   td|Y nX |S Nr%   r   r
   z0Database is int between 0 and limit - 1, not {0}
isinstancenumbersIntegral
startswithintrN   r+   r   r   r   r   r   rj      s    

zRedis._prepare_virtual_hostc                 C   s   | j | j| j| jdS )N)r   r   dbr   )r   r   r   r   rd   r   r   r   _get_redis_client_args   s    zRedis._get_redis_client_argsc                 C   s   t jf |  S r   )rZ   ri   rv   rd   r   r   r   rk      s    zRedis._get_redis_client)r    r!   r"   r   rj   rv   rk   rR   r   r   r1   r   ri      s   ri   c                       s4   e Zd Z fddZdd Zdd Zdd Z  ZS )	RedisSentinelc                    sf   t t| j|f|| |di }| jp,d| _| jp8d| _| | j| _| || _	| 
|| _d S )NrW   r$   ig  )r'   rw   r   r\   r   r   rj   r   _prepare_master_namemaster_namerk   rZ   r_   r1   r   r   r      s    zRedisSentinel.__init__c                 C   sh   t |tjsd|r|dkrd}n|dr4|dd  }zt|}W n" tk
rb   td|Y nX |S rm   rn   rt   r   r   r   rj      s    

z#RedisSentinel._prepare_virtual_hostc                 C   s.   z|d }W n t k
r(   tdY nX |S )Nry   z+master_name is required for Sentinel broker)KeyErrorrN   )r   rW   ry   r   r   r   rx      s    
z"RedisSentinel._prepare_master_namec                 C   s<   | j |dd}tjj| j| jfgf|}|| j}|S )Nsentinel_kwargs)r   r{   )	r   r\   rZ   sentinelZSentinelr   r   Z
master_forry   )r   rW   Zconnection_kwargsr|   Zredis_clientr   r   r   rk      s    zRedisSentinel._get_redis_client)r    r!   r"   r   rj   rx   rk   rR   r   r   r1   r   rw      s   		rw   c                       s   e Zd Z fddZ  ZS )RedisSocketc                    s4   t t| j|f|| tjd| j | jd| _d S )Nr%   )Zunix_socket_pathr   )r'   r}   r   rZ   ri   r   r   rl   r1   r   r   r      s    zRedisSocket.__init__)r    r!   r"   r   rR   r   r   r1   r   r}      s   r}   c                       s,   e Zd ZdZ fddZ fddZ  ZS )RedisSslz
    Redis SSL class offering connection to the broker over SSL.
    This does not currently support SSL settings through the url, only through
    the broker_use_ssl celery configuration.
    c                    s:   d|krt d|di | _tt| j|f|| d S )Nbroker_use_sslz%rediss broker requires broker_use_ssl)rN   r\   r   r'   r~   r   rl   r1   r   r   r      s    zRedisSsl.__init__c                    s2   t t|  }d|d< t| jtr.|| j |S )NTssl)r'   r~   rv   ro   r   dictupdate)r   Zclient_argsr1   r   r   rv      s
    zRedisSsl._get_redis_client_args)r    r!   r"   __doc__r   rv   rR   r   r   r1   r   r~      s   r~   c                   @   s   e Zd Zdd Zdd ZdS )Brokerc                 O   s   t |j}|dkr"t|f||S |dkr:t|f||S |dkrRt|f||S |dkrjt|f||S |dkrt|f||S td S )NZamqprZ   Zredisszredis+socketr|   )r   rM   r#   ri   r~   r}   rw   r   )rO   r   r   r   rM   r   r   r   __new__   s    
zBroker.__new__c                 C   s   t d S r   r   r   r   r   r   r      s    zBroker.queuesN)r    r!   r"   r   r   r   r   r   r   r      s   r   c                  c   s   t tjdkrtjd nd} t tjdkr4tjd nd}t tjdkrRtjd }nd}t| |d}||gV }|r|t| t  d S )Nr
   zamqp://   ZceleryrT   z'http://guest:guest@localhost:15672/api/)r0   )lensysargvr   r   printr*   stop)r   Z
queue_namer0   Zbrokerr   r   r   r   main  s    r   __main__)#r   rB   r>   loggingrp   Ztornador   r   r   urllib.parser   r   r   r   rZ   r[   	getLoggerr    r.   objectr	   r#   rS   ri   rw   r}   r~   r   rP   r   r(   r)   r*   Zadd_callbackstartr   r   r   r   <module>   s6   

8"/



