U
    d                     @   s(  d dl Z d dlZd dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZmZmZmZmZ d dlmZ d dlmZmZmZmZ d dlmZ d dlmZmZmZ d dlmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z& d dl'm(Z(m)Z) d d	l*m+Z+m,Z, d d
l-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z; d dl<m=Z=m>Z>m?Z? d dl@mAZAmBZBmCZC edeDde
d eedf ZEdZFeeeDef dddZGG dd deZHG dd dee"e)ZIG dd dZJG dd dZKG dd dee"e)ZLeD ]4ZMeMNddO ZMeMdkrqePeLeMe$eM qG dd  d ZQdS )!    N)
AnyDequeDict	GeneratorListMappingOptionalTypeTypeVarUnion)ResponseCallbackT)
ConnectionDefaultParserEncoder	parse_url)CommandsParser)EMPTY_RESPONSENEVER_DECODEAbstractRedis)	PIPELINE_BLOCKED_COMMANDSPRIMARYREPLICASLOT_IDAbstractRedisClusterLoadBalancerblock_pipeline_commandget_node_nameparse_cluster_slots)READ_COMMANDSAsyncRedisClusterCommands)REDIS_CLUSTER_HASH_SLOTSkey_slot)AskErrorBusyLoadingErrorClusterCrossSlotErrorClusterDownErrorClusterErrorConnectionError	DataErrorMasterDownError
MovedErrorRedisClusterExceptionResponseErrorSlotNotCoveredErrorTimeoutErrorTryAgainError)AnyKeyT
EncodableTKeyT)
dict_mergesafe_strstr_if_bytesTargetNodesTClusterNode)Zclient_namedbdecode_responsesZencoder_classencodingencoding_errorsZhealth_check_intervalparser_classpasswordredis_connect_funcretryZretry_on_timeoutZsocket_connect_timeoutZsocket_keepaliveZsocket_keepalive_optionsZsocket_read_sizesocket_timeoutsocket_typeusername)kwargsreturnc                  K   s   dd |   D S )z0Remove unsupported or disabled keys from kwargs.c                 S   s   i | ]\}}|t kr||qS  )CONNECTION_ALLOWED_KEYS).0kvrE   rE   9/tmp/pip-unpacked-wheel-f5h5_hbx/redis/asyncio/cluster.py
<dictcomp>T   s       z"cleanup_kwargs.<locals>.<dictcomp>)items)rC   rE   rE   rJ   cleanup_kwargsR   s    rM   c                	   @   s&   e Zd Zeejeeee	e
edZdS )ClusterParser)ZASKZTRYAGAINMOVEDCLUSTERDOWNZ	CROSSSLOT
MASTERDOWNN)__name__
__module____qualname__r3   r   ZEXCEPTION_CLASSESr"   r/   r*   r%   r$   r)   rE   rE   rE   rJ   rN   W   s   rN   c                   @   s`  e Zd ZdZeeed dddZdZdTe	e e
e	ed  eee
e
e	e edd
ddZd dddZddddZd dddZdddddddZeedd f dddZdZddddZedddd Zed dd!d"Zed dd#d$Zed dd%d&Zddd'd(Zddd)d*Zddd+d,d-ZdUe	e e	e
 e	e e	d d.d/d0ZdVeee	d d1d2d3Ze e
