U
    dǦ                     @   s  d Z ddlZddlZddlZddlmZ ddlmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZmZmZ dd	lmZ dd
lmZmZ ddlmZ ddlZddlmZmZmZmZ ddlm Z  ddl!m"Z" ddlm#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z* ddl+m,Z,m-Z-m.Z.m/Z/m0Z0 ddl1m2Z2 ddl3m4Z4m5Z5 ddl6m7Z7 ddl8m9Z9m:Z:m;Z;m<Z<m=Z= ddl>m?Z? dZ@eAdhZBe7eCZDdZEeddZFdZGdZHdd ZIG dd  d eJZKd!d" ZLG d#d$ d$ZMG d%d& d&ZNG d'd( d(eMeNZOeOZPG d)d* d*eMZQG d+d, d,eQeNZRG d-d. d.eOZSdS )/zResult backend base classes.

- :class:`BaseBackend` defines the interface.

- :class:`KeyValueStoreBackend` is a common base class
    using K/V semantics like _get and _put.
    N)
namedtuple)datetime	timedelta)partial)WeakValueDictionary)ExceptionInfo)dumpsloadsprepare_accept_content)registry)bytes_to_strensure_bytes)maybe_sanitize_url)current_appgroupmaybe_signaturestates)get_current_task)Context)BackendGetMetaErrorBackendStoreError
ChordErrorImproperlyConfiguredNotRegisteredSecurityErrorTaskRevokedErrorTimeoutError)GroupResult
ResultBase	ResultSetallow_join_resultresult_from_tuple)	BufferMap)LRUCachearity_greater)
get_logger)create_exception_clsensure_serializableget_pickleable_exceptionget_pickled_exceptionraise_with_context) get_exponential_backoff_interval)BaseBackendKeyValueStoreBackendDisabledBackendpicklei    pending_results_t)ZconcreteweakzU
No result backend is configured.
Please see the documentation for more information.
z
Starting chords requires a result backend to be configured.

Note that a group chained with a task is also upgraded to be a chord,
as this pattern requires synchronization.

Result backends that supports chords: Redis, Database, Memcached, and more.
c                 C   s   | |dt  i|S )zReturn an unpickled backend.app)r   Z_get_current_object)clsargskwargs r6   8/tmp/pip-unpacked-wheel-mu1yl971/celery/backends/base.pyunpickle_backendE   s    r8   c                   @   s    e Zd Zdd Ze Z ZZdS )	_nulldictc                 O   s   d S Nr6   )selfakwr6   r6   r7   ignoreK   s    z_nulldict.ignoreN)__name__
__module____qualname__r>   __setitem__update
setdefaultr6   r6   r6   r7   r9   J   s   r9   c                 C   s   | d krdS | j S NF)Zignore_resultrequestr6   r6   r7   _is_request_ignore_resultQ   s    rH   c                   @   s*  e Zd ZejZejZejZeZdZdZ	dZ
dZdddddZdpdd	Zdqd
dZdd ZddejfddZddddejfddZdd ZdddejfddZdddejfddZdrddZdsddZdtddZdd  Zd!d" Zd#d$ Zd%d& Zd'd( Z d)d* Z!d+d, Z"dud-d.Z#dvd/d0Z$d1d2 Z%d3d4 Z&dwd5d6Z'd7d8 Z(dxd9d:Z)d;d< Z*d=d> Z+d?d@ Z,e,Z-dAdB Z.dCdD Z/dEdF Z0dGdH Z1dIdJ Z2dydKdLZ3dMdN Z4dOdP Z5dzdQdRZ6d{dSdTZ7dUdV Z8dWdX Z9dYdZ Z:d[d\ Z;d]d^ Z<d_d` Z=dadb Z>dcdd Z?d|dedfZ@dgdh ZAdidj ZBd}dkdlZCd~dndoZDdS )BackendNFT   r      )max_retriesZinterval_startZinterval_stepZinterval_maxc                 K   s   || _ | j j}	|p|	j| _tj| j \| _| _| _|p:|	j	}
|
dkrJt
 nt|
d| _| ||| _|d krr|	jn|| _| jd kr|	jn| j| _t| j| _|	dd| _|	dd| _|	dd| _|	d	td
| _ti t | _tt| _|| _d S )N)limitZresult_backend_always_retryFZ+result_backend_max_sleep_between_retries_msi'  Z,result_backend_base_sleep_between_retries_ms
   Zresult_backend_max_retriesinf) r2   confZresult_serializer
