U
    df)                     @   s
  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	m
Z
mZ d dlmZ d dlmZmZ d dlmZmZ d d	lmZ d d
lmZ ddlmZ ddlm	Z ddlmZ efZzd d	lmZ eef7 ZW n ek
r   Y nX ee Z!G dd deZ"dd Z#dS )    N)maybe_signature)BaseDictBackend)
ChordError)GroupResultallow_join_resultresult_from_tuple)
get_logger)	b64decode	b64encode)
connectiontransaction)InterfaceError)DecodeError   )ChordCounter)r   )
TaskResultc                   @   s   e Zd ZdZeZeZdZdd Z	dd Z
ddd	Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZdS )DatabaseBackendz>The Django database backend, using models to store task state.g      ?c                 C   s&   t D ]}t||rt   dS qdS )a  Check if an exception is safe to retry.

        Backends have to overload this method with correct predicates
        dealing with their exceptions.

        By default no exception is safe to retry, it's up to
        backend implementation to define which exceptions are safe.

        For Celery / django-celery-results, retry Django / Psycopg2
        InterfaceErrors, like "Connection already closed", with new connection.

        Set result_backend_always_retry to True in order to enable retries.
        TF)EXCEPTIONS_TO_CATCH
isinstancer   close)selfexcexc_type r   K/tmp/pip-unpacked-wheel-oj8imec1/django_celery_results/backends/database.pyexception_safe_to_retry$   s
    
z'DatabaseBackend.exception_safe_to_retryc           	      C   s   d d d d d d d}|r| j jddrt|dd d k	r>|j}nt|dd }t|dd d k	rb|j}nt|dd }|d k	r| |\}}}|d k	r| |\}}}t|di pi }|d	d }||||t|d
d |t|dd d |S )N)periodic_task_name	task_argstask_kwargsZ	task_name	tracebackZworkerextendedresultargsreprargs
kwargsreprkwargs
propertiesr   Ztaskhostname)	appconfZfind_value_for_keygetattrr"   r$   encode_contentgetupdate)	r   requestr   Zextended_propsr   r   _r&   r   r   r   r   _get_extended_properties9   s:    

	z(DatabaseBackend._get_extended_propertiesNc              	   C   sf   |  |\}}}|  d| |i\}	}	}
|||
|||||d}|| || | jjjf | |S )z2Store return value and status of an executed task.children)content_encodingcontent_typemetar!   statustask_idr   using)r+   Zcurrent_task_childrenr-   r0   	TaskModel_default_managerZstore_result)r   r6   r!   r5   r   r.   r7   r3   r2   r/   r4   Z
task_propsr   r   r   _store_resultf   s$    


zDatabaseBackend._store_resultc              	   C   s   | j j|}| }| ||ddp,i }| ||d}|d}|d}z| ||}| ||}W n ttj	fk
r   Y nX |j
||||||d | |S )z#Get task metadata for a task by id.r4   Nr!   r   r   )r!   r   r   r#   r%   )r8   r9   Zget_taskas_dictdecode_contentpopr,   r   binasciiErrorr-   Zmeta_from_decoded)r   r6   objresr4   r!   r   r   r   r   r   _get_task_meta_for   s(    

z"DatabaseBackend._get_task_meta_forc                 C   s*   |  |\}}}|dkr t|}|||fS Nbinary)_encoder
   )r   datar3   r2   contentr   r   r   r+      s    zDatabaseBackend.encode_contentc                 C   s$   |r |j dkrt|}| |S d S rC   )r2   r	   decode)r   r@   rG   r   r   r   r<      s    
