U
    d*Y                     @   s  d Z ddlZddlZddlZddlZddlZddlmZ ddlmZ ddl	m
Z
mZ ddlmZ ddlmZmZ dd	lmZ ejejejejhZd
Ze ZdZdZedZddddddZefddZ G dd dZ!G dd de!Z"G dd de!Z#dddZ$dS )zTransport implementation.    N)contextmanager)SSLError)packunpack   )UnexpectedFrame)KNOWN_TCP_OPTSSOL_TCP)set_cloexeci(  is   AMQP  	z\[([\.0-9a-f:]+)\](?::(\d+))?i  <   
   	   )TCP_NODELAYTCP_USER_TIMEOUTTCP_KEEPIDLETCP_KEEPINTVLTCP_KEEPCNTc                 C   s^   |}t | }|r6|d} |drVt|d}n d| krV| dd\} }t|}| |fS )z1Convert hostname:port string to host, port tuple.r      :)IPV6_LITERALmatchgroupintrsplit)hostdefaultportm r   2/tmp/pip-unpacked-wheel-sfh45kzb/amqp/transport.pyto_host_port(   s    


r    c                   @   s   e Zd ZdZd$ddZdZdd Zd	d
 Zedd Z	dd Z
dd Zdd Zdd Zd%ddZdd Zdd Zdd Zdd Zefd d!Zd"d# ZdS )&_AbstractTransporta  Common superclass for TCP and SSL transports.

    PARAMETERS:
        host: str

            Broker address in format ``HOSTNAME:PORT``.

        connect_timeout: int

            Timeout of creating new connection.

        read_timeout: int

            sets ``SO_RCVTIMEO`` parameter of socket.

        write_timeout: int

            sets ``SO_SNDTIMEO`` parameter of socket.

        socket_settings: dict

            dictionary containing `optname` and ``optval`` passed to
            ``setsockopt(2)``.

        raise_on_initial_eintr: bool

            when True, ``socket.timeout`` is raised
            when exception is received during first read. See ``_read()`` for
            details.
    NTc                 K   sD   d| _ d | _|| _t| _t|\| _| _|| _|| _	|| _
|| _d S NF)	connectedsockraise_on_initial_eintrEMPTY_BUFFER_read_bufferr    r   r   connect_timeoutread_timeoutwrite_timeoutsocket_settings)selfr   r(   r)   r*   r+   r%   kwargsr   r   r   __init__W   s    z_AbstractTransport.__init__)
connectionr$   r%   r'   r   r   r(   r)   r*   r+   __dict____weakref__c              	   C   s   | j rt| j  d  d| j  d  }| j  d  d| j  d  }dt| j d| d| dt| dd		S dt| j d
t| dd	S d S )Nr   r   r   <z: z -> z at z#x>z: (disconnected) at )r$   getsocknamegetpeernametype__name__id)r,   srcdstr   r   r   __repr__t   s
    ""*z_AbstractTransport.__repr__c              	   C   sz   z>| j rW d S | | j| j| j | | j| j| j d| _ W n6 t	t
fk
rt   | jrn| j sn| j  d | _ Y nX d S )NT)r#   _connectr   r   r(   _init_socketr+   r)   r*   OSErrorr   r$   closer,   r   r   r   connect|   s      

z_AbstractTransport.connectc              
   c   s   |d kr| j V  n| j }| }||kr2|| zz| j V  W n tk
r } z0dt|krht ndt|kr|t  W 5 d }~X Y n8 tk
r } z|jtj	krt  W 5 d }~X Y nX W 5 ||kr|| X d S )N	timed outzThe operation did not complete)
r$   
gettimeout
settimeoutr   strsockettimeoutr>   errnoEWOULDBLOCK)r,   rG   r$   prevexcr   r   r   having_timeout   s(    


z!_AbstractTransport.having_timeoutc              	   C   s   t ||t jt jt}t|D ]\}}|\}}}	}
}zRt  |||	| _zt| jd W n tk
rj   Y nX | j	| | j
| W n> t jk
r   | jr| j  d | _|d t|kr Y qX  qqd S )NTr   )rF   getaddrinfo	AF_UNSPECSOCK_STREAMr	   	enumerater$   r
   NotImplementedErrorrD   rA   errorr?   len)r,   r   r   rG   entriesiresafsocktypeproto	canonnamesar   r   r   r<      s0        
