U
    d<                     @   sR  d dl Z d dlZd dlmZmZmZmZmZmZ d dl	m
Z
mZ d dlmZmZmZ d dlmZmZ zd dlZejdd dZW n ek
r   dZY nX d	d
ddgZedZedddZddddgZG dd dee edZG dd
 d
eZG dd dee e
dZG dd dZG dd deeZG dd deeZ G dd	 d	e!ee Z"dS )    N)DictCallableOptionalTypeVarGenericIterator)_DataPipeMeta_IterDataPipeMeta)_deprecation_warning!_iter_deprecated_functional_names _map_deprecated_functional_names)DatasetIterableDatasetF)use_dillT	DataChunkDFIterDataPipeIterDataPipeMapDataPipeTT_co)	covariantbatchgroupbyZ_dataframes_as_tuplestrace_as_dataframec                       s   e Zd ZU dZi Zeeef ed< dZ	e
e ed< dZe
e ed< dZe
e ed< dZe
e ed< dZe
e ed< d	Zeed
< dd Zedd ZedddZdd Z fddZedd Zedd Zdd Zdd Zdd Z  ZS ) r   a  
    Iterable-style DataPipe.

    All DataPipes that represent an iterable of data samples should subclass this.
    This style of DataPipes is particularly useful when data come from a stream, or
    when the number of samples is too large to fit them all in memory. ``IterDataPipe`` is lazily initialized and its
    elements are computed only when ``next()`` is called on the iterator of an ``IterDataPipe``.

    All subclasses should overwrite :meth:`__iter__`, which would return an
    iterator of samples in this DataPipe. Calling ``__iter__`` of an ``IterDataPipe`` automatically invokes its
    method ``reset()``, which by default performs no operation. When writing a custom ``IterDataPipe``, users should
    override ``reset()`` if necessary. The common usages include resetting buffers, pointers,
    and various state variables within the custom ``IterDataPipe``.

    Note:
        Only `one` iterator can be valid for each ``IterDataPipe`` at a time,
        and the creation a second iterator will invalidate the first one. This constraint is necessary because
        some ``IterDataPipe`` have internal buffers, whose states can become invalid if there are multiple iterators.
        The code example below presents details on how this constraint looks in practice.
        If you have any feedback related to this constraint, please see `GitHub IterDataPipe Single Iterator Issue`_.

    These DataPipes can be invoked in two ways, using the class constructor or applying their
    functional form onto an existing ``IterDataPipe`` (recommended, available to most but not all DataPipes).
    You can chain multiple `IterDataPipe` together to form a pipeline that will perform multiple
    operations in succession.

    .. _GitHub IterDataPipe Single Iterator Issue:
        https://github.com/pytorch/data/issues/45

    Note:
        When a subclass is used with :class:`~torch.utils.data.DataLoader`, each
        item in the DataPipe will be yielded from the :class:`~torch.utils.data.DataLoader`
        iterator. When :attr:`num_workers > 0`, each worker process will have a
        different copy of the DataPipe object, so it is often desired to configure
        each copy independently to avoid having duplicate data returned from the
        workers. :func:`~torch.utils.data.get_worker_info`, when called in a worker
        process, returns information about the worker. It can be used in either the
        dataset's :meth:`__iter__` method or the :class:`~torch.utils.data.DataLoader` 's
        :attr:`worker_init_fn` option to modify each copy's behavior.

    Examples:
        General Usage:
            >>> from torchdata.datapipes.iter import IterableWrapper, Mapper
            >>> dp = IterableWrapper(range(10))
            >>> map_dp_1 = Mapper(dp, lambda x: x + 1)  # Using class constructor
            >>> map_dp_2 = dp.map(lambda x: x + 1)  # Using functional form (recommended)
            >>> list(map_dp_1)
            [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
            >>> list(map_dp_2)
            [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
            >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0)
            >>> list(filter_dp)
            [2, 4, 6, 8, 10]
        Single Iterator Constraint Example:
            >>> from torchdata.datapipes.iter import IterableWrapper, Mapper
            >>> dp = IterableWrapper(range(10))
            >>> it1 = iter(source_dp)
            >>> list(it1)
            [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
            >>> it1 = iter(source_dp)
            >>> it2 = iter(source_dp)  # The creation of a new iterator invalidates `it1`
            >>> next(it2)
            0
            >>> next(it1)  # Further usage of `it1` will raise a `RunTimeError`
    	functionsNreduce_ex_hookgetstate_hookstr_hook	repr_hook_valid_iterator_idF	_restoredc                 C   sR   |t jkr:|tkr$t| }tf | tt j| | }|S td| jj	|d S Nz"'{0}' object has no attribute '{1})
r   r   r   r
   	functoolspartialAttributeErrorformat	__class____name__selfZattribute_namekwargsfunction r,   G/tmp/pip-unpacked-wheel-ua33x9lu/torch/utils/data/datapipes/datapipe.py__getattr__s   s    

