U
    d
N                    @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
mZmZmZ d dlmZmZmZmZ d dlmZmZmZ d dlmZmZmZmZ d dlmZmZ d dlm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/ d d	l0m1Z1 d d
l2m3Z3m4Z4m5Z5m6Z6m7Z7 e8e9Z:e;e<e;dddZ=dd Z>dd Z?dd Z@e
e
eee<e<f ee;e
f f dddZAdd ZBdZCdZDdZEdZFdZGdd ZHG d d! d!eZIG d"d# d#ZJG d$d% d%eJeZKG d&d' d'ZLG d(d) d)ZMG d*d+ d+ZNG d,d- d-eZOG d.d/ d/eKZPe;ed0e
f d1d2d3ZQd4ZReRD ]&ZSeSTd5d6U ZSeVePeSeQeS q<G d7d8 d8ZWG d9d: d:ZXdS );    N)OrderedDict)AnyCallableDictTuple)CaseInsensitiveDictPubSubRedis
parse_scan)READ_COMMANDSCommandsParserRedisClusterCommands)ConnectionPoolDefaultParserEncoder	parse_url)REDIS_CLUSTER_HASH_SLOTSkey_slot)AskErrorAuthenticationErrorBusyLoadingErrorClusterCrossSlotErrorClusterDownErrorClusterErrorConnectionError	DataErrorMasterDownError
MovedErrorRedisClusterException
RedisErrorResponseErrorSlotNotCoveredErrorTimeoutErrorTryAgainErrorLock)
dict_mergelist_keys_to_dictmerge_resultsafe_strstr_if_bytes)hostportreturnc                 C   s   |  d| S )N: r+   r,   r/   r/   1/tmp/pip-unpacked-wheel-f5h5_hbx/redis/cluster.pyget_node_name-   s    r2   c                 O   s   | j p| jj|d f|S )Nr   )
connectionconnection_poolget_connection)
redis_nodeargsoptionsr/   r/   r1   r5   1   s
    r5   c           	      K   sB   i }g }|  D ](\}}t|f|\}}|||< ||7 }q||fS N)itemsr
   )	commandresr8   Zcursorsret	node_nameresponsecursorrr/   r/   r1   parse_scan_result7   s    
rB   c              
   K   sj   t  }| D ]D}|D ]:\}}z||  |7  < W q tk
rN   |||< Y qX qqdd | D }|S )Nc                 S   s   g | ]\}}||fqS r/   r/   ).0channelZnumsubr/   r/   r1   
<listcomp>K   s     z'parse_pubsub_numsub.<locals>.<listcomp>)r   valuesKeyErrorr:   )r;   r<   r8   Znumsub_dZnumsub_tupsrD   Z	numsubbedZ
ret_numsubr/   r/   r1   parse_pubsub_numsubB   s    rH   )respr8   r-   c                    s|   | dd ttttf d fddi }| D ]F}|d d \}}}|dd  }| fdd|D d	|||f< q0|S )
Ncurrent_host )r7   r-   c                     s   t | d p | d fS Nr      )r*   r7   )rJ   r/   r1   
fix_serverT   s    z'parse_cluster_slots.<locals>.fix_server   c                    s   g | ]} | qS r/   r/   )rC   replica)rO   r/   r1   rE   ]   s     z'parse_cluster_slots.<locals>.<listcomp>)primaryreplicas)getr   r   str)rI   r8   slotsslotstartendrR   rS   r/   )rJ   rO   r1   parse_cluster_slotsO   s    rZ   c           	      K   s   g }| D ]}g g d}t dt|d dD ]*}|d |d | |d |d  f q*|d }|D ]@}i }t dt|dD ]}||d  ||| < qz|d | qb|| q|S )z(
    Parse CLUSTER SHARDS response.
    )rV   nodesr   rM      rV   rP   r[   )rangelenappend)	rI   r8   ZshardsxZshardir[   nodeZ	dict_noder/   r/   r1   parse_cluster_shardsc   s    
(rc   rR   rQ   zslot-id)charsetZconnection_classr4   Zclient_namedbdecode_responsesencodingencoding_errorserrorsr+   Zmax_connections
nodes_flagredis_connect_funcpasswordr,   retryZretry_on_timeoutZsocket_connect_timeoutZsocket_keepaliveZsocket_keepalive_optionssocket_timeoutsslZssl_ca_certsZssl_ca_dataZssl_certfileZssl_cert_reqsZssl_keyfileZssl_passwordZunix_socket_pathusernamer0   c                  K   s   dd |   D }|S )z9
    Remove unsupported or disabled keys from kwargs
    c                 S   s&   i | ]\}}|t kr|tkr||qS r/   )REDIS_ALLOWED_KEYSKWARGS_DISABLED_KEYS)rC   kvr/   r/   r1   
<dictcomp>   s
     z"cleanup_kwargs.<locals>.<dictcomp>)r:   )kwargsconnection_kwargsr/   r/   r1   cleanup_kwargs   s    rx   c                	   @   s&   e Zd Zeejeeee	e