zDatabaseBackend.decode_contentc                 C   s6   z| j jj|d  W n | j jk
r0   Y nX d S )N)r6   )r8   r9   r,   deleteDoesNotExist)r   r6   r   r   r   _forget   s    zDatabaseBackend._forgetc                 C   s$   | j j| j | jj| j dS )zDelete expired metadata.N)r8   r9   Zdelete_expiredexpires
GroupModel)r   r   r   r   cleanup   s    zDatabaseBackend.cleanupc                 C   sP   | j j|}|rL| }| ||d }d|d< |rHt|| jd|d< |S dS )z&return result value for a group by id.r!   Nr(   )rM   r9   	get_groupr;   r<   r   r(   )r   group_idgroup_resultrA   Zdecoded_resultr   r   r   _restore_group   s    zDatabaseBackend._restore_groupc                 C   s,   |  | \}}}| jj|||| |S )zStore return value of group)r+   as_tuplerM   r9   Zstore_group_result)r   rQ   rR   r3   r2   r!   r   r   r   _save_group   s    
   zDatabaseBackend._save_groupc                 C   s4   z| j j|  W n | jjk
r.   Y nX d S )N)rM   r9   rP   rI   r8   rJ   )r   rQ   r   r   r   _delete_group   s    zDatabaseBackend._delete_groupc                 K   s`   t |ts| jj| }n|}dd |D }|ddp<t|}t|}tjj	|j
||d dS )z6Add a ChordCounter with the expected number of resultsc                 S   s   g | ]}|  qS r   )rT   ).0rr   r   r   
<listcomp>   s     z/DatabaseBackend.apply_chord.<locals>.<listcomp>
chord_sizeN)rQ   Z	sub_taskscount)r   r   r(   r,   lenjsondumpsr   objectscreateid)r   Zheader_result_argsbodyr%   Zheader_resultresultsrZ   rF   r   r   r   apply_chord   s    

  zDatabaseBackend.apply_chordc              	   K   s   |j |j }}|r|sdS d}t l tj j|d }|dkr`t	
d| W 5 Q R  dS | jd8  _|jdkr|  nd}|  W 5 Q R X |r|j| jd}	|	 rt|j| jd}
t| j|
|	d	 dS )
z/Called on finishing each part of a Chord headerNF)rQ   z$Can't find ChordCounter for Group %s   r   TrO   )r(   callbackrR   )ra   groupr   Zatomicr   r_   Zselect_for_updatefilterfirstloggerwarningr[   saverI   rR   r(   readyr   Zchordtrigger_callback)r   r.   stater!   r%   tidgidZcall_callbackZchord_counterdepsrf   r   r   r   on_chord_part_return   s4    



z$DatabaseBackend.on_chord_part_return)NNN)__name__
__module____qualname____doc__r   r8   GroupResultModelrM   Zsubpolling_intervalr   r0   r:   rB   r+   r<   rK   rN   rS   rU   rV   rd   rs   r   r   r   r   r      s(   2   
!
r   c                 C   s"  |j r|j}n|j}z&t  || jjdd}W 5 Q R X W n tk
r } zfz"t| }d|j	 d|}W n t
k
r   t|}Y nX td|j	| | j|t| W 5 d}~X Y n`X z|| W nN tk
r } z.td|j	| | jj|td|d W 5 d}~X Y nX dS )	zAdd the callback to the queue or mark the callback as failed
    Implementation borrowed from `celery.app.builtins.unlock_chord`
    T)timeout	propagatezDependency z raised zChord %r raised: %rNzCallback error: )r   )Zsupports_native_joinZjoin_nativejoinr   r)   Zresult_chord_join_timeout	ExceptionnextZ_failed_join_reportra   StopIterationreprrj   	exceptionbackendZchord_error_from_stackr   delay)r(   rf   rR   jretr   Zculpritreasonr   r   r   rn   	  s,    $ rn   )$r>   r]   Zceleryr   Zcelery.backends.baser   Zcelery.exceptionsr   Zcelery.resultr   r   r   Zcelery.utils.logr   Zcelery.utils.serializationr	   r
   Z	django.dbr   r   Zdjango.db.utilsr   Zkombu.exceptionsr   modelsr   rx   r   r   Zpsycopg2ZPsycopg2InterfaceErrorImportErrorrt   rj   r   rn   r   r   r   r   <module>   s.    m