U
    d                     @   sz   d Z ddlmZ ddlmZ ddlmZmZ dZej	Z
dd	d
ZG dd dejZG dd dejZG dd dejZdS )zYCarrot compatibility interface.

See https://pypi.org/project/carrot/ for documentation.
    )count   )	messaging)ExchangeQueue)	PublisherConsumerFNc                 c   s8   |j |d tdD ]}|r(||kr( q4|  V  qd S )N)no_ackr   )consumer   Zdrain_events)
connectionconsumerr	   limit	iteration r   0/tmp/pip-unpacked-wheel-hqfrjlvz/kombu/compat.py_iterconsume   s
    r   c                       sj   e Zd ZdZdZdZdZdZdZdZ	d fdd	Z
d	d
 Z fddZdd Zdd Zedd Z  ZS )r   zCarrot compatible producer. directTFNc           	         s   |r|}|p| j | _ |p| j| _|p(| j| _|d k	r:|| _|d k	rH|| _t| j tsrt| j | j| j| j| jd| _ t j|| j f| d S )N)nametyperouting_keyauto_deletedurable)	exchangeexchange_typer   r   r   
isinstancer   super__init__)	selfr   r   r   r   r   r   channelkwargs	__class__r   r   r   #   s"    zPublisher.__init__c                 O   s   | j ||S N)publish)r   argsr    r   r   r   send:   s    zPublisher.sendc                    s   t    d| _d S NT)r   close_closedr   r!   r   r   r(   =   s    
zPublisher.closec                 C   s   | S r#   r   r*   r   r   r   	__enter__A   s    zPublisher.__enter__c                 G   s   |    d S r#   r(   r   exc_infor   r   r   __exit__D   s    zPublisher.__exit__c                 C   s   | j S r#   )r   r*   r   r   r   backendG   s    zPublisher.backend)NNNNNN)__name__
__module____qualname____doc__r   r   r   r   r   r)   r   r&   r(   r+   r/   propertyr0   __classcell__r   r   r!   r   r      s&          r   c                       s   e Zd ZdZdZdZdZdZdZdZ	dZ
dZd fdd	Z fd	d
Zdd Zdd Zdd Zdd Zd ddZdd Zd!ddZd"ddZd#ddZd$ddZ  ZS )%r   zCarrot compatible consumer.r   r   TFNc	           
         s   |  | _|d k	r|| _|d k	r&|| _|d k	r4|| _|p<| j| _|pH| j| _|pT| j| _|p`| j| _t	| j| j| j| j| jd}t
| j|| j| j| j| jd}t j| j|f|	 d S )N)r   r   r   r   )r   r   r   	exclusiver   )r   r0   r   r7   r   queuer   r   r   r   r   r   r   )
r   r   r8   r   r   r   r   r7   r   r    r!   r   r   r   X   s2    
zConsumer.__init__c                    s   || _ t | d S r#   r0   r   reviver   r   r!   r   r   r:   v   s    zConsumer.revivec                 C   s   |    | j  d| _d S r'   )cancelr0   r(   r)   r*   r   r   r   r(   z   s    
zConsumer.closec                 C   s   | S r#   r   r*   r   r   r   r+      s    zConsumer.__enter__c                 G   s   |    d S r#   r,   r-   r   r   r   r/      s    zConsumer.__exit__c                 C   s   | j ddS )NT)infinite)	iterqueuer*   r   r   r   __iter__   s    zConsumer.__iter__c                 C   s8   |d kr| j }| jd |}|r4|r4| |j| |S )Nr   )r	   queuesgetZreceivepayload)r   r	   Zenable_callbacksmessager   r   r   fetch   s    zConsumer.fetchc                 C   s   t dd S )Nz Use fetch(enable_callbacks=True))NotImplementedErrorr*   r   r   r   process_next   s    zConsumer.process_nextc                 C   s   |d k	rt d|  S )Nz&discard_all does not implement filters)rE   purge)r   
filterfuncr   r   r   discard_all   s
    zConsumer.discard_allc                 C   s   t | j| ||S r#   r   r   r   r   r	   r   r   r   iterconsume   s    zConsumer.iterconsumec                 C   s   |  |}t|S r#   )rL   list)r   r   itr   r   r   wait   s    
zConsumer.waitc                 c   s:   t  D ].}|  }|s|d ks*|r.||kr. q6|V  qd S r#   )r   rD   )r   r   r=   Zitems_since_startitemr   r   r   r>      s    
zConsumer.iterqueue)NNNNNNN)NF)N)NN)N)NF)r1   r2   r3   r4   r8   r   r   r   r   r7   r   r)   r   r:   r(   r+   r/   r?   rD   rF   rI   rL   rO   r>   r6   r   r   r!   r   r   L   s6            
	


r   c                       sT   e Zd Zd fdd	ZdddZdd Zd	d
 Zdd Z fddZdd Z	  Z
S )ConsumerSetNc           
         s   |rd| _ || _nd| _ | | _g }|r@|D ]}||j q.|rj| D ]\}}	|tj|f|	 qLt	 j
| j|f| d S )NTF)_provided_channelr0   r   extendr@   itemsappendr   	from_dictr   r   )
r   r   rV   Z	consumersr   r    r@   r   Z
queue_nameZqueue_optionsr!   r   r   r      s    
zConsumerSet.__init__Fc                 C   s   t | j| ||S r#   rJ   rK   r   r   r   rL      s    zConsumerSet.iterconsumec                 C   s   |   S r#   )rG   r*   r   r   r   rI      s    zConsumerSet.discard_allc                 K   s   |  tj|f|S r#   )	add_queuer   rV   )r   r8   optionsr   r   r   add_consumer_from_dict   s    z"ConsumerSet.add_consumer_from_dictc                 C   s   |j D ]}| | qd S r#   )r@   rW   )r   r   r8   r   r   r   add_consumer   s    
zConsumerSet.add_consumerc                    s   || _ t | d S r#   r9   r;   r!   r   r   r:      s    zConsumerSet.revivec                 C   s   |    | js| j  d S r#   )r<   rR   r   r(   r*   r   r   r   r(      s    zConsumerSet.close)NNN)NF)r1   r2   r3   r   rL   rI   rY   rZ   r:   r(   r6   r   r   r!   r   rQ      s     
rQ   )FN)r4   	itertoolsr   r   r   entityr   r   __all__rV   Zentry_to_queuer   ZProducerr   r   rQ   r   r   r   r   <module>   s   
3^