edZdS )ClusterParser)ZASKZTRYAGAINMOVEDCLUSTERDOWNZ	CROSSSLOT
MASTERDOWNN)__name__
__module____qualname__r&   r   ZEXCEPTION_CLASSESr   r#   r   r   r   r   r/   r/   r/   r1   ry      s   ry   c                I   @   s  e Zd ZdZdZdZdZdZdZeeeeehZ	e
eddd	d
dddddddddddddddddddddd d!d"d#d$d%d&d'd(d)d*d+d,d-d.d/d0d1d2d3d4d5d6d7d8d9d:d;d<d=d>d?d@dAdBdCdDdEdFdGdHdId7d8ddJgGeedKdLdMdNdOdPdQdRdSdTdUgeedVgeedWdXdYdZd[geZd\d]d^d_d`dadbdcdddedfdgdhdidjdkdldmdndodpdqdrdsdtdudvdwdxdydzgfZeed{Ze
edgeedgd|d} ed"dgeedddddd$d'd)d*d/d0gd~d} ed#d(gdd} ed1gdd} edRgeedUgdd} edSgdd} edTgdd} 
ZeeefZdS )AbstractRedisCluster   Z	primariesrS   allrandomzdefault-nodezACL CATzACL DELUSERz
ACL DRYRUNzACL GENPASSzACL GETUSERzACL HELPzACL LISTzACL LOGzACL LOADzACL SAVEzACL SETUSERz	ACL USERSz
ACL WHOAMIZAUTHCLIENT LISTCLIENT SETNAMECLIENT GETNAME
CONFIG SETCONFIG REWRITECONFIG RESETSTATTIMEzPUBSUB CHANNELSzPUBSUB NUMPATzPUBSUB NUMSUBPINGINFOSHUTDOWNKEYSDBSIZEBGSAVESLOWLOG GETSLOWLOG LENSLOWLOG RESETZWAITSAVEzMEMORY PURGEzMEMORY MALLOC-STATSzMEMORY STATSLASTSAVEzCLIENT TRACKINGINFOzCLIENT PAUSEzCLIENT UNPAUSEzCLIENT UNBLOCKz	CLIENT IDzCLIENT REPLYzCLIENT GETREDIRzCLIENT INFOCLIENT KILLREADONLY	READWRITEzCLUSTER INFOzCLUSTER MEETzCLUSTER NODESzCLUSTER REPLICASzCLUSTER RESETzCLUSTER SET-CONFIG-EPOCHCLUSTER SLOTSCLUSTER SHARDSzCLUSTER COUNT-FAILURE-REPORTSzCLUSTER KEYSLOTZCOMMANDzCOMMAND COUNTzCOMMAND LISTzCOMMAND GETKEYS
CONFIG GETDEBUG	RANDOMKEYzGRAPH.CONFIGFLUSHALLFLUSHDBzFUNCTION DELETEzFUNCTION FLUSHzFUNCTION LISTzFUNCTION LOADzFUNCTION RESTORESCANSCRIPT EXISTSSCRIPT FLUSHSCRIPT LOADzFUNCTION DUMPzCLUSTER COUNTKEYSINSLOTzCLUSTER DELSLOTSzCLUSTER DELSLOTSRANGEzCLUSTER GETKEYSINSLOTzCLUSTER SETSLOTz	FT.CREATEz	FT.SEARCHzFT.AGGREGATEz
FT.EXPLAINzFT.EXPLAINCLIz
FT,PROFILEzFT.ALTERzFT.DROPINDEXzFT.ALIASADDzFT.ALIASUPDATEzFT.ALIASDELz
FT.TAGVALSz	FT.SUGADDz	FT.SUGGETz	FT.SUGDELz	FT.SUGLENzFT.SYNUPDATEz
FT.SYNDUMPzFT.SPELLCHECKz
FT.DICTADDz
FT.DICTDELzFT.DICTDUMPzFT.INFOzFT._LISTz	FT.CONFIGzFT.ADDzFT.DELzFT.DROPzFT.GETzFT.MGETz	FT.SYNADD)r   r   c                 C   s   t t| S r9   )sumlistrF   r;   r<   r/   r/   r1   <lambda>Z      zAbstractRedisCluster.<lambda>c                 C   s   t |trt| S |S r9   )
isinstancedictr   rF   r   r/   r/   r1   r   k  r   c                 C   s   t |trt| S |S r9   )r   r   r   rF   r   r/   r/   r1   r   o  r   c                 C   s   t | dkrdS dS rL   )r   rF   r   r/   r/   r1   r   r  r   c                 C   s   t |  S r9   )r   rF   popr   r/   r/   r1   r   v  r   c                 C   s   dd t |  D S )Nc                 S   s   g | ]}t |qS r/   )r   )rC   rs   r/   r/   r1   rE   y  s     z1AbstractRedisCluster.<lambda>.<locals>.<listcomp>)ziprF   r   r/   r/   r1   r   y  r   c                 C   s   t | S r9   )r   rF   r   r/   r/   r1   r   {  r   N)r}   r~   r   RedisClusterRequestTTL	PRIMARIESREPLICAS	ALL_NODESRANDOMDEFAULT_NODE
NODE_FLAGSr&   r'   SLOT_IDCOMMAND_FLAGSSEARCH_COMMANDSrZ   rc   #CLUSTER_COMMANDS_RESPONSE_CALLBACKSrH   r(   rB   RESULT_CALLBACKSr   r"   r   ERRORS_ALLOW_RETRYr/   r/   r/   r1   r      sv  IL
l%
  
  'r   c                	   @   s6  e Zd Zedd ZdNd	d
Zdd Zdd Zdd Zdd Z	dd Z
dd ZdOddZdd Zdd Zdd Zdd  ZdPd!d"Zd#d$ Zd%d& ZdQd'd(ZdRd)d*ZdSd+d,ZdTd.d/Zd0d1 Zd2d3 Zd4d5 Zd6d7 Zd8d9 Zd:d; Zd<d= Zd>d? Zd@dA Z dBdC Z!dDdE Z"dFdG Z#dHdI Z$dJdK Z%dLdM Z&dS )URedisClusterc                 K   s   | f d|i|S )a  
        Return a Redis client object configured from the given URL

        For example::

            redis://[[username]:[password]]@localhost:6379/0
            rediss://[[username]:[password]]@localhost:6379/0
            unix://[[username]:[password]]@/path/to/socket.sock?db=0

        Three URL schemes are supported:

        - `redis://` creates a TCP socket connection. See more at:
          <https://www.iana.org/assignments/uri-schemes/prov/redis>
        - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
          <https://www.iana.org/assignments/uri-schemes/prov/rediss>
        - ``unix://``: creates a Unix Domain Socket connection.

        The username, password, hostname, path and all querystring values
        are passed through urllib.parse.unquote in order to replace any
        percent-encoded values with their corresponding characters.

        There are several ways to specify a database number. The first value
        found will be used:

            1. A ``db`` querystring option, e.g. redis://localhost?db=0
            2. If using the redis:// or rediss:// schemes, the path argument
               of the url, e.g. redis://localhost/0
            3. A ``db`` keyword argument to this function.

        If none of these options are specified, the default db=0 is used.

        All querystring options are cast to their appropriate Python types.
        Boolean arguments can be specified with string values "True"/"False"
        or "Yes"/"No". Values that cannot be properly cast cause a
        ``ValueError`` to be raised. Once parsed, the querystring arguments
        and keyword arguments are passed to the ``ConnectionPool``'s
        class initializer. In the case of conflicting arguments, querystring
        arguments always win.

        urlr/   )clsr   rv   r/   r/   r1   from_url  s    *zRedisCluster.from_urlN  rP   F
   Tc
                 K   s  |dkrg }d|
krt dd}|	dk	rd}t|	}d|krDt dd|kr`|d dkr`t d	|
| |
d
}|
d|}|t|| n6|dk	r|dk	r|t|| nt|dkrt dtd|  |
	dd| _
