U
    dD4                     @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZmZmZmZ ddlmZmZ d	d
lmZmZ d	dlmZ d	dlmZ d	dlmZ dZdZeeZ da!dd Z"dd Z#d:ddZ$G dd deZ%dd Z&d;ddZ'dd Z(d d! Z)d"d# Z*d<d$d%Z+d=d&d'Z,d>d(d)Z-d?d*d+Z.d,d- Z/d.d/ Z0ed0d1 Z1d@d2d3Z2dAd4d5Z3dBd6d7Z4G d8d9 d9Z5dS )CzCommon Utilities.    N)deque)contextmanager)partial)count)NAMESPACE_OIDuuid3uuid4uuid5)ChannelErrorRecoverableConnectionError   )ExchangeQueue)
get_logger)registry)uuid)		Broadcastmaybe_declarer   itermessages
send_replycollect_repliesinsureddrain_consumer	eventloopi  c                   C   s   t d krt ja t S N)_node_idr   int r   r   0/tmp/pip-unpacked-wheel-hqfrjlvz/kombu/common.pyget_node_id    s    r   c                 C   sN   d | ||t|}zttt|}W n" tk
rH   ttt|}Y nX |S )Nz{:x}-{:x}-{:x}-{:x})formatidstrr   r   
ValueErrorr	   )Znode_idZ
process_idZ	thread_idinstanceentretr   r   r   generate_oid'   s       r'   Tc                 C   s    t t t |rt nd| S Nr   )r'   r   osgetpid	threading	get_ident)r$   threadsr   r   r   oid_from1   s    r.   c                       s,   e Zd ZdZejd Zd fdd	Z  ZS )	r   a  Broadcast queue.

    Convenience class used to define broadcast queues.

    Every queue instance will have a unique name,
    and both the queue and exchange is configured with auto deletion.

    Arguments:
        name (str): This is used as the name of the exchange.
        queue (str): By default a unique id is used for the queue
            name for every consumer.  You can specify a custom
            queue name here.
        unique (bool): Always create a unique queue
            even if a queue name is supplied.
        **kwargs (Any): See :class:`~kombu.Queue` for a list
            of additional keyword arguments supported.
    ))queueNNFTc              
      sb   |rd |pdt }n|p&dt  }t jf |p6|||||d k	rH|n
t|ddd| d S )Nz{}.{}Zbcastzbcast.Zfanout)type)aliasr/   nameauto_deleteexchange)r    r   super__init__r   )selfr2   r/   uniquer3   r4   r1   kwargs	__class__r   r   r6   O   s    
zBroadcast.__init__)NNFTNN)__name__
__module____qualname____doc__r   attrsr6   __classcell__r   r   r:   r   r   :   s   
      r   c                 C   s   | |j jjkS r   )
connectionclientdeclared_entities)entitychannelr   r   r   declaration_cachedf   s    rG   Fc                 K   s   |rt | |f|S t| |S )zDeclare entity (cached).)_imaybe_declare_maybe_declare)rE   rF   retryretry_policyr   r   r   r   j   s    r   c                 C   s4   | j }|s0|s"td| d|  | |} | S dS )zMake sure the channel is bound to the entity.

    :param entity: generic kombu nomenclature, generally an exchange or queue
    :param channel: channel to bind to the entity
    :return: the updated entity
    zCannot bind channel z to entity N)is_boundr
   bind)rE   rF   rL   r   r   r   _ensure_channel_is_boundq   s    
rN   c                 C   s   | }t | | |d kr2| js,td|  d| j}d  }}|jrd| jrd|jjj}t| }||krddS |jsrt	d| j
|d |d k	r|r|| |d k	r| j|_dS )Nzchannel is None and entity z not bound.Fchannel disconnected)rF   T)rN   rL   r
   rF   rB   Zcan_cache_declarationrC   rD   hashr   Zdeclareaddr2   )rE   rF   origZdeclaredidentr   r   r   rI      s,    



