from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from decouple import config
import json
from typing import Dict, Any, Optional, List
import asyncio
import logging

logger = logging.getLogger(__name__)

class KafkaClient:
    def __init__(self):
        self.bootstrap_servers = config('KAFKA_BOOTSTRAP_SERVERS', default='localhost:9092').split(",")
        self.producer: Optional[KafkaProducer] = None
        self.consumer: Optional[KafkaConsumer] = None

    def get_producer(self) -> KafkaProducer:
        if not self.producer:
            self.producer = KafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                key_serializer=lambda k: k.encode('utf-8') if k else None,
                retries=5,
                retry_backoff_ms=100,
                request_timeout_ms=30000
            )
        return self.producer

    def get_consumer(self, topics: List[str], group_id: str) -> KafkaConsumer:
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=self.bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None,
            key_deserializer=lambda k: k.decode('utf-8') if k else None,
            auto_offset_reset='earliest',
            enable_auto_commit=True
        )
        return self.consumer

    async def send_message(self, topic: str, message: Dict[str, Any], key: Optional[str] = None):
        """Send a message to Kafka topic"""
        try:
            producer = self.get_producer()
            future = producer.send(topic, value=message, key=key)
            producer.flush()  # Ensure the message is sent
            logger.info(f"Message sent to topic {topic}: {message}")
            return True
        except KafkaError as e:
            logger.error(f"Failed to send message to topic {topic}: {e}")
            return False

    def close_producer(self):
        if self.producer:
            self.producer.close()

    def close_consumer(self):
        if self.consumer:
            self.consumer.close()

# Global Kafka client instance
kafka_client = KafkaClient()

# Event types
class EventTypes:
    USER_REGISTERED = "user.registered"
    USER_LOGGED_IN = "user.logged_in"
    USER_LOGGED_OUT = "user.logged_out"
    USER_PROFILE_UPDATED = "user.profile.updated"
    USER_ROLE_CHANGED = "user.role.changed"
    USER_PERMISSION_CHANGED = "user.permission.changed"
    PASSWORD_CHANGED = "user.password.changed"
    ACCOUNT_DEACTIVATED = "user.account.deactivated"
    ACCOUNT_ACTIVATED = "user.account.activated"

# Topics
class Topics:
    AUTH_EVENTS = "auth-events"
    ACCOUNT_EVENTS = "account-events"
    USER_EVENTS = "user-events"
