U
    d                     @   s   U d Z ddlZddlmZmZmZmZmZmZ ddlZddlm	Z	 ddl
Zg Zee ed< ee	 Zee	ef Zeegeee e	f f ZG dd deZG dd	 d	Zdd
ddZeee dddZee dddZdS )zManipulation of micro-batches.    N)AnyCallableListUnioncastSequence)Tensor__all__c                   @   s*   e Zd ZdZedddZedd ZdS )NoChunka  
    Wrapper for a Tensor in :meth:`Pipe.forward` indicating that the tensor
    should not be chunked on the batch dimension and instead be replicated
    as-is across all micro-batches. This is useful for tensors which might
    not have any 'batch' semantics for the model.
    )inpc                 C   s"   t |std| || _d S )Nz+NoChunk only supported for tensors, found: )torch	is_tensor	TypeError_tensor)selfr    r   N/tmp/pip-unpacked-wheel-ua33x9lu/torch/distributed/pipeline/sync/microbatch.py__init__   s    
zNoChunk.__init__c                 C   s   | j S N)r   r   r   r   r   tensor#   s    zNoChunk.tensorN)__name__
__module____qualname____doc__r   r   propertyr   r   r   r   r   r
      s   r
   c                   @   s  e Zd ZdZeee ef ddddZe	edddZ
e	d	d
 Zdd Zdd Zed dddZedddZdd ZedddZedddZejeeddddZejeeddddZeeef ddd dZeddd!d"Zeddd#d$ZdS )%BatchzC
    An abstraction representing a microbatch in the pipeline.
    N)valuesreturnc                 C   s@   || _ t|| _| js<tdd | j D s<td| j  d S )Nc                 s   s   | ]}t |V  qd S r   r   r   ).0valuer   r   r   	<genexpr>3   s     z!Batch.__init__.<locals>.<genexpr>zNo tensors found in batch: )_valuesr   r   atomicanyr   )r   r   r   r   r   r   -   s
    zBatch.__init__r   c                 C   s   | j stdtt| jS )z Retrieves the underlying tensor.znot atomic batch)r$   AttributeErrorr   r   r#   r   r   r   r   r   6   s    zBatch.tensorc                 C   s   | j S )z-Retreives the underlying values for the batch)r#   r   r   r   r   r   =   s    zBatch.valuesc                 C   s<   | j r
dS t| jD ]\}}t|r|  S qtddS )z<
        Retrieves the index of first tensor found.
        r   zNo tensor found!N)r$   	enumerater#   r   r   r   )r   ir!   r   r   r   find_tensor_idxB   s    

zBatch.find_tensor_idxc                 C   s2   | j r| jjS | jD ]}t|r|j  S qdS )z;
        Retrieves the device for this microbatch.
        N)r$   r#   devicer   r   )r   r!   r   r   r   
get_deviceN   s
    

zBatch.get_device)functionr   c                 C   s&   | j rt|| jS t|| j S dS )zbCalls a function on the microbatch. It also wraps
        the output with :class:`Batch`.
        N)r$   r   r#   )r   r-   r   r   r   callY   s    z
Batch.callc                 C   s   d| j d| jdS )NzBatch[atomic=z]()r$   r#   r   r   r   r   __repr__b   s    zBatch.__repr__c                 c   s    | j r| jV  n| jE d H  d S r   r0   r   r   r   r   __iter__e   s    
zBatch.__iter__c                 C   s   | j r
dS t| jS )N   )r$   lenr#   r   r   r   r   __len__k   s    zBatch.__len__)indexc                 C   s&   | j s| j| S |dkr td| jS )Nr    atomic batch allows index 0 onlyr$   r#   
IndexError)r   r6   r   r   r   __getitem__n   s
    
zBatch.__getitem__)r6   r!   r   c                 C   s   d S r   r   r   r6   r!   r   r   r   __setitem__x   s    zBatch.__setitem__c                 C   s   d S r   r   r;   r   r   r   r<   |   s    )r6   r   c                 C   s(   t |tr| || n| || d S r   )
isinstanceint_setitem_by_index_setitem_by_slicer;   r   r   r   r<      s    
c                 C   sP   | j s6|}| jd | |f | j|d d   | _d S |dkrFtd|| _d S )Nr3   r   r7   r8   )r   r6   r!   r)   r   r   r   r?      s    (zBatch._setitem_by_indexc                 C   s`   |j |j  kr$|j  kr$d ks.n td| js>|| _d S t|dkrRtd|d | _d S )Nzonly slice [:] supportedr3   z5atomic batch cannot be replaced with multiple tensorsr   )startstopstepNotImplementedErrorr$   r#   r4   r9   r;   r   r   r   r@      s    &zBatch._setitem_by_slice)r   r   r   r   r   r   r   r   r   r   r   r   r*   r,   Functionr.   strr1   r2   r>   r5   r:   typingoverloadr<   sliceTensorsr?   r@   r   r   r   r   r   (   s(   	
	
r   r&   c                    sB   t dd |D s td| t  fdd|D r>tddS )z
    Checks whether the input contains at least one tensor and each tensor is
    on the same device as the first partition.

    Raises:
        ValueError: input does not contain at least one tensor

    c                 s   s   | ]}t |V  qd S r   r   r    inputr   r   r   r"      s     zcheck.<locals>.<genexpr>z inputs do not have any tensors: c                 3   s"   | ]}t |o|j kV  qd S r   )r   r   r+   rK   first_devicer   r   r"      s     z>All inputs should be on the same device as the first partitionN)r%   r   
ValueError)rN   inputsr   rM   r   check   s    
rQ   )chunksr   c                 G   s  t |dkr2t|d tr2dd |d | D S dd t| D }d}|D ]}t|r|| }|dkr|t |krtd| dt | t |}t|D ]\}}|| 	| qqLt| D ].}t|t
r|| 	|j q|| 	| qqL|d	| }d
d |D S )z7Splits an input mini-batch into multiple micro-batches.r3   r   c                 S   s   g | ]}t |qS r   r   r    xr   r   r   
<listcomp>   s     zscatter.<locals>.<listcomp>c                 S   s   g | ]}g qS r   r   )r    _r   r   r   rV      s     z6Found different number of chunks produced for inputs:  and Nc                 S   s   g | ]}t |qS r   rS   rT   r   r   r   rV      s     )r4   r=   r   chunkranger   r   RuntimeErrorr(   appendr
   r   )rR   rP   ZbatchesZ
num_chunksrL   tensorsr)   r   r   r   r   scatter   s$    


r_   )outputsc              	   C   s   | d j r(tdd | D }t|}ng }tt| d D ]}t| d | }g }| D ]>}|t|| krtd| dt||  |||  qXt	| d | r|t| q<|| q<t|}|S )z4Concatenates output micro-batches into a mini-batch.r   c                 s   s   | ]}|j V  qd S r   )r   )r    br   r   r   r"      s     zgather.<locals>.<genexpr>z2Types for microbatch outputs do not match, found: rY   )
r$   tupler   catr[   r4   typer   r]   r   )r`   r^   outputZ
output_bufr)   output_typeZcurrent_outputsbatchr   r   r   gather   s     
rh   )r   rG   r   r   r   r   r   r   r   r   Ztorch.cuda.commr	   rF   __annotations__rJ   ZTensorOrTensorsrE   objectr
   r   rQ   r>   r_   rh   r   r   r   r   <module>   s    w#