import asyncio
import json
import logging
from typing import Dict, Any
from kafka import KafkaConsumer
from shared.config import settings
from app.services.auth_event_handler import AuthEventHandler
from shared.kafka_client import kafka_client

logger = logging.getLogger(__name__)

class KafkaEventConsumer:
    """Kafka consumer for handling auth events"""
    
    def __init__(self):
        self.consumer: KafkaConsumer = None
        self.auth_event_handler = AuthEventHandler()
        self.running = False
        
    async def start_consuming(self):
        """Start consuming Kafka messages"""
        try:
            # Create consumer for auth events (topic name aligned with shared.kafka_client.Topics.AUTH_EVENTS)
            self.consumer = kafka_client.get_consumer(
                topics=["auth-events"],
                group_id="account-service-group"
            )
            
            self.running = True
            logger.info("Starting Kafka consumer for account service...")
            
            # Start consuming messages
            for message in self.consumer:
                if not self.running:
                    break
                    
                try:
                    await self._process_message(message)
                except Exception as e:
                    logger.error(f"Error processing message: {e}")
                    
        except Exception as e:
            logger.error(f"Error starting Kafka consumer: {e}")
        finally:
            await self.stop_consuming()
    
    async def _process_message(self, message):
        """Process a Kafka message"""
        try:
            # Parse message
            event_data = message.value if isinstance(message.value, dict) else json.loads(message.value.decode('utf-8'))
            event_type = event_data.get('event_type')
            
            logger.info(f"Processing event: {event_type}")
            
            # Route to unified event processor that expects the full event payload
            await self.auth_event_handler.process_event(event_type, event_data)
                
        except json.JSONDecodeError as e:
            logger.error(f"Error decoding message: {e}")
        except Exception as e:
            logger.error(f"Error processing message: {e}")
    
    async def stop_consuming(self):
        """Stop consuming Kafka messages"""
        self.running = False
        if self.consumer:
            self.consumer.close()
            logger.info("Kafka consumer stopped")

# Global instance
kafka_consumer = KafkaEventConsumer()