d4d5d6Z!e"dd7d8Z#e$ee	e f dd9d:Z%ee&dd;d<d=Z'dd>eee	e ed d?d@dAZ(eee
dBdCdDZ)eedEdFdGZ*eed dEdHdIZ+e eedJdKdLZ,de-e.e f eedMdNdOZ/dWe	e e	e dPdQdRdSZ0dS )XRedisClustera
  
    Create a new RedisCluster client.

    Pass one of parameters:

      - `url`
      - `host` & `port`
      - `startup_nodes`

    | Use ``await`` :meth:`initialize` to find cluster nodes & create connections.
    | Use ``await`` :meth:`close` to disconnect connections & close client.

    Many commands support the target_nodes kwarg. It can be one of the
    :attr:`NODE_FLAGS`:

      - :attr:`PRIMARIES`
      - :attr:`REPLICAS`
      - :attr:`ALL_NODES`
      - :attr:`RANDOM`
      - :attr:`DEFAULT_NODE`

    Note: This client is not thread/process/fork safe.

    :param host:
        | Can be used to point to a startup node
    :param port:
        | Port used if **host** is provided
    :param startup_nodes:
        | :class:`~.ClusterNode` to used as a startup node
    :param cluster_error_retry_attempts:
        | Retry command execution attempts when encountering :class:`~.ClusterDownError`
          or :class:`~.ConnectionError`
    :param require_full_coverage:
        | When set to ``False``: the client will not require a full coverage of the
          slots. However, if not all slots are covered, and at least one node has
          ``cluster-require-full-coverage`` set to ``yes``, the server will throw a
          :class:`~.ClusterDownError` for some key-based commands.
        | When set to ``True``: all slots must be covered to construct the cluster
          client. If not all slots are covered, :class:`~.RedisClusterException` will be
          thrown.
        | See:
          https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters
    :param reinitialize_steps:
        | Specifies the number of MOVED errors that need to occur before reinitializing
          the whole cluster topology. If a MOVED error occurs and the cluster does not
          need to be reinitialized on this current error handling, only the MOVED slot
          will be patched with the redirected node.
          To reinitialize the cluster on every MOVED error, set reinitialize_steps to 1.
          To avoid reinitializing the cluster on moved errors, set reinitialize_steps to
          0.
    :param read_from_replicas:
        | Enable read from replicas in READONLY mode. You can read possibly stale data.
          When set to true, read commands will be assigned between the primary and
          its replications in a Round-Robin manner.
    :param url:
        | See :meth:`.from_url`
    :param kwargs:
        | Extra arguments that will be passed to the
          :class:`~redis.asyncio.connection.Connection` instances when created

    :raises RedisClusterException:
        if any arguments are invalid. Eg:

        - db kwarg
        - db != 0 in url
        - unix socket connection
        - none of host & url & startup_nodes were provided

    )urlrC   rD   c                 K   s   | f d|i|S )a'  
        Return a Redis client object configured from the given URL.

        For example::

            redis://[[username]:[password]]@localhost:6379/0
            rediss://[[username]:[password]]@localhost:6379/0
            unix://[[username]:[password]]@/path/to/socket.sock?db=0

        Three URL schemes are supported:

        - `redis://` creates a TCP socket connection. See more at:
          <https://www.iana.org/assignments/uri-schemes/prov/redis>
        - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
          <https://www.iana.org/assignments/uri-schemes/prov/rediss>
        - ``unix://``: creates a Unix Domain Socket connection.

        The username, password, hostname, path and all querystring values
        are passed through urllib.parse.unquote in order to replace any
        percent-encoded values with their corresponding characters.

        There are several ways to specify a database number. The first value
        found will be used:

            1. A ``db`` querystring option, e.g. redis://localhost?db=0
            2. If using the redis:// or rediss:// schemes, the path argument
               of the url, e.g. redis://localhost/0
            3. A ``db`` keyword argument to this function.

        If none of these options are specified, the default db=0 is used.

        All querystring options are cast to their appropriate Python types.
        Boolean arguments can be specified with string values "True"/"False"
        or "Yes"/"No". Values that cannot be properly cast cause a
        ``ValueError`` to be raised. Once parsed, the querystring arguments and
        keyword arguments are passed to :class:`~redis.asyncio.connection.Connection`
        when created. In the case of conflicting arguments, querystring
        arguments always win.

        rV   rE   )clsrV   rC   rE   rE   rJ   from_url   s    *zRedisCluster.from_url)_initialize_lockcluster_error_retry_attemptscommand_flagscommands_parserconnection_kwargsencoder
node_flagsnodes_managerread_from_replicasreinitialize_counterreinitialize_stepsresponse_callbacksresult_callbacksN  F   
   r7   )
hostportstartup_nodesrequire_full_coveragerb   r[   rd   rV   rC   rD   c	                 K   s~  |sg }d|	krt d|rrt|}
d|
kr4t dd|
krP|
d dkrPt d|	|
 |	d}|	d|}n|rz|s|st d	| j|	d
< tf |	 | _}	| jj	  | _
|	d< |r|r|t||f| j tf ||d| j| _t|	dd|	dd|	dd| _|| _|| _|| _d| _t | _| jj	 | _| jj	 | _| jj	 | _dd | jd< d| _t | _ d S )Nr8   z4Argument 'db' is not possible to use in cluster modepathzFRedisCluster does not currently support Unix Domain Socket connectionsr   z9A ``db`` querystring option can only be 0 in cluster moderj   rk   a5  RedisCluster requires at least one node to discover the cluster. Please provide one of the followings:
1. host and port, for example:
 RedisCluster(host='localhost', port=6379)
2. list of startup nodes, for example:
 RedisCluster(startup_nodes=[ClusterNode('localhost', 6379), ClusterNode('localhost', 6378)])r>   re   )rl   rm   r:   zutf-8r;   strictr9   Fc                 [   s   t t| d f|S )Nr   )r   listvalues)cmdresrC   rE   rE   rJ   <lambda>:  s   z'RedisCluster.__init__.<locals>.<lambda>CLUSTER SLOTST)!r+   r   updateget
