"""
Kafka service for the data scraping service
"""
import json
import logging
from typing import Dict, Any, Optional
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from app.config.settings import get_settings

logger = logging.getLogger(__name__)


class KafkaService:
    """Kafka service for publishing and consuming messages"""
    
    def __init__(self):
        self.settings = get_settings()
        self._producer = None
        self._consumer = None
    
    def get_producer(self) -> KafkaProducer:
        """Get Kafka producer instance"""
        if self._producer is None:
            try:
                self._producer = KafkaProducer(
                    bootstrap_servers=self.settings.KAFKA_BOOTSTRAP_SERVERS.split(','),
                    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                    key_serializer=lambda k: k.encode('utf-8') if k else None,
                    acks='all',
                    retries=3,
                    retry_backoff_ms=1000,
                    request_timeout_ms=30000,
                    max_in_flight_requests_per_connection=1
                )
                logger.info("Kafka producer initialized successfully")
            except Exception as e:
                logger.error(f"Failed to initialize Kafka producer: {e}")
                raise
        return self._producer
    
    def get_consumer(self, topics: list, group_id: str = None) -> KafkaConsumer:
        """Get Kafka consumer instance"""
        try:
            consumer = KafkaConsumer(
                *topics,
                bootstrap_servers=self.settings.KAFKA_BOOTSTRAP_SERVERS.split(','),
                group_id=group_id or self.settings.KAFKA_GROUP_ID,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                key_deserializer=lambda k: k.decode('utf-8') if k else None,
                auto_offset_reset='earliest',
                enable_auto_commit=True,
                auto_commit_interval_ms=1000,
                session_timeout_ms=30000,
                heartbeat_interval_ms=10000
            )
            logger.info(f"Kafka consumer initialized for topics: {topics}")
            return consumer
        except Exception as e:
            logger.error(f"Failed to initialize Kafka consumer: {e}")
            raise
    
    async def publish_message(self, topic: str, message: Dict[str, Any], key: str = None) -> bool:
        """Publish a message to Kafka topic"""
        try:
            producer = self.get_producer()
            
            # Add timestamp and service info
            enriched_message = {
                **message,
                'timestamp': datetime.utcnow().isoformat(),
                'service': 'data-service',
                'version': self.settings.APP_VERSION
            }
            
            future = producer.send(
                topic=f"{self.settings.KAFKA_TOPIC_PREFIX}_{topic}",
                value=enriched_message,
                key=key
            )
            
            # Wait for the message to be sent
            record_metadata = future.get(timeout=10)
            logger.info(f"Message sent to topic {record_metadata.topic} partition {record_metadata.partition}")
            return True
            
        except KafkaError as e:
            logger.error(f"Failed to send message to Kafka: {e}")
            return False
        except Exception as e:
            logger.error(f"Unexpected error sending message to Kafka: {e}")
            return False
    
    # Specific event publishers
    async def publish_article_scraped(self, article_data: Dict[str, Any]) -> bool:
        """Publish article scraped event"""
        event = {
            'event_type': 'article_scraped',
            'article_id': article_data.get('id'),
            'url': article_data.get('url'),
            'title': article_data.get('title'),
            'source_domain': article_data.get('source_domain'),
            'search_query': article_data.get('search_query'),
            'search_engine': article_data.get('search_engine'),
            'word_count': article_data.get('word_count'),
            'quality_score': article_data.get('quality_score')
        }
        return await self.publish_message('article_events', event, key=article_data.get('id'))
    
    async def publish_search_job_started(self, job_data: Dict[str, Any]) -> bool:
        """Publish search job started event"""
        event = {
            'event_type': 'search_job_started',
            'job_id': job_data.get('job_id'),
            'search_query': job_data.get('search_query'),
            'search_engines': job_data.get('search_engines'),
            'max_results': job_data.get('max_results'),
            'created_by': job_data.get('created_by')
        }
        return await self.publish_message('search_events', event, key=job_data.get('job_id'))
    
    async def publish_search_job_completed(self, job_data: Dict[str, Any]) -> bool:
        """Publish search job completed event"""
        event = {
            'event_type': 'search_job_completed',
            'job_id': job_data.get('job_id'),
            'search_query': job_data.get('search_query'),
            'total_found': job_data.get('total_found'),
            'total_scraped': job_data.get('total_scraped'),
            'total_failed': job_data.get('total_failed'),
            'duration': job_data.get('duration'),
            'status': job_data.get('status')
        }
        return await self.publish_message('search_events', event, key=job_data.get('job_id'))
    
    async def publish_search_job_failed(self, job_data: Dict[str, Any], error: str) -> bool:
        """Publish search job failed event"""
        event = {
            'event_type': 'search_job_failed',
            'job_id': job_data.get('job_id'),
            'search_query': job_data.get('search_query'),
            'error': error,
            'total_scraped': job_data.get('total_scraped', 0)
        }
        return await self.publish_message('search_events', event, key=job_data.get('job_id'))
    
    async def publish_duplicate_detected(self, article_data: Dict[str, Any], original_id: str) -> bool:
        """Publish duplicate article detected event"""
        event = {
            'event_type': 'duplicate_detected',
            'article_id': article_data.get('id'),
            'original_id': original_id,
            'url': article_data.get('url'),
            'title': article_data.get('title'),
            'similarity_score': article_data.get('similarity_score')
        }
        return await self.publish_message('article_events', event, key=article_data.get('id'))
    
    async def publish_content_analysis_completed(self, article_data: Dict[str, Any]) -> bool:
        """Publish content analysis completed event"""
        event = {
            'event_type': 'content_analysis_completed',
            'article_id': article_data.get('id'),
            'url': article_data.get('url'),
            'word_count': article_data.get('word_count'),
            'reading_time': article_data.get('reading_time'),
            'quality_score': article_data.get('quality_score'),
            'relevance_score': article_data.get('relevance_score'),
            'sentiment_score': article_data.get('sentiment_score'),
            'keywords': article_data.get('keywords', []),
            'language': article_data.get('language')
        }
        return await self.publish_message('analysis_events', event, key=article_data.get('id'))
    
    def close(self):
        """Close Kafka connections"""
        try:
            if self._producer:
                self._producer.close()
                logger.info("Kafka producer closed")
            if self._consumer:
                self._consumer.close()
                logger.info("Kafka consumer closed")
        except Exception as e:
            logger.error(f"Error closing Kafka connections: {e}")


# Convenience functions for common events
async def publish_article_scraped_event(article_data: Dict[str, Any]) -> bool:
    """Convenience function to publish article scraped event"""
    kafka_service = KafkaService()
    return await kafka_service.publish_article_scraped(article_data)


async def publish_search_started_event(job_data: Dict[str, Any]) -> bool:
    """Convenience function to publish search job started event"""
    kafka_service = KafkaService()
    return await kafka_service.publish_search_job_started(job_data)


async def publish_search_completed_event(job_data: Dict[str, Any]) -> bool:
    """Convenience function to publish search job completed event"""
    kafka_service = KafkaService()
    return await kafka_service.publish_search_job_completed(job_data)


async def publish_search_failed_event(job_data: Dict[str, Any], error: str) -> bool:
    """Convenience function to publish search job failed event"""
    kafka_service = KafkaService()
    return await kafka_service.publish_search_job_failed(job_data, error)