U
    	mh                     @   s   d dl Z d dlZd dlmZmZmZmZ d dlmZm	Z	 d dl
mZ d dlZeeZG dd dZddd	d
dddddddddZeedddZdS )    N)OptionalDictAnyCallable)KafkaProducerKafkaConsumer)
KafkaErrorc                   @   s   e Zd ZdedddZedddZdeeeef e	e e
dd	d
ZdeeeedddZdeeeedddZdd ZdS )KafkaClientN)bootstrap_serversc                 C   s"   |pt dd| _d | _d | _d S )NZKAFKA_BOOTSTRAP_SERVERSzlocalhost:9092)osgetenvr
   producerconsumer)selfr
    r   shared/utils/kafka_utils.py__init__   s    zKafkaClient.__init__)returnc              
   C   s~   | j dkrxz8t| jdd dd ddddd	| _ td
| j  W n4 tk
rv } ztd|   W 5 d}~X Y nX | j S )z%Get or create Kafka producer instanceNc                 S   s   t j| tddS )N)defaultutf-8)jsondumpsstrencode)vr   r   r   <lambda>       z*KafkaClient.get_producer.<locals>.<lambda>c                 S   s   | r|  dS d S Nr   )r   kr   r   r   r      r   all     i0u  )r
   Zvalue_serializerZkey_serializerZacksZretriesZretry_backoff_msZrequest_timeout_mszKafka producer connected to z!Failed to create Kafka producer: )r   r   r
   loggerinfo	Exceptionerror)r   er   r   r   get_producer   s     
	zKafkaClient.get_producer)topicmessagekeyr   c              
   C   s   zH|   }|j|||d}|  |jdd}td| d|  W dS  tk
r } z td| d|  W Y dS d	}~X Y nX d	S )
zSend a message to Kafka topic)valuer+   
   )ZtimeoutzMessage sent to : TzFailed to send message to FN)r(   sendflushgetr#   debugr%   r&   )r   r)   r*   r+   r   ZfutureZrecord_metadatar'   r   r   r   send_message#   s    zKafkaClient.send_messageearliest)topicsgroup_idauto_offset_resetr   c                 C   s|   zBt || j||dd dd dddd}td| d|  |W S  tk
rv } ztd	|   W 5 d
}~X Y nX d
S )z,Create a Kafka consumer for specified topicsc                 S   s   t | dS r   )r   loadsdecode)mr   r   r   r   8   r   z-KafkaClient.create_consumer.<locals>.<lambda>c                 S   s   | r|  dS d S r   )r9   r   r   r   r   r   9   r   Tr"   )r
   r6   r7   Zvalue_deserializerZkey_deserializerZenable_auto_commitZauto_commit_interval_msZconsumer_timeout_msz"Kafka consumer created for topics z with group z!Failed to create Kafka consumer: N)r   r
   r#   r$   r%   r&   )r   r5   r6   r7   r   r'   r   r   r   create_consumer0   s"    zKafkaClient.create_consumer)r5   r6   message_handlerr7   c                 C   s  |  |||}zztd|  |D ]x}z0td|j d|j  ||j|j|j W q& tk
r } z$t	d|j d|  W Y q&W 5 d}~X Y q&X q&W nN t
k
r   td Y n2 tk
r } zt	d|  W 5 d}~X Y nX W 5 |  td X dS )	z9Consume messages from Kafka topics with a message handlerzConsumer closedz"Starting to consume messages from zReceived message from r.   zError processing message from NzConsumer interrupted by userzConsumer error: )r;   closer#   r$   r2   r)   r,   r+   r%   r&   KeyboardInterrupt)r   r5   r6   r<   r7   r   r*   r'   r   r   r   consume_messagesD   s      &zKafkaClient.consume_messagesc                 C   s8   | j r| j   td | jr4| j  td dS )z'Close producer and consumer connectionszKafka producer closedzKafka consumer closedN)r   r=   r#   r$   r   )r   r   r   r   r=   Y   s    


zKafkaClient.close)N)N)r4   )r4   )__name__
__module____qualname__r   r   r   r(   r   r   r   boolr3   listr   r;   r   r?   r=   r   r   r   r   r	   
   s   "r	   zsearch-requestszgoogle-search-requestszbing-search-requestszyandex-search-requestszduckduckgo-search-requestszyahoo-search-requestszbaidu-search-requestszsearch-resultszmetadata-requestszmetadata-resultszexport-requestszexport-results)SEARCH_REQUESTSGOOGLE_SEARCH_REQUESTSBING_SEARCH_REQUESTSYANDEX_SEARCH_REQUESTSDUCKDUCKGO_SEARCH_REQUESTSYAHOO_SEARCH_REQUESTSBAIDU_SEARCH_REQUESTSZSEARCH_RESULTSZMETADATA_REQUESTSZMETADATA_RESULTSZEXPORT_REQUESTSZEXPORT_RESULTS)enginer   c                 C   s>   t d t d t d t d t d t d d}||  t d S )	z3Get the appropriate Kafka topic for a search enginerF   rG   rH   rI   rJ   rK   )ZgoogleZbingZyandexZ
duckduckgoZyahooZbaidurE   )TOPICSr1   lower)rL   Ztopic_mappingr   r   r   get_search_topicr   s    rO   )r   Zloggingtypingr   r   r   r   Zkafkar   r   Zkafka.errorsr   r   Z	getLoggerr@   r#   r	   rM   r   rO   r   r   r   r   <module>   s*   
Z