rI   c                 K   s6   t | | | jjstd| jjjj| tf|| |S )NrO   )rN   rF   rB   r   rC   ZensurerI   )rE   rF   rK   r   r   r   rH      s    

  rH   c              
   #   sv   t    fdd}|g|pg  | _| F t| jjj||ddD ](}z  V  W q> tk
rd   Y q>X q>W 5 Q R X dS )z&Drain messages from consumer instance.c                    s     | |f d S r   )append)bodymessageaccr   r   
on_message   s    z"drain_consumer.<locals>.on_messageT)limittimeoutignore_timeoutsN)r   	callbacksr   rF   rB   rC   popleft
IndexError)ZconsumerrZ   r[   r]   rY   _r   rW   r   r      s    
  
r   c                 K   s$   t | jf |g|d||||dS )zIterator over messages.)ZqueuesrF   )rZ   r[   r]   )r   ZConsumer)connrF   r/   rZ   r[   r]   r9   r   r   r   r      s      r   c              	   c   sP   |rt |pt D ]8}z| j|dV  W q tjk
rH   |rD|sD Y qX qdS )a  Best practice generator wrapper around ``Connection.drain_events``.

    Able to drain events forever, with a limit, and optionally ignoring
    timeout errors (a timeout of 1 is often used in environments where
    the socket can get "stuck", and is a best practice for Kombu consumers).

    ``eventloop`` is a generator.

    Examples:
        >>> from kombu.common import eventloop

        >>> def run(conn):
        ...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
        ...     next(it)   # one event consumed, or timed out.
        ...
        ...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
        ...         pass  # loop forever.

    It also takes an optional limit parameter, and timeout errors
    are propagated by default::

        for _ in eventloop(connection, limit=1, timeout=1):
            pass

    See Also:
        :func:`itermessages`, which is an event loop bound to one or more
        consumers, that yields any messages received.
    )r[   N)ranger   Zdrain_eventssocketr[   )ra   rZ   r[   r\   ir   r   r   r      s    r   c              	   K   sD   |j |f| ||dt|jd |jdtj|j |jdf|S )a  Send reply for request.

    Arguments:
        exchange (kombu.Exchange, str): Reply exchange
        req (~kombu.Message): Original request, a message with
            a ``reply_to`` property.
        producer (kombu.Producer): Producer instance
        retry (bool): If true must retry according to
            the ``reply_policy`` argument.
        retry_policy (Dict): Retry settings.
        **props (Any): Extra properties.
    )r4   rJ   rK   Zreply_tocorrelation_id)Zrouting_keyre   
serializercontent_encoding)publishdictZ
propertiesgetserializersZtype_to_namecontent_typerg   )r4   reqmsgZproducerrJ   rK   propsr   r   r   r      s     


r   c           	   	   o   s`   | dd}d}z8t| ||f||D ]\}}|s:|  d}|V  q&W 5 |rZ||j X dS )z,Generator collecting replies from ``queue``.no_ackTFN)
setdefaultZafter_reply_message_receivedr2   r   Zack)	ra   rF   r/   argsr9   rp   ZreceivedrU   rV   r   r   r   r      s    
r   c                 C   s   t jd| |dd d S )Nz#Connection error: %r. Retry in %ss
T)exc_info)loggererror)excintervalr   r   r   _ensure_errback  s      rx   c              	   c   s,   z
d V  W n | j | j k
r&   Y nX d S r   )Zconnection_errorsZchannel_errors)ra   r   r   r   _ignore_errors  s    
ry   c              
   O   s2   |r*t |  |||W  5 Q R  S Q R X t | S )a  Ignore connection and channel errors.

    The first argument must be a connection object, or any other object
    with ``connection_error`` and ``channel_error`` attributes.

    Can be used as a function:

    .. code-block:: python

        def example(connection):
            ignore_errors(connection, consumer.channel.close)

    or as a context manager:

    .. code-block:: python

        def example(connection):
            with ignore_errors(connection):
                consumer.channel.close()


    Note:
        Connection and channel errors should be properly handled,
        and not ignored.  Using this function is only acceptable in a cleanup
        phase, like when a connection is lost or at shutdown.
    )ry   )ra   funrr   r9   r   r   r   ignore_errors  s    
r{   c                 C   s   |r|| d S r   r   )rB   rF   	on_reviver   r   r   revive_connection@  s    r}   c              
   K   s   |pt }| jddb}|j|d |j}tt||d}	|j||f||	d|}
|
|t||d\}}|W  5 Q R  S Q R X dS )zFunction wrapper to handle connection errors.

    Ensures function performing broker commands completes
    despite intermittent connection failures.
    T)block)errback)r|   )r   r|   )rB   N)rx   acquireZensure_connectionZdefault_channelr   r}   Z	autoretryri   )poolrz   rr   r9   r   r|   optsra   rF   Zreviver   retvalr`   r   r   r   r   E  s    r   c                   @   s@   e Zd ZdZdZdd ZdddZddd	Zd
