import json
import logging
from typing import Optional, Dict, Any, Callable
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import os

logger = logging.getLogger(__name__)

class KafkaClient:
    def __init__(self, bootstrap_servers: str = None):
        self.bootstrap_servers = bootstrap_servers or os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
        self.producer = None
        self.consumer = None
        
    def get_producer(self) -> KafkaProducer:
        """Get or create Kafka producer instance"""
        if self.producer is None:
            try:
                self.producer = KafkaProducer(
                    bootstrap_servers=self.bootstrap_servers,
                    value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
                    key_serializer=lambda k: k.encode('utf-8') if k else None,
                    acks='all',
                    retries=3,
                    retry_backoff_ms=1000,
                    request_timeout_ms=30000
                )
                logger.info(f"Kafka producer connected to {self.bootstrap_servers}")
            except Exception as e:
                logger.error(f"Failed to create Kafka producer: {e}")
                raise
        return self.producer
    
    def send_message(self, topic: str, message: Dict[str, Any], key: Optional[str] = None) -> bool:
        """Send a message to Kafka topic"""
        try:
            producer = self.get_producer()
            future = producer.send(topic, value=message, key=key)
            producer.flush()
            record_metadata = future.get(timeout=10)
            logger.debug(f"Message sent to {topic}: {record_metadata}")
            return True
        except Exception as e:
            logger.error(f"Failed to send message to {topic}: {e}")
            return False
    
    def create_consumer(self, topics: list, group_id: str, auto_offset_reset: str = 'earliest') -> KafkaConsumer:
        """Create a Kafka consumer for specified topics"""
        try:
            consumer = KafkaConsumer(
                *topics,
                bootstrap_servers=self.bootstrap_servers,
                group_id=group_id,
                auto_offset_reset=auto_offset_reset,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                key_deserializer=lambda k: k.decode('utf-8') if k else None,
                enable_auto_commit=True,
                auto_commit_interval_ms=1000,
                consumer_timeout_ms=-1
            )
            logger.info(f"Kafka consumer created for topics {topics} with group {group_id}")
            return consumer
        except Exception as e:
            logger.error(f"Failed to create Kafka consumer: {e}")
            raise
    
    def consume_messages(self, topics: list, group_id: str, message_handler: Callable, auto_offset_reset: str = 'earliest'):
        """Consume messages from Kafka topics with a message handler"""
        consumer = self.create_consumer(topics, group_id, auto_offset_reset)
        
        try:
            logger.info(f"Starting to consume messages from {topics}")
            for message in consumer:
                try:
                    logger.debug(f"Received message from {message.topic}: {message.value}")
                    message_handler(message.topic, message.value, message.key)
                except Exception as e:
                    logger.error(f"Error processing message from {message.topic}: {e}")
                    continue
        except KeyboardInterrupt:
            logger.info("Consumer interrupted by user")
        except Exception as e:
            logger.error(f"Consumer error: {e}")
        finally:
            consumer.close()
            logger.info("Consumer closed")
    
    def close(self):
        """Close producer and consumer connections"""
        if self.producer:
            self.producer.close()
            logger.info("Kafka producer closed")
        if self.consumer:
            self.consumer.close()
            logger.info("Kafka consumer closed")

# Topic names configuration
TOPICS = {
    'SEARCH_REQUESTS': 'search-requests',
    'GOOGLE_SEARCH_REQUESTS': 'google-search-requests',
    'BING_SEARCH_REQUESTS': 'bing-search-requests',
    'YANDEX_SEARCH_REQUESTS': 'yandex-search-requests',
    'DUCKDUCKGO_SEARCH_REQUESTS': 'duckduckgo-search-requests',
    'YAHOO_SEARCH_REQUESTS': 'yahoo-search-requests',
    'BAIDU_SEARCH_REQUESTS': 'baidu-search-requests',
    'SEARCH_RESULTS': 'search-results',
    'METADATA_REQUESTS': 'metadata-requests',
    'METADATA_RESULTS': 'metadata-results',
    'EXPORT_REQUESTS': 'export-requests',
    'EXPORT_RESULTS': 'export-results'
}

def get_search_topic(engine: str) -> str:
    """Get the appropriate Kafka topic for a search engine"""
    topic_mapping = {
        'google': TOPICS['GOOGLE_SEARCH_REQUESTS'],
        'bing': TOPICS['BING_SEARCH_REQUESTS'],
        'yandex': TOPICS['YANDEX_SEARCH_REQUESTS'],
        'duckduckgo': TOPICS['DUCKDUCKGO_SEARCH_REQUESTS'],
        'yahoo': TOPICS['YAHOO_SEARCH_REQUESTS'],
        'baidu': TOPICS['BAIDU_SEARCH_REQUESTS']
    }
    return topic_mapping.get(engine.lower(), TOPICS['SEARCH_REQUESTS'])
