U
    d)                     @   s  d Z ddlmZmZ ddlmZ ddlmZ ddlmZm	Z	 ddl
mZ ddlmZ dd	lmZ zdd
lZW n ek
r   d
ZY nX erzddlmZ W n  ek
r   ddlmZ Y nX ddlmZ nd
ZG dd deZdZeddgZG dd deZd
S )zMongoDB result store backend.    )datetime	timedelta)EncodeError)cached_property)maybe_sanitize_urlurlparse)states)ImproperlyConfigured   )BaseBackendN)Binary)InvalidDocumentc                   @   s   e Zd ZdS )r   N)__name__
__module____qualname__ r   r   ;/tmp/pip-unpacked-wheel-mu1yl971/celery/backends/mongodb.pyr      s   r   )MongoBackendpicklemsgpackc                       s  e Zd ZdZdZdZdZdZdZdZ	dZ
dZdZdZd	ZdZd3 fd
d	Zedd Zdd Zdd Z fddZ fddZd4ddZdd Zdd Zdd Zdd Zd d! Zd"d# Zd5 fd%d&	Zd'd( Ze d)d* Z!e d+d, Z"e d-d. Z#e d/d0 Z$d6d1d2Z%  Z&S )7r   zMongoDB result backend.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`pymongo` is not available.
    N	localhostii  celeryZcelery_taskmetaZcelery_groupmeta
   Fc                    s  i | _ t j|f| ts"td|   D ]\}}| j || q.| jr| 	| j| _tj
| j}dd |d D }|d | _|d | _|| _|d r|d | _| j |d  | jjd	}|d k	rt|tstd
t|}d|ksd|krd | _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _| j |di  | j | d S )NzCYou need to install the pymongo library to use the MongoDB backend.c                 S   s"   g | ]}|d   d|d  qS )r   :r
   r   ).0xr   r   r   
<listcomp>N   s    z)MongoBackend.__init__.<locals>.<listcomp>ZnodelistusernamepassworddatabaseoptionsZmongodb_backend_settingsz4MongoDB backend settings should be grouped in a dicthostport
mongo_hostusertaskmeta_collectiongroupmeta_collection)r    super__init__pymongor	   _prepare_client_optionsitems
setdefaulturl_ensure_mongodb_uri_complianceZ
uri_parser	parse_urir$   r   r#   database_nameupdateappconfget
isinstancedictpopr!   r"   r%   r&   )selfr2   kwargskeyvalueZuri_dataZ	hostslistconfig	__class__r   r   r(   :   sZ    




  zMongoBackend.__init__c                 C   s2   t | }|jdsd|  } | dkr.| d7 } | S )NZmongodbzmongodb+
mongodb://r   )r   scheme
startswith)r-   
parsed_urlr   r   r   r.   v   s    
z+MongoBackend._ensure_mongodb_uri_compliancec                 C   s$   t jdkrd| jiS | jddS d S )N)   ZmaxPoolSizeF)max_pool_sizeZauto_start_request)r)   Zversion_tuplerD   r8   r   r   r   r*      s
    

z$MongoBackend._prepare_client_optionsc                 C   s   | j dkrddlm} | j}|sL| j}t|trL|dsLd| d| j }t	| j
}||d< | jrn| j|d< | jr~| j|d< |f || _ | j S )	zConnect to the MongoDB server.Nr   )MongoClientr?   r   r!   r   r   )_connectionr)   rF   r#   r!   r5   strrA   r"   r6   r    r$   r   )r8   rF   r!   r3   r   r   r   _get_connection   s"    




zMongoBackend._get_connectionc                    s0   | j dkr|S t |}| j tkr,t|}|S NZbson)
serializerr'   encodeBINARY_CODECSr   )r8   datapayloadr=   r   r   rL      s    

zMongoBackend.encodec                    s   | j dkr|S t |S rJ   )rK   r'   decode)r8   rN   r=   r   r   rP      s    
zMongoBackend.decodec           	   
   K   sj   | j | ||||d}||d< z| jjd|i|dd W n* tk