|
d| ji tf |
}
t|
dd|
dd|
dd| _|| _| jj | _| jj | _|| _d| _|| _d| _tf ||||d|
| _t| jj| _t| jj| _t | | _!t"# | _$dS )a  
         Initialize a new RedisCluster client.

         :startup_nodes: 'list[ClusterNode]'
             List of nodes from which initial bootstrapping can be done
         :host: 'str'
             Can be used to point to a startup node
         :port: 'int'
             Can be used to point to a startup node
         :require_full_coverage: 'bool'
            When set to False (default value): the client will not require a
            full coverage of the slots. However, if not all slots are covered,
            and at least one node has 'cluster-require-full-coverage' set to
            'yes,' the server will throw a ClusterDownError for some key-based
            commands. See -
            https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
            When set to True: all slots must be covered to construct the
            cluster client. If not all slots are covered, RedisClusterException
            will be thrown.
        :read_from_replicas: 'bool'
             Enable read from replicas in READONLY mode. You can read possibly
             stale data.
             When set to true, read commands will be assigned between the
             primary and its replications in a Round-Robin manner.
         :dynamic_startup_nodes: 'bool'
             Set the RedisCluster's startup nodes to all of the discovered nodes.
             If true (default value), the cluster's discovered nodes will be used to
             determine the cluster nodes-slots mapping in the next topology refresh.
             It will remove the initial passed startup nodes if their endpoints aren't
             listed in the CLUSTER SLOTS output.
             If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
             specific IP addresses, it is best to set it to false.
        :cluster_error_retry_attempts: 'int'
             Retry command execution attempts when encountering ClusterDownError
             or ConnectionError
        :reinitialize_steps: 'int'
            Specifies the number of MOVED errors that need to occur before
            reinitializing the whole cluster topology. If a MOVED error occurs
            and the cluster does not need to be reinitialized on this current
            error handling, only the MOVED slot will be patched with the
            redirected node.
            To reinitialize the cluster on every MOVED error, set
            reinitialize_steps to 1.
            To avoid reinitializing the cluster on moved errors, set
            reinitialize_steps to 0.

         :**kwargs:
             Extra arguments that will be sent into Redis instance when created
             (See Official redis-py doc for supported kwargs
         [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
             Some kwargs are not supported and will raise a
             RedisClusterException:
                 - db (Redis do not support database SELECT in cluster mode)
        Nre   z4Argument 'db' is not possible to use in cluster modeFTpathzFRedisCluster does not currently support Unix Domain Socket connectionsr   z9A ``db`` querystring option can only be 0 in cluster moder+   r,   a5  RedisCluster requires at least one node to discover the cluster. Please provide one of the followings:
1. host and port, for example:
 RedisCluster(host='localhost', port=6379)
2. list of startup nodes, for example:
 RedisCluster(startup_nodes=[ClusterNode('localhost', 6379), ClusterNode('localhost', 6378)])zstartup_nodes : rk   rg   utf-8rh   strictrf   )startup_nodesr   require_full_coveragedynamic_startup_nodes)%r   r   updaterT   r_   ClusterNoder^   logdebugr   user_on_connect_func
on_connectrx   r   encodercluster_error_retry_attempts	__class__r   copycommand_flagsr   
node_flagsread_from_replicasreinitialize_counterreinitialize_stepsnodes_managerNodesManagerr   r   cluster_response_callbacksr   result_callbacksr   commands_parser	threadingr%   _lock)selfr+   r,   r   r   r   r   r   r   r   rv   r   Zurl_optionsr/   r/   r1   __init__  st    C

	




zRedisCluster.__init__c                 C   s   | S r9   r/   r   r/   r/   r1   	__enter__?  s    zRedisCluster.__enter__c                 C   s   |    d S r9   closer   exc_type	exc_value	tracebackr/   r/   r1   __exit__B  s    zRedisCluster.__exit__c                 C   s   |    d S r9   r   r   r/   r/   r1   __del__E  s    zRedisCluster.__del__c              	   C   s>   |   D ]0}|jrz|jj  W q tk
r6   Y qX qd S r9   )	get_nodesredis_connectionr4   
disconnectOSErrorr   rb   r/   r/   r1   disconnect_connection_poolsH  s    z(RedisCluster.disconnect_connection_poolsc                 C   sR   | t |  | jr:|d t| dkr:td| jdk	rN| | dS )z
        Initialize the connection, authenticate and select a database and send
         READONLY if it is set during object initialization.
        r   OKzREADONLY command failedN)	
set_parserry   r   r   send_commandr*   Zread_responser   r   )r   r3   r/   r/   r1   r   Q  s    


zRedisCluster.on_connectc              	   C   s2   |j s,| j |j s"| j|g W 5 Q R X |j S r9   )r   r   r   create_redis_connectionsr   r/   r/   r1   get_redis_connectionf  s
    z!RedisCluster.get_redis_connectionc                 C   s   | j |||S r9   )r   get_noder   r+   r,   r>   r/   r/   r1   r   m  s    zRedisCluster.get_nodec                 C   s   | j tS r9   )r   get_nodes_by_server_typePRIMARYr   r/   r/   r1   get_primariesp  s    zRedisCluster.get_primariesc                 C   s   | j tS r9   )r   r   REPLICAr   r/   r/   r1   get_replicass  s    zRedisCluster.get_replicasc                 C   s   t t| jj S r9   )r   choicer   r   nodes_cacherF   r   r/   r/   r1   get_random_nodev  s    zRedisCluster.get_random_nodec                 C   s   t | jj S r9   )r   r   r   rF   r   r/   r/   r1   r   y  s    zRedisCluster.get_nodesc                 C   sn   |  |}| jj|}|dks,t|dkr<td| d|rXt| jj| dk rXdS |rbd}nd}|| S )z
        Get the node that holds the key's slot.
        If replica set to True but the slot doesn't have any replicas, None is
        returned.
        Nr   Slot "z " is not covered by the cluster.r\   rM   )keyslotr   slots_cacherT   r^   r!   )r   keyrQ   rW   Z
slot_cachenode_idxr/   r/   r1   get_node_from_key|  s    
zRedisCluster.get_node_from_keyc                 C   s   | j jS )z0
        Get the cluster's default node
        )r   default_noder   r/   r/   r1   get_default_node  s    zRedisCluster.get_default_nodec                 C   sD   |dks| j |jddkr(td dS || j_td|  dS )z
        Set the default node of the cluster.
        :param node: 'ClusterNode'
        :return True if the default node was set, else False
        Nr>   zVThe requested node does not exist in the cluster, so the default node was not changed.Fz$Changed the default cluster node to T)r   namer   infor   r  r   r/   r/   r1   set_default_node  s    zRedisCluster.set_default_nodec                 C   s6   |dkr|   }|jdkr,td|j d|j S )ai  
        Returns a Monitor object for the specified target node.
        The default cluster node will be selected if no target node was
        specified.
        Monitor is useful for handling the MONITOR command to the redis server.
        next_command() method returns one command from monitor
        listen() method yields commands from monitor.
        NzCluster Node z has no redis_connection)r  r   r   r  monitor)r   target_noder/   r/   r1   r	    s    	
zRedisCluster.monitorc                 K   s   t | f|||d|S )z~
        Allows passing a ClusterNode, or host&port, to get a pubsub instance
        connected to the specified node
        )rb   r+   r,   )ClusterPubSub)r   rb   r+   r,   rv   r/   r/   r1   pubsub  s    zRedisCluster.pubsubc                 C   sF   |rt d|rt dt| j| j| jj| j| j| j| j| j	| j
d	S )ac  
        Cluster impl:
            Pipelines do not work in cluster mode the same way they
            do in normal mode. Create a clone of this object so
            that simulating pipelines will work correctly. Each
            command will be called directly when used and
            when calling execute() will only return the result stack.
        z(shard_hint is deprecated in cluster modez)transaction is deprecated in cluster mode)	r   r   r   r   r   r   r   r   lock)r   ClusterPipeliner   r   r   r   r   r   r   r   r   )r   Ztransaction
