U
    d
                     @   s   d dl Z d dlZd dlZd dlmZmZmZ zd dlZejdd dZ	W n e
k
r`   dZ	Y nX dddgZd	d Zd
d Zdd ZdS )    N)IterDataPipecommunicationMapDataPipeF)Zuse_dillTDataPipeToQueuesLoopSpawnProcessForDataPipelineSpawnThreadForDataPipelinec                 C   sh   t | trtj}tjj}n$t | tr4tj}tjj}n
t	d| t
d |j| |||ddD ]}q^d S )Nz.Only supports IterDataPipe or MapDataPipe, got   T)Zblocking_request_get)
isinstancer   r   iterprotocolZIterDataPipeQueueProtocolServerr   mapZMapDataPipeQueueProtocolServer	ExceptiontorchZset_num_threadsZDataPipeBehindQueues)Zsource_datapipe	req_queue	res_queueZ	pipe_typeZprotocol_type_ r   L/tmp/pip-unpacked-wheel-ua33x9lu/torch/utils/data/communication/eventloop.pyr      s    






c                 C   s.   |   }|   }| jt|||fd}|||fS )N)targetargs)QueueProcessr   )Zmultiprocessing_ctxdatapiper   r   processr   r   r   r   (   s     c                 C   s   t j }t j }ztt| }W nt tk
r } zVtrztt| }W q tk
r~ } ztd| W 5 d}~X Y qX n
td|W 5 d}~X Y nX t	j
t|||fdd}||||fS )z
        Given a DataPipe, creates a copy of the DataPipe, starts a new Thread with DataPipeToQueuesLoop as target,
        and return the process, req_queue, res_queue, thread_local_datapipe.
    z1Unable to dill DataPipe to make thread local copyNzPUnable to pickle DataPipe to make thread local copy (consider installing `dill`)T)r   r   daemon)r   queueZThreadingQueuepickleloadsdumpsr   HAS_DILLdill	threadingThreadr   )r   r   r   Znew_datapipepeder   r   r   r   r   0   s&    

  )r   r!   r   Ztorch.utils.datar   r   r   r    extendr   ImportError__all__r   r   r   r   r   r   r   <module>   s    
