U
    d?                     @   sL  d dddddgZ ddlZddlZddlmZmZmZmZmZ ddl	Z	ddl
mZ d	d
lmZmZ d	dlmZ d	dlmZ edddgZdd ZdZejde daet_tjret_dd Zdd Zejej fddZ!dd  Z"dd Z#ej$ddfddZ%dd Z&dd Z'd-d d!Z(d"d# Z)d$d% Z*d&d' Z+d(d) Z,d*d+ Z-ed,e%e- dS ).init_backendbackend_registeredconstruct_rpc_backend_optionsregister_backendBackendTypeBackendValue    N)castDictListSetTuple   )_group_membership_management_update_group_membership)api)	constants%construct_rpc_backend_options_handlerinit_backend_handlerc                 C   s
   d| j  S )NzBackendType.)name)self r   J/tmp/pip-unpacked-wheel-ua33x9lu/torch/distributed/rpc/backend_registry.py_backend_type_repr   s    r   z
    An enum class of available backends.

    PyTorch ships with a builtin ``BackendType.TENSORPIPE`` backend.
    Additional ones can be registered using the
    :func:`~torch.distributed.rpc.backend_registry.register_backend` function.
valuenamesc                 C   s   | t j kS )z
    Checks if backend_name is registered as an RPC backend.

    Args:
        backend_name (str): string to identify the RPC backend.
    Returns:
        True if the backend has been registered with ``register_backend``, else
        False.
    )r   __members__keys)backend_namer   r   r   r   (   s    
c                 C   sd   t | rtd| dd tD }t| t||dif|}tjd|datt_	tj
r\tt_
t|  S )a  Registers a new RPC backend.

    Args:
        backend_name (str): backend string to identify the handler.
        construct_rpc_backend_options_handler (function):
            Handler that is invoked when
            rpc_backend.construct_rpc_backend_options(**dict) is called.
        init_backend_handler (function): Handler that is invoked when the
            `_init_rpc_backend()` function is called with a backend.
             This returns the agent.
    z"RPC backend {}: already registeredc                 S   s   i | ]}|j |jqS r   )r   r   ).0memberr   r   r   
<dictcomp>G   s      z$register_backend.<locals>.<dictcomp>)r   r   r   r   )r   RuntimeErrorformatr   dictr   enumEnumr   __repr____doc___backend_type_doc)r   r   r   Zexisting_enum_dictZextended_enum_dictr   r   r   r   5   s$     
c                 K   s   | j j||f|S N)r   r   )backendrpc_timeoutinit_methodkwargsr   r   r   r   Y   s     c                 O   s   | j j||S r*   )r   r   )r+   argsr.   r   r   r   r   d   s    c                 C   sz   t j}t| |||}|d k	s&td|dkrN|| krNtd|| |dkrv|| krvtd|| |S )Nz*Failed to initialize default ProcessGroup.z)rank argument {} doesn't match pg rank {}z/world_size argument {} doesn't match pg size {})	rpc_constantsZDEFAULT_PROCESS_GROUP_TIMEOUTdistZProcessGroupGlooAssertionErrorrankr"   r#   size)storer4   
world_sizeZprocess_group_timeoutgroupr   r   r   _init_process_groupg   s     r9   c                 K   s   ddl m} || ||||dS )Nr   TensorPipeRpcBackendOptions)r,   r-   num_worker_threads_transports	_channels) r;   )r,   r-   r<   r=   r>   r.   r;   r   r   r   1_tensorpipe_construct_rpc_backend_options_handler}   s    r@   c                    s   t  fdd| D S )Nc                 3   s<   | ]4}|j d kp2|j dko2d|j  ko. k n  V  qdS )cpucudar   N)typeindex)r   ddevice_countr   r   	<genexpr>   s   z/_tensorpipe_validate_devices.<locals>.<genexpr>)all)devicesrG   r   rF   r   _tensorpipe_validate_devices   s    rK   c                 C   s   dd t | D }t|| |||f| dd |D }dd |D }dd |D }dd |D }	t||||	 t| ||}
t|||
}|
|fS )Nc                 S   s   g | ]}d di g fqS )r?   r   r   )r   _r   r   r   
<listcomp>   s     zB_tensorpipe_exchange_and_check_all_device_maps.<locals>.<listcomp>c                 S   s   g | ]\}}}}|qS r   r   )r   r   rL   r   r   r   rM      s    
 c                 S   s   i | ]\}}}}||qS r   r   )r   r   countrL   r   r   r   r!      s    
  zB_tensorpipe_exchange_and_check_all_device_maps.<locals>.<dictcomp>c                 S   s   i | ]\}}}}||qS r   r   )r   r   rL   map_r   r   r   r!      s    
  c                 S   s   i | ]\}}}}||qS r   r   )r   r   rL   rJ   r   r   r   r!      s    
  )ranger5   r2   Zall_gather_object_validate_device_maps_create_reverse_mapping_create_device_list)my_nameZmy_device_countmy_device_maps