serializerserializer_registryZ	_encoderscontent_typecontent_encodingencoderZresult_cache_maxr9   r#   _cacheprepare_expiresexpiresZresult_accept_contentacceptZaccept_contentr
   getalways_retrymax_sleep_between_retries_msbase_sleep_between_retries_msfloatrL   r0   r   Z_pending_resultsr"   MESSAGE_BUFFER_MAXZ_pending_messagesurl)r;   r2   rR   Zmax_cached_resultsrZ   rY   Zexpires_typera   r5   rQ   Zcmaxr6   r6   r7   __init__u   s(    


zBackend.__init__c                 C   s2   |r
| j S t| j pd}|dr.|dd S |S )z=Return the backend as an URI, sanitizing the password or not. z:///NrM   )ra   r   endswith)r;   Zinclude_passwordra   r6   r6   r7   as_uri   s    zBackend.as_uric                 K   s   |  ||tjS )zMark a task as started.)store_resultr   ZSTARTEDr;   task_idmetar6   r6   r7   mark_as_started   s    zBackend.mark_as_startedc                 C   s:   |rt |s| j||||d |r6|jr6| ||| dS )z#Mark task as successfully executed.rF   N)rH   rf   chordon_chord_part_return)r;   rh   resultrG   rf   stater6   r6   r7   mark_as_done   s    
zBackend.mark_as_donec              	   C   s   |r| j |||||d |r|jr0| ||| zt|j}W n ttfk
r\   t }Y nX |D ]z}	t|	}
|
	|
j
 |
j
d|
_|
j
d|
_|r|tjkr|
jdk	r| j |
j||||
d d|
j
krb| |
|| qb|r|jr| ||| dS )z#Mark task as executed with failure.	tracebackrG   rh   group_idNrk   )rf   rk   rl   iterchainAttributeError	TypeErrortupler   rC   optionsr[   idr   r   ZPROPAGATE_STATESrh   errbacks_call_task_errbacks)r;   rh   excrq   rG   rf   Zcall_errbacksrn   Z
chain_dataZ
chain_elemZchain_elem_ctxr6   r6   r7   mark_as_failure   sD    
    

