U
    d                      @   s   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ zdd	lZW n ek
rp   d	ZY nX d
ZdZG dd deZd	S )z#Elasticsearch result store backend.    )datetimebytes_to_str)
_parse_url)states)ImproperlyConfigured   )KeyValueStoreBackendN)ElasticsearchBackendzVYou need to install the elasticsearch library to use the Elasticsearch result backend.c                       s   e Zd ZdZdZdZdZdZdZdZ	dZ
dZd	Zd
Zd' fdd	Zdd Zdd Zdd Zdd Zdd Zdd Zdd Z fddZ fddZdd  Zd!d" Zd#d$ Zed%d& Z  ZS )(r
   zElasticsearch Backend.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`elasticsearch` is not available.
    celerybackendhttp	localhosti#  NF
      c                    s8  t  j|| || _| jjj}td kr.ttd  } } } } }	 }
}|rt	|\}}}	}
}}}|dkrpd }|r|
d}|d\}}}|p| j| _|p| j| _|p| j| _|p| j| _|	p| j| _|
p| j| _|p| j| _|dp| j| _|d}|d k	r
|| _|d}|d k	r"|| _|dd| _d | _d S )Nelasticsearch/Zelasticsearch_retry_on_timeoutZelasticsearch_timeoutZelasticsearch_max_retriesZelasticsearch_save_meta_as_textT)super__init__urlZappconfgetr   r   E_LIB_MISSINGr   strip	partitionindexdoc_typeschemehostportusernamepasswordes_retry_on_timeout
es_timeoutes_max_retrieses_save_meta_as_text_server)selfr   argskwargs_getr   r   r   r   r   r    r!   path_r#   r$   	__class__ A/tmp/pip-unpacked-wheel-mu1yl971/celery/backends/elasticsearch.pyr   ,   s<    



zElasticsearchBackend.__init__c                 C   s    t |tjjr|jdkrdS dS )N>   N/A              TF)
isinstancer   
exceptionsZTransportErrorstatus_code)r'   excr/   r/   r0   exception_safe_to_retryU   s    	
z,ElasticsearchBackend.exception_safe_to_retryc              	   C   sb   zD|  |}z|d r&|d d W W S W n ttfk
r@   Y nX W n tjjk
r\   Y nX d S )Nfound_sourceresult)r*   	TypeErrorKeyErrorr   r:   NotFoundError)r'   keyresr/   r/   r0   r   c   s    

zElasticsearchBackend.getc                 C   s   | j j| j| j|dS N)r   r   id)serverr   r   r   r'   rD   r/   r/   r0   r*   n   s
    zElasticsearchBackend._getc                 C   s^   |d t  d d d}z| j||d W n& tjjk
rX   | ||| Y nX d S )Nz{}Z)r@   z
@timestamp)rG   body)	formatr   utcnow	isoformat_indexr   r:   ConflictError_update)r'   rD   valuestaterK   r/   r/   r0   _set_with_stateu   s    
z$ElasticsearchBackend._set_with_statec                 C   s   |  ||d S N)rT   )r'   rD   rR   r/   r/   r0   set   s    zElasticsearchBackend.setc                 K   s<   dd |  D }| jjf t|| j| j|ddid|S )Nc                 S   s   i | ]\}}t ||qS r/   r   .0kvr/   r/   r0   
<dictcomp>   s      z/ElasticsearchBackend._index.<locals>.<dictcomp>Zop_typecreaterG   r   r   rK   params)itemsrH   r   r   r   )r'   rG   rK   r)   r/   r/   r0   rO      s    zElasticsearchBackend._indexc           
   	   K   s8  dd |  D }z,| j|d}|ds<| j||f|W S W n( tjjk
rf   | j||f| Y S X z| |d d }W n tt	fk
r   Y n8X |d t
jkrddiS |d t
jkr|t
jkrddiS |d	d
}|dd
}| jjf t|| j| jd|i||dd|}	|	d dkr4tjddi |	S )au  Update state in a conflict free manner.

        If state is defined (not None), this will not update ES server if either:
        * existing state is success
        * existing state is a ready state and current state in not a ready state

        This way, a Retry state cannot override a Success or Failure, and chord_unlock
        will not retry indefinitely.
        c                 S   s   i | ]\}}t ||qS r/   r   rW   r/   r/   r0   r[      s      z0ElasticsearchBackend._update.<locals>.<dictcomp>)rD   r>   r?   r@   statusZnoopZ_seq_nor   Z_primary_termdoc)Zif_primary_termZ	if_seq_nor]   r8   z(conflicting update occurred concurrently)r_   r*   r   rO   r   r:   rC   Zdecode_resultrA   rB   r   SUCCESSZREADY_STATESZUNREADY_STATESrH   updater   r   r   rP   )
r'   rG   rK   rS   r)   Zres_getZmeta_present_on_backendZseq_noZ	prim_termrE   r/   r/   r0   rQ      s:    

zElasticsearchBackend._updatec                    sp   | j rt |S t|ts(t |S |drH| |d d |d< |drh| |d d |d< |S d S )Nr@      	traceback)r%   r   encoder9   dictr   _encode)r'   datar-   r/   r0   rf      s    


zElasticsearchBackend.encodec                    sl   | j rt |S t|ts(t |S |drFt |d |d< |drdt |d |d< |S d S )Nr@   re   )r%   r   decoder9   rg   r   )r'   payloadr-   r/   r0   rj      s    


zElasticsearchBackend.decodec                    s    fdd|D S )Nc                    s   g | ]}  |qS r/   )r   )rX   rD   r'   r/   r0   
<listcomp>   s     z-ElasticsearchBackend.mget.<locals>.<listcomp>r/   )r'   keysr/   rl   r0   mget   s    zElasticsearchBackend.mgetc                 C   s   | j j| j| j|d d S rF   )rH   deleter   r   rI   r/   r/   r0   rp      s    zElasticsearchBackend.deletec                 C   sH   d}| j r| jr| j | jf}tj| j d| j | j| j| j| j	|dS )z$Connect to the Elasticsearch server.N:)Zretry_on_timeoutmax_retriestimeoutr   	http_auth)
r    r!   r   ZElasticsearchr   r   r"   r$   r#   r   )r'   rt   r/   r/   r0   _get_server   s    z ElasticsearchBackend._get_serverc                 C   s   | j d kr|  | _ | j S rU   )r&   ru   rl   r/   r/   r0   rH      s    

zElasticsearchBackend.server)N)__name__
__module____qualname____doc__r   r   r   r   r   r    r!   r"   r#   r$   r   r=   r   r*   rT   rV   rO   rQ   rf   rj   ro   rp   ru   propertyrH   __classcell__r/   r/   r-   r0   r
      s4   )5r
   )ry   r   Zkombu.utils.encodingr   Zkombu.utils.urlr   r   r   Zcelery.exceptionsr   baser	   r   ImportError__all__r   r
   r/   r/   r/   r0   <module>   s   