shard_hintr/   r/   r1   pipeline  s    	zRedisCluster.pipeline皙?c              	   C   s"   |dkrt }|| ||||||dS )aL  
        Return a new Lock object using key ``name`` that mimics
        the behavior of threading.Lock.

        If specified, ``timeout`` indicates a maximum life for the lock.
        By default, it will remain locked until release() is called.

        ``sleep`` indicates the amount of time to sleep per loop iteration
        when the lock is in blocking mode and another client is currently
        holding the lock.

        ``blocking`` indicates whether calling ``acquire`` should block until
        the lock has been acquired or to fail immediately, causing ``acquire``
        to return False and the lock not being acquired. Defaults to True.
        Note this value can be overridden by passing a ``blocking``
        argument to ``acquire``.

        ``blocking_timeout`` indicates the maximum amount of time in seconds to
        spend trying to acquire the lock. A value of ``None`` indicates
        continue trying forever. ``blocking_timeout`` can be specified as a
        float or integer, both representing the number of seconds to wait.

        ``lock_class`` forces the specified lock implementation. Note that as
        of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
        a Lua-based lock). So, it's unlikely you'll need this parameter, unless
        you have created your own custom lock class.

        ``thread_local`` indicates whether the lock token is placed in
        thread-local storage. By default, the token is placed in thread local
        storage so that a thread only sees its token, not a token set by
        another thread. Consider the following timeline:

            time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
                     thread-1 sets the token to "abc"
            time: 1, thread-2 blocks trying to acquire `my-lock` using the
                     Lock instance.
            time: 5, thread-1 has not yet completed. redis expires the lock
                     key.
            time: 5, thread-2 acquired `my-lock` now that it's available.
                     thread-2 sets the token to "xyz"
            time: 6, thread-1 finishes its work and calls release(). if the
                     token is *not* stored in thread local storage, then
                     thread-1 would see the token value as "xyz" and would be
                     able to successfully release the thread-2's lock.

        In some use cases it's necessary to disable thread local storage. For
        example, if you have code where one thread acquires a lock and passes
        that lock instance to a worker thread to release later. If thread
        local storage isn't disabled in this case, the worker thread won't see
        the token set by the thread that acquired the lock. Our assumption
        is that these cases aren't common and as such default to using
        thread local storage.N)timeoutsleepblockingblocking_timeoutthread_localr$   )r   r  r  r  r  r  
lock_classr  r/   r/   r1   r    s    >zRedisCluster.lockc                 C   s   || j |< dS )zSet a custom Response CallbackN)r   )r   r;   callbackr/   r/   r1   set_response_callback#  s    z"RedisCluster.set_response_callbackc                 O   s\  |d   }t|dkrR|d  d|d    | jkrR|d  d|d    }|dd }|d k	rl|}n| j|}|rtd| d|  || jjkr| 	 gS || jj
kr|  S || jjkr|  S || jjkr|  S || jjk r| jjgS || jjd kr| jjgS | j| }| j|| jo8|tk}td| d	|  |gS d S )
Nr   r\    rM   rj   zTarget node/s for z: zTarget for z: slot )upperr^   r   r   rT   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r   determine_slotget_node_from_slotr   r   )r   r7   rv   r;   rj   Zcommand_flagrW   rb   r/   r/   r1   _determine_nodes'  s8    ,



 zRedisCluster._determine_nodesc                 C   s"   | j dkrdS | j| j  dkS d S )Nr   F)r   r   r   r/   r/   r1   _should_reinitializedO  s    
z"RedisCluster._should_reinitializedc                 C   s   | j |}t|S )z
        Calculate keyslot for a given key.
        See Keys distribution model in https://redis.io/topics/cluster-spec
        )r   encoder   )r   r   rs   r/   r/   r1   r   Y  s    zRedisCluster.keyslotc                 G   s   |   j}| jj|f| S )a  
        Get the keys in the command. If the command has no keys in in, None is
        returned.

        NOTE: Due to a bug in redis<7.0, this function does not work properly
        for EVAL or EVALSHA when the `numkeys` arg is 0.
         - issue: https://github.com/redis/redis/issues/9493
         - fix: https://github.com/redis/redis/pull/9733

        So, don't use this function with EVAL or EVALSHA.
        )r  r   r   Zget_keys)r   r7   Z
redis_connr/   r/   r1   _get_command_keysa  s    
zRedisCluster._get_command_keysc                    s  |d } j |tkr |d S |dkrxt|dkrBtd| |d }|dd|  }t|dkrrtdtS |}n@ j| }|dkst|dkr|dkrtdtS td	| t|dkr҈ 	|d S  fd
d|D }t|dkrt| d|
 S )a  
        Figure out what slot to use based on args.

        Raises a RedisClusterException if there's a missing key and we can't
            determine what slots to map the command to; or, if the keys don't
            all map to the same key slot.
        r   rM   )ZEVALEVALSHAr\   zInvalid args in command: rP   N)ZFCALLZFCALL_ROzNo way to dispatch this command to Redis Cluster. Missing key.
You can execute the command by specifying target nodes.
Command: c                    s   h | ]}  |qS r/   )r   )rC   r   r   r/   r1   	<setcomp>  s     z.RedisCluster.determine_slot.<locals>.<setcomp>z) - all keys must map to the same key slot)r   rT   r   r^   r   r   	randranger   r!  r   r   )r   r7   r;   Znum_actual_keysZ	eval_keyskeysrV   r/   r   r1   r  p  s4    
zRedisCluster.determine_slotc                 C   s   | j S )z.
        Get the connections' encoder
        )r   r   r/   r/   r1   get_encoder  s    zRedisCluster.get_encoderc                 C   s   | j jS )z9
        Get the connections' key-word arguments
        )r   rw   r   r/   r/   r1   get_connection_kwargs  s    z"RedisCluster.get_connection_kwargsc                 C   s   t |to|| jkS r9   )r   rU   r   )r   target_nodesr/   r/   r1   _is_nodes_flag  s    zRedisCluster._is_nodes_flagc                 C   sL   t |tr|}n8t |tr"|g}n&t |tr6| }ntdt| |S )Nztarget_nodes type can be one of the following: node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. The passed type is )r   r   r   r   rF   	TypeErrortype)r   r(  r[   r/   r/   r1   _parse_target_nodes  s    



z RedisCluster._parse_target_nodesc                 O   s
  d}d}| dd}|dk	r4| |s4| |}d}|r<dn| j}d}td|D ]}zli }	|s| j||d|i}|std| d	|D ]}
| j|
f|||	|
j< q| j	|d |	f|W   S  t
k
 r } zt|| jjkr|}n|W 5 d}~X Y qPX qP|dS )
a"  
        Wrapper for ERRORS_ALLOW_RETRY error handling.

        It will try the number of times specified by the config option
        "self.cluster_error_retry_attempts" which defaults to 3 unless manually
        configured.

        If it reaches the number of times, the command will raise the exception

        Key argument :target_nodes: can be passed with the following types:
            nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
            ClusterNode
            list<ClusterNode>
            dict<Any, ClusterNode>
        FNr(  TrM   r   rj   !No targets were found to execute  command on)r   r)  r,  r   r]   r  r   _execute_commandr  _process_resultBaseExceptionr+  r   r   )r   r7   rv   Ztarget_nodes_specifiedr(  passed_targetsZretry_attempts	exception_r<   rb   er/   r/   r1   execute_command  s>    

 
zRedisCluster.execute_commandc              
   O   s  |d }d}d}d}d}d}	t | j}