zIterDataPipe.__getattr__c                 C   s   || j |< d S Nr   clsfunction_namer+   r,   r,   r-   register_function}   s    zIterDataPipe.register_functionc                    s@    | j krtd  fdd}t|||}|| j  < d S )N>Unable to add DataPipe function name {} as it is already takenc                    s<   | |f||}t |tr8|s(t |tr8 tkr8| }|S r/   )
isinstancer   r   UNTRACABLE_DATAFRAME_PIPESr   )r2   enable_df_api_tracing	source_dpargsr*   result_piper3   r,   r-   class_function   s    
zBIterDataPipe.register_datapipe_as_function.<locals>.class_functionr   	Exceptionr%   r"   r#   )r2   r3   cls_to_registerr8   r=   r+   r,   r<   r-   register_datapipe_as_function   s
    
	z*IterDataPipe.register_datapipe_as_functionc                 C   s   t jdk	rt | S | jS a  
        This contains special logic to serialize `lambda` functions when `dill` is available.
        If this doesn't cover your custom DataPipe's use case, consider writing custom methods for
        `__getstate__` and `__setstate__`, or use `pickle.dumps` for serialization.
        N)r   r   __dict__r)   r,   r,   r-   __getstate__   s    

zIterDataPipe.__getstate__c                    s:   t jd k	r,zt | W S  tk
r*   Y nX t j||S r/   )r   r   NotImplementedErrorsuper__reduce_ex__r)   r:   r*   r&   r,   r-   rH      s    
zIterDataPipe.__reduce_ex__c                 C   s$   t jd k	r|d k	rtd|t _d S Nz*Attempt to override existing getstate_hook)r   r   r?   r2   Zhook_fnr,   r,   r-   set_getstate_hook   s    zIterDataPipe.set_getstate_hookc                 C   s$   t jd k	r|d k	rtd|t _d S Nz+Attempt to override existing reduce_ex_hook)r   r   r?   rL   r,   r,   r-   set_reduce_ex_hook   s    zIterDataPipe.set_reduce_ex_hookc                 C   s    | j d k	r|  | S t| jjS r/   r   strr&   __qualname__rD   r,   r,   r-   __repr__   s    

zIterDataPipe.__repr__c                 C   s    | j d k	r|  | S t| jjS r/   r   rQ   r&   rR   rD   r,   r,   r-   __str__   s    

zIterDataPipe.__str__c                 C   s   dS )a  
        Reset the `IterDataPipe` to the initial state. By default, no-op. For subclasses of `IterDataPipe`,
        depending on their functionalities, they may want to override this method with implementations that
        may clear the buffers and reset pointers of the DataPipe.
        The `reset` method is always called when `__iter__` is called as part of `hook_iterator`.
        Nr,   rD   r,   r,   r-   reset   s    zIterDataPipe.reset)F)r'   
__module__rR   __doc__r   r   rQ   r   __annotations__r   r   r   r   r   r   intr    boolr.   classmethodr4   rA   rE   rH   rM   rO   rS   rU   rV   __classcell__r,   r,   rJ   r-   r   )   s,   
A




)	metaclassc                   @   s   e Zd Zdd ZdS )r   c                 C   s   dS )NTr,   rD   r,   r,   r-   
_is_dfpipe   s    zDFIterDataPipe._is_dfpipeN)r'   rW   rR   r_   r,   r,   r,   r-   r      s   c                       s   e Zd ZU dZi Zeeef ed< dZ	e
e ed< dZe
e ed< dZe
e ed< dZe
e ed< dd	 Zed
d Zedd Zdd Z fddZedd Zedd Zdd Zdd Z  ZS )r   a  
    Map-style DataPipe.

    All datasets that represent a map from keys to data samples should subclass this.
    Subclasses should overwrite :meth:`__getitem__`, supporting fetching a
    data sample for a given, unique key. Subclasses can also optionally overwrite
    :meth:`__len__`, which is expected to return the size of the dataset by many
    :class:`~torch.utils.data.Sampler` implementations and the default options
    of :class:`~torch.utils.data.DataLoader`.

    These DataPipes can be invoked in two ways, using the class constructor or applying their
    functional form onto an existing `MapDataPipe` (recommend, available to most but not all DataPipes).

    Note:
        :class:`~torch.utils.data.DataLoader` by default constructs an index
        sampler that yields integral indices. To make it work with a map-style
        DataPipe with non-integral indices/keys, a custom sampler must be provided.

    Example:
        >>> from torchdata.datapipes.map import SequenceWrapper, Mapper
        >>> dp = SequenceWrapper(range(10))
        >>> map_dp_1 = dp.map(lambda x: x + 1)  # Using functional form (recommended)
        >>> list(map_dp_1)
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        >>> map_dp_2 = Mapper(dp, lambda x: x + 1)  # Using class constructor
        >>> list(map_dp_2)
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        >>> batch_dp = map_dp_1.batch(batch_size=2)
        >>> list(batch_dp)
        [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
    r   Nr   r   r   r   c                 C   sR   |t jkr:|tkr$t| }tf | tt j| | }|S td| jj	|d S r!   )
r   r   r   r
   r"   r#   r$   r%   r&   r'   r(   r,   r,   r-   r.      s    

