
    mh                         d dl Z d dlZd dlmZmZmZmZ d dlmZm	Z	 d dl
mZ d dlZ ej                  e      Z G d d      Zddd	d
dddddddddZdedefdZy)    N)OptionalDictAnyCallable)KafkaProducerKafkaConsumer)
KafkaErrorc            	           e Zd ZddefdZdefdZddedeeef   de	e   de
fd	Zdd
edededefdZdd
edededefdZd Zy)KafkaClientNbootstrap_serversc                 ^    |xs t        j                  dd      | _        d | _        d | _        y )NKAFKA_BOOTSTRAP_SERVERSzlocalhost:9092)osgetenvr   producerconsumer)selfr   s     ./shared/utils/kafka_utils.py__init__zKafkaClient.__init__   s*    !2!lbii@Y[k6l    returnc           	      *   | j                   R	 t        | j                  d d dddd      | _         t        j	                  d	| j                          | j                   S | j                   S # t
        $ r}t        j                  d
|         d}~ww xY w)z%Get or create Kafka producer instanceNc                 V    t        j                  | t              j                  d      S )N)defaultutf-8)jsondumpsstrencode)vs    r   <lambda>z*KafkaClient.get_producer.<locals>.<lambda>   s    tzz!S/I/P/PQX/Y r   c                 ,    | r| j                  d      S d S Nr   )r   ks    r   r!   z*KafkaClient.get_producer.<locals>.<lambda>   s    !QXXg->  r   all     i0u  )r   value_serializerkey_serializeracksretriesretry_backoff_msrequest_timeout_mszKafka producer connected to z!Failed to create Kafka producer: )r   r   r   loggerinfo	Exceptionerror)r   es     r   get_producerzKafkaClient.get_producer   s    ==  -&*&<&<%Y#M%)',! :4;Q;Q:RST }}t}}  @DEs   AA+ +	B4BBtopicmessagekeyc                 *   	 | j                         }|j                  |||      }|j                          |j                  d      }t        j                  d| d|        y# t        $ r%}t        j                  d| d|        Y d}~y	d}~ww xY w)
zSend a message to Kafka topic)valuer7   
   )timeoutzMessage sent to : TzFailed to send message to NF)r4   sendflushgetr/   debugr1   r2   )r   r5   r6   r7   r   futurerecord_metadatar3   s           r   send_messagezKafkaClient.send_message#   s    		((*H]]5S]AFNN$jjj4OLL+E7"_4EFG 	LL5eWBqcBC	s   A!A$ $	B-BBtopicsgroup_idauto_offset_resetc                     	 t        || j                  ||d d dddd}t        j                  d| d|        |S # t        $ r}t        j                  d|         d	}~ww xY w)
z,Create a Kafka consumer for specified topicsc                 J    t        j                  | j                  d            S r#   )r   loadsdecode)ms    r   r!   z-KafkaClient.create_consumer.<locals>.<lambda>8   s    TZZ8I-J r   c                 ,    | r| j                  d      S d S r#   )rJ   r$   s    r   r!   z-KafkaClient.create_consumer.<locals>.<lambda>9   s    188G+< t r   Tr(   )r   rE   rF   value_deserializerkey_deserializerenable_auto_commitauto_commit_interval_msconsumer_timeout_msz"Kafka consumer created for topics z with group z!Failed to create Kafka consumer: N)r   r   r/   r0   r1   r2   )r   rD   rE   rF   r   r3   s         r   create_consumerzKafkaClient.create_consumer0   s|    	$"&"8"8!"3#J!K#'(,$(
H KK<VHLQYPZ[\O 	LL<QC@A	s   := 	A$AA$message_handlerc                    | j                  |||      }	 t        j                  d|        |D ]Z  }	 t        j                  d|j                   d|j
                           ||j                  |j
                  |j                         \ 	 |j                          t        j                  d       y# t        $ r/}t        j                  d|j                   d|        Y d}~d}~ww xY w# t        $ r t        j                  d       Y t        $ r"}t        j                  d|        Y d}~d}~ww xY w# |j                          t        j                  d       w xY w)	z9Consume messages from Kafka topics with a message handlerz"Starting to consume messages from zReceived message from r<   zError processing message from NzConsumer interrupted by userzConsumer error: zConsumer closed)rR   r/   r0   r@   r5   r9   r7   r1   r2   KeyboardInterruptclose)r   rD   rE   rS   rF   r   r6   r3   s           r   consume_messageszKafkaClient.consume_messagesD   s$   '':KL	+KK<VHEF# LL#9'--7==/!Z[#GMM7=='++N NNKK)* ! LL#A'--PRSTRU!VW ! 	8KK67 	1LL+A3/00	1 NNKK)*se   C. AB3
C. D; 3	C+<%C&!C. &C++C. .D8D; D8D3.D; 3D88D; ;'E"c                     | j                   r/| j                   j                          t        j                  d       | j                  r0| j                  j                          t        j                  d       yy)z'Close producer and consumer connectionszKafka producer closedzKafka consumer closedN)r   rV   r/   r0   r   )r   s    r   rV   zKafkaClient.closeY   sO    ==MM!KK/0==MM!KK/0 r   )N)earliest)__name__
__module____qualname__r   r   r   r4   r   r   r   boolrC   listr   rR   r   rW   rV    r   r   r   r   
   s    # 
m &# S#X Xc] ^b d c c cp (+t +s +X +jm +*1r   r   zsearch-requestszgoogle-search-requestszbing-search-requestszyandex-search-requestszduckduckgo-search-requestszyahoo-search-requestszbaidu-search-requestszsearch-resultszmetadata-requestszmetadata-resultszexport-requestszexport-results)SEARCH_REQUESTSGOOGLE_SEARCH_REQUESTSBING_SEARCH_REQUESTSYANDEX_SEARCH_REQUESTSDUCKDUCKGO_SEARCH_REQUESTSYAHOO_SEARCH_REQUESTSBAIDU_SEARCH_REQUESTSSEARCH_RESULTSMETADATA_REQUESTSMETADATA_RESULTSEXPORT_REQUESTSEXPORT_RESULTSenginer   c                     t         d   t         d   t         d   t         d   t         d   t         d   d}|j                  | j                         t         d         S )	z3Get the appropriate Kafka topic for a search enginera   rb   rc   rd   re   rf   )googlebingyandex
duckduckgoyahoobaidur`   )TOPICSr?   lower)rl   topic_mappings     r   get_search_topicrw   r   sb     12-.129:/0/0M V\\^V4E-FGGr   )r   loggingtypingr   r   r   r   kafkar   r   kafka.errorsr	   r   	getLoggerrZ   r/   r   rt   r   rw   r_   r   r   <module>r}      s|      0 0 . # 				8	$V1 V1t )626">44&,*(&

HS 
HS 
Hr   