U
    dc                  
   @   sR  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZmZ d d	l m!Z!m"Z" d d
l#m$Z$ ej%dkZ&ej%dkZ'e(e)Z*G dd de+Z,e-ee ddddZ.ej/dddZ0ej/dddZ1ee-ef e-e2dddZ3dZ4dZ5G dd deZ6ee6ee-e6f f e-ee-e6f ddd Z7eG d!d" d"Z8G d#d$ d$e j9Z:e2d%d&d'Z;e-eee-ef ee-ee2e2f f ee-e2f ee-e2f ee-ej<f ej=dd(	d)d*Z>G d+d, d,e:Z?G d-d. d.Z@G d/d0 d0e:ZAdS )1    N)nullcontext)	dataclassfield)IntFlag)synchronize)	FrameType)AnyCallableDictOptionalSetTupleUnion)ProcessFailurerecord)redirect_stderrredirect_stdout)TailLogwin32darwinc                       s,   e Zd ZdZeejdd fddZ  ZS )SignalExceptionz
    Exception is raised inside the torchelastic agent process by the termination handler
    if the death signal got received by the process.
    N)msgsigvalreturnc                    s   t  | || _d S N)super__init__r   )selfr   r   	__class__ Q/tmp/pip-unpacked-wheel-ua33x9lu/torch/distributed/elastic/multiprocessing/api.pyr   -   s    zSignalException.__init__)	__name__
__module____qualname____doc__strsignalSignalsr   __classcell__r    r    r   r!   r   '   s   r   )signumframer   c                 C   s*   t | }tdt  d| |ddS )a  Termination handler that raises exceptions on the main process.

    When the process receives death signal(SIGTERM, SIGINT), this termination handler will
    be invoked. It raises the ``SignalException`` exception that should be processed by the
    user code. Python does not terminate process after the termination handler is finished,
    so the exception should not be silently ignored, otherwise the process will never
    be terminated.
    zProcess z got signal: )r   N)r'   r(   r   osgetpid)r*   r+   r   r    r    r!   _terminate_process_handler2   s    	
r.   r   c                   C   s   t r
tjS tjS dS )zJ
    Get the kill signal. SIGKILL for unix, CTRL_C_EVENT for windows.
    N)
IS_WINDOWSr'   CTRL_C_EVENTSIGKILLr    r    r    r!   _get_kill_signal?   s    r3   c                   C   s   t r
tjS tjS dS )zY
    Get the default termination signal. SIGTERM for unix, CTRL_C_EVENT for windows.
    N)r0   r'   r1   SIGTERMr    r    r    r!   _get_default_signalI   s    r5   )dnprocswhatc                 C   s<   t |  }t t|}||kr8t| d| d| d S )Nz), local rank mapping mismatch, expected: z
, actual: )setkeysrangeRuntimeError)r6   r7   r8   Zactual_keysZexpected_keysr    r    r!   _validate_full_rankS   s    r=   z^(\d:[0123],)*(\d:[0123])$z^[0123]$c                   @   sD   e Zd ZdZdZdZeeB Zeee	d e
ed f f dddZdS )Stdr         )vmr   c                 C   s|   dd }t t|r||S t t|r^i }|dD ]"}|d\}}|||t|< q6|S t| dt dt ddS )	z
        Example:

        ::

         from_str("0") -> Std.NONE
         from_str("1") -> Std.OUT
         from_str("0:3,1:0,2:1,3:2") -> {0: Std.ALL, 1: Std.NONE, 2: Std.OUT, 3: Std.ERR}

        Any other input raises an exception
        c                 S   s&   t | } tD ]}|| kr|  S qd S r   )intr>   )vsr    r    r!   to_stdv   s    zStd.from_str.<locals>.to_std,:z does not match: <z> or <>N)rematch_VALUE_REGEX_MAPPING_REGEXsplitrB   
ValueError)clsrA   rE   r6   mirC   r    r    r!   from_strh   s    zStd.from_strN)r"   r#   r$   NONEZOUTZERRALLclassmethodr&   r   r
   rB   rR   r    r    r    r!   r>   b   s   r>   )
