U
    dd$                     @   s  U d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZ d dl	m
Z
mZmZmZmZmZmZmZ ddlmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlm Z m!Z!m"Z" ddl#m$Z$m%Z% ddl&m'Z' dZ(edZ)e$dZ*e$ee"  e+d< e$dZ,e$eee"e-f   e+d< e$dZ.e$e e+d< ddde
de)f e/e0ee e)dddZ1edddZ2ddddZ3e4dkre3  dS )    N)deque)module_from_specspec_from_file_location)CallableDequeListOptionalSetTupleTypeVarcast   )current_timeget_asynclibget_cancelled_exc_class)BrokenWorkerProcess)open_process)CapacityLimiter)CancelScope
fail_after)ByteReceiveStreamByteSendStreamProcess)RunVarcheckpoint_if_cancelled)BufferedByteReceiveStreami,  T_Retval_process_pool_workers_process_pool_idle_workers_default_process_limiterF)cancellablelimiter.)funcargsr    r!   returnc                   s  t td fdd}t I dH  tjd| |ftjd}zt t }W n@ t	k
r   t
 t }t
 t
| t  Y nX |pt 4 I dH X |rv| \}jdkrjttjtttj t }g }	|r6||d d  tk rq6| \}   |	 qtd	d
  |	D ] I dH  qFW 5 Q R X q qtj ddt!g}
t"|
t#j$t#j$dI dH zttjtttj t%d  &dI dH }W 5 Q R X |dkrt'd|t(tj)d dd}tjdtj*|ftjd}||I dH  W nR t't+ fk
rN    Y n6 t,k
r } z  t'd|W 5 d}~X Y nX - t| d
Z z6tt.||I dH W  W  5 Q R  W  5 Q I dH R  S kr|t f X W 5 Q R X W 5 Q I dH R X dS )a  
    Call the given function with the given arguments in a worker process.

    If the ``cancellable`` option is enabled and the task waiting for its completion is cancelled,
    the worker process running it will be abruptly terminated using SIGKILL (or
    ``terminateProcess()`` on Windows).

    :param func: a callable
    :param args: positional arguments for the callable
    :param cancellable: ``True`` to allow cancellation of the operation while it's running
    :param limiter: capacity limiter to use to limit the total amount of processes running
        (if omitted, the default limiter is used)
    :return: an awaitable that yields the return value of the function.

    )pickled_cmdr$   c                    s  z^ | I d H   ddI d H }|d\}}|dkrHtd| t|I d H }W n tk
r } zj z0  t	dd 
 I d H  W 5 Q R X W n tk
r   Y nX t|t rЂ nt|W 5 d }~X Y nX t|}|dkrt|tst|n|S d S )	N   
2       )   RETURN	   EXCEPTION-Worker process returned unexpected response: TZshieldr*   )sendZreceive_untilsplitRuntimeErrorZreceive_exactlyintBaseExceptiondiscardkillr   acloseProcessLookupError
isinstancer   r   pickleloadsAssertionError)r%   responsestatuslengthZpickled_responseexcretvalZbufferedprocessstdinworkers 4/tmp/pip-unpacked-wheel-yaxr6kle/anyio/to_process.pysend_raw_command2   s2    


z"run_sync.<locals>.send_raw_commandNrun)protocolr   r   Tr,   z-uz-m)rA   stdout         READY
r+   __main____file__initz*Error during worker process initialization)/bytesobjectr   r7   dumpsHIGHEST_PROTOCOLr   getr   LookupErrorsetr   r   Z#setup_process_pool_exit_at_shutdowncurrent_default_process_limiterpop
returncoder   r   rA   r   r   rH   r   WORKER_MAX_IDLE_TIMEpopleftr3   removeappendr   r4   sys
executable__name__r   
subprocessPIPEr   Zreceiver   getattrmodulespathr   r1   addr   )r"   r    r!   r#   rE   requestZidle_workersZ
idle_sincenowZkilled_processescommandmessagemain_module_pathpickledr=   rC   r?   rD   run_sync   s    !



  




6
rl   )r$   c                  C   sB   z
t  W S  tk
r<   tt p&d} t |  |  Y S X dS )z
    Return the capacity limiter that is used by default to limit the number of worker processes.

    :return: a capacity limiter object

       N)r   rS   rT   r   os	cpu_countrU   )r!   rC   rC   rD   rV      s    

rV   c               
   C   s  t j} t j}ttjt _ttjdt _|jd d  }}zt	| j^}}W n: t
k
rd   Y d S  tk
r } z|}W 5 d }~X Y nX |dkr|\}}z|| }W n& tk
r } z|}W 5 d }~X Y nX n|dkrV|\t _}t jd= |rVz:td|}	|	r*|	jr*t|	}
|	j|
 |
t jd< W n( tk
rT } z|}W 5 d }~X Y nX z4|d k	rvd}t|tj}nd}t|tj}W n: tk
r } z|}d}t|tj}W 5 d }~X Y nX |jd	|t|f  |j| t|tr2|q2d S )
NwrK   rF   rN   rL   __mp_main__r*   r)   s   %s %d
)r]   rA   rH   openrn   devnullbufferwriter7   loadEOFErrorr1   rd   rc   r   loaderr   exec_modulerQ   rR   lenr6   
SystemExit)rA   rH   r>   	exceptionrh   r#   r=   r"   rj   specmainr;   rk   rC   rC   rD   process_worker   sX    



 
r   rL   )5rn   r7   r`   r]   collectionsr   importlib.utilr   r   typingr   r   r   r   r	   r
   r   r   Z_core._eventloopr   r   r   Z_core._exceptionsr   Z_core._subprocessesr   Z_core._synchronizationr   Z_core._tasksr   r   abcr   r   r   Zlowlevelr   r   Zstreams.bufferedr   rY   r   r   __annotations__r   floatr   rP   boolrl   rV   r   r_   rC   rC   rC   rD   <module>   sH    (
 <