d}|
dkr|
d8 }
z8z|rT| j|d}n*|	r~| j| }| j|| jov|t	k}d}	t
d| d|j d|j  | |}t|f||}|r|d	 |j|d	f| d}|j|  |j||f|}|| jkr| j| |f|}|W W \S  tttfk
rX } zt
t|  W 5 d}~X Y n ttfk
r } zTt
t| |dk	r|  |d7 }|d
k rtd nd|_| j   W 5 d}~X Y n tk
r8 } zHt
d |  j d7  _ | ! r| j  d| _ n| j"| d}	W 5 d}~X Y n< t#k
rr   t
d |
| jd k rltd Y n t$k
r } z"t
d t%|j&|j'd}d}W 5 d}~X Y n t(k
r } z&t
d td | j  |W 5 d}~X Y n~ t)k
r4 } z |* }t
d|  |W 5 d}~X Y n@ t+k
rr } z t
d |r^|  |W 5 d}~X Y nX W 5 |dk	r|j| X q*t,ddS )z9
        Send a command to a node in the cluster
        r   NFrM   r  zExecuting command z on target node: r  ZASKING   g      ?r   Tr#   r\   g?r   r0   r   zResponseError: r1  zTTL exhausted.)-intr   r4   releaser   r  r   r  r   r   r   r   server_typer  r   r5   r   parse_responser   r   r   r   r3  r+  r   r"   r   timer  r   
initializer   r   r  update_moved_exceptionr#   r   r2   r+   r,   r   r    __str__r1  r   )r   r
  r7   rv   r;   r6   r3   Zredirect_addraskingmovedZttlZconnection_error_retry_counterrW   r?   r5  messager/   r/   r1   r/    s    


 





	









zRedisCluster._execute_commandc              	   C   s@   z&| j  | jr| j  W 5 Q R X W n tk
r:   Y nX d S r9   )r   r   r   AttributeErrorr   r/   r/   r1   r     s    zRedisCluster.closec                 K   sB   || j kr| j | ||f|S t|dkr:t| d S |S dS )a  
        Process the result of the executed command.
        The function would return a dict or a single value.

        :type command: str
        :type res: dict

        `res` should be in the following format:
            Dict<node_name, command_result>
        rM   r   N)r   r^   r   rF   )r   r;   r<   rv   r/   r/   r1   r0    s
    
zRedisCluster._process_resultc                 C   s   t | || dS )a  
        This function can be used to add externally defined redis modules,
        and their namespaces to the redis client.

        ``funcname`` - A string containing the name of the function to create
        ``func`` - The function, being added to this class.
        N)setattr)r   funcnamefuncr/   r/   r1   load_external_module  s    z!RedisCluster.load_external_module)	Nr   NrP   Fr   FTN)NNN)F)N)NNN)NN)Nr  TNNT)'r}   r~   r   classmethodr   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r  r	  r  r  r  r  r  r  r   r!  r  r&  r'  r)  r,  r6  r/  r   r0  rG  r/   r/   r/   r1   r     sh   
-         
 	




      
J(
9?y	r   c                   @   s.   e Zd Zd
ddZdd Zdd Zdd	 ZdS )r   Nc                 C   s:   |dkrt |}|| _|| _t||| _|| _|| _d S )N	localhost)socketgethostbynamer+   r,   r2   r  r:  r   )r   r+   r,   r:  r   r/   r/   r1   r     s    
zClusterNode.__init__c                 C   s.   d| j  d| j d| j d| j d| j dS )Nz[host=z,port=z,name=z,server_type=z,redis_connection=])r+   r,   r  r:  r   r   r/   r/   r1   __repr__  s    ,zClusterNode.__repr__c                 C   s   t |to|j| jkS r9   )r   r   r  )r   objr/   r/   r1   __eq__  s    zClusterNode.__eq__c                 C   s   | j d k	r| j   d S r9   )r   r   r   r/   r/   r1   r     s    
zClusterNode.__del__)NN)r}   r~   r   r   rM  rO  r   r/   r/   r/   r1   r     s   

	r   c                   @   sB   e Zd ZdZdeddddZeeeddd	Zdd
ddZdS )LoadBalancerz$
    Round-Robin Load Balancing
    r   N)start_indexr-   c                 C   s   i | _ || _d S r9   )primary_to_idxrQ  )r   rQ  r/   r/   r1   r     s    zLoadBalancer.__init__)rR   	list_sizer-   c                 C   s&   | j || j}|d | | j |< |S )NrM   )rR  
setdefaultrQ  )r   rR   rS  Zserver_indexr/   r/   r1   get_server_index  s    zLoadBalancer.get_server_index)r-   c                 C   s   | j   d S r9   )rR  clearr   r/   r/   r1   reset  s    zLoadBalancer.reset)r   )	r}   r~   r   __doc__r8  r   rU   rU  rW  r/   r/   r/   r1   rP    s   rP  c                   @   s   e Zd Zd ddZd!ddZdd	 Zd
d Zd"ddZdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd ZdS )#r   FNTc                 K   sj   i | _ i | _i | _d | _| | || _|| _|| _d | _|| _	t
 | _|d krXt }|| _|   d S r9   )r   r   r   r  populate_startup_nodesr   _require_full_coverage_dynamic_startup_nodes_moved_exceptionrw   rP  read_load_balancerr   r%   r   r=  )r   r   r   r   r  r   rv   r/   r/   r1   r     s    	
zNodesManager.__init__c                 C   sP   |r.|r.|dkrt |}| jt||dS |r>| j|S td dS dS )z
        Get the requested node from the cluster's nodes.
        nodes.
        :return: ClusterNode if the node exists, else None
        rI  r0   zEget_node requires one of the following: 1. node name 2. host and portN)rJ  rK  r   rT   r2   r   errorr   r/   r/   r1   r     s    
zNodesManager.get_nodec                 C   s
   || _ d S r9   )r\  )r   r3  r/   r/   r1   r>  
  s    z#NodesManager.update_moved_exceptionc                 C   s   | j }| j|j|jd}|dk	r2|jtk	rNt|_nt|j|jt}|| j|j< || j	|j
 kr| j	|j
 d }t|_| j	|j
 | | j	|j
 | || j	|j
 d< | j|kr|| _n|g| j	|j
< d| _ dS )z@
        Update the slot's node with the redirected one
        r0   Nr   )r\  r   r+   r,   r:  r   r   r   r  r   Zslot_idr   r_   remover  )r   r5  Zredirected_nodeZold_primaryr/   r/   r1   _update_moved_slots  s"    

