U
    d}                     @   s   d dl Z d dlZd dlmZmZ dZdddddd	d
gZdd
 ZG dd deZ	G dd deZ
G dd deZdd ZdddZG dd	 d	eZdS )    N)IterDataPipecommunicationgMbP?DataPipeBehindQueuesEnsureNonBlockingDataPipeInvalidStateResetRequiredNonBlockingNotAvailableQueueWrapperdefault_not_available_hookc                   C   s   t t d S N)timesleepDEFAULT_NON_BLOCKING_SLEEP r   r   G/tmp/pip-unpacked-wheel-ua33x9lu/torch/utils/data/communication/iter.pyr
      s    c                   @   s   e Zd ZdS )r   N)__name__
__module____qualname__r   r   r   r   r      s   c                   @   s   e Zd ZdZdS )r   z
        Returned by DataPipe when it is expecting to get reset request,
        for example RouterDataPipe expecting all workers to request reset'
    N)r   r   r   __doc__r   r   r   r   r      s   c                   @   s<   e Zd ZeZdd Zdd Zdd Zdd Ze	d	d
 Z
dS )r   c                 C   s   |    | S r   )reset_iteratorselfr   r   r   __iter__&   s    zNonBlocking.__iter__c                 C   sN   z
|   W S  tk
r"   tY q  tk
rF   tjd k	rBt  Y q X q d S r   )nonblocking_nextStopIterationr   r   not_available_hookr   r   r   r   __next__*   s    

zNonBlocking.__next__c                 C   s   t d| j d S )Nz*nonblocking_next is not implemented for %sNotImplementedError	__class__r   r   r   r   r   4   s    zNonBlocking.nonblocking_nextc                 C   s   t d| j d S )Nz(reset_iterator is not implemented for %sr   r   r   r   r   r   8   s    zNonBlocking.reset_iteratorc                 C   s
   | t _d S r   )r   r   )Zhook_functionr   r   r   register_not_available_hook<   s    z'NonBlocking.register_not_available_hookN)r   r   r   r
   r   r   r   r   r   staticmethodr    r   r   r   r   r   #   s   
c                 C   s~   t | tstdt| j t | tr*| S t| ds:d | _t| dsZdd }t	|| | _
t| dszdd }t	|| | _| S )	NzNot Iterable DataPipe _as_iteratorr   c                 S   s   | j d krt| | _ t| j S r   )r"   iternextr   r   r   r   r   J   s    

z3EnsureNonBlockingDataPipe.<locals>.nonblocking_nextr   c                 S   s
   d | _ d S r   )r"   r   r   r   r   r   Q   s    z1EnsureNonBlockingDataPipe.<locals>.reset_iterator)
isinstancer   	Exceptionstrr   r   hasattrr"   types
MethodTyper   r   )Zvalidated_datapiper   r   r   r   r   r   A   s*    



 
 Fc                 c   sj  t |tjjstd|t| } d}|rfz|j|d}W n" tjjk
r\   dV  Y q$Y nX t |tjj	r~| 
  |  q$t |tjjrd}|  q$t |tjjrZ|rdz|  }W n tk
r   dV  Y qY nh tk
r   |  |rd}ndV  Y qdY n6 tk
r@   |  |r0d}ndV  Y qdY nX || dV  qdqq$td|q$dS )z
        Indefinitely iterates over req_queue and passing values from source_datapipe to res_queue
        If raise_stop is true, raises exception when StopIteration received from the source_datapipe
    z.Expecting IterDataPipeQueueProtocolServer, gotT)blockFz%Unrecognized type of request receivedN)r%   r   protocolZIterDataPipeQueueProtocolServerr&   r   Zget_new_request
EmptyQueuemessagesZResetIteratorRequestr   Zresponse_reset_iteratorZTerminateRequestZresponse_terminateZGetNextRequestr   r   r   Zresponse_stop_iterationr   Zresponse_invalid_stateZresponse_next)Zsource_datapiper,   Z	full_stopZblocking_request_getforeverrequestvaluer   r   r   r   X   sN    





c                   @   s*   e Zd ZdZd
ddZdd Zdd Zd	S )r	   zN
        Creates iter.DataPipe which reads data from the DataLoader.Queue
    h㈵>c                 C   s4   t |tjjstd||| _d| _d| _|| _d S )NZGotr   F)r%   r   r,   ZIterDataPipeQueueProtocolClientr&   counter_stop_iteration_response_wait_time)r   r,   Zresponse_wait_timer   r   r   __init__   s    
zQueueWrapper.__init__c                 C   sZ   d| _ d| _| j  z| j  W qVW q tjjk
rR   tjd k	rNt  Y qX qd S )NFr   )	r4   r3   r,   Zrequest_reset_iteratorZget_response_reset_iteratorr   r-   r   r   r   r   r   r   r      s    


zQueueWrapper.reset_iteratorc                 C   s   | j rtd| j r"| j  z| jjd| jd}W n tjjk
rT   t	Y nX t
|tjjrnd| _ tt
|tjjrt	|jS )NzA`next` or `nonblocking_next` called after receiving StopIterationT)r+   timeout)r4   r&   r,   Zcan_take_requestZrequest_nextZget_response_nextr5   r   r-   r   r%   r.   ZStopIterationResponser   ZInvalidStateResponser1   )r   responser   r   r   r      s     


zQueueWrapper.nonblocking_nextN)r2   )r   r   r   r   r6   r   r   r   r   r   r   r	      s   
)FF)r   r)   Ztorch.utils.datar   r   r   __all__r
   r&   r   r   r   r   r   r	   r   r   r   r   <module>   s$   
5