U
    	mh                     @   s   d dl Z d dlZd dlZd dlZd dlmZmZ ejd ejd d dl	m
Z
 d dlmZmZ d dlmZmZmZmZ ejejdd	 eeZG d
d dZedkre Ze  dS )    N)DictAnyz/appz/app/shared)SearchResponse)KafkaClientTOPICS)RedisClientget_cache_key
CACHE_KEYSCACHE_EXPIRATIONz4%(asctime)s - %(name)s - %(levelname)s - %(message)s)levelformatc                   @   sD   e Zd Zdd ZedddZeeedddZd	d
 Z	dd Z
dS )DataProcessingServicec                 C   s   t  | _t | _d S )N)r   kafka_clientr   redis_clientself r    services/data-processing/main.py__init__   s    zDataProcessingService.__init__)search_responsec              
   C   s   zt d|j  ttd |jd}| jj|| td d ttd |jd}|j	s\dndd|j	std	|j
 d
n
d|j	 |gd}| j|| t d|j  W n2 tk
r } zt 	d|  W 5 d}~X Y nX dS )z$Process and aggregate search resultsz'Processing search results for request: SEARCH_RESULTS)
request_id)ZexpireZREQUEST_STATUSZ	completedZfailedg      Y@zSearch completed with z resultszSearch failed: )ZstatusZprogressmessageZresult_filesz*Results processed and stored for request: z!Error processing search results: N)loggerinfor   r   r	   r   setdictr
   errorZtotal_resultsZhset	Exception)r   r   Zresults_keyZ
status_keyZstatus_updateer   r   r   process_search_results   s    z,DataProcessingService.process_search_results)topicr   keyc              
   C   sP   zt f |}| | W n2 tk
rJ } ztd|  W 5 d}~X Y nX dS )zHandle incoming search resultszError handling search result: N)r   r    r   r   r   )r   r!   r   r"   r   r   r   r   r   handle_search_result3   s
    
z*DataProcessingService.handle_search_resultc              
   C   s   t d zrz| jjtd gd| jd W nN tk
rH   t d Y n2 tk
rx } zt 	d|  W 5 d}~X Y nX W 5 |   X dS )z#Start consuming messages from Kafkaz#Starting Data Processing Service...r   zdata-processing-service)ZtopicsZgroup_idZmessage_handlerz'Data Processing Service stopped by userz"Error in Data Processing Service: N)
r   r   cleanupr   Zconsume_messagesr   r#   KeyboardInterruptr   r   )r   r   r   r   r   start_consuming<   s    

&z%DataProcessingService.start_consumingc                 C   s$   | j r| j   | jr | j  dS )zClean up resourcesN)r   closer   r   r   r   r   r$   M   s    
zDataProcessingService.cleanupN)__name__
__module____qualname__r   r   r    strr   r#   r&   r$   r   r   r   r   r      s
   	r   __main__)ossysZloggingtimetypingr   r   pathappendZshared.models.schemasr   Zshared.utils.kafka_utilsr   r   Zshared.utils.redis_utilsr   r   r	   r
   ZbasicConfigINFOZ	getLoggerr(   r   r   Zservicer&   r   r   r   r   <module>   s$   
>