U
    9%e<+                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	Z	d dl
mZ d dlmZ dddddgZe aejZejZG d	d deZG d
d dZe Zdd Zdd Zdd Zdd Zdd Zdd Ze ddddgZ e dddgZ!dS )    N)Enum)_get_current_rpc_agentRPCExecMode	serializedeserialize	PythonUDFRemoteExceptionc                   @   s   e Zd ZdZdZdZdZdS )r   syncasyncZ	async_jitremoteN)__name__
__module____qualname__ZSYNCASYNCZ	ASYNC_JITZREMOTE r   r   ]/var/www/html/Darija-Ai-API/env/lib/python3.8/site-packages/torch/distributed/rpc/internal.pyr      s   c                   @   st   e Zd ZdZdd Zdd Zedd Zdd	 Zed
d Z	dd Z
dd Zedd Zdd Zdd Zdd ZdS )_InternalRPCPicklera	  
    This class provides serialize() and deserialize() interfaces to serialize
    data to be "binary string + tensor table" format
    So for RPC python UDF function and args, non tensor data will be serialized
    into regular binary string, tensor data will be put into thread local tensor
    tables, this serialization format is consistent with builtin operator and args
    using JIT pickler. This format will make tensor handling in C++ much easier,
    e.g. attach tensor to distributed autograd graph in C++
    c                 C   s$   t j | _| j| jtj< i | _d S N)copyregdispatch_tablecopy_dispatch_table_tensor_reducertorchZTensor_class_reducer_dict)selfr   r   r   __init__)   s    z_InternalRPCPickler.__init__c                 C   s   || j kr|| j |< d S r   )r   )r   Z	obj_classZreducerr   r   r   _register_reducer0   s    
z%_InternalRPCPickler._register_reducerc                 C   s
   t j| S r   )_thread_local_tensor_tablesrecv_tables)clstensor_indexr   r   r   _tensor_receiver5   s    z$_InternalRPCPickler._tensor_receiverc                 C   s&   t j| tt jd }tj|ffS )N   )r   send_tablesappendlenr   r"   )r   Ztensorr!   r   r   r   r   :   s    z#_InternalRPCPickler._tensor_reducerc                 C   s   t jj|S r   )distrpcPyRRefZ_deserialize)r    rref_fork_datar   r   r   _py_rref_receiver@   s    z%_InternalRPCPickler._py_rref_receiverc                 C   s   |  }tj|ffS r   )Z
_serializer   r+   )r   Zpy_rrefr*   r   r   r   _py_rref_reducerD   s    z$_InternalRPCPickler._py_rref_reducerc                 C   s
   |  |S r   )r,   )r   Zrrefr   r   r   _rref_reducerH   s    z!_InternalRPCPickler._rref_reducerc                 C   s   t |}tj|}|S )z
        Given a serialized representation of a ScriptModule created with torch.jit.save,
        loads and returns the ScriptModule.
        )ioBytesIOr   jitload)r    Zscript_module_serializedfmr   r   r   _script_module_receiverK   s    
z+_InternalRPCPickler._script_module_receiverc                 C   s&   t  }tj|| tj| ffS )z,
        Serializes a ScriptModule.
        )r.   r/   r   r0   saver   r4   getvalue)r   Zscript_moduler2   r   r   r   _script_module_reducerU   s    z*_InternalRPCPickler._script_module_reducerc                 C   s   t  }t|}| j|_| j|jtjj< | j	|jtjj
< t|tjjrT| j|j|j< | j D ]}| j| |j|< q^ttdrtj}nd}g t_|| tj}|dk	r|t_nt`| |fS )ze
        Serialize non tensor data into binary string, tensor data into
        tensor table
        r$   N)r.   r/   _picklerr   r   r,   r'   r(   r)   r-   ZRRef
isinstancer   r0   ZScriptModuler7   	__class__r   keyshasattrr   r$   dumpr6   )r   objr2   p
class_nameZold_send_tablesZtensorsr   r   r   r   ]   s&    

z_InternalRPCPickler.serializec              
   C   s   t tdrtj}nd}|t_ztt|}| }W n< tk
rr } zt|d }t|}||_	W 5 d}~X Y nX |dk	r|t_nt`|S )zJ
        Deserialize binary string + tensor table to original obj
        r   Nz Default RPC pickler does not serialize
            function code. Ensure that UDFs are defined on both caller and
            callee modules.)
