U
    d,                     @   s  d Z ddlmZ ddlmZ ddlZddlmZmZm	Z	m
Z
mZmZmZmZmZ ddlZddlmZ ddlZddlmZmZ dd	lmZ dd
lmZ ddgZee Zeeef Zeeef Zeeee f Zerddlm Z  ne!Z G dd de Z"e"dddZ#G dd dZ$G dd dej%Z&e& Z'ee
d dddZ(ee
d dddZ)e*dddZ+e*dddZ,G dd  d Z-ej.e	e dd!d"d#Z/eej.e	e e
d d!d$d%Z0G d&d' d'ej1j"Z2G d(d) d)ej1j"Z3dS )*a  Checkpointing with preceding recomputation.

PyTorch already provides the official checkpointing utilities in
:mod:`torch.utils.checkpoint`. The official checkpointing combines
recomputation and recursive backpropagation into one autograd function named
``CheckpointFunction``. Hence, the recomputation can be started only when the
gradients arrive to the function. In Pipe, the recomputation needs to precede
the gradient arrival to minimize the GPU idle time.

We solve this problem by introducing separate autograd functions named
:class:`Recompute` and :class:`Checkpoint`. Each function represents
recomputation and recursive backpropagation, respectively. We can manipulate
the control flow in aspect of both the autograd engine and CUDA with a pair of
the functions.

Specifically, we place CUDA stream synchronization between :class:`Recompute`
and :class:`Checkpoint` to delay only :class:`Checkpoint` until the gradient is
copied entirely.

    )deque)contextmanagerN)	TYPE_CHECKINGAnyDeque	GeneratorListOptionalUnionSequenceTuple)Tensor   )forkjoin)Batch)	get_phonyis_checkpointingis_recomputing)Protocolc                   @   s   e Zd ZeedddZdS )Function)inputreturnc                 C   s   d S N )selfr   r   r   N/tmp/pip-unpacked-wheel-ua33x9lu/torch/distributed/pipeline/sync/checkpoint.py__call__F   s    zFunction.__call__N)__name__
__module____qualname__TensorOrTensorsr   r   r   r   r   r   E   s   r   )functionc                 C   s*   t |}t| |}| }|| |jS )zMakes a checkpoint with a simple interface like
    :func:`torch.utils.checkpoint.checkpoint`. It's only used to test or debug
    :class:`Checkpoint` and :class:`Recompute` without boilerplate.
    )r   Checkpointing
checkpoint	recomputevalues)r"   r   batchZchkr   r   r   r$   J   s
    

r$   c                   @   s@   e Zd ZdZeeddddZedddZedd	d
dZdS )r#   z?Generates a pair of :class:`Checkpoint` and :class:`Recompute`.N)r"   r'   r   c                 C   s(   || _ || _tdd| _tdd| _d S )Nr   )maxlen)r"   r'   r   
recomputed
rng_states)r   r"   r'   r   r   r   __init__[   s    zCheckpointing.__init__r   c                 C   sf   | j j}t| j }t| j  dd}tj|| j| j| j	|f| }t
|tr^tdd |D }t|S )z/Returns a batch applied by :class:`Checkpoint`.T)requires_gradc                 S   s*   g | ]"}t |r"| s"| n|qS r   )torch	is_tensorZis_floating_pointdetach.0xr   r   r   
<listcomp>r   s     z,Checkpointing.checkpoint.<locals>.<listcomp>)r'   atomictupler   Z
get_device
Checkpointapplyr)   r*   r"   
isinstancer   )r   input_atomicinputsphonyoutputr   r   r   r$   d   s    

zCheckpointing.checkpoint)r'   r   c                 C   sb   | j j}t| j }| }t|| \||< }tj|| j| j| j	|f| }t
|| |||< dS )z1Applies :class:`Recompute` to the batch in place.N)r'   r5   r6   Zfind_tensor_idxr   	Recomputer8   r)   r*   r"   r   )r   r'   r:   r;   Z
tensor_idxr<   r   r   r   r%   v   s    
zCheckpointing.recompute)	r   r   r    __doc__r   r   r+   r$   r%   r   r   r   r   r#   X   s   	r#   c                   @   s   e Zd ZddddZdS )ThreadLocalNr,   c                 C   s   d| _ d| _d S )NF)r   r   )r   r   r   r   r+      s    zThreadLocal.__init__)r   r   r    r+   r   r   r   r   r@      s   r@   )NNNr,   c                  c   s$   t j} dt _z
dV  W 5 | t _X dS )zDMakes :func:`is_checkpointing` return :data:`True` within a context.TNthread_localr   origr   r   r   enable_checkpointing   s
    
rE   c                  c   s$   t j} dt _z
dV  W 5 | t _X dS )zBMakes :func:`is_recomputing` return :data:`True` within a context.TNrB   r   rC   r   r   r   enable_recomputing   s
    
rG   c                   C   s   t jS )zWhether the current forward propagation is under checkpointing.

    Returns:
        bool: :data:`True` if it's under checkpointing.

    rA   r   r   r   r   r      s    c                   C   s   t jS )a9  Whether the current forward propagation is under checkpoint
    recomputation. Use this to prevent duplicated side-effects at forward
    propagation::

        class Counter(nn.Module):
            def __init__(self):
                super().__init__()
                self.counter = 0

            def forward(self, input):
                if not is_recomputing():
                    self.counter += 1
                return input

    Returns:
        bool: :data:`True` if it's under checkpoint recomputation.

    .. seealso:: :ref:`Detecting Recomputation`

    rF   r   r   r   r   r      s    c                   @   sf   e Zd ZU dZee ed< ee ed< eed< e	ed< e