on_connectrM   r^   	__class__RESPONSE_CALLBACKScopyre   appendr7   NodesManagerra   r   r_   r[   rb   rd   rc   r   r]   Z
NODE_FLAGSr`   ZCOMMAND_FLAGSr\   ZRESULT_CALLBACKSrf   rY   asyncioLockrZ   )selfrj   rk   rl   rm   rb   r[   rd   rV   rC   Zurl_optionsrE   rE   rJ   __init__   sp    






zRedisCluster.__init__rD   c              
      s   | j r| j4 I dH v | j rz0| j I dH  | j| jjI dH  d| _ W n8 tk
r   | j I dH  | jdI dH   Y nX W 5 Q I dH R X | S )zJGet all nodes from startup nodes & creates connections if not initialized.NFrl   )rY   rZ   ra   
initializer]   default_nodeBaseExceptioncloser   rE   rE   rJ   r   @  s    

zRedisCluster.initializec              
      sX   | j sT| j4 I dH 4 | j sDd| _ | j I dH  | jdI dH  W 5 Q I dH R X dS )z.Close all connections & client if initialized.NTrl   )rY   rZ   ra   r   r   rE   rE   rJ   r   Q  s    zRedisCluster.closec                    s   |   I d H S Nr   r   rE   rE   rJ   
__aenter__Z  s    zRedisCluster.__aenter__exc_type	exc_value	tracebackrD   c                    s   |   I d H  d S r   )r   r   r   r   r   rE   rE   rJ   	__aexit__]  s    zRedisCluster.__aexit__c                 C   s   |    S r   r   	__await__r   rE   rE   rJ   r   `  s    zRedisCluster.__await__zUnclosed RedisCluster clientc                 C   sd   t | dr`| js`tj| j d| t| d z| | jd}t | W n t	k
r^   Y nX d S )NrY    sourceclientmessage)
hasattrrY   warningswarn_DEL_MESSAGEResourceWarningr~   get_event_loopcall_exception_handlerRuntimeError)r   contextrE   rE   rJ   __del__e  s    zRedisCluster.__del__)
connectionrD   c                    sP   | t | I d H  | jrL|dI d H  t| I d H dkrLtdd S )NZREADONLYOKzREADONLY command failed)
set_parserrN   rx   rb   Zsend_commandr5   read_response_without_lockr'   )r   r   rE   rE   rJ   rx   o  s    
zRedisCluster.on_connectc                 C   s   t | jj S )zGet all nodes of the cluster.)rp   ra   nodes_cacherq   r   rE   rE   rJ   	get_nodes}  s    zRedisCluster.get_nodesc                 C   s   | j tS )z%Get the primary nodes of the cluster.)ra   get_nodes_by_server_typer   r   rE   rE   rJ   get_primaries  s    zRedisCluster.get_primariesc                 C   s   | j tS )z%Get the replica nodes of the cluster.)ra   r   r   r   rE   rE   rJ   get_replicas  s    zRedisCluster.get_replicasc                 C   s   t t| jj S )z!Get a random node of the cluster.)randomchoicerp   ra   r   rq   r   rE   rE   rJ   get_random_node  s    zRedisCluster.get_random_nodec                 C   s   | j jS )z#Get the default node of the client.)ra   r   r   rE   rE   rJ   get_default_node  s    zRedisCluster.get_default_node)noderD   c                 C   s&   |r| j |jdstd|| j_dS )z
        Set the default node of the client.

        :raises DataError: if None is passed or node does not exist in cluster.
        	node_namez1The requested node does not exist in the cluster.N)get_nodenamer(   ra   r   )r   r   rE   rE   rJ   set_default_node  s    zRedisCluster.set_default_noderj   rk   r   rD   c                 C   s   | j |||S )z&Get node by (host, port) or node_name.)ra   r   r   rj   rk   r   rE   rE   rJ   r     s    zRedisCluster.get_node)keyreplicarD   c                 C   s^   |  |}| jj|}|s,td| d|rHt| jj| dk rHdS |rRd}nd}|| S )aG  
        Get the cluster node corresponding to the provided key.

        :param key:
        :param replica:
            | Indicates if a replica should be returned
            |
              None will returned if no replica holds this key

        :raises SlotNotCoveredError: if the key is not covered by any slot.
        Slot "z " is not covered by the cluster.   N   r   )keyslotra   slots_cacherw   r-   len)r   r   r   slotZ
