"""Kafka service for asynchronous task processing."""

import json
import logging
import asyncio
from typing import Dict, Any, Optional, Callable
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import time
import threading

from src.config.settings import KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC

logger = logging.getLogger(__name__)

class KafkaService:
    """Kafka service for managing message queues."""
    
    def __init__(self):
        self.producer: Optional[KafkaProducer] = None
        self.consumer: Optional[KafkaConsumer] = None
        self.bootstrap_servers = KAFKA_BOOTSTRAP_SERVERS
        self.topic = KAFKA_TOPIC
        self.healthy = False
        self._consumer_thread = None
        self._running = False
        
    def connect_producer(self) -> bool:
        """Connect Kafka producer."""
        try:
            self.producer = KafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                key_serializer=lambda x: x.encode('utf-8') if x else None,
                acks='all',
                retries=3,
                batch_size=16384,
                linger_ms=10,
                buffer_memory=33554432
            )
            
            # Test connection
            metadata = self.producer.list_topics(timeout=5)
            
            self.healthy = True
            logger.info("Kafka producer connected successfully")
            return True
            
        except Exception as e:
            logger.error(f"Failed to connect Kafka producer: {e}")
            self.healthy = False
            return False
    
    def connect_consumer(self, group_id: str = "scraper_consumer", 
                        message_handler: Callable = None) -> bool:
        """Connect Kafka consumer."""
        try:
            self.consumer = KafkaConsumer(
                self.topic,
                bootstrap_servers=self.bootstrap_servers,
                group_id=group_id,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                key_deserializer=lambda m: m.decode('utf-8') if m else None,
                auto_offset_reset='latest',
                enable_auto_commit=True,
                auto_commit_interval_ms=1000,
                max_poll_records=10
            )
            
            if message_handler:
                self._start_consumer_thread(message_handler)
            
            logger.info(f"Kafka consumer connected successfully to topic: {self.topic}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to connect Kafka consumer: {e}")
            return False
    
    def _start_consumer_thread(self, message_handler: Callable):
        """Start consumer in background thread."""
        def consume_messages():
            self._running = True
            logger.info("Starting Kafka consumer thread...")
            
            try:
                for message in self.consumer:
                    if not self._running:
                        break
                    
                    try:
                        logger.debug(f"Received message: {message.value}")
                        message_handler(message.value)
                    except Exception as e:
                        logger.error(f"Error processing message: {e}")
                        
            except Exception as e:
                logger.error(f"Consumer thread error: {e}")
            finally:
                logger.info("Consumer thread stopped")
        
        self._consumer_thread = threading.Thread(target=consume_messages, daemon=True)
        self._consumer_thread.start()
    
    async def send_message(self, message: Dict[str, Any], key: str = None) -> bool:
        """Send message to Kafka topic."""
        if not self.producer:
            logger.error("Kafka producer not connected")
            return False
        
        try:
            # Add timestamp to message
            message['timestamp'] = time.time()
            message['sent_at'] = time.strftime('%Y-%m-%d %H:%M:%S')
            
            future = self.producer.send(self.topic, value=message, key=key)
            
            # Wait for message to be sent
            record_metadata = future.get(timeout=10)
            
            logger.debug(f"Message sent to topic {record_metadata.topic} "
                        f"partition {record_metadata.partition} "
                        f"offset {record_metadata.offset}")
            
            return True
            
        except KafkaError as e:
            logger.error(f"Failed to send message to Kafka: {e}")
            return False
        except Exception as e:
            logger.error(f"Unexpected error sending message: {e}")
            return False
    
    def send_scrape_task(self, task_id: str, query: str, engines: list, 
                        max_results: int = 10) -> bool:
        """Send scrape task to Kafka."""
        task = {
            'task_id': task_id,
            'type': 'scrape_task',
            'query': query,
            'engines': engines,
            'max_results': max_results,
            'status': 'pending'
        }
        
        return asyncio.run(self.send_message(task, key=task_id))
    
    def send_result(self, task_id: str, results: list, execution_time: float, 
                   status: str = 'completed') -> bool:
        """Send scrape results to Kafka."""
        result_message = {
            'task_id': task_id,
            'type': 'scrape_result',
            'status': status,
            'results_count': len(results),
            'execution_time': execution_time,
            'results': [
                {
                    'title': r.title,
                    'url': r.url,
                    'description': r.description,
                    'engine': r.engine,
                    'position': r.position,
                    'timestamp': r.timestamp.isoformat()
                } for r in results
            ]
        }
        
        return asyncio.run(self.send_message(result_message, key=task_id))
    
    def get_topic_info(self) -> Dict[str, Any]:
        """Get information about Kafka topics."""
        if not self.producer:
            return {"error": "Producer not connected"}
        
        try:
            metadata = self.producer.list_topics(timeout=5)
            
            topic_info = {
                'available_topics': list(metadata.topics.keys()),
                'configured_topic': self.topic,
                'bootstrap_servers': self.bootstrap_servers,
                'producer_connected': self.producer is not None,
                'consumer_connected': self.consumer is not None,
                'healthy': self.healthy
            }
            
            if self.topic in metadata.topics:
                topic_metadata = metadata.topics[self.topic]
                topic_info['topic_partitions'] = len(topic_metadata.partitions)
                topic_info['topic_exists'] = True
            else:
                topic_info['topic_exists'] = False
            
            return topic_info
            
        except Exception as e:
            logger.error(f"Error getting topic info: {e}")
            return {"error": str(e)}
    
    def close(self):
        """Close Kafka connections."""
        self._running = False
        
        if self.consumer:
            try:
                self.consumer.close()
                logger.info("Kafka consumer closed")
            except Exception as e:
                logger.error(f"Error closing consumer: {e}")
        
        if self.producer:
            try:
                self.producer.flush()
                self.producer.close()
                logger.info("Kafka producer closed")
            except Exception as e:
                logger.error(f"Error closing producer: {e}")
        
        if self._consumer_thread and self._consumer_thread.is_alive():
            self._consumer_thread.join(timeout=5)
        
        self.healthy = False

# Global Kafka service instance
kafka_service = KafkaService()
