U
    dB                      @   s   d dl mZ G dd deZG dd deZG dd deZG dd	 d	eZG d
d deZG dd deZ	G dd deZ
G dd deZdS )    )communicationc                   @   s   e Zd ZdZdd ZdS )Protocolrequest_queueresponse_queuec                 C   s   || _ || _d S Nr   selfr   r    r
   K/tmp/pip-unpacked-wheel-ua33x9lu/torch/utils/data/communication/protocol.py__init__   s    zProtocol.__init__N)__name__
__module____qualname__	__slots__r   r
   r
   r
   r   r      s   r   c                   @   s@   e Zd ZdZdZdd Zdd Zdd Zdd
dZdddZ	dS )ProtocolClientzr
        ProtocolClient takes charge of putting requests into req_queue and returning results from res_queue.
    Nc                 C   s   || _ || _d | _d S r   )r   r   	_req_sentr   r
   r
   r   r      s    zProtocolClient.__init__c                 C   s
   | j d kS r   r   r	   r
   r
   r   can_take_request   s    zProtocolClient.can_take_requestc                 C   s
   | j d k	S r   r   r   r
   r
   r   waiting_for_response   s    z#ProtocolClient.waiting_for_responseTc                 C   s   |   std|| _d S )Nz/Protocol only supports one request in the Queue)r   	Exceptionr   r	   requestr
   r
   r   request_sent   s    zProtocolClient.request_sentc                 C   s   |   std|d | _d S )Nz5Expected no peding requests, but something got served)r   r   r   )r	   resultr
   r
   r   request_served"   s     zProtocolClient.request_served)T)N)
r   r   r   __doc__r   r   r   r   r   r   r
   r
   r
   r   r      s   
r   c                   @   s6   e Zd ZdZdZdd Zdd Zddd	Zd
d ZdS )ProtocolServerzt
        ProtocolServer takes charge of getting requests from req_queue and fetching data from source datapipe.
    Nc                 C   s   || _ || _d | _d S r   )r   r   _req_receivedr   r
   r
   r   r   /   s    zProtocolServer.__init__c                 C   s
   | j d k	S r   )r   r   r
   r
   r   have_pending_request4   s    z#ProtocolServer.have_pending_requestFc              
   C   sX   |   rtdz| jj|d}W n* tk
rL } ztdW 5 d }~X Y nX || _|S )Nz5Trying to get next request, while having one unservedblockqueue is empty)r    r   r   get
EmptyQueuer   r	   r"   responseer
   r
   r   get_new_request7   s    zProtocolServer.get_new_requestc                 C   sD   |   stdt| jtjjs(td| jtj	  d | _d S )N(Attempting to reply with pending requestz8Replaying with terminate status to other type of message)
r    r   
isinstancer   r   messagesZTerminateRequestr   putZTerminateResponser   r
   r
   r   response_terminateC   s    z!ProtocolServer.response_terminate)F)	r   r   r   r   r   r   r    r)   r.   r
   r
   r
   r   r   )   s   
r   c                   @   s$   e Zd Zdd Zdd Zdd ZdS )MapDataPipeQueueProtocolServerc                 C   s0   |   std| jtj|| d | _d S Nr*   )r    r   r   r-   r   r,   ZGetItemResponser   )r	   keyvaluer
   r
   r   response_itemN   s    z,MapDataPipeQueueProtocolServer.response_itemc                 C   s.   |   std| jtj| d | _d S r0   )r    r   r   r-   r   r,   LenResponser   )r	   sizer
   r
   r   response_lenT   s    z+MapDataPipeQueueProtocolServer.response_lenc                 C   s,   |   std| jtj  d | _d S r0   r    r   r   r-   r   r,   ZStopIterationResponser   r   r
   r
   r   response_index_out_of_boundZ   s    z:MapDataPipeQueueProtocolServer.response_index_out_of_boundN)r   r   r   r3   r6   r8   r
   r
   r
   r   r/   M   s   r/   c                   @   s0   e Zd Zdd Zdd ZdddZdd	d
ZdS )MapDataPipeQueueProtocolClientc                 C   s4   |   stdtj }| j| | | d S )NzLCan not request len while we are still waiting response for previous request)r   r   r   r,   Z
LenRequestr   r-   r   r   r
   r
   r   request_lena   s
    
