U
    d                     @   s~   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	lmZ d
d ZG dd deZG dd dZdS )z%Generic resource pool implementation.    N)deque)Empty)	LifoQueue   )
exceptions)register_after_fork)lazyc                 C   s&   z|    W n tk
r    Y nX d S N)force_close_all	Exception)resource r   2/tmp/pip-unpacked-wheel-hqfrjlvz/kombu/resource.py_after_fork_cleanup_resource   s    r   c                   @   s   e Zd ZdZdd ZdS )r   z#Last in first out version of Queue.c                 C   s   t  | _d S r	   )r   queue)selfmaxsizer   r   r   _init   s    zLifoQueue._initN)__name__
__module____qualname____doc__r   r   r   r   r   r      s   r   c                   @   s   e Zd ZdZejZdZd&ddZdd Zdd	 Z	d'd
dZ
dd Zdd Zdd Zdd Zdd Zdd Zdd Zd(ddZd)ddZedd  Zejd!d  Zejd"re
ZeZd#Zd$d Z
d%d ZdS )*ResourcezPool of resources.FNc                 C   s^   || _ |pd| _d| _|d k	r"|n| j| _t | _t | _| jrRtd k	rRt| t	 | 
  d S )Nr   F)_limitpreload_closedclose_after_forkr   	_resourceset_dirtyr   r   setup)r   limitr   r   r   r   r   __init__"   s    

zResource.__init__c                 C   s   t dd S )Nzsubclass responsibility)NotImplementedErrorr   r   r   r   r    1   s    zResource.setupc                 C   s6   | j r"t| j| j kr"| | j | j|   d S r	   )r!   lenr   LimitExceededr   
put_nowaitnewr$   r   r   r   _add_when_empty4   s    zResource._add_when_emptyc                    s   j rtdjrzjj||d W n tk
rD     Y qX z  W n8 tk
r   t	 t
r|j  n
   Y nX j  qqn   fdd}| _ S )av  Acquire resource.

        Arguments:
            block (bool): If the limit is exceeded,
                then block until there is an available item.
            timeout (float): Timeout to wait
                if ``block`` is true.  Default is :const:`None` (forever).

        Raises:
            LimitExceeded: if block is false and the limit has been exceeded.
        zAcquire on closed pool)blocktimeoutc                      s      dS )a  Release resource so it can be used by another thread.

            Warnings:
                The caller is responsible for discarding the object,
                and to never use the resource again.  A new resource must
                be acquired if so needed.
            N)releaser   Rr   r   r   r,   `   s    z!Resource.acquire.<locals>.release)r   RuntimeErrorr!   r   getr   r)   prepareBaseException
isinstancer   r'   r,   r   addr(   )r   r*   r+   r,   r   r-   r   acquire<   s(    

	zResource.acquirec                 C   s   |S r	   r   r   r   r   r   r   r1   m   s    zResource.preparec                 C   s   |   d S r	   )closer6   r   r   r   close_resourcep   s    zResource.close_resourcec                 C   s   d S r	   r   r6   r   r   r   release_resources   s    zResource.release_resourcec                 C   s    | j r| j| | | dS )zqReplace existing resource with a new instance.

        This can be used in case of defective resources.
        N)r!   r   discardr8   r6   r   r   r   replacev   s    zResource.replacec                 C   s8   | j r*| j| | j| | | n
| | d S r	   )r!   r   r:   r   r'   r9   r8   r6   r   r   r   r,      s
    zResource.releasec                 C   s   d S r	   r   r6   r   r   r   collect_resource   s    zResource.collect_resourcec                 C   s   | j r
dS d| _ | j}| j}z| }W n tk
r@   Y qhY nX z| | W q tk
rd   Y qX qz|j }W n tk
r   Y qY nX z| | W qh tk
r   Y qhX qhdS )zClose and remove all resources in the pool (also those in use).

        Used to close resources from parent processes after fork
        (e.g. sockets/connections).
        NT)	r   r   r   popKeyErrorr<   AttributeErrorr   
IndexError)r   Zdirtyr   Zdresresr   r   r   r
      s*    

zResource.force_close_allc                 C   s   | j }| jrDd|  k r"| j k rDn n|sD|s@td| j |d}|| _ |rpz|   W n tk
rn   Y nX |   ||k r| j|dkd d S )Nr   z,Can't shrink pool when in use: was={} now={}T)collect)r   r   r/   formatr
   r   r    _shrink_down)r   r!   forceignore_errorsresetZ
prev_limitr   r   r   resize   s&    $ zResource.resizeTc              	   C   s\   G dd d}| j }t|d| 0 t|j| jkrN|j }|r$| | q$W 5 Q R X d S )Nc                   @   s   e Zd Zdd Zdd ZdS )z#Resource._shrink_down.<locals>.Noopc                 S   s   d S r	   r   r$   r   r   r   	__enter__   s    z-Resource._shrink_down.<locals>.Noop.__enter__c                 S   s   d S r	   r   )r   typevalue	tracebackr   r   r   __exit__   s    z,Resource._shrink_down.<locals>.Noop.__exit__N)r   r   r   rI   rM   r   r   r   r   Noop   s   rN   mutex)r   getattrr%   r   r!   popleftr<   )r   rB   rN   r   r.   r   r   r   rD      s    
zResource._shrink_downc                 C   s   | j S r	   )r   r$   r   r   r   r!      s    zResource.limitc                 C   s   |  | d S r	   )rH   )r   r!   r   r   r   r!      s    ZKOMBU_DEBUG_POOLr   c                 O   s~   dd l }| jd  }| _td| d| jj  | j||}||_td| d| jj  t|dsjg |_|j	|
  |S )Nr   r   +z	 ACQUIRE -acquired_by)rL   _next_resource_idprint	__class__r   _orig_acquire_resource_idhasattrrT   appendformat_stack)r   argskwargsrL   idrr   r   r   r5      s    
c                 C   sR   |j }td| d| jj  | |}td| d| jj  |  jd8  _|S )NrR   z	 RELEASE rS   r   )rY   rV   rW   r   _orig_releaserU   )r   r   r_   r`   r   r   r   r,      s    
)NNN)FN)FFF)T)r   r   r   r   r   r&   r   r"   r    r)   r5   r1   r8   r9   r;   r,   r<   r
   rH   rD   propertyr!   setterosenvironr0   rX   ra   rU   r   r   r   r   r      s4   

1	!



r   )r   rd   collectionsr   r   r   r   Z
_LifoQueue r   Zutils.compatr   Zutils.functionalr   r   r   r   r   r   r   <module>   s   