U
    d                     @   s   d dl Z d dlZd dlmZmZ dZdddddd	gZd
d	 ZG dd deZ	G dd deZ
dd ZdddZG dd de
ZdS )    N)communicationMapDataPipegMbP?DataPipeBehindQueuesEnsureNonBlockingMapDataPipeNonBlockingMapNotAvailableQueueWrapperForMapdefault_not_available_hookc                   C   s   t t d S N)timesleepDEFAULT_NON_BLOCKING_SLEEP r   r   F/tmp/pip-unpacked-wheel-ua33x9lu/torch/utils/data/communication/map.pyr	      s    c                   @   s   e Zd ZdS )r   N)__name__
__module____qualname__r   r   r   r   r      s   c                   @   s<   e Zd ZeZdd Zdd Zdd Zdd Ze	d	d
 Z
dS )r   c                 C   s:   z|  |W S  tk
r2   tjd k	r.t  Y q X q d S r
   )nonblocking_getitemr   r   not_available_hookselfindexr   r   r   __getitem__   s
    
zNonBlockingMap.__getitem__c                 C   s6   z
|   W S  tk
r0   tjd k	r,t  Y nX d S r
   )nonblocking_lenr   r   r   r   r   r   r   __len__%   s
    

zNonBlockingMap.__len__c                 C   s   t d| j d S )Nz)nonblocking_len is not implemented for %sNotImplementedError	__class__r   r   r   r   r   ,   s    zNonBlockingMap.nonblocking_lenc                 C   s   t d| j d S )Nz-nonblocking_getitem is not implemented for %sr   r   r   r   r   r   0   s    z"NonBlockingMap.nonblocking_getitemc                 C   s
   | t _d S r
   )r   r   )Zhook_functionr   r   r   register_not_available_hook4   s    z*NonBlockingMap.register_not_available_hookN)r   r   r   r	   r   r   r   r   r   staticmethodr   r   r   r   r   r      s   c                 C   sl   t | tstd| j t | tr(| S t| dsHdd }t|| | _t| dshdd }t|| | _	| S )NzNot Map DataPipe - got r   c                 S   s   |   S r
   )r   r   r   r   r   r   ?   s    z5EnsureNonBlockingMapDataPipe.<locals>.nonblocking_lenr   c                 S   s
   |  |S r
   )r   r   r   r   r   r   D   s    z9EnsureNonBlockingMapDataPipe.<locals>.nonblocking_getitem)

isinstancer   	Exceptionr   r   hasattrtypes
MethodTyper   r   )Zvalidated_datapiper   r   r   r   r   r   9   s"    


 
 Fc           	   
   c   sT  t |tjjstd|t| } d}|rPz|j|d}W n" tjjk
r\   dV  Y q$Y nX t |tjj	rzd}|
  q$t |tjjr|  }|| q$t |tjjrD|rNz| |j}W nd tk
r   dV  Y qY nH tk
r& } z(|  |rd}ndV  W Y qNW 5 d}~X Y nX ||j| dV  qNqq$td|q$dS )z
        Indefinitely iterates over req_queue and passing values from source_datapipe to res_queue
        If raise_stop is true, raises exception when StopIteration received from the source_datapipe
    z-Expecting MapDataPipeQueueProtocolServer, gotT)blockFNz%Unrecognized type of request received)r!   r   protocolZMapDataPipeQueueProtocolServerr"   r   Zget_new_request
EmptyQueuemessagesZTerminateRequestZresponse_terminateZ
LenRequestr   Zresponse_lenZGetItemRequestr   keyr   
IndexErrorZresponse_index_out_of_boundZresponse_item)	Zsource_datapiper'   Z	full_stopZblocking_request_getforeverrequestsizevalueer   r   r   r   K   sB    


c                   @   s*   e Zd ZdZd
ddZdd Zdd Zd	S )r   zM
        Creates map.DataPipe which reads data from the DataLoader.Queue
    h㈵>c                 C   s4   t |tjjstd||| _d| _d| _|| _d S )NZGotr   F)r!   r   r'   ZMapDataPipeQueueProtocolClientr"   counter_stop_iteration_response_wait_time)r   r'   Zresponse_wait_timer   r   r   __init__~   s    
zQueueWrapperForMap.__init__c                 C   s   | j rtd| j r$| j| z| jjd| jd}W n tjjk
rV   t	Y nX t
|tjjr|d| _ td| d|j|jfS )NzG`getitem` or `nonblocking_getitem` called after receiving StopIterationTr&   timeoutzIndex z is out of bound.)r3   r"   r'   can_take_requestZrequest_itemZget_response_itemr4   r   r(   r   r!   r)   ZStopIterationResponser+   r*   r/   )r   r   responser   r   r   r      s    

z&QueueWrapperForMap.nonblocking_getitemc                 C   s\   | j rtd| j r"| j  z| jjd| jd}W n tjjk
rT   t	Y nX |j
S )Nz?`len` or `nonblocking_len` called after receiving StopIterationTr6   )r3   r"   r'   r8   Zrequest_lenZget_response_lenr4   r   r(   r   len)r   r9   r   r   r   r      s    


z"QueueWrapperForMap.nonblocking_lenN)r1   )r   r   r   __doc__r5   r   r   r   r   r   r   r   z   s   
)FF)r   r$   Ztorch.utils.datar   r   r   __all__r	   r"   r   r   r   r   r   r   r   r   r   <module>   s    

/