U
    d                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlmZ e Z	dZ
dZdZdZdeeeeeedddZdd Zdd ZejdddZdS )    N)closing)
get_loggerzAddress already in usezSocket Timeoutz_tcp_store/num_membersz_tcp_store/last_member   X  T   )	is_serverserver_addrserver_port
world_sizetimeoutwait_for_workersc                 C   sP  |dkr$|dkr$t d| d| |dkr>td| d |dkrJ|nd}|dkr\|}nt }td| d| d	| d
|  d| d z<tj|||| tj|d|d}	|rt|	| td |	W S  t	k
rH }
 z`t
|
tkr6||k rtd| d| d| d |d7 }nt	d| d| d|
n W 5 d }
~
X Y qNX qNd S )Nr   r   zCserver_port must be specified when world_size > 1, got server_port=z, world_size=zsever_port: z, specified, ignoring retrieszCreating c10d store on :z
  world_size  : z
  is_server   : z
  timeout(sec): 
)seconds)Z	host_nameportr   Z	is_masterr   r   zSuccessfully created c10d storezport: z already in use, attempt: [/]zon z, port: z already in use)
ValueErrorloginfoget_free_portdistZTCPStoredatetime	timedelta_check_full_rankRuntimeErrorstr_ADDRESS_IN_USEwarning)r   r	   r
   r   r   r   retriesattemptr   storee r$   O/tmp/pip-unpacked-wheel-ua33x9lu/torch/distributed/elastic/utils/distributed.pycreate_c10d_store   sN    	"
	



r&   c              
   C   sx   |  td}||kr | td z| t W nD tk
rr } z&t|tkr`td| d|n W 5 d }~X Y nX d S )Nr   z<val_ignored>ztimed out waiting for all z members to join)	add_MEMBER_CHECKINset_LAST_MEMBER_CHECKINgetr   r   _SOCKET_TIMEOUTTimeoutError)r"   r   idxr#   r$   r$   r%   r   [   s    
r   c               
   C   s2   t  } t|  |  d W  5 Q R  S Q R X d S )Nr   )get_socket_with_portr   getsockname)sockr$   r$   r%   r   k   s    
r   )returnc                  C   s   t jddt jt jd} | D ]x}|\}}}}}t  |||}z|d |d |W   S  tk
r } z|  tj	d|d W 5 d}~X Y qX qt
ddS )	a  
    Returns a free port on localhost that is "reserved" by binding a temporary
    socket on it. Close the socket before passing the port to the entity
    that requires it. Usage example

    ::

    sock = _get_socket_with_port()
    with closing(sock):
        port = sock.getsockname()[1]
        sock.close()
        # there is still a race-condition that some other process
        # may grab this port before func() runs
        func(port)
    	localhostN)hostr   familytype)r3   r   r   zSocket creation attempt failed.)exc_infozFailed to create a socket)socketgetaddrinfo	AF_UNSPECSOCK_STREAMbindlistenOSErrorcloser   r   r   )addrsaddrr5   r6   proto_sr#   r$   r$   r%   r/   q   s"       


"r/   )r   r   r   Tr   )r   r8   
contextlibr   Ztorch.distributedZdistributedr   Z'torch.distributed.elastic.utils.loggingr   r   r   r,   r(   r*   boolr   intfloatr&   r   r   r/   r$   r$   r$   r%   <module>   s2        B