slot_cachenode_idxrE   rE   rJ   get_node_from_key  s    
zRedisCluster.get_node_from_key)r   rD   c                 C   s   t | j|S )z
        Find the keyslot for a given key.

        See: https://redis.io/docs/manual/scaling/#redis-cluster-data-sharding
        )r!   r_   encode)r   r   rE   rE   rJ   r     s    zRedisCluster.keyslotc                 C   s   | j S )z%Get the encoder object of the client.)r_   r   rE   rE   rJ   get_encoder  s    zRedisCluster.get_encoderc                 C   s   | j S )zGGet the kwargs passed to :class:`~redis.asyncio.connection.Connection`.)r^   r   rE   rE   rJ   get_connection_kwargs  s    z"RedisCluster.get_connection_kwargs)commandcallbackrD   c                 C   s   || j |< dS )zSet a custom response callback.N)re   )r   r   r   rE   rE   rJ   set_response_callback  s    z"RedisCluster.set_response_callback)	node_flag)r   argsr   rD   c                   s   |s| j |}|| jkr|| jjkr0| jjgS || jjkrH| jt	S || jj
kr`| jtS || jjkr|t| jj S || jjkrtt| jj gS | j| j|f| I d H | jo|tkgS r   )r\   rw   r`   ry   ZDEFAULT_NODEra   r   Z	PRIMARIESr   r   ZREPLICASr   Z	ALL_NODESrp   r   rq   ZRANDOMr   r   get_node_from_slot_determine_slotrb   r   )r   r   r   r   rE   rE   rJ   _determine_nodes  s$    

zRedisCluster._determine_nodes)r   r   rD   c                    s    j |tkrt|d S |dkrjt|dk rDtd|f| |dd|d   }|stdtS n> j	j
|f| I d H }|s|dkrtdtS td| t|dkr |d S  fdd	|D }t|dkrt| d
| S )Nr   )ZEVALZEVALSHAr   zInvalid args in command: r   )ZFCALLZFCALL_ROzNo way to dispatch this command to Redis Cluster. Missing key.
You can execute the command by specifying target nodes.
Command: c                    s   h | ]}  |qS rE   )r   )rG   r   r   rE   rJ   	<setcomp>   s     z/RedisCluster._determine_slot.<locals>.<setcomp>z) - all keys must map to the same key slot)r\   rw   r   intr   r+   r   	randranger    r]   Zget_keysr   pop)r   r   r   keysslotsrE   r   rJ   r     s2    
zRedisCluster._determine_slot)target_nodesrD   c                 C   s   t |to|| jkS r   )
isinstancestrr`   )r   r   rE   rE   rJ   _is_node_flag(  s    zRedisCluster._is_node_flagc                 C   sP   t |tr|}n<t |tr"|g}n*t |tr:t| }ntdt| |S )Nztarget_nodes type can be one of the following: node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. The passed type is )r   rp   r7   dictrq   	TypeErrortype)r   r   nodesrE   rE   rJ   _parse_target_nodes+  s    


z RedisCluster._parse_target_nodesr   rC   rD   c                    s   d }g }d}j }dd}|rB|sB|}d}d}t|D ]b}jrd I dH  z|sj d|iI dH }|std  d	t	|dkrj
|d f I dH }	|jkrj| ||d j|	ifW   S |	W   S d
d |D }
tj fdd|D  I dH }|jkrVj| |tt|
|fW   S tt|
|W   S W qJ tk
r } z t|jjkr|}n|W 5 d}~X Y qJX qJ|dS )a|  
        Execute a raw command on the appropriate cluster node or target_nodes.

        It will retry the command as specified by :attr:`cluster_error_retry_attempts` &
        then raise an exception.

        :param args:
            | Raw command args
        :param kwargs:

            - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode`
              or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
            - Rest of the kwargs are passed to the Redis connection

        :raises RedisClusterException: if target_nodes is not provided & the command
            can't be mapped to a slot
        r   Fr   NTr   r   !No targets were found to execute  command onc                 S   s   g | ]
}|j qS rE   r   rG   r   rE   rE   rJ   
<listcomp>s  s     z0RedisCluster.execute_command.<locals>.<listcomp>c                 3   s&   | ]}t j|f V  qd S r   )r~   ensure_future_execute_commandr   r   rC   r   rE   rJ   	<genexpr>u  s   z/RedisCluster.execute_command.<locals>.<genexpr>)r[   r   r   r   rangerY   r   r   r+   r   r   rf   r   r~   gatherr   zipr   r   ry   ERRORS_ALLOW_RETRY)r   r   rC   r   r   Ztarget_nodes_specifiedZretry_attemptspassed_targets_retr   rq   e	exceptionrE   r   rJ   execute_command?  sh    


 


 
zRedisCluster.execute_command)target_noder   rC   rD   c              
      s  d }}d }| j }d}|dkr|d8 }zn|rP| j|d}|dI d H  d}n4|r| j| I d H }	| j|	| jo||d tk}d}|j||I d H W S  tk