r<   r   r   
_unpicklerr.   r/   r1   AttributeErrorstr	__cause__)r   binary_datatensor_tableZold_recv_tablesZ	unpicklerrete
except_strr   r   r   r      s$    
z_InternalRPCPickler.deserializeN)r   r   r   __doc__r   r   classmethodr"   r   r+   r,   r-   r4   r7   r   r   r   r   r   r   r      s   



	5r   c                 C   s
   t | S r   )_internal_rpc_picklerr   )r>   r   r   r   r      s    c                 C   s   t | |S r   )rL   r   )rE   rF   r   r   r   r      s    c              
   C   s   z"t | tr| | j| j| j}W nb tk
r } zDdt   dt| dt	
  }t|tjd t|t|}W 5 d}~X Y nX |S )z
    This function is exclusively called from C++.
    See ``torch/csrc/distributed/rpc/python_rpc_handler.cpp``.

    Runs a Python UDF and returns its return value.
    Wraps any exception in ``RemoteException`` if the function raises.
    zOn z:

)fileN)r9   rB   funcargskwargs	Exceptionr   Zget_worker_inforepr	traceback
format_excprintsysstderrr   type)Z
python_udfresultrH   rI   r   r   r   _run_function   s    
" r[   c              
   C   s|   t | trx| jdd}d }z| |}W n< tk
rj } ztdt| d| |W 5 d }~X Y nX |d k	rx|d S )Nzutf-8Zunicode_escapez8Failed to create original exception type. Error msg was z' Original exception on remote side was )	r9   r   msgencodedecodeexception_typeBaseExceptionRuntimeErrorrC   )rZ   Zexception_msgexcrH   r   r   r   _handle_exception   s    
rc   c              	   C   s$   d| j  d| d| d| d	}|S )a  
    Builds the key that RPC calls are profiled with using the autograd profiler.
    This will be the name of the corresponding Event recorded in the profiler.

    Args:
        exec_type (RPCExecMode): Type of RPC/RRef call
        func_name (str): Name of function being profiled.
        current_worker_name (str): Name of current worker.
        dst_worker_name (str): Name of the destination worker.

    Returns:
        String representing profiling key
    rpc_#( -> ))value)	exec_type	func_namecurrent_worker_nameZdst_worker_nameprofile_keyr   r   r   _build_rpc_profiling_key   s     rn   c              	   C   sR   t j stdd| j dt| d| d| d	}t j }t j|| |S )ar  
    This function should be called from RPC/RRef functions to create a
    RecordFunction object for profiling. This function also runs the before
    callbacks that start the profiling, though the user is responsible for
    running the appropriate callbacks when the function to be profiled finishes.

    Args:
        exec_type (RPCExecMode): Type of RPC/RRef call
        func_name (str): Name of function being profiled.
        current_worker_name (str): Name of current worker.
        dest_worker_name (str): Name of the destination worker.

    Returns:
        An instance of `torch.autograd._RecordFunction`.
    z$Autograd profiler should be enabled.rd   re   rf   rg   rh   )r   ZautogradZ_profiler_enabledAssertionErrorri   rC   Z_RecordFunctionZ_run_before_callbacks)rj   rk   rl   Zdest_worker_namerm   rfr   r   r   _start_record_function  s
    $
rq   rO   rP   rQ   r\   r_   )"collectionsr   r.   picklerW   	threadingrT   enumr   r   Ztorch.distributeddistributedr'   Ztorch._C._distributed_rpcr   __all__localr   Picklerr8   	UnpicklerrA   r   r   rL   r   r   r[   rc   rn   rq   
namedtupler   r   r   r   r   r   <module>   s4    