
    ymh                        d dl Z d dlZd dlZd dlZd dlmZmZ ej                  j                  d       ej                  j                  d       d dl	m
Z
 d dlmZmZ d dlmZmZmZmZ  ej&                  ej(                  d	        ej*                  e      Z G d
 d      Zedk(  r e       Zej5                          yy)    N)DictAnyz/appz/app/shared)SearchResponse)KafkaClientTOPICS)RedisClientget_cache_key
CACHE_KEYSCACHE_EXPIRATIONz4%(asctime)s - %(name)s - %(levelname)s - %(message)s)levelformatc                   >    e Zd Zd ZdefdZdededefdZd Z	d	 Z
y
)DataProcessingServicec                 @    t               | _        t               | _        y )N)r   kafka_clientr   redis_clientselfs    "./services/data-processing/main.py__init__zDataProcessingService.__init__   s    'M'M    search_responsec                 ~   	 t         j                  d|j                          t        t        d   |j                        }| j
                  j                  ||j                         t        d          t        t        d   |j                        }|j                  sdndd|j                  sd	|j                   d
nd|j                   |gd}| j
                  j                  ||       t         j                  d|j                          y# t        $ r"}t         j                  d|        Y d}~yd}~ww xY w)z$Process and aggregate search resultsz'Processing search results for request: SEARCH_RESULTS)
request_id)expireREQUEST_STATUS	completedfailedg      Y@zSearch completed with z resultszSearch failed: )statusprogressmessageresult_filesz*Results processed and stored for request: z!Error processing search results: N)loggerinfor   r	   r
   r   setdictr   errortotal_resultshset	Exception)r   r   results_key
status_keystatus_updatees         r   process_search_resultsz,DataProcessingService.process_search_results   sC   	BKKA/B\B\A]^_ (
3C(DQ`QkQklK!!+/C/C/EN^_oNp!q 'z2B'CP_PjPjkJ-<-B-B+!crcxcx3O4Q4Q3RRZ[  AP  Q`  Qf  Qf  Pg  h!,	M "":}=KKD_E_E_D`ab 	BLL<QC@AA	Bs   DD 	D<D77D<topicr"   keyc                     	 t        di |}| j                  |       y# t        $ r"}t        j	                  d|        Y d}~yd}~ww xY w)zHandle incoming search resultszError handling search result: N )r   r0   r+   r$   r(   )r   r1   r"   r2   r   r/   s         r   handle_search_resultz*DataProcessingService.handle_search_result3   sI    	?,7w7O''8 	?LL9!=>>	?s    	A
AA
c                 t   t         j                  d       	 | j                  j                  t        d   gd| j
                         | j                          y# t        $ r t         j                  d       Y 1t        $ r"}t         j                  d|        Y d}~Vd}~ww xY w# | j                          w xY w)z#Start consuming messages from Kafkaz#Starting Data Processing Service...r   zdata-processing-service)topicsgroup_idmessage_handlerz'Data Processing Service stopped by userz"Error in Data Processing Service: N)
r$   r%   r   consume_messagesr   r5   KeyboardInterruptr+   r(   cleanup)r   r/   s     r   start_consumingz%DataProcessingService.start_consuming<   s    9:	../012 $ 9 9 /  LLN ! 	CKKAB 	CLL=aSABB	C LLNs5   0A B"6B% 8B" BB% B""B% %B7c                     | j                   r| j                   j                          | j                  r| j                  j                          yy)zClean up resourcesN)r   closer   r   s    r   r<   zDataProcessingService.cleanupM   s=    ##%##% r   N)__name__
__module____qualname__r   r   r0   strr'   r5   r=   r<   r4   r   r   r   r      s9    *Bn B0?# ? ?3 ?"&r   r   __main__)ossysloggingtimetypingr   r   pathappendshared.models.schemasr   shared.utils.kafka_utilsr   r   shared.utils.redis_utilsr   r	   r
   r   basicConfigINFO	getLoggerr@   r$   r   servicer=   r4   r   r   <module>rS      s    	 
        0 8 ] ]   
,,A 
		8	$<& <&| z#%G r   