val_or_maplocal_world_sizer   c                    sL   t  tr  fddt|D S i }t|D ]} |tj||< q,|S dS )a  
    Certain APIs take redirect settings either as a single value (e.g. apply to all
    local ranks) or as an explicit user-provided mapping. This method is a convenience
    method that converts a value or mapping into a mapping.

    Example:

    ::

     to_map(Std.OUT, local_world_size=2) # returns: {0: Std.OUT, 1: Std.OUT}
     to_map({1: Std.OUT}, local_world_size=2) # returns: {0: Std.NONE, 1: Std.OUT}
     to_map({0: Std.OUT, 1: Std.OUT}, local_world_size=2) # returns: {0: Std.OUT, 1: Std.OUT}
    c                    s   i | ]
}| qS r    r    ).0rQ   rV   r    r!   
<dictcomp>   s      zto_map.<locals>.<dictcomp>N)
isinstancer>   r;   getrS   )rV   rW   maprQ   r    rY   r!   to_map   s    
r^   c                   @   s   e Zd ZU dZeedZeee	f e
d< eedZeeef e
d< eedZeeef e
d< eedZeeef e
d< eddd	Zd
S )RunProcsResulta  
    Results of a completed run of processes started with ``start_processes()``.
    Returned by ``PContext``.

    Note the following:

    1. All fields are mapped by local rank
    2. ``return_values`` - only populated for functions (not the binaries).
    3. ``stdouts`` - path to stdout.log (empty string if no redirect)
    4. ``stderrs`` - path to stderr.log (empty string if no redirect)

    )default_factoryreturn_valuesfailuresstdoutsstderrsr/   c                 C   s   t | jdkS )Nr   )lenrb   r   r    r    r!   	is_failed   s    zRunProcsResult.is_failedN)r"   r#   r$   r%   r   dictra   r
   rB   r   __annotations__rb   r   rc   r&   rd   boolrg   r    r    r    r!   r_      s   
r_   c                   @   s  e Zd ZdZeeeef eee	f eeeeef f eeef eeef eeef eeef eeef d	ddZ
ddddZejddd	d
Zejee dddZdeeee dddZejeeef dddZejdejeddddZdeej eddddZdS )PContexta  
    The base class that standardizes operations over a set of processes
    that are launched via different mechanisms. The name ``PContext``
    is intentional to disambiguate with ``torch.multiprocessing.ProcessContext``.

    .. warning:: stdouts and stderrs should ALWAYS be a superset of
                 tee_stdouts and tee_stderrs (respectively) this is b/c
                 tee is implemented as a redirect + tail -f <stdout/stderr.log>
    	name
entrypointargsenvsrc   rd   tee_stdoutstee_stderrserror_filesc
                 C   st   || _ t|}
t||
d t||
d || _|| _|| _|| _|| _|	| _|
| _	t
||tj| _t
||tj| _d S )Nrc   rd   )rm   re   r=   rn   ro   rp   rc   rd   rs   r7   r   sysstdout_stdout_tailstderr_stderr_tail)r   rm   rn   ro   rp   rc   rd   rq   rr   rs   r7   r    r    r!   r      s    zPContext.__init__Nr/   c                 C   s\   t  t jt t  t jt ts<t  t jt t  t jt |   | j	  | j
	  dS )zN
        Start processes using parameters defined in the constructor.
        N)r'   r4   r.   SIGINTr0   SIGHUPSIGQUIT_startrv   startrx   rf   r    r    r!   r}      s    