z NodesManager._update_moved_slotsc              	   C   s   | j r&| j | j r|   W 5 Q R X | j|dksHt| j| dkr`td| d| j d|dkr| j| d j}| j	
|t| j| }nB|dks|tkst| j| dkrd}ntdt| j| d }| j| | S )z9
        Gets a node that servers this hash slot
        Nr   r   z5" not covered by the cluster. "require_full_coverage="TrM   )r\  r   r`  r   rT   r^   r!   rZ  r  r]  rU  r   r   randint)r   rW   r   r:  Zprimary_namer  r/   r/   r1   r  4  s.    " zNodesManager.get_node_from_slotc                    s    fdd| j  D S )z
        Get all nodes with the specified server type
        :param server_type: 'primary' or 'replica'
        :return: list of ClusterNode
        c                    s   g | ]}|j  kr|qS r/   r:  )rC   rb   rc  r/   r1   rE   ]  s   
z9NodesManager.get_nodes_by_server_type.<locals>.<listcomp>)r   rF   )r   r:  r/   rc  r1   r   W  s    
z%NodesManager.get_nodes_by_server_typec                 C   s   |D ]}|| j |j< qdS )zK
        Populate all startup nodes and filters out any duplicates
        N)r   r  )r   r[   nr/   r/   r1   rY  c  s    z#NodesManager.populate_startup_nodesc                 C   s"   t dtD ]}||kr
 dS q
dS )Nr   FT)r]   r   )r   r   ra   r/   r/   r1   check_slots_coveragej  s    z!NodesManager.check_slots_coveragec                 C   s6   |D ],}|j dkr| jf |j|jd| j|_ qdS )zV
        This function will create a redis connection to all nodes in :nodes:
        Nr0   )r   create_redis_noder+   r,   rw   )r   r[   rb   r/   r/   r1   r   r  s    
 z%NodesManager.create_redis_connectionsc                 K   sL   | j r4|d|i |d|i ttf |d}ntf ||d|}|S )Nr+   r,   )r4   r0   )r   r   r	   r   )r   r+   r,   rv   rA   r/   r/   r1   rf  |  s    zNodesManager.create_redis_nodec                 C   sJ   t ||}||}|d krF| j|}|d ks:|jd krFt|||}|S r9   )r2   rT   r   r   r   )r   r+   r,   roletmp_nodes_cacher>   r
  r/   r/   r1   _get_or_create_cluster_node  s    

z(NodesManager._get_or_create_cluster_nodec                    s  t d |   i }i }g }d}d}| j}| j D ]4}z~|jrL|j}n<t|}	|		ddd | j
|j|jf|	}|| j|j _t| ddkrtdt|d}
d}W n ttfk
r } z*|j}t d	|j d
|  W Y q6W 5 d}~X Y n tk
rn } zHt d | }d|ks@d|krJW Y q6ntd| d| W 5 d}~X Y nB tk
r } z"| }td|j d| W 5 d}~X Y nX t|
dkrt|
d d d dkrt| jdkr|j|
d d d< |
D ]V  d }t|d }|dkr(|j}t|d }| ||t|}|||j< tt d t d d D ]}||krg ||< || |  fddtdt D }|D ]B}t|d }|d }| ||t |}|| | |||j< qnZ|| d }|j|jkrl||j d|j d|  t|dkrltdd!| qlq| "|}|r6 qnq6|s|td| #t$|  |s| j%rtdt| d t& d!|| _'|| _(| )td | _*| j+r|| _d| _,dS )"z
        Initializes the nodes cache, slots cache and redis connections.
        :startup_nodes:
            Responsible for discovering other nodes in the cluster
        z/Initializing the nodes' topology of the clusterFTr   )rf   rg   Zcluster_enabledz(Cluster mode is not enabled on this noder   zQAn exception occurred while trying to initialize the cluster using the seed node z:
Nz6ReseponseError sending "cluster slots" to redis serverr{   r|   z7ERROR sending "cluster slots" command to redis server: z	. error: z6ERROR sending "cluster slots" command to redis server rM   r   r\   rK   c                    s   g | ]} | qS r/   r/   )rC   jrW   r/   r1   rE     s     z+NodesManager.initialize.<locals>.<listcomp>rP   z vs z
 on slot: r7  z6startup_nodes could not agree on a valid slots cache: z, zORedis Cluster cannot be connected. Please provide at least one reachable node. z9All slots are not covered after query all startup_nodes. z of z covered...)-r   r   rW  rw   r   rF   r   r   deepcopyr   rf  r+   r,   r  boolr  rT   r   r*   r6  r   r"   r?  r3  r    	Exceptionr^   r8  ri  r   r]   r_   r   joinre  r   r   rZ  r   r   r   r   r  r[  r\  )r   rh  Z	tmp_slotsZdisagreementsZstartup_nodes_reachableZfully_coveredrv   Zstartup_noderA   Zcopy_kwargsZcluster_slotsr5  msgrB  Zprimary_noder+   r,   r
  ra   Zreplica_nodesZreplica_nodeZtarget_replica_nodeZtmp_slotr/   rk  r1   r=    s    

 




   
"
   
zNodesManager.initializec                 C   s*   d | _ | j D ]}|jr|j  qd S r9   )r  r   rF   r   r   r   r/   r/   r1   r   3  s    zNodesManager.closec                 C   s(   z| j   W n tk
r"   Y nX d S r9   )r]  rW  r*  r   r/   r/   r1   rW  9  s    zNodesManager.reset)FFNT)NNN)FN)r}   r~   r   r   r   r>  r`  r  r   rY  re  r   rf  ri  r=  r   rW  r/   r/   r/   r1   r     s&       

'
#

 r   c                       sL   e Zd ZdZd fdd	ZdddZdd Zd	d
 Zdd Zdd Z	  Z
S )r  z
    Wrapper for PubSub class.

    IMPORTANT: before using ClusterPubSub, read about the known limitations
    with pubsub in Cluster mode and learn how to workaround them:
    https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
    Nc                    sV   d| _ | |||| | j dkr$dn|| j j}|| _t jf |||jd dS )a*  
        When a pubsub instance is created without specifying a node, a single
        node will be transparently chosen for the pubsub connection on the
        first command execution. The node will be determined by:
         1. Hashing the channel name in the request to find its keyslot
         2. Selecting a node that handles the keyslot: If read_from_replicas is
            set to true, a replica can be selected.

        :type redis_cluster: RedisCluster
        :type node: ClusterNode
        :type host: str
        :type port: int
        N)r4   r   )rb   set_pubsub_noder   r4   clustersuperr   r   )r   redis_clusterrb   r+   r,   rv   r4   r   r/   r1   r   J  s      zClusterPubSub.__init__c                 C   s~   |dk	r"|  |||j|j |}nR|dk	rV|dk	rV|j||d}|  |||| |}nt||gdkrptdnd}|| _dS )aN  
        The pubsub node will be set according to the passed node, host and port
        When none of the node, host, or port are specified - the node is set
        to None and will be determined by the keyslot of the channel in the
        first command to be executed.
        RedisClusterException will be thrown if the passed node does not exist
        in the cluster.
        If host is passed without port, or vice versa, a DataError will be
        thrown.
        :type cluster: RedisCluster
        :type node: ClusterNode
        :type host: str
        :type port: int
        Nr0   Tz6Passing a host requires passing a port, and vice versa)_raise_on_invalid_noder+   r,   r   anyr   rb   )r   rr  rb   r+   r,   Zpubsub_noder/   r/   r1   rq  d  s    
