U
    d[                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ d	d
lmZmZmZ d	dlmZ z ddlmZmZ ddlmZ W n ek
r   edY nX eeZdZedd Zdd ZG dd de	Z dS )z SQLAlchemy result store backend.    N)contextmanagerwraps)states)BaseBackend)ImproperlyConfigured)maybe_timedelta   )TaskTaskExtendedTaskSet)SessionManager)DatabaseErrorInvalidRequestError)StaleDataErrorzhThe database result backend requires SQLAlchemy to be installed.See https://pypi.org/project/SQLAlchemy/)DatabaseBackendc                 c   s>   z.z
d V  W n tk
r*   |    Y nX W 5 |    X d S )N)close	Exceptionrollback)session r   E/tmp/pip-unpacked-wheel-mu1yl971/celery/backends/database/__init__.pysession_cleanup   s    
r   c                    s   t   fdd}|S )Nc                     st   | dd}t|D ]Z}z | |W   S  tttfk
rl   tjd j|| d dd |d |krh Y qX qd S )Nmax_retries   z-Failed operation %s.  Retrying %s more times.r	   T)exc_info)popranger   r   r   loggerwarning__name__)argskwargsr   retriesfunr   r   _inner)   s     
zretry.<locals>._innerr   )r%   r&   r   r$   r   retry'   s    r'   c                       s   e Zd ZdZdZeZeZd fdd	Z	e
dd Ze fdd	Zedd
dZdddZedd Zedd Zedd Zedd Zedd Zdd Zd  fdd	Z  ZS )!r   zThe database result backend.g      ?Nc                    s   t  jf t|d| | jj}| jr,t| _|p8|p8|j| _	t
|pDi f|jpNi | _|d|j| _|jpli }|jpvi }| jj|d|dd | jj|d|dd | j	stdd S )N)Zexpires_typeurlshort_lived_sessionstask)ZschemanamegroupzTMissing connection string! Do you have the database_url setting set to a real value?)super__init__r   appconfextended_resultr   task_clsZdatabase_urlr(   dictZdatabase_engine_optionsengine_optionsgetZdatabase_short_lived_sessionsr)   Zdatabase_table_schemasZdatabase_table_names	configuretaskset_clsr   )selfdburir4   r(   r"   r0   ZschemasZ
tablenames	__class__r   r   r.   E   s@    


zDatabaseBackend.__init__c                 C   s   | j jddS )Nextendedresult)r/   r0   Zfind_value_for_key)r8   r   r   r   r1   e   s    zDatabaseBackend.extended_resultc                 C   s   |j f | j| jd| jS )N)r9   r)   )Zsession_factoryr(   r)   r4   )r8   Zsession_managerr   r   r   ResultSessioni   s    zDatabaseBackend.ResultSessionc           	   	   K   s   |   }t|r t|| j| jj|k}|o:|d }|sb| |}||_|| |  | j	|||||d |
  W 5 Q R X dS )z1Store return value and state of an executed task.r   )	tracebackrequestN)r>   r   listqueryr2   filtertask_idaddflush_update_resultcommit)	r8   rD   r=   stater?   r@   r"   r   r*   r   r   r   _store_resulto   s    


zDatabaseBackend._store_resultc           
      C   sN   | j ||||ddd}dd | jjjD }|D ]}||}	t|||	 q.d S )NFT)r=   rI   r?   r@   Zformat_dateencodec                 S   s   g | ]}|j d kr|j qS )>   rD   id)r+   ).0columnr   r   r   
<listcomp>   s    
z2DatabaseBackend._update_result.<locals>.<listcomp>)Z_get_result_metar2   Z	__table__columnsr5   setattr)
r8   r*   r=   rI   r?   r@   metarP   rN   valuer   r   r   rG      s      
zDatabaseBackend._update_resultc              
   C   s   |   }t| t|| j| jj|k}|o:|d }|sX| |}tj|_	d|_
| }|dddk	r| |d |d< |dddk	r| |d |d< | |W  5 Q R  S Q R X dS )z$Get task meta-data for a task by id.r   Nr!   r"   )r>   r   rA   rB   r2   rC   rD   r   ZPENDINGstatusr=   to_dictr5   decodeZmeta_from_decoded)r8   rD   r   r*   datar   r   r   _get_task_meta_for   s    

z"DatabaseBackend._get_task_meta_forc              
   C   sR   |   }t|8 | ||}|| |  |  |W  5 Q R  S Q R X dS )z&Store the result of an executed group.N)r>   r   r7   rE   rF   rH   )r8   group_idr=   r   r,   r   r   r   _save_group   s    

zDatabaseBackend._save_groupc              
   C   sV   |   }t|< || j| jj|k }|rH| W  5 Q R  S W 5 Q R X dS )zGet meta-data for group by id.N)r>   r   rB   r7   rC   
taskset_idfirstrU   )r8   rY   r   r,   r   r   r   _restore_group   s    

zDatabaseBackend._restore_groupc              	   C   sN   |   }t|4 || j| jj|k  |  |  W 5 Q R X dS )z!Delete meta-data for group by id.N)	r>   r   rB   r7   rC   r[   deleterF   rH   )r8   rY   r   r   r   r   _delete_group   s    

zDatabaseBackend._delete_groupc              	   C   sF   |   }t|, || j| jj|k  |  W 5 Q R X dS )zForget about result.N)r>   r   rB   r2   rC   rD   r^   rH   )r8   rD   r   r   r   r   _forget   s    
zDatabaseBackend._forgetc              	   C   s|   |   }| j}| j }t|R || j| jj|| k 	  || j
| j
j|| k 	  |  W 5 Q R X dS )zDelete expired meta-data.N)r>   expiresr/   nowr   rB   r2   rC   Z	date_doner^   r7   rH   )r8   r   ra   rb   r   r   r   cleanup   s    

zDatabaseBackend.cleanupr   c                    s2   |si n|}| | j| j| jd t ||S )N)r9   ra   r4   )updater(   ra   r4   r-   
__reduce__)r8   r!   r"   r:   r   r   re      s    zDatabaseBackend.__reduce__)NNN)NN)NN)r   N)r    
__module____qualname____doc__Zsubpolling_intervalr
   r2   r   r7   r.   propertyr1   r   r>   r'   rJ   rG   rX   rZ   r]   r_   r`   rc   re   __classcell__r   r   r:   r   r   ;   s6    
    




	
	
r   )!rh   logging
contextlibr   Z
vine.utilsr   Zceleryr   Zcelery.backends.baser   Zcelery.exceptionsr   Zcelery.utils.timer   modelsr
   r   r   r   r   Zsqlalchemy.excr   r   Zsqlalchemy.orm.excr   ImportError	getLoggerr    r   __all__r   r'   r   r   r   r   r   <module>   s,   



