U
    dc                     @   s  d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	lmZ d
ZdgZg Ze ZejdZdd ZG dd deZG dd de	Zdd ZG dd deZeeedZG dd deZeeedZdd Zdd Zd#dd Z d!d" Z!dS )$zPublic resource pools.    N)chain   )Resource)Producer)EqualityDict)register_after_fork)lazy)ProducerPool	PoolGroupregister_groupconnections	producers	get_limit	set_limitreset
   ZKOMBU_DISABLE_LIMIT_PROTECTIONc                 C   s   |    d S N)cleargroup r   //tmp/pip-unpacked-wheel-hqfrjlvz/kombu/pools.py_after_fork_cleanup_group   s    r   c                       sd   e Zd ZdZeZdZ fddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Z fddZ  ZS )r	   z*Pool of :class:`kombu.Producer` instances.Tc                    s,   || _ |dd p| j| _t j|| d S )Nr   )r   popr   super__init__)selfr   argskwargs	__class__r   r   r      s    zProducerPool.__init__c                 C   s   | j jddS )NT)block)r   acquirer   r   r   r   _acquire_connection#   s    z ProducerPool._acquire_connectionc                 C   s8   |   }z| |W S  tk
r2   |   Y nX d S r   )r$   r   BaseExceptionrelease)r   connr   r   r   create_producer&   s    zProducerPool.create_producerc                 C   s
   t | jS r   )r   r(   r#   r   r   r   new.   s    zProducerPool.newc                 C   s*   | j r&t| j D ]}| j|   qd S r   )limitrangeZ	_resource
put_nowaitr)   )r   _r   r   r   setup1   s    zProducerPool.setupc                 C   s   d S r   r   r   resourcer   r   r   close_resource6   s    zProducerPool.close_resourcec                 C   sR   t |r| }|jd krN|  }z|| W n tk
rL   |   Y nX |S r   )callableZ_channelr$   Zreviver%   r&   )r   pr'   r   r   r   prepare9   s    
zProducerPool.preparec                    s&   |j r|j   d |_t | d S r   )Z__connection__r&   Zchannelr   r/   r   r   r   r&   E   s    
zProducerPool.release)__name__
__module____qualname____doc__r   close_after_forkr   r$   r(   r)   r.   r1   r4   r&   __classcell__r   r   r   r   r	      s   r	   c                   @   s*   e Zd ZdZd
ddZdd Zdd	 ZdS )r
   zCollection of resource pools.NTc                 C   s(   || _ || _| jr$td k	r$t| t d S r   )r*   r9   r   r   )r   r*   r9   r   r   r   r   O   s    zPoolGroup.__init__c                 C   s   t dd S )Nz!PoolGroups must define ``create``)NotImplementedError)r   r0   r*   r   r   r   createU   s    zPoolGroup.createc                 C   s,   | j }|tkrt }| || }| |< |S r   )r*   use_global_limitr   r<   )r   r0   r*   kr   r   r   __missing__X   s
    zPoolGroup.__missing__)NT)r5   r6   r7   r8   r   r<   r?   r   r   r   r   r
   L   s   
r
   c                 C   s   t |  | S )z*Register group (can be used as decorator).)_groupsappendr   r   r   r   r   `   s    
r   c                   @   s   e Zd ZdZdd ZdS )ConnectionszCollection of connection pools.c                 C   s   |j |dS Nr*   )ZPoolr   
connectionr*   r   r   r   r<   i   s    zConnections.createNr5   r6   r7   r8   r<   r   r   r   r   rB   f   s   rB   rD   c                   @   s   e Zd ZdZdd ZdS )	ProducerszCollection of producer pools.c                 C   s   t t| |dS rC   )r	   r   rE   r   r   r   r<   s   s    zProducers.createNrG   r   r   r   r   rH   p   s   rH   c                   C   s   t dd tD  S )Nc                 s   s"   | ]}|r|  ntg V  qd S r   )valuesiter).0gr   r   r   	<genexpr>{   s     z_all_pools.<locals>.<genexpr>)r   r@   r   r   r   r   
_all_poolsz   s    rN   c                   C   s   t d S )z"Get current connection pool limit.r   )_limitr   r   r   r   r   ~   s    r   Fc                 C   s>   | pd} t d pd}| |kr:| t d< t D ]}||  q*| S )zSet new connection pool limit.r   )rO   rN   resize)r*   forceZreset_afterignore_errorsZglimitpoolr   r   r   r      s    
r   c               	   O   sD   t  D ]&}z|  W q tk
r*   Y qX qtD ]}|  q2dS )z*Reset all pools by closing open resources.N)rN   Zforce_close_all	Exceptionr@   r   )r   r   rS   r   r   r   r   r      s    
r   )FFF)"r8   os	itertoolsr   rF   r   Z	messagingr   Zutils.collectionsr   Zutils.compatr   Zutils.functionalr   __all__rO   r@   objectr=   environgetZdisable_limit_protectionr   r	   r
   r   rB   r   rH   r   rN   r   r   r   r   r   r   r   <module>   s0   4
