U
    d%                     @   s  U d Z ddlmZ ddlmZ ddlmZmZmZm	Z	m
Z
mZmZmZmZ ddlZddlmZmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddlmZ ddlmZ ddl m!Z!m"Z" ddl#m$Z$m%Z%m&Z& ddl'm(Z(m)Z) g Z*ee+ e,d< ee Z-eee-f Z.e
ee/ e/ef Z0erRee	d  Z1ee
e2ee
def e0df f  Z3neZ1eZ3eeddddZ4ee$e$ddddZ5ee$e$ddddZ6e7e7eee
e7e7f   dddZ8G dd dZ9dS ) z!The pipeline parallelism of Pipe.    )Queue)TracebackType)	TYPE_CHECKINGIterableListOptionalTupleTypeUnioncastSequenceN)Tensornn)record_function   )Checkpointing)CopyWait)forkjoin)Batch)
SkipLayout)SkipTrackerThroughPotalsuse_skip_tracker)AbstractStreamcurrent_stream
use_device)Taskcreate_workers__all__r   )	fork_fromjoin_toreturnc                 C   s:   |   }|  }t| | \| |< }t|| |||< d S N)Zfind_tensor_idxr   r   )r    r!   Zfork_from_idxZjoin_to_idxZphony r$   L/tmp/pip-unpacked-wheel-ua33x9lu/torch/distributed/pipeline/sync/pipeline.py_depend,   s    r&   )batchprev_streamnext_streamr"   c                 C   s8   t j||f|  | d d < tdd | D | d d < d S )Nc                 S   s*   g | ]"}t |r"| s"| n|qS r$   torchZ	is_tensorZis_floating_pointdetach.0xr$   r$   r%   
<listcomp>7   s     z_copy.<locals>.<listcomp>)r   applytupler'   r(   r)   r$   r$   r%   _copy4   s    r4   c                 C   s8   t j||f|  | d d < tdd | D | d d < d S )Nc                 S   s*   g | ]"}t |r"| s"| n|qS r$   r*   r-   r$   r$   r%   r0   =   s     z_wait.<locals>.<listcomp>)r   r1   r2   r3   r$   r$   r%   _wait:   s    r5   )mnr"   c                 #   sL   t | | d D ]6  fddt td  |  dtd  |D V  qdS )z)Generates schedules for each clock cycle.r   c                    s   g | ]} | |fqS r$   r$   )r.   jkr$   r%   r0   P   s     z!_clock_cycles.<locals>.<listcomp>r   N)rangemaxmin)r6   r7   r$   r9   r%   _clock_cycles@   s    r>   c                   @   s   e Zd ZdZeej eej eee	  e
eddddZee ddddZee eeeef  ee dd	d
dZee eeeef  ee dd	ddZdS )Pipelinez"The pipeline parallelism for Pipe.N)
partitionsdevicescopy_streamsskip_layoutcheckpoint_stopr"   c                 C   s2   || _ || _|| _|| _|| _t|\| _| _d S r#   )r@   rA   rB   rC   rD   r   	in_queues
out_queues)selfr@   rA   rB   rC   rD   r$   r$   r%   __init__V   s    zPipeline.__init__)batchesr"   c                    sd   | j }| j}| j t|}t|} fdd|D }t||D ] }| ||| | ||| q>dS )zURuns pipeline parallelism.

        It modifies the given batches in place.

        c                    s   g | ]}t  qS r$   )r   )r.   _rC   r$   r%   r0   r   s     z Pipeline.run.<locals>.<listcomp>N)r@   rA   rC   lenr>   fencecompute)rG   rI   r@   rA   r6   r7   skip_trackersscheduler$   rK   r%   rune   s    zPipeline.run)rI   rP   rO   r"   c              	   C   s   | j }| j}|D ]\}}|dkr>|dkr>t||d  ||  || | }||D ]0\}	}
}||	 | }|| || |||
| qT|dkr||d  | }t|| || qdS )zWCopies micro-batches after computation for the previous
        micro-batches.
        r   r   N)rB   rC   r&   Zcopy_policycopyr4   )rG   rI   rP   rO   rB   rC   ir8   r)   Zprev_jnsnamer(   r$   r$   r%   rM   x   s    zPipeline.fencec              
   C   s  | j }| j}| j}| j}| j d js(d}t|}dd |D }	d}
|D ]\}}|| }|| }|dkr~t||| | |	|  ||k }|r||| ||dtjt	t
t
tddd}t||}t|	| |j|jd	}~~n>|||| ||fttjt	t
t
td
dd}t|	| |dd	}~| j| | qF|D ]\}}| j|  \}}|
dk	rVq,n|sjtt|}
q,ttttf |\}}||d krt||	| || |  t||  || W 5 Q R X |||< q,|
dk	r|
d |
d |
d dS )z0Runs tasks with synchronization to copy streams.r   c                 S   s   g | ]}t |qS r$   )r   )r.   dr$   r$   r%   r0      s     z$Pipeline.compute.<locals>.<listcomp>N)	partitionskip_trackerchunk_idpart_id)rW   rX   rY   rZ   r"   c                 W   sP   t |> td||f " | | W  5 Q R  W  5 Q R  S Q R X W 5 Q R X d S Nzchunk%d-part%d)r   r   )rW   rX   rY   rZ   inputsr$   r$   r%   function   s    z"Pipeline.compute.<locals>.function)rN   finalize)r'   rW   rX   rY   rZ   r"   c                 S   sR   t |@ td||f $ | |W  5 Q R  W  5 Q R  S Q R X W 5 Q R X d S r[   )r   r   call)r'   rW   rX   rY   rZ   r$   r$   r%   rN      s    z!Pipeline.compute.<locals>.computer      )r@   rA   rB   rD   ZtrainingrL   r5   r   Moduler   intTensorOrTensorsr   r   
checkpointZ	recomputer   rE   putrF   getr   ExcInfor   r   r^   with_traceback)rG   rI   rP   rO   r@   rA   rB   rD   r7   Zstreamsexc_inforS   r8   r'   rW   rd   r]   ZchkZtaskrN   okpayloadr$   r$   r%   rN      sz    





zPipeline.compute)__name__
__module____qualname____doc__r   r   Z
Sequentialr+   Zdevicer   r   rb   rH   r   rQ   r   r   rM   rN   r$   r$   r$   r%   r?   S   s&   
    r?   ):ro   queuer   typesr   typingr   r   r   r   r   r	   r
   r   r   r+   r   r   Ztorch.autograd.profilerr   rd   r   rR   r   r   
dependencyr   r   Z
microbatchr   Zskip.layoutr   Zskip.trackerr   r   streamr   r   r   Zworkerr   r   r   str__annotations__ZTensorsrc   BaseExceptionrg   ZInQueueboolZOutQueuer&   r4   r5   rb   r>   r?   r$   r$   r$   r%   <module>   s8   ,$"