zPContext.startc                 C   s
   t  dS )zQ
        Start processes using strategy defined in a particular context.
        NNotImplementedErrorrf   r    r    r!   r|      s    zPContext._startc                 C   s
   t  dS )aF  
        Polls the run status of the processes running under this context.
        This method follows an "all-or-nothing" policy and returns
        a ``RunProcessResults`` object if either all processes complete
        successfully or any process fails. Returns ``None`` if
        all processes are still running.
        Nr~   rf   r    r    r!   _poll   s    	zPContext._pollr?   )timeoutperiodr   c                 C   sV   |dkr|   S |dk rtj}t | }t |k rR|   }|rF|S t| q*dS )a  
        Waits for the specified ``timeout`` seconds, polling every ``period`` seconds
        for the processes to be done. Returns ``None`` if the processes are still running
        on timeout expiry. Negative timeout values are interpreted as "wait-forever".
        A timeout value of zero simply queries the status of the processes (e.g. equivalent
        to a poll).

        ..note: Multiprocesing library registers SIGTERM and SIGINT signal handlers that raise
                ``SignalException`` when the signals received. It is up to the consumer of the code
                to properly handle the exception. It is important not to swallow the exception otherwise
                the process would not terminate. Example of the typical workflow can be:

        .. code-block:: python
            pc = start_processes(...)
            try:
                pc.wait(1)
                .. do some other work
            except SignalException as e:
                pc.shutdown(e.sigval, timeout=30)

        If SIGTERM or SIGINT occurs, the code above will try to shutdown child processes by propagating
        received signal. If child processes will not terminate in the timeout time, the process will send
        the SIGKILL.
        r   N)r   rt   maxsizetimesleep)r   r   r   Zexpiryprr    r    r!   wait  s    zPContext.waitc                 C   s
   t  dS )zR
        Returns pids of processes mapped by their respective local_ranks
        Nr~   rf   r    r    r!   pids-  s    zPContext.pids   	death_sigr   r   c                 C   s
   t  dS )z
        Terminates all processes managed by this context and cleans up any
        meta resources (e.g. redirect, error_file files).
        Nr~   r   r   r   r    r    r!   _close4  s    zPContext._closec                 C   s<   |s
t  }| j||d | jr(| j  | jr8| j  dS )ar  
        Terminates all processes managed by this context and cleans up any
        meta resources (e.g. redirect, error_file files).

        Args:
            death_sig: Death signal to terminate porcesses.
            timeout: Time to wait for processes to finish, if process is
                still alive after this time, it will be terminated via SIGKILL.
        )r   r   N)r5   r   rv   stoprx   r   r    r    r!   close<  s    
zPContext.close)r   r?   )r   )Nr   )r"   r#   r$   r%   r&   r   r	   r
   rB   r   r   r}   abcabstractmethodr|   r   r_   r   floatr   r   r'   r(   r   r   r    r    r    r!   rk      s8   







)    rk   )std_rdc                 C   s   t sts| st S || S d S r   )r0   IS_MACOSr   )r   Zredirect_fnr    r    r!   
get_std_cmQ  s    r   )	
local_rankfnro   rp   stdout_redirectsstderr_redirectsret_valsqueue_finished_reading_eventr   c              
   C   s   ||  }||  }	||  }
||  }||  }t |t}t |t}|	 D ]\}}|tj|< qD|" | t|| }W 5 Q R X W 5 Q R X |
| |  d S r   )	r   r   r   itemsr,   environr   putr   )r   r   ro   rp   r   r   r   r   Zargs_Zenv_Zret_val_Z	stdout_rdZ	stderr_rdZ	stdout_cmZ	stderr_cmkrC   retr    r    r!   _wrapX  s    

 
r   c                       s   e Zd ZdZeeeeef eeeeef f eeef eeef eeef eeef eeef ed
 fddZ	dd Z
eddd	Zee dd
dZeeef dddZdejeddddZ  ZS )MultiprocessContextzF
    ``PContext`` holding worker processes invoked as a function.
    )