z*MapDataPipeQueueProtocolClient.request_lenc                 C   s6   |   stdtj|}| j| | | d S )NzMCan not request item while we are still waiting response for previous request)r   r   r   r,   ZGetItemRequestr   r-   r   )r	   indexr   r
   r
   r   request_itemh   s
    z+MapDataPipeQueueProtocolClient.request_itemFNc                 C   sf   |   stdz| jj||d}W n tk
r@   tdY nX | | t|tj	j
sbtd|S )N5Can not expect any response without submitted requestr"   timeoutr#   Invalid response received)r   r   r   r$   TimeoutErrorr%   r   r+   r   r,   r4   r	   r"   r?   r'   r
   r
   r   get_response_leno   s    
z/MapDataPipeQueueProtocolClient.get_response_lenc                 C   sP   |   stdz| jj||d}W n tk
r@   tdY nX | | |S Nr=   r>   r#   )r   r   r   r$   rA   r%   r   rB   r
   r
   r   get_response_item{   s    
z0MapDataPipeQueueProtocolClient.get_response_item)FN)FN)r   r   r   r:   r<   rC   rE   r
   r
   r
   r   r9   `   s   
r9   c                   @   s   e Zd ZdS )r%   N)r   r   r   r
   r
   r
   r   r%      s   r%   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
IterDataPipeQueueProtocolServerc                 C   sD   |   stdt| jtjjs(td| jtj	  d | _d S )Nr*   z4Replaying with reset status to other type of message)
r    r   r+   r   r   r,   ResetIteratorRequestr   r-   ResetIteratorResponser   r
   r
   r   response_reset_iterator   s    z7IterDataPipeQueueProtocolServer.response_reset_iteratorc                 C   s.   |   std| jtj| d | _d S r0   )r    r   r   r-   r   r,   ZGetNextResponser   )r	   r2   r
   r
   r   response_next   s    z-IterDataPipeQueueProtocolServer.response_nextc                 C   s,   |   std| jtj  d | _d S r0   r7   r   r
   r
   r   response_stop_iteration   s    z7IterDataPipeQueueProtocolServer.response_stop_iterationc                 C   s,   |   std| jtj  d | _d S r0   )r    r   r   r-   r   r,   ZInvalidStateResponser   r   r
   r
   r   response_invalid_state   s    z6IterDataPipeQueueProtocolServer.response_invalid_stateN)r   r   r   rI   rJ   rK   rL   r
   r
   r
   r   rF      s   	rF   c                   @   s0   e Zd Zdd Zdd ZdddZdd	d
ZdS )IterDataPipeQueueProtocolClientc                 C   s4   |   stdtj }| j| | | d S )NzFCan not reset while we are still waiting response for previous request)r   r   r   r,   rG   r   r-   r   r   r
   r
   r   request_reset_iterator   s
    
z6IterDataPipeQueueProtocolClient.request_reset_iteratorc                 C   s4   |   stdtj }| j| | | d S )NzRCan not request next item while we are still waiting response for previous request)r   r   r   r,   ZGetNextRequestr   r-   r   r   r
   r
   r   request_next   s
    
z,IterDataPipeQueueProtocolClient.request_nextFc              
   C   sb   z| j j|d}W n* tk
r< } ztdW 5 d }~X Y nX | | t|tjjs^tdd S )Nr!   r#   r@   )	r   r$   r   r%   r   r+   r   r,   rH   r&   r
   r
   r   get_response_reset_iterator   s    
z;IterDataPipeQueueProtocolClient.get_response_reset_iteratorNc              
   C   s^   |   stdz| jj||d}W n* tk
rN } ztdW 5 d }~X Y nX | | |S rD   )r   r   r   r$   r%   r   )r	   r"   r?   r'   r(   r
   r
   r   get_response_next   s    
z1IterDataPipeQueueProtocolClient.get_response_next)F)FN)r   r   r   rN   rO   rP   rQ   r
   r
   r
   r   rM      s   

rM   N)Ztorch.utils.datar   objectr   r   r   r/   r9   r   r%   rF   rM   r
   r
   r
   r   <module>   s   $(