z_AbstractTransport._connectc              	   C   s   | j d  | j tjtjd | | tj|ftj|ffD ]B\}}|d k	r>t	|}t	|| d }| j tj|t
d|| q>|   | t d S )Nr   i@B Zll)r$   rD   
setsockoptrF   
SOL_SOCKETSO_KEEPALIVE_set_socket_optionsSO_SNDTIMEOSO_RCVTIMEOr   r   _setup_transport_writeAMQP_PROTOCOL_HEADER)r,   r+   r)   r*   rG   intervalsecZusecr   r   r   r=      s     
 
z_AbstractTransport._init_socketc              	   C   s   i }t D ]}d }|dkrDzddlm} W qX tk
r@   d}Y qXX ntt|rXtt|}|r|tkrrt| ||< qtt|r|ttt|||< q|S )Nr   r   )r      )	r   rF   r   ImportErrorhasattrgetattrDEFAULT_SOCKET_SETTINGS
getsockoptr	   )r,   r$   tcp_optsoptenumr   r   r   _get_tcp_socket_defaults   s&    


 
z+_AbstractTransport._get_tcp_socket_defaultsc                 C   s@   |  | j}|r|| | D ]\}}| jt|| q"d S N)rp   r$   updateitemsr\   r	   )r,   r+   rm   rn   valr   r   r   r_      s
    
z&_AbstractTransport._set_socket_optionsFc                 C   s   t ddS )z#Read exactly n bytes from the peer.Must be overridden in subclassNrQ   )r,   ninitialr   r   r   _read   s    z_AbstractTransport._readc                 C   s   dS )z.Do any additional initialization of the class.Nr   r@   r   r   r   rb      s    z#_AbstractTransport._setup_transportc                 C   s   dS )z8Do any preliminary work in shutting down the connection.Nr   r@   r   r   r   _shutdown_transport   s    z&_AbstractTransport._shutdown_transportc                 C   s   t ddS )z&Completely write a string to the peer.ru   Nrv   )r,   sr   r   r   rc      s    z_AbstractTransport._writec                 C   s   | j d k	r~z|   W n tk
r*   Y nX z| j tj W n tk
rR   Y nX z| j   W n tk
rv   Y nX d | _ d| _d S r"   )r$   rz   r>   shutdownrF   	SHUT_RDWRr?   r#   r@   r   r   r   r?     s    
z_AbstractTransport.closec              
   C   s  | j }t}z|dd}||7 }|d|\}}}|tkr|t}z||t }	W n& tjttfk
rt   ||7 } Y nX d||	g}
n||}
||
7 }t|d}W n tjk
r   || j	 | _	 Y n ttfk
rl } zt
|tjrtjdkr|jtjkr|| j	 | _	t t
|trHdt|krH|| j	 | _	t |jtkrZd| _ W 5 d	}~X Y nX |d
kr|||
fS td|ddd	S )a  Parse AMQP frame.

        Frame has following format::

            0      1         3         7                   size+7      size+8
            +------+---------+---------+   +-------------+   +-----------+
            | type | channel |  size   |   |   payload   |   | frame-end |
            +------+---------+---------+   +-------------+   +-----------+
             octet    short     long        'size' octets        octet

           Tz>BHI    r   ntrB   FN   zReceived frame_end z#04xz while expecting 0xce)ry   r&   SIGNED_INT_MAXrF   rG   r>   r   joinordr'   
isinstancerR   osnamerH   rI   rE   _UNAVAILr#   r   )r,   r   readZread_frame_bufferZframe_headerZ
frame_typeZchannelsizeZpart1Zpart2payloadZ	frame_endrK   r   r   r   
read_frame  sP    




z_AbstractTransport.read_framec              
   C   s^   z|  | W nJ tjk
r&    Y n4 tk
rX } z|jtkrFd| _ W 5 d }~X Y nX d S r"   )rc   rF   rG   r>   rH   r   r#   )r,   r{   rK   r   r   r   writeY  s    
z_AbstractTransport.write)NNNNT)F)r7   
__module____qualname____doc__r.   	__slots__r;   rA   r   rL   r<   r=   rp   r_   ry   rb   rz   rc   r?   r   r   r   r   r   r   r   r!   7   s.          