zMapDataPipe.__getattr__c                 C   s   || j |< d S r/   r0   r1   r,   r,   r-   r4      s    zMapDataPipe.register_functionc                 C   s:   || j krtd|dd }t||}|| j |< d S )Nr5   c                 _   s   | |f||}|S r/   r,   )r2   r9   r:   r*   r;   r,   r,   r-   r=     s    zAMapDataPipe.register_datapipe_as_function.<locals>.class_functionr>   )r2   r3   r@   r=   r+   r,   r,   r-   rA      s
    
z)MapDataPipe.register_datapipe_as_functionc                 C   s   t jdk	rt | S | jS rB   )r   r   rC   rD   r,   r,   r-   rE     s    

zMapDataPipe.__getstate__c                    s:   t jd k	r,zt | W S  tk
r*   Y nX t j||S r/   )r   r   rF   rG   rH   rI   rJ   r,   r-   rH     s    
zMapDataPipe.__reduce_ex__c                 C   s$   t jd k	r|d k	rtd|t _d S rK   )r   r   r?   rL   r,   r,   r-   rM     s    zMapDataPipe.set_getstate_hookc                 C   s$   t jd k	r|d k	rtd|t _d S rN   )r   r   r?   rL   r,   r,   r-   rO   #  s    zMapDataPipe.set_reduce_ex_hookc                 C   s    | j d k	r|  | S t| jjS r/   rP   rD   r,   r,   r-   rS   )  s    

zMapDataPipe.__repr__c                 C   s    | j d k	r|  | S t| jjS r/   rT   rD   r,   r,   r-   rU   /  s    

zMapDataPipe.__str__)r'   rW   rR   rX   r   r   rQ   r   rY   r   r   r   r   r   r.   r\   r4   rA   rE   rH   rM   rO   rS   rU   r]   r,   r,   rJ   r-   r      s&   






c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
_DataPipeSerializationWrapperc                 C   s
   || _ d S r/   	_datapipe)r)   Zdatapiper,   r,   r-   __init__7  s    z&_DataPipeSerializationWrapper.__init__c                 C   sJ   d}zt | j}W n, tk
r@   tr:t| j}d}n Y nX ||fS )NFT)pickledumpsrb   r?   HAS_DILLdill)r)   r   valuer,   r,   r-   rE   :  s    z*_DataPipeSerializationWrapper.__getstate__c                 C   s*   |\}}|rt || _nt|| _d S r/   )rg   loadsrb   rd   )r)   staterh   r   r,   r,   r-   __setstate__F  s    z*_DataPipeSerializationWrapper.__setstate__c                 C   s:   zt | jW S  tk
r4   tdt| jY nX d S )Nz%{} instance doesn't have valid length)lenrb   r?   	TypeErrorr%   typer'   rD   r,   r,   r-   __len__M  s    z%_DataPipeSerializationWrapper.__len__N)r'   rW   rR   rc   rE   rk   ro   r,   r,   r,   r-   r`   6  s   r`   c                   @   s   e Zd Zdd ZdS )!_IterDataPipeSerializationWrapperc                 c   s   | j E d H  d S r/   ra   rD   r,   r,   r-   __iter__W  s    z*_IterDataPipeSerializationWrapper.__iter__N)r'   rW   rR   rq   r,   r,   r,   r-   rp   V  s   rp   c                   @   s   e Zd Zdd ZdS ) _MapDataPipeSerializationWrapperc                 C   s
   | j | S r/   ra   )r)   idxr,   r,   r-   __getitem__\  s    z,_MapDataPipeSerializationWrapper.__getitem__N)r'   rW   rR   rt   r,   r,   r,   r-   rr   [  s   rr   c                       sJ   e Zd Z fddZdddZee d fddZedd	d
Z  Z	S )r   c                    s   t  | || _d S r/   )rG   rc   items)r)   ru   rJ   r,   r-   rc   a  s    zDataChunk.__init__ c                 C   s(   |d d dd t| D  d }|S )N[z, c                 s   s   | ]}t |V  qd S r/   )rQ   ).0ir,   r,   r-   	<genexpr>f  s     z#DataChunk.as_str.<locals>.<genexpr>])joiniter)r)   indentresr,   r,   r-   as_stre  s    $zDataChunk.as_str)returnc                 #   s   t   D ]
}|V  q
d S r/   )rG   rq   r)   ry   rJ   r,   r-   rq   i  s    zDataChunk.__iter__c                 c   s   | j D ]
}|V  qd S r/   )ru   r   r,   r,   r-   raw_iteratorm  s    
zDataChunk.raw_iterator)rv   )
r'   rW   rR   rc   r   r   r   rq   r   r]   r,   r,   rJ   r-   r   `  s   
)#r"   rd   typingr   r   r   r   r   r   Z"torch.utils.data.datapipes._typingr   r	   Z'torch.utils.data.datapipes.utils.commonr
   r   r   Ztorch.utils.data.datasetr   r   rg   extendrf   ImportError__all__r   r   r7   r   r   r   r`   rp   rr   listr   r,   r,   r,   r-   <module>   s>    
 k 