r    Y q t	t
fk
r   |d7 }|dk rtdI d H  n|  I d H   Y q tk
r` }
 zN|  jd7  _| jrD| j| j dkrD|  I d H  d| _n|
| j_d}W 5 d }
~
X Y q tk
r   || j d	 k rtd
I d H  Y q tk
r }
 zt|
j|
jd}d}W 5 d }
~
X Y q tk
r   tdI d H  |  I d H   Y qX qtdd S )NFr   r   r   ZASKING         ?Tr   g?rj   rk   zTTL exhausted.)ZRedisClusterRequestTTLr   r   r   ra   r   rb   r   r#   r'   r.   r~   sleepr   r*   rc   rd   _moved_exceptionr/   r"   r   rj   rk   r%   r&   )r   r   r   rC   ZaskingmovedZredirect_addrZttlZconnection_error_retry_counterr   r   rE   rE   rJ   r     s`    
 	
zRedisCluster._execute_commandClusterPipeline)transaction
shard_hintrD   c                 C   s    |rt d|rt dt| S )z
        Create & return a new :class:`~.ClusterPipeline` object.

        Cluster implementation of pipeline does not support transaction or shard_hint.

        :raises RedisClusterException: if transaction or shard_hint are truthy values
        z(shard_hint is deprecated in cluster modez)transaction is deprecated in cluster mode)r+   r   )r   r   r   rE   rE   rJ   pipeline  s
    
zRedisCluster.pipeline)Nrg   NFFrh   ri   N)NNN)F)NN)1rR   rS   rT   __doc__classmethodr   r   rX   	__slots__r   r   r   boolr   r   r   r   r   r   r   r   r   r   rx   r   r   r   r   r   r   r   r   r1   r   r   r   r   r   r   r   r   r   r   r   r   r   r2   r   r   rE   rE   rE   rJ   rU   e   s   F+        
W	
   
     3P 
 M    rU   c                	   @   s   e Zd ZdZdZddefeeee ee	e e
ddddZedd	d
Ze
edddZdZddddZddddZedddZeee
e
dddZe
e
e
dddZed edddZdS )r7   z
    Create a new ClusterNode.

    Each ClusterNode manages multiple :class:`~redis.asyncio.connection.Connection`
    objects for the (host, port).
    )
_connections_freeconnection_classr^   rj   max_connectionsr   rk   re   server_typeNl        )rj   rk   r  r  r  r^   rD   c                 K   s|   |dkrt |}||d< ||d< || _|| _t||| _|| _|| _|| _|| _	|
dtj| _g | _tj| jd| _d S )N	localhostrj   rk   re   )maxlen)socketgethostbynamerj   rk   r   r   r  r  r  r^   r   rU   rz   re   r   collectionsdequer  )r   rj   rk   r  r  r  r^   rE   rE   rJ   r     s"    	
 zClusterNode.__init__r   c              	   C   s&   d| j  d| j d| j d| j d	S )Nz[host=z, port=z, name=z, server_type=])rj   rk   r   r  r   rE   rE   rJ   __repr__  s    $zClusterNode.__repr__)objrD   c                 C   s   t |to|j| jkS r   )r   r7   r   )r   r  rE   rE   rJ   __eq__%  s    zClusterNode.__eq__zUnclosed ClusterNode objectc              	   C   sj   | j D ]^}|jrtj| j d| t| d z| | jd}t | W n t	k
r^   Y nX  qfqd S )Nr   r   r   )
r   is_connectedr   r   r   r   r~   r   r   r   )r   r   r   rE   rE   rJ   r   *  s    
  zClusterNode.__del__c                    sB   t jdd | jD ddiI d H }tdd |D d }|r>|d S )Nc                 s   s   | ]}t | V  qd S r   r~   r   
disconnect)rG   r   rE   rE   rJ   r   :  s   z)ClusterNode.disconnect.<locals>.<genexpr>Zreturn_exceptionsTc                 s   s   | ]}t |tr|V  qd S r   )r   	Exception)rG   rs   rE   rE   rJ   r   @  s     
 )r~   r   r   next)r   r   excrE   rE   rJ   r  8  s    zClusterNode.disconnectc                 C   s   | j rHtt| j D ](}| j  }|jr0|  S | j | q| j  S t| j| jk rv| jf | j	}| j| |S t
dd S )NzToo many connections)r  r   r   popleftr  r|   r   r  r  r^   r'   )r   r   r   rE   rE   rJ   acquire_connectionD  s    