Br!   c                
       sv   e Zd ZdZd fdd	ZdZdd Zddd	Zdd
dZdddZ	dd Z
dejejejffddZdd Z  ZS )SSLTransporta  Transport that works over SSL.

    PARAMETERS:
        host: str

            Broker address in format ``HOSTNAME:PORT``.

        connect_timeout: int

            Timeout of creating new connection.

        ssl: bool|dict

            parameters of TLS subsystem.
                - when ``ssl`` is not dictionary, defaults of TLS are used
                - otherwise:
                    - if ``ssl`` dictionary contains ``context`` key,
                      :attr:`~SSLTransport._wrap_context` is used for wrapping
                      socket. ``context`` is a dictionary passed to
                      :attr:`~SSLTransport._wrap_context` as context parameter.
                      All others items from ``ssl`` argument are passed as
                      ``sslopts``.
                    - if ``ssl`` dictionary does not contain ``context`` key,
                      :attr:`~SSLTransport._wrap_socket_sni` is used for
                      wrapping socket. All items in ``ssl`` argument are
                      passed to :attr:`~SSLTransport._wrap_socket_sni` as
                      parameters.

        kwargs:

            additional arguments of
            :class:`~amqp.transport._AbstractTransport` class
    Nc                    s6   t |tr|ni | _t| _t j|fd|i| d S )Nr(   )r   dictssloptsr&   r'   superr.   )r,   r   r(   sslr-   	__class__r   r   r.     s    zSSLTransport.__init__)r   c                 C   s,   | j | jf| j| _| j  | jj| _dS )z!Wrap the socket in an SSL object.N)_wrap_socketr$   r   do_handshaker   _quick_recvr@   r   r   r   rb     s    
zSSLTransport._setup_transportc                 K   s"   |r| j ||f|S | j|f|S rq   )_wrap_context_wrap_socket_sni)r,   r$   contextr   r   r   r   r     s    zSSLTransport._wrap_socketc                 K   s    t jf |}||_|j|f|S )u  Wrap socket without SNI headers.

        PARAMETERS:
            sock: socket.socket

            Socket to be wrapped.

            sslopts: dict

                Parameters of  :attr:`ssl.SSLContext.wrap_socket`.

            check_hostname

                Whether to match the peer cert’s hostname. See
                :attr:`ssl.SSLContext.check_hostname` for details.

            ctx_options

                Parameters of :attr:`ssl.create_default_context`.
        )r   create_default_contextcheck_hostnamewrap_socket)r,   r$   r   r   Zctx_optionsctxr   r   r   r     s    zSSLTransport._wrap_contextFTc                 C   s   |||||	d}|dkr(|r"t jnt j}t |}|dk	rF||| |dk	rX|| |
dk	rj||
 zt jox|	dk	|_W n t	k
r   Y nX |dk	r||_
|dkr|j
t jkr|rt jjnt jj}|| |jf |}|S )u  Socket wrap with SNI headers.

        stdlib :attr:`ssl.SSLContext.wrap_socket` method augmented with support
        for setting the server_hostname field required for SNI hostname header.

        PARAMETERS:
            sock: socket.socket

                Socket to be wrapped.

            keyfile: str

                Path to the private key

            certfile: str

                Path to the certificate

            server_side: bool

                Identifies whether server-side or client-side
                behavior is desired from this socket. See
                :attr:`~ssl.SSLContext.wrap_socket` for details.

            cert_reqs: ssl.VerifyMode

                When set to other than :attr:`ssl.CERT_NONE`, peers certificate
                is checked. Possible values are :attr:`ssl.CERT_NONE`,
                :attr:`ssl.CERT_OPTIONAL` and :attr:`ssl.CERT_REQUIRED`.

            ca_certs: str

                Path to “certification authority” (CA) certificates
                used to validate other peers’ certificates when ``cert_reqs``
                is other than :attr:`ssl.CERT_NONE`.

            do_handshake_on_connect: bool

                Specifies whether to do the SSL
                handshake automatically. See
                :attr:`~ssl.SSLContext.wrap_socket` for details.

            suppress_ragged_eofs (bool):

                See :attr:`~ssl.SSLContext.wrap_socket` for details.

            server_hostname: str

                Specifies the hostname of the service which
                we are connecting to. See :attr:`~ssl.SSLContext.wrap_socket`
                for details.

            ciphers: str

                Available ciphers for sockets created with this
                context. See :attr:`ssl.SSLContext.set_ciphers`

            ssl_version:

                Protocol of the SSL Context. The value is one of
                ``ssl.PROTOCOL_*`` constants.
        )r$   server_sidedo_handshake_on_connectsuppress_ragged_eofsserver_hostnameN)r   PROTOCOL_TLS_SERVERPROTOCOL_TLS_CLIENT
