"""
Kafka service for handling message publishing and consuming
"""
import json
import logging
from typing import Dict, Any, Optional, Callable
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from app.core.config import get_settings

logger = logging.getLogger(__name__)
settings = get_settings()


class KafkaService:
    """Kafka service for message publishing and consuming"""
    
    def __init__(self):
        self.producer = None
        self.consumer = None
        self.bootstrap_servers = settings.KAFKA_BOOTSTRAP_SERVERS.split(',')
        
    def get_producer(self) -> KafkaProducer:
        """Get or create Kafka producer"""
        if not self.producer:
            try:
                self.producer = KafkaProducer(
                    bootstrap_servers=self.bootstrap_servers,
                    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                    key_serializer=lambda k: k.encode('utf-8') if k else None,
                    acks='all',
                    retries=3,
                    retry_backoff_ms=1000,
                    request_timeout_ms=30000,
                    max_in_flight_requests_per_connection=1
                )
                logger.info("Kafka producer initialized successfully")
            except Exception as e:
                logger.error(f"Failed to initialize Kafka producer: {e}")
                raise
        return self.producer
    
    def get_consumer(self, topics: list, group_id: str) -> KafkaConsumer:
        """Get or create Kafka consumer"""
        try:
            consumer = KafkaConsumer(
                *topics,
                bootstrap_servers=self.bootstrap_servers,
                group_id=group_id,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                key_deserializer=lambda k: k.decode('utf-8') if k else None,
                auto_offset_reset='earliest',
                enable_auto_commit=True,
                auto_commit_interval_ms=1000,
                session_timeout_ms=30000,
                heartbeat_interval_ms=10000
            )
            logger.info(f"Kafka consumer initialized for topics: {topics}")
            return consumer
        except Exception as e:
            logger.error(f"Failed to initialize Kafka consumer: {e}")
            raise
    
    async def publish_message(
        self, 
        topic: str, 
        message: Dict[str, Any], 
        key: Optional[str] = None
    ) -> bool:
        """Publish a message to Kafka topic"""
        try:
            producer = self.get_producer()
            future = producer.send(topic, value=message, key=key)
            
            # Wait for the message to be sent
            record_metadata = future.get(timeout=10)
            
            logger.info(
                f"Message published to topic '{topic}' "
                f"partition {record_metadata.partition} "
                f"offset {record_metadata.offset}"
            )
            return True
            
        except KafkaError as e:
            logger.error(f"Failed to publish message to topic '{topic}': {e}")
            return False
        except Exception as e:
            logger.error(f"Unexpected error publishing message: {e}")
            return False
    
    async def publish_user_event(
        self, 
        event_type: str, 
        user_id: str, 
        data: Dict[str, Any]
    ) -> bool:
        """Publish user-related events"""
        message = {
            "event_type": event_type,
            "user_id": user_id,
            "timestamp": data.get("timestamp"),
            "data": data
        }
        return await self.publish_message("user-events", message, key=user_id)
    
    async def publish_auth_event(
        self, 
        event_type: str, 
        user_id: str, 
        data: Dict[str, Any]
    ) -> bool:
        """Publish authentication-related events"""
        message = {
            "event_type": event_type,
            "user_id": user_id,
            "timestamp": data.get("timestamp"),
            "data": data
        }
        return await self.publish_message("auth-events", message, key=user_id)
    
    def consume_messages(
        self, 
        topics: list, 
        group_id: str, 
        message_handler: Callable[[str, Dict[str, Any]], None]
    ):
        """Consume messages from Kafka topics"""
        try:
            consumer = self.get_consumer(topics, group_id)
            
            logger.info(f"Starting to consume messages from topics: {topics}")
            
            for message in consumer:
                try:
                    topic = message.topic
                    value = message.value
                    key = message.key
                    
                    logger.debug(f"Received message from topic '{topic}': {value}")
                    
                    # Call the message handler
                    message_handler(topic, value)
                    
                except Exception as e:
                    logger.error(f"Error processing message: {e}")
                    continue
                    
        except Exception as e:
            logger.error(f"Error consuming messages: {e}")
            raise
    
    def close(self):
        """Close Kafka connections"""
        if self.producer:
            self.producer.close()
            logger.info("Kafka producer closed")
        
        if self.consumer:
            self.consumer.close()
            logger.info("Kafka consumer closed")


# Global Kafka service instance
kafka_service = KafkaService()


# Event publishing functions
async def publish_user_registered(user_id: str, user_data: Dict[str, Any]):
    """Publish user registration event"""
    await kafka_service.publish_user_event("user_registered", user_id, user_data)


async def publish_user_login(user_id: str, login_data: Dict[str, Any]):
    """Publish user login event"""
    await kafka_service.publish_auth_event("user_login", user_id, login_data)


async def publish_user_logout(user_id: str, logout_data: Dict[str, Any]):
    """Publish user logout event"""
    await kafka_service.publish_auth_event("user_logout", user_id, logout_data)


async def publish_password_reset(user_id: str, reset_data: Dict[str, Any]):
    """Publish password reset event"""
    await kafka_service.publish_auth_event("password_reset", user_id, reset_data)


async def publish_token_refresh(user_id: str, token_data: Dict[str, Any]):
    """Publish token refresh event"""
    await kafka_service.publish_auth_event("token_refresh", user_id, token_data)