zClusterNode.acquire_connection)r   r   rC   rD   c                    sz   z.t |kr|jddI d H }n| I d H }W n* tk
rX   t|krR|t  Y S  Y nX || jkrv| j| |f|S |S )NT)Zdisable_decoding)r   r   r,   r   re   )r   r   r   rC   responserE   rE   rJ   parse_responseU  s    
zClusterNode.parse_responser   c              	      sR   |   }||j| dI d H  z| j||d f|I d H W S | j| X d S )NFr   )r  send_packed_commandZpack_commandr  r|   r  )r   r   rC   r   rE   rE   rJ   r   j  s
    zClusterNode.execute_commandPipelineCommand)commandsrD   c                    s   |   }||dd |D dI d H  d}|D ]V}z$| j||jd f|jI d H |_W q2 tk
r } z||_d}W 5 d }~X Y q2X q2| j	| |S )Nc                 s   s   | ]}|j V  qd S r   )r   rG   rr   rE   rE   rJ   r   ~  s     z/ClusterNode.execute_pipeline.<locals>.<genexpr>Fr   T)
r  r  Zpack_commandsr  r   rC   resultr  r  r|   )r   r  r   r   rr   r   rE   rE   rJ   execute_pipelinex  s&     
 zClusterNode.execute_pipeline)rR   rS   rT   r   r   r   r   r   r   r	   r   r   r  r   r  r   r   r  r  r  r   r   r  rE   rE   rE   rJ   r7     s6     c                   @   s   e Zd ZdZded eeddddZdee	 ee
 ee	 ed dd	d
Zdee	df ee	df eddddZddddZd e
eddddZe	ed dddZddddZd!e	ddddZdS )"r}   )r   r^   r   r   read_load_balancerrm   r   rl   Fr7   N)rl   rm   rC   rD   c                 K   s@   dd |D | _ || _|| _d | _i | _i | _t | _d | _d S )Nc                 S   s   i | ]}|j |qS rE   r   r   rE   rE   rJ   rK     s      z)NodesManager.__init__.<locals>.<dictcomp>)	rl   rm   r^   r   r   r   r   r  r   )r   rl   rm   rC   rE   rE   rJ   r     s    zNodesManager.__init__r   c                 C   sJ   |r.|r.|dkrt |}| jt||dS |r>| j|S tdd S )Nr  r   zEget_node requires one of the following: 1. node name 2. host and port)r  r  r   rw   r   r(   r   rE   rE   rJ   r     s    
zNodesManager.get_node)oldnew
remove_oldrD   c                 C   sx   |r2t | D ] }||krt||  q| D ]8\}}||krj|| |krXq:t||   |||< q:d S r   )rp   r   r~   r   r   r  rL   )r   r   r!  r"  r   r   rE   rE   rJ   	set_nodes  s    zNodesManager.set_nodesr   c                 C   s   | j }| j|j|jd}|r.|jtkrXt|_n*t|j|jtf| j}| | j	|j
|i || j|j kr| j|j d }t|_| j|j | | j|j | || j|j d< | j|kr|| _n|g| j|j< d | _ d S )Nr   r   )r   r   rj   rk   r  r   r7   r^   r#  r   r   r   Zslot_idr   r|   remover   )r   r   Zredirected_nodeZold_primaryrE   rE   rJ   _update_moved_slots  s.    
  
z NodesManager._update_moved_slots)r   rb   rD   c              	   C   s   | j r|   zL|rL| j| d j}| j|t| j| }| j| | W S | j| d W S  ttfk
r   t	d| d| j
 dY nX d S )Nr   r   z5" not covered by the cluster. "require_full_coverage=")r   r%  r   r   r  Zget_server_indexr   
IndexErrorr   r-   rm   )r   r   rb   Zprimary_namer   rE   rE   rJ   r     s     zNodesManager.get_node_from_slot)r  rD   c                    s    fdd| j  D S )Nc                    s   g | ]}|j  kr|qS rE   r  r   r(  rE   rJ   r     s   
z9NodesManager.get_nodes_by_server_type.<locals>.<listcomp>)r   rq   )r   r  rE   r(  rJ   r     s    
z%NodesManager.get_nodes_by_server_typec                    s  | j   i }i }g }d}d}| j D ]}z6|dI d H dsNtd|dI d H }d}W n ttfk
r   Y q(Y n t	k
r } z:|
 }	d|	ksd|	krW Y q(ntd	| d
|	 W 5 d }~X Y nB tk
r } z"|
 }	td|j d
|	 W 5 d }~X Y nX t|dkrZ|d d d sZt| jdkrZ|j|d d d< |D ] tdt D ]}
dd  |
 D  |
< qr d }|d }|dkr|j}t|d }|t||}|st||tf| j}|||j< tt d t d d D ]}
|
|krg ||
< ||
 |  fddtdt D }|D ]V}|d }|d }|t||}|st||tf| j}||
 | |||j< qPnZ||
 d }|j|jkr||j d|j d|
  t|dkrtdd| qq^d}ttD ]}