e ed< eedf ed< ed	d
ddZd	S )Contextz]The common interface between the :class:`Checkpoint` and
    :class:`Recompute` context.
    r)   r*   r"   r:   r;   .Zsaved_tensorsN)tensorsr   c                 G   s   d S r   r   )r   rI   r   r   r   save_for_backward   s    zContext.save_for_backward)r   r   r    r?   r   
Recomputed__annotations__	RNGStatesr   boolr   r   r   r   rJ   r   r   r   r   rH      s   
rH   )devicer*   r   c                 C   s6   t  }| jdkr t j| }nd}|||f dS )z:meth:`Checkpoint.forward` captures the current PyTorch's random number
    generator states at CPU and GPU to reuse in :meth:`Recompute.backward`.

    .. seealso:: :ref:`Referential Transparency`

    cudaN)r.   Zget_rng_statetyperP   append)rO   r*   cpu_rng_stategpu_rng_stater   r   r   save_rng_states   s
    
rU   c              	   c   sf   |  \}}g }| jdkr$||  tj|, t| |dk	rRtj||  dV  W 5 Q R X dS )z:meth:`Recompute.backward` restores the random number generator states
    captured by :func:`save_rng_states` within its context.

    .. seealso:: :ref:`Referential Transparency`

    rP   N)poprQ   rR   r.   randomZfork_rngZset_rng_staterP   )rO   r*   rS   rT   Zgpu_devicesr   r   r   restore_rng_states   s    


rX   c                   @   sR   e Zd Zeeeee ee e	e
dddZeeeeee df dddZdS )	r7   )ctxr<   r)   r*   r"   r:   c           
   
   G   s   || _ || _t|j| j || _|| _|r6|d g}n"g }|D ]}t|r>|| q>| j	|  t
 B t 0 |rt|dkst||d }	n|| }	W 5 Q R X W 5 Q R X |	S )Nr   r   )r)   r*   rU   rO   r"   r:   r.   r/   rR   rJ   Zno_gradrE   lenAssertionError)
rY   r<   r)   r*   r"   r:   r;   rI   r   r=   r   r   r   forward   s$    

zCheckpoint.forward.rY   grad_outputr   c                 G   s   | j  \}}t|tr|}n|f}tdd |D rVtdd |D }tj|| d d d d d g}|dd |D  t|S )Nc                 s   s   | ]}t |o|jV  qd S r   r.   r/   r-   )r2   yr   r   r   	<genexpr>*  s     z&Checkpoint.backward.<locals>.<genexpr>c                 S   s    g | ]}t |r|jr|qS r   r_   r1   r   r   r   r4   +  s     
  z'Checkpoint.backward.<locals>.<listcomp>c                 s   s"   | ]}t |r|jnd V  qd S r   )r.   r/   Zgradr1   r   r   r   ra   /  s     )	r)   rV   r9   r6   anyr.   autogradbackwardextend)rY   r^   r=   Z
input_leafoutputsrI   
grad_inputr   r   r   rd   "  s    
zCheckpoint.backwardN)r   r   r    staticmethodrH   r   r   rK   rM   r   rN   r\   r   r	   rd   r   r   r   r   r7      s   "r7   c                	   @   sL   e Zd Zeeeee ee e	e
edddZeeeed dddZdS )	r>   )rY   r<   r)   r*   r"   r:   r   c           	      G   s^   || _ || _|| _|| _|| _|r.|d g}n"g }|D ]}t|r6|| q6| j|  |S )Nr   )	r)   r*   r"   r:   r;   r.   r/   rR   rJ   )	rY   r<   r)   r*   r"   r:   r;   rI   r   r   r   r   r\   4  s    

zRecompute.forward)N.r]   c                 G   s   | j }tdd |D }d }|D ]}t|r |j} q:q |d krPtd| t|| j\ t H t	 6 | j
rt|dkst| |d }n
| j| }W 5 Q R X W 5 Q R X W 5 Q R X | j||f d d d d d g}|dd | j D  t|S )Nc                 s   s,   | ]$}t |r | |jn|V  qd S r   )r.   r/   r0   Zrequires_grad_r-   r1   r   r   r   ra   S  s     z%Recompute.backward.<locals>.<genexpr>zNo tensors found in r   r   c                 s   s   | ]
}d V  qd S r   r   )r2   _r   r   r   ra   j  s     )r;   r6   r.   r/   rO   RuntimeErrorrX   r*   Zenable_gradrG   r:   rZ   r[   r"   r)   rR   re   )rY   r^   r;   Zinputs_leafrO   r   r=   rg   r   r   r   rd   P  s&    
(zRecompute.backwardN)r   r   r    rh   rH   r   r   rK   rM   r   rN   r\   r   rd   r   r   r   r   r>   3  s   r>   )4r?   collectionsr   
contextlibr   	threadingtypingr   r   r   r   r   r	   r
   r   r   r.   r   Ztorch.autograd
dependencyr   r   Z
microbatchr   r<   r   __all__ZTensorsr!   rK   rM   Ztyping_extensionsr   objectr   r$   r#   localr@   rB   rE   rG   rN   r   r   rH   rO   rU   rX   rc   r7   r>   r   r   r   r   <module>   sF   ,,


6