rm   rn   ro   rp   rc   rd   rq   rr   rs   start_methodc                    s^   t  |||||||||		 |
 _ fddt jD  _i  _d  _t	 j
  _d S )Nc                    s   i | ]}|t  j qS r    )mpget_contextr   SimpleQueuerX   r   rf   r    r!   rZ     s    z0MultiprocessContext.__init__.<locals>.<dictcomp>)r   r   r   r;   r7   	_ret_vals_return_values_pcr   r   Event_worker_finished_event)r   rm   rn   ro   rp   rc   rd   rq   rr   rs   r   r   rf   r!   r   {  s$    
zMultiprocessContext.__init__c              	   C   sJ   | j rtdtjt| j| j| j| j| j	| j
| jf| jdd| jd| _ d S )NzWThe process context already initialized. Most likely the start method got called twice.F)r   ro   r7   joindaemonr   )r   rN   r   Zstart_processesr   rn   ro   rp   rc   rd   r   r   r7   r   rf   r    r    r!   r|     s&    	zMultiprocessContext._startr/   c                 C   s   t | j| jkS r   )re   r   r7   rf   r    r    r!   _is_done  s    zMultiprocessContext._is_donec                 C   s\  | j d k	stz| j d td| jD ]$}| j| }| s(| | j|< q(| 	 r| j
  | j   t| j| jd |   t| j| j| jdW S W d S W n tjtjfk
rV } z|j}| jj}| j j| }| j| }tjd|j d| d|j d| d	| j d
dd |   t|t||j|j|di| j| jd W Y S d }~X Y nX d S )Nr   r   zreturn_value queue)ra   rc   rd   failed (exitcode: ) local_rank:  (pid: z	) of fn: z (start_method: )T)exc_infor   pidexitcodeZ
error_filerb   rc   rd   )r   AssertionErrorr   r;   r7   r   emptyr\   r   r   r   r9   r=   r   r_   rc   rd   r   ZProcessRaisedExceptionZProcessExitedExceptionZerror_indexrn   r$   	processesrs   logerrorr   r   r   r   )r   r   Zreturn_queueeZfailed_local_rankfn_nameZfailed_procZerror_filepathr    r    r!   r     sX    


  

( zMultiprocessContext._pollc                 C   s&   | j d k	stdd t| j  D S )Nc                 S   s   i | ]\}}||qS r    r    )rX   r   r   r    r    r!   rZ     s      z,MultiprocessContext.pids.<locals>.<dictcomp>)r   r   	enumerater   rf   r    r    r!   r     s    zMultiprocessContext.pidsr   Nr   c              	   C   s
  | j s
d S | j jD ]N}| rtd|j d|j  zt|j| W q t	k
r^   Y qX qt
 | }| j jD ]&}|t
  }|dkr q|| qv| j jD ]^}| rtd|j d| dt   zt|jt  W n t	k
r   Y nX |  qd S )NzClosing process z via signal r   Unable to shutdown process  via , forcefully exitting via )r   r   is_aliver   warningr   rm   r,   killProcessLookupErrorr   	monotonicr   r3   )r   r   r   procendtime_to_waitr    r    r!   r     s2    zMultiprocessContext._close)r   )r"   r#   r$   r%   r&   r	   r
   rB   r   r   r|   rj   r   r   r_   r   r   r'   r(   r   r)   r    r    r   r!   r   v  s"   





'Er   c                   @   sb   e Zd ZdZeeeeef eedddZeeeef ej	dddZ
deej dd	d
dZdS )SubprocessHandlerz
    Convenience wrapper around python's ``subprocess.Popen``. Keeps track of
    meta-objects associated to the process (e.g. stdout and stderr redirect fds).
    rn   ro   envru   rw   c                 C   sb   |rt |dnd | _|r"t |dnd | _tj }|| |fdd |D }| ||| _d S )Nwc                 S   s   g | ]}t |qS r    )r&   )rX   r   r    r    r!   
<listcomp>9  s     z.SubprocessHandler.__init__.<locals>.<listcomp>)	open_stdout_stderrr,   r   copyupdate_popenr   )r   rn   ro   r   ru   rw   Zenv_varsargs_strr    r    r!   r   +  s    

zSubprocessHandler.__init__)ro   r   r   c                 C   s   t j||| j| jdS )N)ro   r   ru   rw   )
subprocessPopenr   r   )r   ro   r   r    r    r!   r   <  s    zSubprocessHandler._popenN)r   r   c                 C   s:   |s
t  }| j| | jr&| j  | jr6| j  d S r   )r5   r   send_signalr   r   r   )r   r   r    r    r!   r   G  s    
zSubprocessHandler.close)N)r"   r#   r$   r%   r&   r   r
   r   r   r   r   r   r'   r(   r   r    r    r    r!   r   %  s   