rd } zt|W 5 d}~X Y nX |S )z1Store return value and state of an executed task.)resultstate	tracebackrequest_idTZupsertN)Z_get_result_metarL   
collectionreplace_oner   r   )	r8   task_idrQ   rR   rS   rT   r9   metaexcr   r   r   _store_result   s     zMongoBackend._store_resultc              	   C   sV   | j d|i}|rJ| |d |d | |d |d |d |d dS tjdd	S )
z$Get task meta-data for a task by id.rU   statusrQ   	date_donerS   children)rY   r]   rQ   r^   rS   r_   N)r]   rQ   )rW   find_oneZmeta_from_decodedrP   r   ZPENDING)r8   rY   objr   r   r   _get_task_meta_for   s    zMongoBackend._get_task_meta_forc                 C   s:   ||  dd |D t d}| jjd|i|dd |S )zSave the group result.c                 S   s   g | ]
}|j qS r   )id)r   ir   r   r   r      s     z,MongoBackend._save_group.<locals>.<listcomp>)rU   rQ   r^   rU   TrV   )rL   r   utcnowgroup_collectionrX   )r8   group_idrQ   rZ   r   r   r   _save_group   s    zMongoBackend._save_groupc                    sD    j d|i}|r@|d |d  fdd |d D dS dS )z!Get the result for a group by id.rU   r^   c                    s   g | ]} j |qS r   )r2   ZAsyncResult)r   ZtaskrE   r   r   r      s   z/MongoBackend._restore_group.<locals>.<listcomp>rQ   )rY   r^   rQ   N)rf   r`   rP   )r8   rg   ra   r   rE   r   _restore_group   s    
zMongoBackend._restore_groupc                 C   s   | j d|i dS )zDelete a group by id.rU   N)rf   
delete_one)r8   rg   r   r   r   _delete_group   s    zMongoBackend._delete_groupc                 C   s   | j d|i dS )zRemove result from MongoDB.

        Raises:
            pymongo.exceptions.OperationsError:
                if the task_id could not be removed.
        rU   N)rW   rj   )r8   rY   r   r   r   _forget   s    
zMongoBackend._forgetc                 C   sN   | j s
dS | jdd| j | j ii | jdd| j | j ii dS )zDelete expired meta-data.Nr^   z$lt)expiresrW   Zdelete_manyr2   nowexpires_deltarf   rE   r   r   r   cleanup   s    zMongoBackend.cleanupr   c                    s(   |si n|}t  |t|| j| jdS )N)rm   r-   )r'   
__reduce__r6   rm   r-   )r8   argsr9   r=   r   r   rq     s
     zMongoBackend.__reduce__c                 C   s   |   }|| j S )N)rI   r0   )r8   connr   r   r   _get_database
  s    zMongoBackend._get_databasec                 C   s   |   S )z]Get database from MongoDB connection.

        performs authentication if necessary.
        )rt   rE   r   r   r   r     s    zMongoBackend.databasec                 C   s   | j | j }|jddd |S z"Get the meta-data task collection.r^   T)Z
background)r   r%   create_indexr8   rW   r   r   r   rW     s    zMongoBackend.collectionc                 C   s   | j | j }|jddd |S ru   )r   r&   rv   rw   r   r   r   rf      s    zMongoBackend.group_collectionc                 C   s   t | jdS )N)seconds)r   rm   rE   r   r   r   ro   *  s    zMongoBackend.expires_deltac                 C   sL   | j s
dS |r| j S d| j kr(t| j S | j dd\}}dt||gS )z~Return the backend as an URI.

        Arguments:
            include_password (bool): Password censored if disabled.
        r?   ,r
   )r-   r   splitjoin)r8   Zinclude_passwordZuri1	remainderr   r   r   as_uri.  s    

zMongoBackend.as_uri)N)NN)r   N)F)'r   r   r   __doc__r#   r!   r"   r$   r   r0   r%   r&   rD   r    Zsupports_autoexpirerG   r(   staticmethodr.   r*   rI   rL   rP   r\   rb   rh   ri   rk   rl   rp   rq   rt   r   r   rW   rf   ro   r}   __classcell__r   r   r=   r   r   #   sP   <

   



	
	
r   )r~   r   r   Zkombu.exceptionsr   Zkombu.utils.objectsr   Zkombu.utils.urlr   r   r   r   Zcelery.exceptionsr	   baser   r)   ImportErrorZbson.binaryr   Zpymongo.binaryZpymongo.errorsr   	Exception__all__	frozensetrM   r   r   r   r   r   <module>   s,   