zClusterPubSub.set_pubsub_nodec                 C   s   | j S )zJ
        Get the node that is being used as the pubsub connection
        )rb   r   r/   r/   r1   get_pubsub_node  s    zClusterPubSub.get_pubsub_nodec                 C   s4   |dks|j |jddkr0td| d| ddS )zl
        Raise a RedisClusterException if the node is None or doesn't exist in
        the cluster.
        Nr  zNode r.   z doesn't exist in the cluster)r   r  r   )r   rt  rb   r+   r,   r/   r/   r1   rv    s    z$ClusterPubSub._raise_on_invalid_nodec                 O   s   | j dkr| jdkrnt|dkrJ|d }| j|}| jj|| jj}n
| j }|| _	| j
|}|j| _| jd| j| _ | j | j | j }| j||jf|  dS )z
        Execute a publish/subscribe command.

        Taken code from redis-py and tweak to make it work within a cluster.
        NrM   r  )r3   r4   r^   rr  r   r   r  r   r   rb   r   r5   r  Zregister_connect_callbackr   Z_executer   )r   r7   rv   rD   rW   rb   r   r3   r/   r/   r1   r6    s(    


 
 zClusterPubSub.execute_commandc                 C   s   | j dk	r| j jS dS )zH
        Get the Redis connection of the pubsub connected node.
        N)rb   r   r   r/   r/   r1   r     s    
z"ClusterPubSub.get_redis_connection)NNN)NNN)r}   r~   r   rX  r   rq  rx  rv  r6  r   __classcell__r/   r/   ru  r1   r  A  s   
!
#r  c                       s  e Zd ZdZeeeeefZ	d=ddZ
dd	 Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd>dd Zd!d" Zd?d#d$Zd@ fd%d&	Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Zd1d2 Zd3d4 Z d5d6 Z!d7d8 Z"d9d: Z#d;d< Z$  Z%S )Ar  z8
    Support for Redis pipeline
    in cluster mode
    NFr7  r   c
                 K   s   g | _ || _|| _d| _|p&| jj | _|r2|ng | _|| _	| jj
 | _|| _|| _d| _|| _t|
dd|
dd|
dd| _|	dkrt }	|	| _dS )	r  Fr   rg   r   rh   r   rf   N)command_stackr   r   Zrefresh_table_asapr   r   r   r   r   r   r   r   r   r   r   r   r   rT   r   r   r%   r   )r   r   r   r   r   r   r   r   r   r  rv   r/   r/   r1   r     s*    


zClusterPipeline.__init__c                 C   s   t | j S r  )r+  r}   r   r/   r/   r1   rM    s    zClusterPipeline.__repr__c                 C   s   | S r{  r/   r   r/   r/   r1   r     s    zClusterPipeline.__enter__c                 C   s   |    dS r  N)rW  r   r/   r/   r1   r     s    zClusterPipeline.__exit__c                 C   s&   z|    W n tk
r    Y nX d S r9   )rW  rn  r   r/   r/   r1   r     s    zClusterPipeline.__del__c                 C   s
   t | jS r{  )r^   rz  r   r/   r/   r1   __len__  s    zClusterPipeline.__len__c                 C   s   dS )z@Pipeline instances should  always evaluate to True on Python 2.7Tr/   r   r/   r/   r1   __nonzero__	  s    zClusterPipeline.__nonzero__c                 C   s   dS )z?Pipeline instances should  always evaluate to True on Python 3+Tr/   r   r/   r/   r1   __bool__  s    zClusterPipeline.__bool__c                 O   s   | j ||S )z?
        Wrapper function for pipeline_execute_command
        )pipeline_execute_commandr   r7   rv   r/   r/   r1   r6    s    zClusterPipeline.execute_commandc                 O   s   | j t||t| j  | S )zN
        Appends the executed command to the pipeline's command stack
        )rz  r_   PipelineCommandr^   r   r7   r8   r/   r/   r1   r    s    z(ClusterPipeline.pipeline_execute_commandc                 C   s8   |D ].}|j }t|tr| ||jd |j |qdS )z8
        Raise the first exception on the stack
        rM   N)resultr   rn  annotate_exceptionpositionr7   )r   stackcrA   r/   r/   r1   raise_first_error   s
    
z!ClusterPipeline.raise_first_errorc                 C   sF   d tt|}d| d| d|jd  }|f|jdd  |_dS )zS
        Provides extra context to the exception prior to it being handled
        r  z
Command # z (z) of pipeline caused error: r   rM   N)ro  mapr)   r7   )r   r3  numberr;   cmdrp  r/   r/   r1   r  *  s    z"ClusterPipeline.annotate_exceptionTc                 C   s&   | j }z| ||W S |   X dS )zB
        Execute all the commands in the current pipeline
        N)rz  rW  send_cluster_commands)r   raise_on_errorr  r/   r/   r1   execute5  s    zClusterPipeline.executec                 C   s   g | _ t | _d| _d| _dS )z/
        Reset back to empty pipeline.
        FN)rz  setscriptsZwatchingZexplicit_transactionr   r/   r/   r1   rW  ?  s    zClusterPipeline.resetc              	   C   sR   |sg S t d| jD ]0}z| j|||dW   S  tk
rB   Y qX qtddS )a  
        Wrapper for CLUSTERDOWN error handling.

        If the cluster reports it is down it is assumed that:
         - connection_pool was disconnected
         - connection_pool was reseted
         - refereh_table_asap set to True

        It will try the number of times specified by
        the config option "self.cluster_error_retry_attempts"
        which defaults to 3 unless manually configured.

        If it reaches the number of times, the command will
        raises ClusterDownException.
        r   )r  allow_redirectionsz0CLUSTERDOWN error. Unable to rebuild the clusterN)r]   r   _send_cluster_commandsr   )r   r  r  r  r4  r/   r/   r1   r  _  s    z%ClusterPipeline.send_cluster_commandsc                    s^  t |dd d}i }|D ]}|jdd}|rD| |sD| |}n(| j|jd|i}|sltd|j dt|d	krtd
|j |d }	|	j	}
|
|kr| 
|	}t||j}t|j|j|||
< ||
 | q| }|D ]}|  q|D ]}|  q| D ]}|j|j qt dd |D dd d}|r|rtd|d j dt|d jj dt|d j  |  jd	7  _|  r| j  |D ]J}zt j |j|j|_W n* t!k