r   c                       s   e Zd ZdZeeeeef eeeeef f eeef eeef eeef eeef eeef d	 fddZdd Z	e
e ddd	Zeeef dd
dZdejeddddZ  ZS )SubprocessContextzD
    ``PContext`` holding worker processes invoked as a binary.
    rl   c
           
         s<   t  |||||||||		 tt| j| _i | _i | _d S r   )r   r   r9   r;   r7   _running_local_ranks	_failuressubprocess_handlers)
r   rm   rn   ro   rp   rc   rd   rq   rr   rs   r   r    r!   r   V  s    zSubprocessContext.__init__c                    s,    j rtd fddt jD  _ d S )Nz[The subprocess handlers already initialized. Most likely the start method got called twice.c              
      s:   i | ]2}|t  j j|  j|  j|  j| d qS )r   )r   rn   ro   rp   rc   rd   r   rf   r    r!   rZ   x  s    z,SubprocessContext._start.<locals>.<dictcomp>)r   rN   r;   r7   rf   r    rf   r!   r|   s  s    
zSubprocessContext._startr/   c              
   C   s   t  }| jD ]R}| j| }|j }|d k	r|| |dkrt||jj|| j| d| j	|< q| j
| | jrx| j	r|   t| j	| j| jd}| rt|j dd d}td|j d|j d	|j d
| j  ndd t| jD |_|S d S d S )Nr   r   r   c                 S   s   | j S r   )	timestamp)fr    r    r!   <lambda>      z)SubprocessContext._poll.<locals>.<lambda>)keyr   r   r   z) of binary: c                 S   s   i | ]
}|d qS r   r    r   r    r    r!   rZ     s     z+SubprocessContext._poll.<locals>.<dictcomp>)r9   r   r   r   polladdr   r   rs   r   difference_updater   r_   rc   rd   rg   minrb   valuesr   r   r   r   rn   r;   r7   ra   )r   Zdone_local_ranksr   handlerr   resultZfirst_failurer    r    r!   r     s>    



"zSubprocessContext._pollc                 C   s   dd | j  D S )Nc                 S   s   i | ]\}}||j jqS r    )r   r   )rX   r   shr    r    r!   rZ     s    z*SubprocessContext.pids.<locals>.<dictcomp>)r   r   rf   r    r    r!   r     s    zSubprocessContext.pidsr   Nr   c              	   C   s  | j s
d S | j  D ]:}|j d krtd|jj d|j  |j|d qt	
 | }| j  D ]D}|t	
  }|dkr qz|j| W qf tjk
r   Y qfX qf| j  D ]L}|j d krtd|jj d| dt   |jt d |j  qd S )NzSending process z closing signal )r   r   r   r   r   )r   r   r   r   r   r   r   rm   r   r   r   r   r   TimeoutExpiredr3   )r   r   r   r   r   r   r    r    r!   r     s0    zSubprocessContext._close)r   )r"   r#   r$   r%   r&   r
   rB   r   r   r|   r   r_   r   r   r'   r(   r   r)   r    r    r   r!   r   Q  s   





+r   )Br   loggingr,   rI   r'   r   rt   r   
contextlibr   Zdataclassesr   r   enumr   multiprocessingr   typesr   typingr   r	   r
   r   r   r   r   Ztorch.multiprocessingr   Z0torch.distributed.elastic.multiprocessing.errorsr   r   Z3torch.distributed.elastic.multiprocessing.redirectsr   r   Z2torch.distributed.elastic.multiprocessing.tail_logr   platformr0   r   	getLoggerr"   r   	Exceptionr   rB   r.   r(   r3   r5   r&   r=   rL   rK   r>   r^   r_   ABCrk   r   r   r   r   r   r   r   r    r    r    r!   <module>	   sf   $




* 
 


 0,