|
|krd} q4q|r( q@q(|sNtd|sv| jrvtdt| dt d|| _| j| j|dd | j| j| jdd | td | _d | _d S )NFINFOZcluster_enabledz(Cluster mode is not enabled on this noderu   TrP   rQ   z7ERROR sending "cluster slots" command to redis server: z	. error: z6ERROR sending "cluster slots" command to redis server r   r   r   c                 S   s   g | ]}t |qS rE   )r5   )rG   valrE   rE   rJ   r   L  s     z+NodesManager.initialize.<locals>.<listcomp> c                    s   g | ]} | qS rE   rE   )rG   jr   rE   rJ   r   _  s     rh   z vs z
 on slot: r   z6startup_nodes could not agree on a valid slots cache: z, zORedis Cluster cannot be connected. Please provide at least one reachable node. z9All slots are not covered after query all startup_nodes. z of z covered...)r"  ) r  resetrl   rq   r   rw   r+   r'   r.   r,   __str__r  r   r   rj   r   r   r   r7   r   r^   r|   r   joinr    rm   r   r#  r   r   r   r   )r   Ztmp_nodes_cacheZ	tmp_slotsZdisagreementsZstartup_nodes_reachableZfully_coveredZstartup_nodeZcluster_slotsr   r   iZprimary_noderj   rk   r   Zreplica_nodesZreplica_nodeZtarget_replica_nodeZtmp_slotrE   r-  rJ   r     s    




  
"
  

zNodesManager.initializer   )attrrD   c                    s.   d | _ tjdd t| | D  I d H  d S )Nc                 s   s   | ]}t | V  qd S r   r  r   rE   rE   rJ   r     s   z%NodesManager.close.<locals>.<genexpr>)r   r~   r   getattrrq   )r   r2  rE   rE   rJ   r     s    zNodesManager.close)F)NNN)F)F)r   )rR   rS   rT   r   r   r   r   r   r   r   r   r   r   r#  r%  r   r   r   r   rE   rE   rE   rJ   r}     sF        