r } z
||_W 5 d}~X Y nX qg }t |dd dD ]B}|jd | j"kr:| j"|jd  |jf|j|_||j q|rZ| #| |S )z
        Send a bunch of cluster commands to the redis cluster.

        `allow_redirections` If the pipeline should follow
        `ASK` & `MOVED` responses automatically. If set
        to false it will raise RedisClusterException.
        c                 S   s   | j S r9   r  r`   r/   r/   r1   r     r   z8ClusterPipeline._send_cluster_commands.<locals>.<lambda>)r   r(  NZ	node_flagr-  r.  rM   zToo many targets for command r   c                 s   s    | ]}t |jtjr|V  qd S r9   )r   r  r  r   rC   r  r/   r/   r1   	<genexpr>  s   z9ClusterPipeline._send_cluster_commands.<locals>.<genexpr>c                 S   s   | j S r9   r  r  r/   r/   r1   r     r   z7An exception occurred during pipeline execution. args: z	, error: r  c                 S   s   | j S r9   r  r  r/   r/   r1   r     r   )$sortedr8   r   r)  r,  r  r7   r   r^   r  r   r5   NodeCommandsr;  r4   r_   rF   writereadr9  r3   r   r3  r+  r  r}   rU   r   r  r   r=  rs  r6  r   r   r  )r   r  r  r  attemptr[   r  r2  r(  rb   r>   r6   r3   Znode_commandsrd  r5  r?   ru  r/   r1   r    sz    
  


0


z&ClusterPipeline._send_cluster_commandsc                 C   s   |st ddS )r  z4ASK & MOVED redirection not allowed in this pipelineNr   )r   r  r/   r/   r1   _fail_on_redirect  s    z!ClusterPipeline._fail_on_redirectc                 G   s   | j d| S )NEXISTS)r  )r6  )r   r%  r/   r/   r1   exists$  s    zClusterPipeline.existsc                 C   s   t ddS )r  z method eval() is not implementedNr  r   r/   r/   r1   eval'  s    zClusterPipeline.evalc                 C   s   t ddS )r  z!method multi() is not implementedNr  r   r/   r/   r1   multi+  s    zClusterPipeline.multic                 O   s   t ddS )r  z5method immediate_execute_command() is not implementedNr  r  r/   r/   r1   immediate_execute_command/  s    z)ClusterPipeline.immediate_execute_commandc                 O   s   t ddS )r  z0method _execute_transaction() is not implementedNr  r  r/   r/   r1   _execute_transaction5  s    z$ClusterPipeline._execute_transactionc                 C   s   t ddS )r  z(method load_scripts() is not implementedNr  r   r/   r/   r1   load_scripts9  s    zClusterPipeline.load_scriptsc                 G   s   t ddS )r  z!method watch() is not implementedNr  r   namesr/   r/   r1   watch=  s    zClusterPipeline.watchc                 C   s   t ddS )r  z#method unwatch() is not implementedNr  r   r/   r/   r1   unwatchA  s    zClusterPipeline.unwatchc                 O   s   t ddS )r  z4method script_load_for_pipeline() is not implementedNr  r  r/   r/   r1   script_load_for_pipelineE  s    z(ClusterPipeline.script_load_for_pipelinec                 G   s$   t |dkrtd| d|d S )z7
        "Delete a key specified by ``names``"
        rM   z=deleting multiple keys is not implemented in pipeline commandZDELr   )r^   r   r6  r  r/   r/   r1   deleteK  s
    zClusterPipeline.delete)NNNFr7  r   N)T)TT)TT)&r}   r~   r   rX  r   r"   r   r   r#   r   r   rM  r   r   r   r}  r~  r  r6  r  r  r  r  rW  r  r  r  r  r  r  r  r  r  r  r  r  r  ry  r/   r/   ru  r1   r    s\          
%	


!   
&    r  .)r  r-   c                    s    fdd}|S )zi
    Prints error because some pipelined commands should
    be blocked when running in cluster-mode
    c                     s   t d  dd S )Nz"ERROR: Calling pipelined function z1 is blocked when running redis in cluster mode...r  )r7   rv   r  r/   r1   inner]  s    
z%block_pipeline_command.<locals>.innerr/   )r  r  r/   r  r1   block_pipeline_commandW  s    r  )EZBGREWRITEAOFr   ZBITOPZ
BRPOPLPUSHr   r   r   r   ZCLIENTr   r   r   r   ZCONFIGr   ECHOr"  r   r   r   r   r   ZMGETzMGET NONATOMICZMOVEZMSETzMSET NONATOMICZMSETNXZPFCOUNTZPFMERGEr   ZPUBLISHr   r   r   ZRENAMEZRENAMENXZ	RPOPLPUSHr   r   r   r   zSCRIPT KILLr   ZSCRIPTZSDIFFZ
SDIFFSTOREz SENTINEL GET MASTER ADDR BY NAMEzSENTINEL MASTERzSENTINEL MASTERSzSENTINEL MONITORzSENTINEL REMOVEzSENTINEL SENTINELSzSENTINEL SETzSENTINEL SLAVESZSENTINELr   ZSINTERZSINTERSTOREZSLAVEOFr   r   r   ZSLOWLOGZSMOVEZSORTZSUNIONZSUNIONSTOREr   r  r4  c                   @   s   e Zd ZdZdddZdS )r  r  Nc                 C   s4   || _ |d kri }|| _|| _d | _d | _d| _d S )NF)r7   r8   r  r  rb   r@  )r   r7   r8   r  r/   r/   r1   r     s    zPipelineCommand.__init__)NN)r}   r~   r   rX  r   r/   r/   r/   r1   r    s   r  c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )r  r  c                 C   s   || _ || _|| _g | _dS r|  )r;  r4   r3   commands)r   r;  r4   r3   r/   r/   r1   r     s    zNodeCommands.__init__c                 C   s   | j | dS r|  )r  r_   )r   r  r/   r/   r1   r_     s    zNodeCommands.appendc              
   C   sv   | j }| j}|D ]
}d|_qz||dd |D  W n6 ttfk
rp } z|D ]
}||_qTW 5 d}~X Y nX dS )z=
        Code borrowed from Redis so it can be fixed
        Nc                 S   s   g | ]
}|j qS r/   rN   r  r/   r/   r1   rE     s     z&NodeCommands.write.<locals>.<listcomp>)r3   r  r  Zsend_packed_commandZpack_commandsr   r"   )r   r3   r  r  r5  r/   r/   r1   r    s    zNodeCommands.writec                 C   s   | j }| jD ]}|jdkrz| j||jd f|j|_W q ttfk
rx } z| jD ]
}||_qTW Y  dS d}~X Y q tk
r   t	
 d |_Y qX qdS )r  Nr   rM   )r3   r  r  r;  r7   r8   r   r"   r   sysexc_info)r   r3   r  r5  r/   r/   r1   r    s    


zNodeCommands.readN)r}   r~   r   rX  r   r_   r  r  r/   r/   r/   r1   r    s
   r  )Yr   loggingr   rJ  r  r   r<  collectionsr   typingr   r   r   r   Zredis.clientr   r   r	   r
   Zredis.commandsr   r   r   Zredis.connectionr   r   r   r   Z	redis.crcr   r   Zredis.exceptionsr   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   Z
redis.lockr%   Zredis.utilsr&   r'   r(   r)   r*   	getLoggerr}   r   rU   r8  r2   r5   rB   rH   rZ   rc   r   r   r   rq   rr   rx   ry   r   r   r   rP  r   r  r  r  ZPIPELINE_BLOCKED_COMMANDSr;   replacelowerrD  r  r  r/   r/   r/   r1   <module>   st   H
   I      1  h   G