my_devicesr8   Zgathered	all_namesall_device_countsall_device_mapsall_devicesreverse_device_mapsr   r   r   ._tensorpipe_exchange_and_check_all_device_maps   s     
 r\   Tc           
      C   s  | D ]`}|| }t t|t |kr8td| d| t||| std| d| d||  q| D ]}|rt||  | std| d||   d|  ||  D ]D\}}	t t|	 t |	krtd| d| d|	 || r<t|	 || sttd| d	| d|	 d
||  n8t|	 || sttd| d| d|	 d||  ||g rt|	 || std| d| d|	 d
||  q||krt|	 || std| d| d|	 d||  qqjd S )NzNode z" has duplicated devices
devices = z, has devices with invalid indices
devices = z
device count = z@ has invalid target node names in its device maps
device maps = z
node names = z5 has duplicated target devices in its device map for z
device map = z5 has unexpected source devices in its device map for z
devices = z? has source devices with invalid indices in its device map for z5 has unexpected target devices in its device map for z? has target devices with invalid indices in its device map for )	lenset
ValueErrorrK   r   issubsetitemsvaluesget)
rW   rX   rY   rZ   is_static_groupnoderJ   Zsource_nodeZtarget_noderO   r   r   r   rQ      sX    

 
 rQ   c                 C   sv   | sbt  }| D ]\}}||  q| D ]\}}||  q2|td t|} t| dd d} | S )NrA   c                 S   s   | j S r*   )rD   )rE   r   r   r   <lambda>       z%_create_device_list.<locals>.<lambda>)key)	r^   ra   updater   discardtorchZdevicelistsorted)rV   rU   r[   Zdevices_setrL   rO   r   r   r   rS      s    rS   c                 C   s<   i }|D ].}| || krdd || |    D ||< q|S )Nc                 S   s   i | ]\}}||qS r   r   )r   kvr   r   r   r!      s     z+_create_reverse_mapping.<locals>.<dictcomp>)ra   )rT   rW   rY   r[   re   r   r   r   rR      s    rR   c                  C   s:   ddl m}  t| t }| }tj }||j	|j
fS )Nr   TensorPipeAgent)r?   rq   r   r   Z_get_current_rpc_agent_get_backend_optionsrk   rB   rG   device_mapsrJ   )rq   agentoptsrG   r   r   r   _get_device_infos  s
    
rv   c                 C   s  ddl m} t|| } |  }|j}|  }i i i g f\}}}}|D ]j}	|	j}
|
|krjt|
t\}}}n"| 	 }t
j |j|j  }}}|||
< |||
< |||
< ||
 qDt||||dd t|||}|D ]8}
t||
 ||
 |||
< tj|
t|||
 |dfd qd S )Nr   rp   F)rd   T)r/   )r?   rq   r   Zget_worker_infor   Zget_worker_infosr   Zrpc_syncrv   rr   rk   rB   rG   rs   rJ   appendrQ   rR   rS   r   )rt   rq   Zmy_worker_inforT   Zall_worker_infosrX   rY   rZ   rW   Zworker_infoZworker_namerG   Z
device_maprJ   ru   r[   r   r   r   #_set_devices_and_reverse_device_map  s.    
rx   c              
   C   sT  ddl m} ddl m} t| tjs2td| t||sJtd|tj	
 rjtj	  tj	 }nd}|rvdnd}|rt| ||}	t|||j|j|	\}
}|| |||||
|}t| tjd |jd	 |	   |S t| |d^ || ||||i g }t| zt| W n  tk
r8   t   Y nX |W  5 Q R  S Q R X d S )
Nr   rp   r:   z!`store` must be a c10d::Store. {}zA`rpc_backend_options` must be a `TensorPipeRpcBackendOptions`. {}r   TF)timeout)r?   rq   r;   
isinstancer2   ZStore	TypeErrorr#   rk   rB   Zis_availableinitrG   r9   r\   rs   rJ   r   Z_init_rpc_statesZ_all_gatherr,   Zbarrierwaitr   rx   	Exceptionshutdown)r6   r   r4   r7   Zrpc_backend_optionsrq   r;   rG   rd   r8   r[   rJ   rt   r   r   r    _tensorpipe_init_backend_handler,  st     

	

	
r   Z
TENSORPIPE)T).__all__collectionsr%   typingr   r	   r
   r   r   rk   Ztorch.distributedZdistributedr2   _utilsr   r   r?   r   r   r1   
namedtupler   r   r)   r&   r$   r   r'   r(   r   r   ZDEFAULT_RPC_TIMEOUT_SECZDEFAULT_INIT_METHODr   r   r9   ZDEFAULT_NUM_WORKER_THREADSr@   rK   r\   rQ   rS   rR   rv   rx   r   r   r   r   r   <module>   sT    
&

	
A	![