'   r}   c                   @   s&  e Zd ZdZdZeddddZd ddd	Zd dd
dZdddddddZ	e
edd f dddZd dddZdddddddZedddZedddZeeef ed dddZd+eeee dddZd,d ed! eeee d"d#d$Zeed d%d&d'Zeeef d d(d)d*ZdS )-r   a  
    Create a new ClusterPipeline object.

    Usage::

        result = await (
            rc.pipeline()
            .set("A", 1)
            .get("A")
            .hset("K", "F", "V")
            .hgetall("K")
            .mset_nonatomic({"A": 2, "B": 3})
            .get("A")
            .get("B")
            .delete("A", "B", "K")
            .execute()
        )
        # result = [True, "1", 1, {"F": "V"}, True, True, "2", "3", 1, 1, 1]

    Note: For commands `DELETE`, `EXISTS`, `TOUCH`, `UNLINK`, `mset_nonatomic`, which
    are split across multiple nodes, you'll get multiple results for them in the array.

    Retryable errors:
        - :class:`~.ClusterDownError`
        - :class:`~.ConnectionError`
        - :class:`~.TimeoutError`

    Redirection errors:
        - :class:`~.TryAgainError`
        - :class:`~.MovedError`
        - :class:`~.AskError`

    :param client:
        | Existing :class:`~.RedisCluster` client
    )_command_stack_clientN)r   rD   c                 C   s   || _ g | _d S r   )r5  r4  )r   r   rE   rE   rJ   r     s    zClusterPipeline.__init__r   c                    s"   | j jr| j  I d H  g | _| S r   )r5  rY   r   r4  r   rE   rE   rJ   r     s    zClusterPipeline.initializec                    s   |   I d H S r   r   r   rE   rE   rJ   r     s    zClusterPipeline.__aenter__r   c                    s
   g | _ d S r   r4  r   rE   rE   rJ   r     s    zClusterPipeline.__aexit__c                 C   s   |    S r   r   r   rE   rE   rJ   r     s    zClusterPipeline.__await__c                 C   s
   g | _ | S r   r6  r   rE   rE   rJ   	__enter__  s    zClusterPipeline.__enter__c                 C   s
   g | _ d S r   r6  r   rE   rE   rJ   __exit__  s    zClusterPipeline.__exit__c                 C   s
   t | jS r   )r   r4  r   rE   rE   rJ   __bool__  s    zClusterPipeline.__bool__c                 C   s
   t | jS r   )r   r4  r   rE   rE   rJ   __len__  s    zClusterPipeline.__len__r   c                 O   s"   | j tt| j f|| | S )ad  
        Append a raw command to the pipeline.

        :param args:
            | Raw command args
        :param kwargs:

            - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode`
              or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
            - Rest of the kwargs are passed to the Redis connection
        )r4  r|   r  r   )r   r   rC   rE   rE   rJ   r     s    zClusterPipeline.execute_commandT)raise_on_errorallow_redirectionsrD   c                    s   | j s
g S zt| jjD ]}| jjr4| j I dH  z&| j| j| j ||dI dH W   W hS  tk
r } z>t|| j	j
kr|}| j I dH  tdI dH  n|W 5 d}~X Y qX q|W 5 g | _ X dS )a
  
        Execute the pipeline.

        It will retry the commands as specified by :attr:`cluster_error_retry_attempts`
        & then raise an exception.

        :param raise_on_error:
            | Raise the first error if there are any errors
        :param allow_redirections:
            | Whether to retry each failed command individually in case of redirection
              errors

        :raises RedisClusterException: if target_nodes is not provided & the command
            can't be mapped to a slot
        N)r;  r<  r   )r4  r   r5  r[   rY   r   _executer   r   ry   r   r   r~   r   )r   r;  r<  r   r   r   rE   rE   rJ   execute	  s*    zClusterPipeline.executerU   r  )r   stackr;  r<  rD   c                    s  dd |D }i }|D ]}|j dd }|rB||sB||}	n.|j|jd|iI d H }	|	sptd|j dt|	dkrtd|j |	d	 }
|
j|kr|
g f||
j< ||
j d 	| qt
jd
d | D  I d H }t|r|rT|D ]^}t|jtttfrz|j|j|j I d H |_W q tk
rP } z
||_W 5 d }~X Y qX q|r|D ]b}|j}t|tr^dtt|j}d|jd  d| d|j }|f|jdd   |_|q^dd |D S )Nc                 S   s"   g | ]}|j rt|j tr|qS rE   )r  r   r  r  rE   rE   rJ   r   @  s      z,ClusterPipeline._execute.<locals>.<listcomp>r   r   r   r   r   zToo many targets for command r   c                 s   s&   | ]}t |d  |d V  qdS )r   r   N)r~   r   r  r   rE   rE   rJ   r   Z  s   z+ClusterPipeline._execute.<locals>.<genexpr>r   z
Command #  (z) of pipeline caused error: c                 S   s   g | ]
}|j qS rE   )r  r  rE   rE   rJ   r   x  s     )rC   r   r   r   r   r   r+   r   r   r|   r~   r   rq   anyr   r  r/   r*   r"   r   r  r0  mapr4   position)r   r   r?  r;  r<  todor   rr   r   r   r   errorsr   r  r   msgrE   rE   rJ   r=  9  sb    


 zClusterPipeline._execute)r   r   rD   c                 G   s*   | j | D ]}| j|f|  q| S r   )r5  Z_partition_keys_by_slotrq   r   )r   r   r   Z	slot_keysrE   rE   rJ   _split_command_across_slotsz  s    z+ClusterPipeline._split_command_across_slots)mappingrD   c                 C   s^   | j j}i }| D ](}t||d }||g | q| D ]}| jd|  qF| S )Nr   MSET)rI  )	r5  r_   rL   r!   r   
setdefaultextendrq   r   )r   rH  r_   Zslots_pairspairr   pairsrE   rE   rJ   mset_nonatomic  s    zClusterPipeline.mset_nonatomic)TT)TT)rR   rS   rT   r   r   rU   r   r   r   r   r   r   r   r7  r8  r   r9  r   r:  r   r2   r1   r   r   r>  r=  r   rG  r   r0   rN  rE   rE   rE   rJ   r     sL   $
     4  B 	
r   r   r   rN  c                   @   s.   e Zd ZeeeddddZedddZdS )r  N)rC  r   rC   rD   c                 O   s   || _ || _|| _d | _d S r   )r   rC   rC  r  )r   rC  r   rC   rE   rE   rJ   r     s    zPipelineCommand.__init__r   c                 C   s   d| j  d| j d| j dS )N[z] r@  ))rC  r   rC   r   rE   rE   rJ   r    s    zPipelineCommand.__repr__)rR   rS   rT   r   r   r   r   r  rE   rE   rE   rJ   r    s   r  )Rr~   r	  r   r  r   typingr   r   r   r   r   r   r   r	   r
   r   Zredis.asyncio.clientr   Zredis.asyncio.connectionr   r   r   r   Zredis.asyncio.parserr   Zredis.clientr   r   r   Zredis.clusterr   r   r   r   r   r   r   r   r   Zredis.commandsr   r   Z	redis.crcr    r!   Zredis.exceptionsr"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r/   Zredis.typingr0   r1   r2   Zredis.utilsr3   r4   r5   r   r6   rF   rM   rN   rU   r7   r}   r   r   replacelowersetattrr  rE   rE   rE   rJ   <module>   sX   0,@    
      &   e