d Zdd Z	dS )QoSa  Thread safe increment/decrement of a channels prefetch_count.

    Arguments:
        callback (Callable): Function used to set new prefetch count,
            e.g. ``consumer.qos`` or ``channel.basic_qos``.  Will be called
            with a single ``prefetch_count`` keyword argument.
        initial_value (int): Initial prefetch count value..

    Example:
        >>> from kombu import Consumer, Connection
        >>> connection = Connection('amqp://')
        >>> consumer = Consumer(connection)
        >>> qos = QoS(consumer.qos, initial_prefetch_count=2)
        >>> qos.update()  # set initial

        >>> qos.value
        2

        >>> def in_some_thread():
        ...     qos.increment_eventually()

        >>> def in_some_other_thread():
        ...     qos.decrement_eventually()

        >>> while 1:
        ...    if qos.prev != qos.value:
        ...        qos.update()  # prefetch changed so update.

    It can be used with any function supporting a ``prefetch_count`` keyword
    argument::

        >>> channel = connection.channel()
        >>> QoS(channel.basic_qos, 10)


        >>> def set_qos(prefetch_count):
        ...     print('prefetch count now: %r' % (prefetch_count,))
        >>> QoS(set_qos, 10)
    Nc                 C   s   || _ t | _|pd| _d S r(   )callbackr+   RLock_mutexvalue)r7   r   initial_valuer   r   r   r6     s    
zQoS.__init__r   c              	   C   s0   | j  | jr | jt|d | _W 5 Q R X | jS )zIncrement the value, but do not update the channels QoS.

        Note:
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   )r   r   maxr7   nr   r   r   increment_eventually  s    zQoS.increment_eventuallyc              	   C   s<   | j * | jr,|  j|8  _| jdk r,d| _W 5 Q R X | jS )zDecrement the value, but do not update the channels QoS.

        Note:
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   )r   r   r   r   r   r   decrement_eventually  s    
zQoS.decrement_eventuallyc                 C   sH   || j krD|}|tkr&tdt d}td| | j|d || _ |S )z#Set channel prefetch_count setting.z(QoS: Disabled: prefetch_count exceeds %rr   zbasic.qos: prefetch_count->%s)Zprefetch_count)prevPREFETCH_COUNT_MAXrt   warningdebugr   )r7   Zpcount	new_valuer   r   r   set  s    
zQoS.setc              
   C   s*   | j  | | jW  5 Q R  S Q R X dS )z)Update prefetch count with current value.N)r   r   r   )r7   r   r   r   update  s    z
QoS.update)r   )r   )
r<   r=   r>   r?   r   r6   r   r   r   r   r   r   r   r   r   Y  s   (

r   )T)NF)r   NN)r   NN)NNF)NFN)N)N)NN)6r?   r)   rc   r+   collectionsr   
contextlibr   	functoolsr   	itertoolsr   r   r   r   r   r	   Zamqpr
   r   rE   r   r   logr   Zserializationr   rk   Z
utils.uuid__all__r   r<   rt   r   r   r'   r.   r   rG   r   rN   rI   rH   r   r   r   r   r   rx   ry   r{   r}   r   r   r   r   r   r   <module>   sV   

	,


  
	
&     


!