zBackend.mark_as_failurec           	   	   C   s   g }|j D ]}| j|}|js(| j|_zDt|jdr`t|jjts`t	|jjdr`|||| n
|
| W q
 tk
r   |
| Y q
X q
|r|j}|jp|}t|| jd}| jjjs|jddr|j|f||d n|j|f||d d S )N
__header__rK   r2   Zis_eagerF)	parent_idroot_id)rz   r2   	signatureZ_apphasattrtype
isinstancer~   r   r$   appendr   ry   r   r   rQ   task_always_eagerdelivery_infor[   applyapply_async)	r;   rG   r|   rq   Zold_signatureZerrbackrh   r   gr6   r6   r7   r{      s>    



    zBackend._call_task_errbacksrc   c                 C   s<   t |}|r | j|||d |d |r8|jr8| ||| d S )Nrp   )r   rf   rk   rl   )r;   rh   reasonrG   rf   rn   r|   r6   r6   r7   mark_as_revoked  s    
 
zBackend.mark_as_revokedc                 C   s   | j |||||dS )zfMark task as being retries.

        Note:
            Stores the current exception (if any).
        rp   )rf   )r;   rh   r|   rq   rG   rf   rn   r6   r6   r7   mark_as_retry  s    
 zBackend.mark_as_retryc              
   C   s   | j }z|j|j j}W n tk
r0   | }Y nX t|jd|jdg t d|}z| 	||d  W n6 t
k
r } z|j|j|d W Y S d }~X Y nX |j|j|dS d S )Nrh   Z
link_error)ry   rz   r   )r|   )r2   Z_taskstaskbackendKeyErrorr   rx   r[   dictr{   	Exceptionfail_from_current_stackry   )r;   callbackr|   r2   r   Zfake_requestZeb_excr6   r6   r7   chord_error_from_stack  s"    

&zBackend.chord_error_from_stackc                 C   s   t  \}}}z6|d kr|n|}t|||f}| |||j	 |W S |d k	rz|j  |jj W n tk
rx   Y nX |j}qF~X d S r:   )
sysexc_infotb_frameclearf_localsRuntimeErrortb_nextr   r}   rq   )r;   rh   r|   type_Zreal_exctbZexception_infor6   r6   r7   r   7  s    
zBackend.fail_from_current_stackc                 C   sL   |dkr| j n|}|tkr"t|S t|}t|d|jt|j| j|j	dS )z$Prepare exception for serialization.NrA   )exc_typeexc_message
exc_module)
rR   EXCEPTION_ABLE_CODECSr(   r   getattrr?   r'   r4   encoder@   )r;   r|   rR   exctyper6   r6   r7   prepare_exceptionJ  s    zBackend.prepare_exceptionc           
   
   C   s  |sdS t |tr(| jtkr$t|}|S t |tsrzt|}W n2 tk
rp } ztd| |W 5 d}~X Y nX |d}z|d }W n, tk
r } zt	d|W 5 d}~X Y nX |dkrt
|t}nRz(tj| }|dD ]}t||}qW n( ttfk
r   t
|tjj}Y nX |dd}t |tr@t|tsp|dkrN|n| d| }td	| d
| z&t |ttfr|| }n||}W n8 tk
r }	 zt| d| d}W 5 d}	~	X Y nX |S )z1Convert serialized exception to Python exception.NzbIf the stored exception isn't an instance of BaseException, it must be a dictionary.
Instead got: r   r   z4Exception information must includethe exception type.r   rc   z!Expected an exception class, got z with payload ())r   BaseExceptionrR   r   r)   r   rv   r[   r   
ValueErrorr&   r?   r   modulessplitr   ru   celery
exceptionsr   
issubclassr   rw   listr   )
r;   r|   er   r   r3   nameexc_msgZfake_exc_typeerrr6   r6   r7   exception_to_pythonT  s\    



 


&zBackend.exception_to_pythonc                 C   s    | j dkrt|tr| S |S )zPrepare value for storage.r/   )rR   r   r   as_tupler;   rm   r6   r6   r7   prepare_value  s    zBackend.prepare_valuec                 C   s   |  |\}}}|S r:   )_encode)r;   data_payloadr6   r6   r7   r     s    zBackend.encodec                 C   s   t || jdS )N)rR   )r   rR   )r;   r   r6   r6   r7   r     s    zBackend._encodec                 C   s$   |d | j kr | |d |d< |S )Nstatusrm   )EXCEPTION_STATESr   )r;   ri   r6   r6   r7   meta_from_decoded  s    zBackend.meta_from_decodedc                 C   s   |  | |S r:   )r   decoder;   r   r6   r6   r7   decode_result  s    zBackend.decode_resultc                 C   s.   |d kr|S |pt |}t|| j| j| jdS )N)rT   rU   rZ   )strr	   rT   rU   rZ   r   r6   r6   r7   r     s    zBackend.decodec                 C   s<   |d kr| j jj}t|tr$| }|d k	r8|r8||S |S r:   )r2   rQ   Zresult_expiresr   r   total_seconds)r;   valuer   r6   r6   r7   rX     s    

zBackend.prepare_expiresc                 C   s(   |d k	r|S | j jj}|d kr$| jS |S r:   )r2   rQ   Zresult_persistent
persistent)r;   Zenabledr   r6   r6   r7   prepare_persistent  s    
zBackend.prepare_persistentc                 C   s(   || j krt|tr| |S | |S r:   )r   r   r   r   r   )r;   rm   rn   r6   r6   r7   encode_result  s    
zBackend.encode_resultc                 C   s
   || j kS r:   )rW   r;   rh   r6   r6   r7   	is_cached  s    zBackend.is_cachedc                 C   s   || j kr t }|r$| }nd }|||| ||d}|rTt|dd rT|j|d< |rnt|dd rn|j|d< | jj	
ddr|rt|dd t|dd t|d	d t|d
d t|dd t|dr|jr|jdnd d}	|rdd	h}
|
D ]"}|	| }| |}t||	|< q||	 |S )N)r   rm   rq   children	date_doner   rr   r   extendedrm   r   r4   r5   hostnameretriesr   Zrouting_key)r   r4   r5   Zworkerr   queue)READY_STATESr   utcnow	isoformatcurrent_task_childrenr   r   r   r2   rQ   Zfind_value_for_keyr   r   r[   r   r   rC   )r;   rm   rn   rq   rG   Zformat_dater   r   ri   Zrequest_metaZencode_needed_fieldsfieldr   Zencoded_valuer6   r6   r7   _get_result_meta  sH    










zBackend._get_result_metac                 C   s   t | d S r:   )timesleep)r;   amountr6   r6   r7   _sleep   s    zBackend._sleepc           
   
   K   s   |  ||}d}z"| j||||fd|i| |W S  tk
r } z^| jr| |r|| jk r|d7 }t| j|| jdd }	| 	|	 qt
td||d n W 5 d}~X Y qX qdS )	zUpdate task state and result.

        if always_retry_backend_operation is activated, in the event of a recoverable exception,
        then retry operation with an exponential backoff until a limit has been reached.
        r   rG   rK   T  z%failed to store result on the backend)rh   rn   N)r   _store_resultr   r\   exception_safe_to_retryrL   r+   r^   r]   r   r*   r   )
r;   rh   rm   rn   rq   rG   r5   r   r|   sleep_amountr6   r6   r7   rf     s4    
  zBackend.store_resultc                 C   s   | j |d  | | d S r:   )rW   pop_forgetr   r6   r6   r7   forget%  s    zBackend.forgetc                 C   s   t dd S )Nz"backend does not implement forget.NotImplementedErrorr   r6   r6   r7   r   )  s    zBackend._forgetc                 C   s   |  |d S )zGet the state of a task.r   )get_task_metar   r6   r6   r7   	get_state,  s    zBackend.get_statec                 C   s   |  |dS )z$Get the traceback for a failed task.rq   r   r[   r   r6   r6   r7   get_traceback2  s    zBackend.get_tracebackc                 C   s   |  |dS )zGet the result of a task.rm   r   r   r6   r6   r7   
get_result6  s    zBackend.get_resultc                 C   s*   z|  |d W S  tk
r$   Y nX dS )z(Get the list of subtasks sent by a task.r   N)r   r   r   r6   r6   r7   get_children:  s    zBackend.get_childrenc                 C   s   | j jjrtdt d S )Nz9Shouldn't retrieve result with task_always_eager enabled.)r2   rQ   r   warningswarnRuntimeWarningr;   r6   r6   r7   _ensure_not_eagerA  s
    
zBackend._ensure_not_eagerc                 C   s   dS )a  Check if an exception is safe to retry.

        Backends have to overload this method with correct predicates dealing with their exceptions.

        By default no exception is safe to retry, it's up to backend implementation
        to define which exceptions are safe.
        Fr6   )r;   r|   r6   r6   r7   r   H  s    zBackend.exception_safe_to_retryc              
   C   s   |    |r.z| j| W S  tk
r,   Y nX d}z| |}W qW q2 tk
r } z\| jr| |r|| jk r|d7 }t| j	|| j
dd }| | qttd|d n W 5 d}~X Y q2X q2|r|dtjkr|| j|< |S )	zGet task meta from backend.

        if always_retry_backend_operation is activated, in the event of a recoverable exception,
        then retry operation with an exponential backoff until a limit has been reached.
        r   rK   Tr   zfailed to get meta)rh   Nr   )r   rW   r   _get_task_meta_forr   r\   r   rL   r+   r^   r]   r   r*   r   r[   r   SUCCESS)r;   rh   cacher   ri   r|   r   r6   r6   r7   r   R  s<    

  

zBackend.get_task_metac                 C   s   | j |dd| j|< dS )z;Reload task result, even if it has been previously fetched.Fr   N)r   rW   r   r6   r6   r7   reload_task_resulty  s    zBackend.reload_task_resultc                 C   s   | j |dd| j|< dS )z<Reload group result, even if it has been previously fetched.Fr   N)get_group_metarW   r;   rr   r6   r6   r7   reload_group_result}  s    zBackend.reload_group_resultc                 C   sR   |    |r.z| j| W S  tk
r,   Y nX | |}|rN|d k	rN|| j|< |S r:   )r   rW   r   _restore_groupr;   rr   r   ri   r6   r6   r7   r     s    

zBackend.get_group_metac                 C   s   | j ||d}|r|d S dS )zGet the result for a group.r   rm   N)r   r   r6   r6   r7   restore_group  s    zBackend.restore_groupc                 C   s   |  ||S )z&Store the result of an executed group.)_save_groupr;   rr   rm   r6   r6   r7   
save_group  s    zBackend.save_groupc                 C   s   | j |d  | |S r:   )rW   r   _delete_groupr   r6   r6   r7   delete_group  s    zBackend.delete_groupc                 C   s   dS )zBackend cleanup.Nr6   r   r6   r6   r7   cleanup  s    zBackend.cleanupc                 C   s   dS )z:Cleanup actions to do at the end of a task worker process.Nr6   r   r6   r6   r7   process_cleanup  s    zBackend.process_cleanupc                 C   s   i S r:   r6   )r;   Zproducerrh   r6   r6   r7   on_task_call  s    zBackend.on_task_callc                 C   s   t dd S )Nz%Backend does not support add_to_chordr   )r;   Zchord_idrm   r6   r6   r7   add_to_chord  s    zBackend.add_to_chordc                 K   s   d S r:   r6   )r;   rG   rn   rm   r5   r6   r6   r7   rl     s    zBackend.on_chord_part_returnc                 C   s   d S r:   r6   )r;   rr   
chord_sizer6   r6   r7   set_chord_size  s    zBackend.set_chord_sizec                 K   s   dd |D |d< zt |dd }W n tk
r:   d }Y nX |jdt |dd }|d krt| jjj||jd j}|jdt |dd}| jj	d j
|j|f||||d	 d S )
Nc                 S   s   g | ]}|  qS r6   r   .0rr6   r6   r7   
<listcomp>  s     z1Backend.fallback_chord_unlock.<locals>.<listcomp>rm   r   r   priorityr   zcelery.chord_unlock)	countdownr   r  )r   r   rx   r[   r2   ZamqpZrouterZrouter   Ztasksr   ry   )r;   header_resultbodyr  r5   Z	body_typer   r  r6   r6   r7   fallback_chord_unlock  s     
 zBackend.fallback_chord_unlockc                 C   s   d S r:   r6   r   r6   r6   r7   ensure_chords_allowed  s    zBackend.ensure_chords_allowedc                 K   s(   |    | jj| }| j||f| d S r:   )r  r2   r   r  r;   Zheader_result_argsr  r5   r  r6   r6   r7   apply_chord  s    zBackend.apply_chordc                 C   s0   |pt t dd }|r,dd t |dg D S d S )NrG   c                 S   s   g | ]}|  qS r6   r  r  r6   r6   r7   r    s     z1Backend.current_task_children.<locals>.<listcomp>r   )r   r   )r;   rG   r6   r6   r7   r     s    zBackend.current_task_childrenr6   c                 C   s   |si n|}t | j||ffS r:   )r8   	__class__r;   r4   r5   r6   r6   r7   
__reduce__  s    zBackend.__reduce__)NNNNNN)F)N)N)N)N)N)TF)NN)T)T)T)rK   )N)r6   N)Er?   r@   rA   r   r   ZUNREADY_STATESr   r   Zsubpolling_intervalsupports_native_joinZsupports_autoexpirer   Zretry_policyrb   re   rj   r   ro   FAILUREr}   r{   ZREVOKEDr   RETRYr   r   r   r   r   r   r   r   r   r   r   rX   r   r   r   r   r   rf   r   r   r   
get_statusr   r   r   r   r   r   r   r   r   r   r   r   r  r  r  r  rl   r  r  r  r  r   r  r6   r6   r6   r7   rI   W   s             

	  
	  
8.  
	  





G	
	
  
0   
"

'



rI   c                   @   sH   e Zd ZdddZdddZddd	ZdddZdd Zedd Z	dS )SyncBackendMixinN      ?Tc           	      c   sl   |    |j}|sd S t }|D ]*}t|tr>|j|jfV  q ||j q | j||||||dE d H  d S )N)timeoutintervalno_ack
on_messageon_interval)r   resultssetr   r   ry   addget_many)	r;   rm   r  r  r  r   r!  r"  task_idsr6   r6   r7   iter_native  s"    
   zSyncBackendMixin.iter_nativec	           
      C   sN   |    |d k	rtd| j|j||||d}	|	rJ||	 |j||dS d S )Nz,Backend does not support on_message callback)r  r  r!  r  )	propagater   )r   r   wait_forry   Z_maybe_set_cacheZmaybe_throw)
r;   rm   r  r  r  r   r!  r   r(  ri   r6   r6   r7   wait_for_pending  s     
z!SyncBackendMixin.wait_for_pendingc                 C   s^   |    d}| |}|d tjkr(|S |r2|  t| ||7 }|r||krtdqdS )aL  Wait for task and return its result.

        If the task raises an exception, this exception
        will be re-raised by :func:`wait_for`.

        Raises:
            celery.exceptions.TimeoutError:
                If `timeout` is not :const:`None`, and the operation
                takes longer than `timeout` seconds.
        g        r   zThe operation timed out.N)r   r   r   r   r   r   r   )r;   rh   r  r  r  r!  Ztime_elapsedri   r6   r6   r7   r)    s    

zSyncBackendMixin.wait_forFc                 C   s   |S r:   r6   )r;   rm   r1   r6   r6   r7   add_pending_result  s    z#SyncBackendMixin.add_pending_resultc                 C   s   |S r:   r6   r   r6   r6   r7   remove_pending_result  s    z&SyncBackendMixin.remove_pending_resultc                 C   s   dS rE   r6   r   r6   r6   r7   is_async   s    zSyncBackendMixin.is_async)Nr  TNN)Nr  TNNNT)Nr  TN)F)
r?   r@   rA   r'  r*  r)  r+  r,  propertyr-  r6   r6   r6   r7   r    s(       
         
       

r  c                   @   s   e Zd ZdZdS )r,   z"Base (synchronous) result backend.Nr?   r@   rA   __doc__r6   r6   r6   r7   r,   %  s   r,   c                       s  e Zd ZeZdZdZdZdZ fddZ	dd Z
d	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zd9ddZd:ddZd;ddZdd Zejfd d!Zejfd"d#Zd$d%d&d$d$d$ejfd'd(Zd)d* Zd<d+d,Zd-d. Zd/d0 Zd1d2 Zd3d4 Z d5d6 Z!d7d8 Z"  Z#S )=BaseKeyValueStoreBackendzcelery-task-meta-zcelery-taskset-meta-zchord-unlock-Fc                    s>   t | jdr| jj| _|   t j|| | jr:| j| _d S )N__func__)	r   key_tr2  _encode_prefixessuperrb   implements_incr_apply_chord_incrr  r  r  r6   r7   rb   3  s    
z!BaseKeyValueStoreBackend.__init__c                 C   s.   |  | j| _|  | j| _|  | j| _d S r:   )r3  task_keyprefixgroup_keyprefixchord_keyprefixr   r6   r6   r7   r4  ;  s    z)BaseKeyValueStoreBackend._encode_prefixesc                 C   s   t dd S )NzMust implement the get method.r   r;   keyr6   r6   r7   r[   @  s    zBaseKeyValueStoreBackend.getc                 C   s   t dd S )NzDoes not support get_manyr   )r;   keysr6   r6   r7   mgetC  s    zBaseKeyValueStoreBackend.mgetc                 C   s   |  ||S r:   )r#  )r;   r=  r   rn   r6   r6   r7   _set_with_stateF  s    z(BaseKeyValueStoreBackend._set_with_statec                 C   s   t dd S )NzMust implement the set method.r   r;   r=  r   r6   r6   r7   r#  I  s    zBaseKeyValueStoreBackend.setc                 C   s   t dd S )Nz Must implement the delete methodr   r<  r6   r6   r7   deleteL  s    zBaseKeyValueStoreBackend.deletec                 C   s   t dd S )NzDoes not implement incrr   r<  r6   r6   r7   incrO  s    zBaseKeyValueStoreBackend.incrc                 C   s   d S r:   r6   rA  r6   r6   r7   expireR  s    zBaseKeyValueStoreBackend.expirerc   c                 C   s$   | j }|d| j||||gS )z#Get the cache key for a task by id.rc   )r3  joinr9  )r;   rh   r=  r3  r6   r6   r7   get_key_for_taskU  s      z)BaseKeyValueStoreBackend.get_key_for_taskc                 C   s$   | j }|d| j||||gS )z$Get the cache key for a group by id.rc   )r3  rE  r:  r;   rr   r=  r3  r6   r6   r7   get_key_for_group\  s      z*BaseKeyValueStoreBackend.get_key_for_groupc                 C   s$   | j }|d| j||||gS )z?Get the cache key for the chord waiting on group with given id.rc   )r3  rE  r;  rG  r6   r6   r7   get_key_for_chordc  s      z*BaseKeyValueStoreBackend.get_key_for_chordc                 C   sF   |  |}| j| jfD ]&}||rt|t|d   S qt|S )zTake bytes: emit string.N)r3  r9  r:  
startswithr   len)r;   r=  prefixr6   r6   r7   _strip_prefixj  s
    

z&BaseKeyValueStoreBackend._strip_prefixc                 c   s:   |D ]0\}}|d k	r|  |}|d |kr||fV  qd S )Nr   )r   )r;   valuesr   kr   r6   r6   r7   _filter_readyr  s
    
z&BaseKeyValueStoreBackend._filter_readyc                    sJ   t |dr(fdd| |D S  fddt||D S d S )Nitemsc                    s   i | ]\}}  ||qS r6   )rM  )r	  rO  vr   r6   r7   
<dictcomp>|  s    z=BaseKeyValueStoreBackend._mget_to_results.<locals>.<dictcomp>c                    s   i | ]\}}t  | |qS r6   r   )r	  irR  )r>  r6   r7   rS    s   
 )r   rP  rQ  	enumerate)r;   rN  r>  r   r6   )r>  r;   r7   _mget_to_resultsy  s    


z)BaseKeyValueStoreBackend._mget_to_resultsNr  Tc	              	   #   sf  |d krdn|}t |tr|nt|}	t }
 j}|	D ]J}z|| }W n tk
rZ   Y q6X |d |kr6t||fV  |
| q6|	|
 d}|	rbt|	}  	 fdd|D ||}|
| |	dd |D  | D ]&\}}|d k	r|| t||fV  q|r2|| |kr2td| d	|r>|  t| |d
7 }|r||krqbqd S )Nr  r   r   c                    s   g | ]}  |qS r6   )rF  )r	  rO  r   r6   r7   r    s   z5BaseKeyValueStoreBackend.get_many.<locals>.<listcomp>c                 S   s   h | ]}t |qS r6   rT  )r	  rR  r6   r6   r7   	<setcomp>  s     z4BaseKeyValueStoreBackend.get_many.<locals>.<setcomp>zOperation timed out (r   rK   )r   r#  rW   r   r   r$  difference_updater   rW  r?  rC   rQ  r   r   r   )r;   r&  r  r  r  r   r!  Zmax_iterationsr   idsZ
cached_idsr   rh   cachedZ
iterationsr>  r
  r=  r   r6   r   r7   r%    sH    
 

z!BaseKeyValueStoreBackend.get_manyc                 C   s   |  | | d S r:   )rB  rF  r   r6   r6   r7   r     s    z BaseKeyValueStoreBackend._forgetc           
   
   K   s   | j ||||d}t||d< | |}|d tjkr:|S z| | || || W n6 tk
r }	 ztt	|	||d|	W 5 d }	~	X Y nX |S )N)rm   rn   rq   rG   rh   r   )rn   rh   )
r   r   r   r   r   r@  rF  r   r   r   )
r;   rh   rm   rn   rq   rG   r5   ri   Zcurrent_metaexr6   r6   r7   r     s     
&z&BaseKeyValueStoreBackend._store_resultc                 C   s(   |  | || d| itj |S )Nrm   )r@  rH  r   r   r   r   r   r6   r6   r7   r     s
     z$BaseKeyValueStoreBackend._save_groupc                 C   s   |  | | d S r:   )rB  rH  r   r6   r6   r7   r     s    z&BaseKeyValueStoreBackend._delete_groupc                 C   s*   |  | |}|s tjddS | |S )$Get task meta-data for a task by id.N)r   rm   )r[   rF  r   ZPENDINGr   rg   r6   r6   r7   r     s    z+BaseKeyValueStoreBackend._get_task_meta_forc                 C   s>   |  | |}|r:| |}|d }t|| j|d< |S dS )r]  rm   N)r[   rH  r   r!   r2   )r;   rr   ri   rm   r6   r6   r7   r     s    
z'BaseKeyValueStoreBackend._restore_groupc                 K   s$   |    | jj| }|j| d d S )Nr   )r  r2   r   saver  r6   r6   r7   r7    s    z*BaseKeyValueStoreBackend._apply_chord_incrc                 K   s  | j s
d S | j}|j}|sd S | |}ztj|| d}W nX tk
r }	 z:t|j|d}
t	
d||	 | |
td|	 W Y S d }	~	X Y nX |d krzt|W n\ tk
r }	 z<t|j|d}
t	
d||	 | |
td| d W Y S d }	~	X Y nX | |}|jd}|d kr0t|}||krJt	d	| nX||krt|j|d}
|jrp|jn|j}zz&t  ||jjd
d}W 5 Q R X W n~ tk
r }	 z^zt| }d||	}W n tk
r   t|	}Y nX t	
d|| | |
t| W 5 d }	~	X Y nZX z|
| W nH tk
rv }	 z(t	
d||	 | |
td|	 W 5 d }	~	X Y nX W 5 |  | j| X n|  || j! d S )Nr^  r   zChord %r raised: %rzCannot restore group: zChord callback %r raised: %rzGroupResult z no longer existsr  z/Chord counter incremented too many times for %rT)r  r(  zDependency {0.id} raised {1!r}zCallback error: )"r6  r2   r   rI  r   restorer   r   rk   logger	exceptionr   r   r   rC  r[   rK  warningr  Zjoin_nativerE  rB  clientr    rQ   Zresult_chord_join_timeoutnextZ_failed_join_reportformatStopIterationreprdelayrD  rY   )r;   rG   rn   rm   r5   r2   gidr=  depsr|   r   valsizejretZculpritr   r6   r6   r7   rl     s    





 "z-BaseKeyValueStoreBackend.on_chord_part_return)rc   )rc   )rc   )NN)$r?   r@   rA   r   r3  r9  r:  r;  r6  rb   r4  r[   r?  r@  r#  rB  rC  rD  rF  rH  rI  rM  r   r   rP  rW  r%  r   r   r   r   r   r   r7  rl   __classcell__r6   r6   r8  r7   r1  ,  sH   


  
&   
r1  c                   @   s   e Zd ZdZdS )r-   z/Result backend base class for key/value stores.Nr/  r6   r6   r6   r7   r-   /  s   r-   c                   @   sP   e Zd ZdZi Zdd Zdd Zdd Zdd	 Ze Z	 Z
 ZZe Z ZZd
S )r.   zDummy result backend.c                 O   s   d S r:   r6   r  r6   r6   r7   rf   8  s    zDisabledBackend.store_resultc                 C   s   t t d S r:   )r   E_CHORD_NO_BACKENDstripr   r6   r6   r7   r  ;  s    z%DisabledBackend.ensure_chords_allowedc                 O   s   t t d S r:   )r   E_NO_BACKENDrr  r  r6   r6   r7   _is_disabled>  s    zDisabledBackend._is_disabledc                 O   s   dS )Nzdisabled://r6   r  r6   r6   r7   re   A  s    zDisabledBackend.as_uriN)r?   r@   rA   r0  rW   rf   r  rt  re   r   r  r   r   Zget_task_meta_forr)  r%  r6   r6   r6   r7   r.   3  s   r.   )Tr0  r   r   r   collectionsr   r   r   	functoolsr   weakrefr   Zbilliard.einfor   Zkombu.serializationr   r	   r
   r   rS   Zkombu.utils.encodingr   r   Zkombu.utils.urlr   Zcelery.exceptionsr   r   r   r   r   Zcelery._stater   Zcelery.app.taskr   r   r   r   r   r   r   r   r   Zcelery.resultr   r   r   r    r!   Zcelery.utils.collectionsr"   Zcelery.utils.functionalr#   r$   Zcelery.utils.logr%   Zcelery.utils.serializationr&   r'   r(   r)   r*   Zcelery.utils.timer+   __all__	frozensetr   r?   ra  r`   r0   rs  rq  r8   r   r9   rH   rI   r  r,   ZBaseDictBackendr1  r-   r.   r6   r6   r6   r7   <module>   s^   (


     N  