SSLContextload_cert_chainload_verify_locationsset_ciphersHAS_SNIr   AttributeErrorverify_mode	CERT_NONEPurposeCLIENT_AUTHSERVER_AUTHload_default_certsr   )r,   r$   keyfilecertfiler   	cert_reqsca_certsr   r   r   ciphersssl_versionoptsr   purposer   r   r   r     sB    D




zSSLTransport._wrap_socket_snic                 C   s   | j dk	r| j  | _ dS )z/Unwrap a SSL socket, so we can call shutdown().N)r$   unwrapr@   r   r   r   rz   -  s    
z SSLTransport._shutdown_transportc           	   
   C   s   | j }| j}zt||k rz||t| }W nH tk
rv } z*|j|krd|r\| jr\t W Y q W 5 d }~X Y nX |std||7 }qW n   || _ Y nX |d | ||d   }| _|S )N%Server unexpectedly closed connectionr   r'   rS   r>   rH   r%   rF   rG   	r,   rw   rx   Z_errnosrecvZrbufr{   rK   resultr   r   r   ry   2  s(    

zSSLTransport._readc                 C   sP   | j j}|rLz||}W n tk
r0   d}Y nX |s>td||d }qdS )z+Write a string out to the SSL socket fully.r   zSocket closedN)r$   r   
ValueErrorr>   )r,   r{   r   rw   r   r   r   rc   N  s    
zSSLTransport._write)NN)N)N)
NNFNNFTNNN)r7   r   r   r   r.   r   rb   r   r   r   rz   rH   ENOENTEAGAINEINTRry   rc   __classcell__r   r   r   r   r   d  s*   "

             
x
r   c                   @   s.   e Zd ZdZdd ZdejejffddZdS )TCPTransportz~Transport that deals directly with TCP socket.

    All parameters are :class:`~amqp.transport._AbstractTransport` class.
    c                 C   s   | j j| _t| _| j j| _d S rq   )r$   sendallrc   r&   r'   r   r   r@   r   r   r   rb   e  s    
zTCPTransport._setup_transportFc           	   
   C   s   | j }| j}zt||k rz||t| }W nH tk
rv } z*|j|krd|r\| jr\t W Y q W 5 d}~X Y nX |std||7 }qW n   || _ Y nX |d| ||d  }| _|S )z%Read exactly n bytes from the socket.Nr   r   r   r   r   r   ry   l  s(    

zTCPTransport._readN)	r7   r   r   r   rb   rH   r   r   ry   r   r   r   r   r   _  s   r   Fc                 K   s"   |rt nt}|| f||d|S )a  Create transport.

    Given a few parameters from the Connection constructor,
    select and create a subclass of
    :class:`~amqp.transport._AbstractTransport`.

    PARAMETERS:

        host: str

            Broker address in format ``HOSTNAME:PORT``.

        connect_timeout: int

            Timeout of creating new connection.

        ssl: bool|dict

            If set, :class:`~amqp.transport.SSLTransport` is used
            and ``ssl`` parameter is passed to it. Otherwise
            :class:`~amqp.transport.TCPTransport` is used.

        kwargs:

            additional arguments of :class:`~amqp.transport._AbstractTransport`
            class
    )r(   r   )r   r   )r   r(   r   r-   	transportr   r   r   	Transport  s    r   )NF)%r   rH   r   rerF   r   
contextlibr   r   structr   r   
exceptionsr   platformr   r	   utilsr
   r   r   r   rI   r   Z	AMQP_PORTbytesr&   r   rd   compiler   rk   r    r!   r   r   r   r   r   r   r   <module>   s>   
	  / |&