U
    k,hO                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z
ddlmZ ddlmZmZmZmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZm Z m!Z!m"Z" ddl#m$Z$m%Z%m&Z&m'Z' ddl(m)Z) e*dZ+e,dddZ-eee.ef dddZ/e"ee. dddZ0e!ee. dddZ1dQeee! ee" ee. dddZ2dRee3e,ddd Z4ed!d"dd#dSe.e.e.d$d%d&Z5ee.d'd(d)Z6dTeee! ee" edd*d+Z7dUeee! ee" ee dd,d-Z8ee.d'd.d/Z9ee,d0d1d2Z:ee.d3d4d5Z;ed6d7 Z<dVeee3 e3d8d9d:Z=ed!d"d;d#e.d3d<d=Z>ed!d>d;d#e.ee. d?d@dAZ?ed!d>d;d#e.dBdCdDZ@e.e.eAdEdFdGZBee,ddHdIZCedJdK ZDee.dLdMdNZEedOdP ZFdS )Wz
Celery Tasks for Stream Processing

This module contains all stream processing functionality as Celery tasks
for better task management. All FFmpeg operations, HLS processing,
and stream management are handled as background tasks.
    N)Path)OptionalDictAnyListTuple)datetime	timedelta)shared_task)settings)timezone)ObjectDoesNotExist)StreamSessionChannel
HLSSegmentVideoConfigurationAudioConfiguration)JingleTemplateJingleDetectionAdBreakDetectionStatistics)NotificationServicezstream_processor.tasks)returnc                  C   s   zDt jddgdddd} | jdkr2td W dS td| j W n8 tk
r`   td	Y n t jk
r|   td
Y nX dS )z
    Validate that FFmpeg is available and accessible.
    
    Returns:
        bool: True if FFmpeg is available, raises exception otherwise
        
    Raises:
        RuntimeError: If FFmpeg is not found or not executable
    ffmpegz-versionT
   capture_outputtexttimeoutr   zFFmpeg validation successfulzFFmpeg test failed: z0FFmpeg not found in PATH. Please install FFmpeg.zFFmpeg validation timed outN)	
subprocessrun
returncodeloggerinfoRuntimeErrorstderrFileNotFoundErrorTimeoutExpired)result r)   7/var/www/html/StreamProcessor/src/apps/streams/tasks.py_validate_ffmpeg%   s    


r+   )channelr   c           	      C   s   t tjd | j }|d }|d }|d }||||d}| D ]v\}}z,|jddd |d td	|  W qB t	k
r } z"t
d
| d| d|   W 5 d}~X Y qBX qB|S )z
    Create the necessary output directories for a channel.
    
    Args:
        channel (Channel): Channel model instance
        
    Returns:
        Dict[str, Path]: Dictionary containing paths to created directories
    
OUTPUT_DIRhlslogsiframes)baser.   r/   r0   Tparentsexist_oki  zCreated directory: zFailed to create z directory : N)r   r   STREAM_CONFIGslugitemsmkdirchmodr"   r#   OSErrorerror)	r,   	base_pathhls_pathZ	logs_pathiframes_pathdirectoriesnamepather)   r)   r*   _create_output_directoriesB   s$    
rD   )audio_configr   c              
   C   sF   g }| j r|ddg |d| jdt| jdt| jd| jg |S )z
    Build audio encoding options from configuration.
    
    Args:
        audio_config (AudioConfiguration): Audio configuration object
        
    Returns:
        List[str]: Audio encoding options for FFmpeg
    -afloudnorm=I=-16:LRA=11:TP=-1-c:a-ar-ac-b:a)	normalizeextendcodecstrsample_ratechannelsbitrate)rE   optionsr)   r)   r*   _build_audio_optionsg   s    
    rT   )video_configr   c                 C   sl   d| j d| jd| jd| jd| jdt| jd| jd| jd	| jg}| j d
krh|	d| j
ddddddddg
 |S )z
    Build video encoding options from configuration.
    
    Args:
        video_config (VideoConfiguration): Video configuration object
        
    Returns:
        List[str]: Video encoding options for FFmpeg
    -c:v
-profile:v-level-s-aspectz-r-b:v-maxrate-bufsizeh264z-preset-crf20-sc_threshold0-g48-keyint_min)rN   profilelevel
resolutionaspect_ratiorO   
frame_ratemin_bitratemax_bitraterM   preset)rU   rS   r)   r)   r*   _build_video_options   sB             
     rn   )r,   rU   rE   r   c                 C   s6  t | }|d }dddddddd	d
| jddddg}|rF|t| n8|dddddttjd dttjd dtjd g
 |r|t| nX|dddddddtjd d tjd! d"tjd# d$tjd% d&tjd# d'd(d)d*d+d,d-d,g |d.dd/t| j	d0t| j
d1d2d3d4d5d6d7d2d8t|d9 t|d: g |S );a]  
    Build the FFmpeg command for stream capture.
    
    Args:
        channel (Channel): Channel configuration
        video_config (VideoConfiguration, optional): Video encoding settings
        audio_config (AudioConfiguration, optional): Audio encoding settings
        
    Returns:
        List[str]: FFmpeg command as list of arguments
    r.   r   z-hide_bannerz	-loglevelr#   z-user_agentz<Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36z-headerszReferer: http://www.radio2m.ma/-iz-mapz0:v:0z0:a:0rF   rG   rH   aacrI   SAMPLE_RATErJ   CHANNELSrK   BITRATErV   r^   rW   mainrX   z3.1rY   
RESOLUTIONrZ   ASPECT_RATIOr[   MIN_BITRATEr\   MAX_BITRATEr]   r_   r`   ra   rb   rc   rd   re   z-fz	-hls_timez-hls_list_sizez-hls_delete_threshold1z
-hls_flagsZdelete_segmentsz-hls_start_number_sourcer   z	-strftimez-hls_segment_filenamezindex_%Y_%m_%d_%H_%M_%S.ts
index.m3u8)rD   hls_urlrM   rT   rO   r   AUDIO_CONFIGrn   VIDEO_CONFIGsegment_durationmax_segments)r,   rU   rE   r@   r>   cmdr)   r)   r*   _build_ffmpeg_command   s               	                   

r   <   )sessionr   r   c              
   C   sl  t   }t   | |k r:z| jr0ztd| j }| sptd| j d d| _d| _|   W W dS |d }| rt	|dT}|
 }d	| krtd
| j d d| _d| _|   W 5 Q R  W W dS W 5 Q R X W nH ttfk
r. } z$td| j d|  W Y W dS d}~X Y nX t| jj| jj }|d }	|	d }
|
 rt|	d}t|dkrdd |D }|rtdt| d d| _|   tt| j W dS td t d W q tk
r6 } z8td|  d| _dt| | _|   W Y dS d}~X Y qX qtd| d d| _d | d| _|   dS )!a  
    Monitor a capture session for initial success.
    
    Args:
        session (StreamSession): Session to monitor
        timeout (int): Timeout in seconds for initial success check
        
    Returns:
        bool: True if capture appears successful, False otherwise
    /proc/zFFmpeg process z has terminatedfailed&FFmpeg process terminated unexpectedlyFcmdlinerr   Process z is not FFmpegzInvalid process IDzError checking process r5   Nr.   rz   *.tsr   c                 S   s   g | ]}|  jd kr|qS )r   )statst_size).0sr)   r)   r*   
<listcomp>7  s      z$_monitor_capture.<locals>.<listcomp>z$Stream capture appears successful - z valid segments foundactiveTz!Found segments but they are empty   zError monitoring capture: zMonitoring error: z#Capture monitoring timed out after z secondszMonitoring timeout after )time
process_idr   existsr"   r<   status
last_errorsaveopenreadlowerr;   IOErrorr,   output_directoryr7   listgloblenr#   monitor_segmentsdelayrO   idwarningsleep	Exception)r   r   
start_time	proc_pathZcmdline_pathfr   rC   Z
output_dirr>   Zplaylist_filesegmentsZvalid_segmentsr)   r)   r*   _monitor_capture  sf     

r   T   )bindmax_retriesdefault_retry_delay)
channel_idvideo_config_idaudio_config_idc              
   C   s  zt d|  tjj|d}|r4tjj|dnd}|rJtjj|dnd}t  |jj	dd}|
 st d|j d t }|jdd	|jd
|j dt  dd t|||d}	|	rpt d|j  t }|jd|jt|	j|	j |
 r| nddd |
 r>t d|j d|  d nt d|j d dt|	j|j|	j dW S d|j d}
t |
 t }|jd|j|
t  dd t|
W n tjk
r   d| d}
t |
 t|
Y n tk
r } zt d|  | jj| jk rHt d | jjd!  d" | j|d#n>z.t }|jd|t|t  dd$d W n   Y nX  W 5 d}~X Y nX dS )%a2  
    Start stream capture for a specific channel.
    
    This task initiates the FFmpeg stream capture process and creates
    the necessary directory structure and configuration files.
    
    Args:
        channel_id (str): UUID of the channel to capture
        video_config_id (str, optional): UUID of video configuration to use
        audio_config_id (str, optional): UUID of audio configuration to use
        
    Returns:
        dict: Task result with session information
        
    Raises:
        Exception: If stream capture fails to start
    z*Starting stream capture task for channel: r   NT	is_activezChannel zC has no active jingle templates. Jingle detection will be disabled.system_alertZmissing_jingle_templatesz	Channel "zT" has no jingle templates. Please add jingle templates to enable ad break detection.)Z
alert_typechannel_namemessage	timestamptemplate_typecontext)r,   rU   rE   z0Stream capture started successfully for channel stream_startedr   )r   
session_idr   Zjingle_templates_countz%Jingle detection enabled for channel z with z
 templatesz&Jingle detection disabled for channel z - no templates found)successr   r   
started_atz"Stream capture failed for channel z after all retry attemptsstream_error)r   error_messager   Channel with ID 
 not foundzStream capture task failed: z&Retrying stream capture task (attempt    ))exc)r   r   r   Zretries_exhausted)r"   r#   r   objectsgetr   r   r+   jingle_templatesfilterr   r   rA   r   send_notificationr   now	isoformat_capture_with_retryrO   r   r   countr<   r   DoesNotExistrequestretriesr   retry)selfr   r   r   r,   rU   rE   r   notification_servicer   	error_msgrC   r)   r)   r*   start_stream_captureS  s    

 

	


	r   )r   c                 C   s  z^t jj| d}|j}td|j d}ttjj|dj	ddd}t
|d}d	}|D ]}|j}||krrq\d
}	d	}
d}|
|	k r|dkrzt|j}d	dl}d	dlm} d	dl}| }t| d d }||
d  |d	d d }tjj|||t||dd}W qW q~ tk
r } zdt|kr|
d7 }
|
|	k rb|d|
  W Y Pq~n&td| d|	 d|  W Y *qn td| d|  W Y qW 5 d}~X Y q~X q~|r\|d7 }td|  | jd7  _|jdgd q\|d	kr6td| d|   |jdkrRt j!| gd
d n|jdkrRt j!| gd
d | ||jd W S  t j"k
r   td!|  d" d#d$i Y S  tk
r } z*td%|  d|  d#t|i W Y S d}~X Y nX dS )&z
    Monitor and create database records for new HLS segments.
    
    Args:
        session_id (str): UUID of the stream session to monitor
    r   z/app/media/streams/z/hls)r   filenameT)flatr   r   r   N)r   i@B ii  i  )r   sequence_numberr   	file_pathdurationis_availablez.duplicate key value violates unique constraintr   g{Gz?z$Failed to create segment record for z after z
 retries: r5   zCreated HLS segment record: segments_processed)update_fieldszCreated z! new segment records for session )r   
processing)args	countdown)r   Znew_segmentstotal_segmentsSession z! not found for segment monitoringr<   Zsession_not_foundz&Error monitoring segments for session )#r   r   r   r,   r   r7   setr   r   values_listr   r   rA   floatr~   r   r   randomr   intr   randintcreaterO   r   r   r"   r<   r#   r   r   r   r   apply_asyncr   )r   r   r,   r>   Zexisting_segmentsZsegment_filesZnew_segments_createdZsegment_filer   r   retry_countZhls_segmentr   r   r   r   r   Ztimestamp_msr   rC   r)   r)   r*   r     s    





r   c              
   C   s  t d| j  tjj| ddgd }|rd}|jrtzd|j }tj	
|rzft| ddL}| }d	| krd
}t d|j d| j  |W  5 Q R  W W S W 5 Q R X W n ttfk
r   Y nX |st d|j d|j d d|_t |_d|_|  W nX tk
rp } z8t d|  d|_t |_d| |_|  W 5 d}~X Y nX n2t d|j d d|_t |_d|_|  tjj| ||dt d}	znt| ||}
t dd|
  tj|
tjtjd
dd
d}t |j!|	_d|	_|	  t d|j!  |	W S  tk
r } zDd|	_t ||	_t |	_|	  t d|  t"d| W 5 d}~X Y nX dS ) a  
    Start stream capture for a channel.
    
    Args:
        channel (Channel): Channel to capture
        video_config (VideoConfiguration, optional): Video encoding settings
        audio_config (AudioConfiguration, optional): Audio encoding settings
        
    Returns:
        StreamSession: Created session object
        
    Raises:
        RuntimeError: If capture fails to start
    z%Starting stream capture for channel: r   r   )r,   
status__inFr   z/cmdliner   r   TzFound existing active session z for channel zExisting session z has dead process (PID: z), marking as failedr   r   zError checking process status: zProcess check failed: Nz% has no process ID, marking as failedzNo process ID recordedpending)r,   rU   rE   r   r   zFFmpeg command:  r   )stdoutr%   r   bufsizeuniversal_newlinesz!Stream capture started with PID: z Failed to start stream capture: zStream capture failed: )#r"   r#   rA   r   r   r   firstr   osrB   r   r   r   r   r   r   r;   r   r   r   r   ended_atr   r   r   r<   r   r   joinr   PopenPIPErO   pidr$   )r,   rU   rE   Zexisting_sessionZprocess_runningr   r   r   rC   r   r   processr)   r)   r*   _start_captureD  s    
"





r   c              
   C   s  d}| j }| j}||k rzdtd|d  d|  t| ||}t|rN|W S |d7 }||k rztd| d t| W q t	k
r } zJt
d|d  d|  |d7 }||k rtd	| d t| W 5 d
}~X Y qX qt
d| d| j  d
S )az  
    Start stream capture with automatic retry logic.
    
    Args:
        channel (Channel): Channel to capture
        video_config (VideoConfiguration, optional): Video encoding settings
        audio_config (AudioConfiguration, optional): Audio encoding settings
        
    Returns:
        Optional[StreamSession]: Session if successful, None if all retries failed
    r   zStream capture attempt r   /zCapture failed, retrying in z seconds...zCapture attempt z	 failed: zRetrying in NzAll z capture attempts failed for )retry_attemptsretry_intervalr"   r#   r   r   r   r   r   r   r<   rA   )r,   rU   rE   r   r   r   r   rC   r)   r)   r*   r     s2    

r   c              
   C   s0  zt d|   tjj| d}t|}|rt d|   t }|jd|jj	t
|j| rjt
| nd|jdd dt
|j|jj	|jr|j nd	d
W S d|  }t | t|W nh tjk
r   d|  d}t | t|Y n6 tk
r* } zt d|   W 5 d	}~X Y nX d	S )z
    Stop an active stream capture session.
    
    Args:
        session_id (str): UUID of the stream session to stop
        
    Returns:
        dict: Task result with session information
    z%Stopping stream capture for session: r   z0Stream capture stopped successfully for session stream_stoppedUnknown)r   r   r   r   r   TN)r   r   r   r   z*Failed to stop stream capture for session zStream session with ID r   z!Stop stream capture task failed: )r"   r#   r   r   r   _stop_capturer   r   r,   rA   rO   r   r   r   r   r   r<   r   r   )r   r   r   r   r   rC   r)   r)   r*   stop_stream_capture  s<    


r  )r   r   c              
   C   s  t d| j  z<| jr.ztt| jtj t d| j  t	
d z<tt| jd tt| jtj t d| j  W n( tk
r   t d| j d Y nX W nb tk
r   t d| j d Y n< tk
r } zt d	| j d
|  W 5 d}~X Y nX t d| j  d| _t | _d| _|   W dS  tk
r } z*t d|  | d|  W Y dS d}~X Y nX dS )z
    Stop an active stream capture session.
    
    Args:
        session (StreamSession): Session to stop
        
    Returns:
        bool: True if successfully stopped, False otherwise
    z!Stopping stream capture session: zSent SIGTERM to process PID:    r   zForce killed process PID: zProcess PID z terminated gracefullyz not found (already terminated)zError terminating process r5   NzTerminated process PID: 	completedTz Failed to stop capture session: zStop failed: F)r"   r#   r   r   r   killr   signalSIGTERMr   r   SIGKILLProcessLookupErrorr   r   r   r   r   r   r   r<   	add_error)r   rC   r)   r)   r*   r  $  s6    

*
r  )
segment_idc              
   C   s   z<t d|   tjj| d}t|  dt|jddW S  tj	k
rp   d|  d}t 
| t|Y nf tk
r } zHt 
d|  z$tjj| d}|jd|  W n   Y nX  W 5 d	}~X Y nX d	S )
a]  
    Process a newly created HLS segment for jingle detection.
    
    This task is triggered when a new segment is detected and performs
    jingle detection and ad break analysis.
    
    Args:
        segment_id (str): UUID of the HLS segment to process
        
    Returns:
        dict: Processing results including any detections found
    zProcessing new segment: r   T)r   r  Zjingle_detection_triggeredzHLS segment with ID r   z Segment processing task failed: zSegment processing failed: N)r"   r#   r   r   r   extract_iframes_from_segmentr   rO   r   r   r<   r   r   r  )r  segmentr   rC   r)   r)   r*   process_new_segment[  s(    

r  c                  C   s`  z"t d tjjddgd} d}d}g }| D ]}z:|jrt |j }| dkr|j	dkrd|j
 d}|| t | |d	7 }W q0|jrhdd
l}z~|jdd|jgddd}|jdkrd|j d|j
 d}|| t | d|_t |_d|_|  |d	7 }W W q0W nF |jk
rf   d|j
 }|| t | |d	7 }Y W q0Y nX |d	7 }W q0 tk
r }	 z2d|j
 d|	 }|| t | |d	7 }W 5 d
}	~	X Y q0X q0|r|dkrt }
|
jd|||t  dd t d| d| d d|||dW S  tk
rZ }	 zt d|	   W 5 d
}	~	X Y nX d
S ) z
    Periodic task to check the health of active stream sessions.
    
    This task monitors active streams for issues and sends alerts
    if problems are detected.
    
    Returns:
        dict: Health check results
    zRunning stream health checkr   r   )r   r   i,  r   z  has no segments after 5 minutesr   NZpsz-pTr   )r   r   r   z for session r   r   zProcess terminated unexpectedlyz!Health check timeout for session zHealth check error for session r5   health_check)healthy_sessionsunhealthy_sessionsissuesr   r   zStream health check completed: z
 healthy, z
 unhealthy)r   r  r  r  zStream health check failed: )r"   debugr   r   r   r   r   r   total_secondsr   r   appendr   r   r   r    r!   r   r   r   r   r'   r   r<   r   r   r   r#   )active_sessionsZhealthy_countZunhealthy_countr  r   Ztime_since_startZissuer   r(   rC   r   r)   r)   r*   check_stream_health  s    















r  )r,   
keep_countr   c                 C   s  |dkr| j }zt|  }t|ddd d}d}t||kr|d|  }|D ]b}z$|  |d7 }td|  W qT t	k
r } zt
d	| d
|  W 5 d}~X Y qTX qT|dkrtd| d| j  |W S  tk
r } ztd|  W Y dS d}~X Y nX dS )z
    Clean up old segment files beyond the configured limit.
    
    Args:
        channel (Channel): Channel to clean up
        keep_count (int, optional): Number of segments to keep
        
    Returns:
        int: Number of segments deleted
    Nr   c                 S   s
   |   jS )N)r   st_mtime)xr)   r)   r*   <lambda>      z'_cleanup_old_segments.<locals>.<lambda>)keyr   r   zDeleted segment: zFailed to delete segment r5   Cleaned up z old segments for zError in cleanup task: )r   r   get_hls_pathsortedr   r   unlinkr"   r  r;   r   r#   rA   r   r<   )r,   r  r>   r   deleted_countZsegments_to_deleter  rC   r)   r)   r*   _cleanup_old_segments  s*    *r$     c                 C   sP  zt jj|d}|jj}tj|jsFt	
d|j  dddW S tj|jdkrvt	d|j  dd	dW S |jjd
d}| st	d|j d z.ttjd |j }|d }|jd
d
d W nb ttfk
r0 } z>t	
d|  ttjd |j }|d }|jd
d
d W 5 d}~X Y nX t|jj}|| d }	ddd|jddddddddt|	g}
ztj|
d
d
d d!}|jdkr*t	d"|j  || d# }ddd|jdd$dddd%t|g}tj|d
d
d d!}|jdkrt	
d&|j  dd'|j dW W S |}	t	d(|  W n4 tjk
r`   t	
d)|  dd*d Y W S X t|	 d+d,}t!t""|}g }|D ]}zttj|d-krddl#}|$|}|dk	r|j%d dkr|j%d. dkr|&| n
t'| n
t'| W nB t|j
fk
rB   zt'| W n tk
r<   Y nX Y nX qt	d/t(| d0|  |r| rt)*|| n|rt	d/t(| d1 d2t(|||jd3W S  t j+k
r   t	
d4|  dd5d Y S  t,k
rJ } zJt	
d6| d7|  | j-j.| j/k r(| j0d8|d9dt|d W Y S d}~X Y nX dS ):z
    Extract I-frames from an HLS segment for jingle detection.
    
    Args:
        segment_id (str): ID of the HLS segment to process
        
    Returns:
        dict: Task result with extracted frame paths
    r   zSegment file does not exist: r<   Zsegment_file_not_foundr   r<   r   zSegment file is empty: r   Zempty_segment_fileTr   z No jingle templates for channel z3, but extracting iframes for user to create jinglesr-   r0   r2   z#Failed to create output directory: streamsNz_%d.pngr   z-yro   z-vfzselect=gt(scene\,0.10)z-vsyncZvfrz
-frame_ptstruez-q:v2Z   r   z"Primary iframe extraction failed: z_iframe_%d.pngzselect=eq(pict_type\,I)3z Both extraction methods failed: zffmpeg_failed: z0Used alternative I-frame extraction for segment zFFmpeg timeout for segment Zffmpeg_timeoutz%d*i   r   z
Extracted z valid frames from segment z. iframes but no jingle templates for detectionr   )r   Zframes_extractedframe_pathsZsegment_processedzSegment not found: Zsegment_not_foundz&Error extracting iframes from segment r5   r   r   r   )1r   r   r   r   r,   r   rB   r   r   r"   r<   getsizer   r   r   r#   rA   r   r   r6   r7   r9   KeyErrorr;   
MEDIA_ROOTstemrO   r   r    r!   r%   r'   replacer!  r   cv2imreadshaper  remover   detect_jingles_in_framesr   r   r   r   r   r   r   )r   r  r  r,   r   r=   r?   rC   Z
video_nameZoutput_patternr   r(   Zalt_output_patternZalt_cmdZ
alt_resultZframe_patternZframe_filesZvalid_frames
frame_pathr4  imgr)   r)   r*   r    s           
     	
*r  r  )r  r-  c                 C   s  z6t jj|d}|jj}|jjdd}g }|D ]}|D ]}| sFq8t|j	|}	|j
p`tjd }
|	|
k r8tjj|j||d|	 |d|	d|
dd	}|| td
|j d|jdd t|j t }|jd|j|j|j|j dd q8q0|D ]*}zt| W q tk
r    Y qX qdt|t|dW S  tk
r } zDt d|  | j!j"| j#k rz| j$d|ddt%|d W Y S d}~X Y nX dS )z
    Detect jingles by comparing extracted frames with templates.
    
    Args:
        segment_id (str): ID of the HLS segment
        frame_paths (List[str]): List of extracted frame file paths
        
    Returns:
        dict: Detection results
    r   Tr   SIMILARITY_THRESHOLD      ?        Zopencv_comparison)Zsimilarity_scoreZdetection_methodZthreshold_used)r   r  templateconfidence_scorer9  frame_timestampmetadatazJingle detected: z (confidence: z.3fr   jingle_detected)r   Zjingle_name
confidencer   r   r   )r   Zdetections_countZframes_processedz#Error detecting jingles in frames: r   r.  r<   r&  N)&r   r   r   r   r,   r   r   image_exists_compare_images
image_pathsimilarity_thresholdr   JINGLE_CONFIGr   r   r  r"   r#   rA   r?  process_ad_breakr   r   r   r   detection_timer   r   r7  r;   r   r   r<   r   r   r   r   rO   )r   r  r-  r  r,   r   
detectionsr9  r>  Z
similarity	threshold	detectionr   rC   r)   r)   r*   r8    sd    

r8  )detection_idc              
   C   s  zht jj|d}|j}tjj|dd }|dkrftjj||jj	d||j
dd}td|j  n|j
|j  }tjd	 }tjd
 }||  kr|krn nr||_|j
|_t||_d|_|  td|j d|dd t }	|	jd|jj||j |j dd nJ||_|j
|_d|_d|_d|_d|_|  td|j d|dd dddW S  tk
r }
 zJtd| d|
  | jj | j!k r| j"d|
ddt#|
d W Y S d}
~
X Y nX dS )z
    Process a jingle detection to create or update ad breaks.
    
    Args:
        detection_id (str): ID of the jingle detection
        
    Returns:
        dict: Ad break processing result
    r   T)r   Zend_time__isnullNGlobalr   )r   r   regionstart_detectionr   r   zStarted new ad break: MIN_AD_BREAK_DURATIONMAX_AD_BREAK_DURATIONr  zCompleted ad break: z (duration: z.1fzs)Zad_break_detected)r   r   r   end_timer   zReset ad break: z (invalid duration: r   )r   Zad_break_processedz(Error processing ad break for detection r5   r   r.  r<   r&  )$r   r   r   r   r   r   r   r   r,   r7   rJ  r"   r#   r   r   r  r   rH  end_detectionrT  r   duration_secondsr   r   r   r   rA   r   rQ  r   r<   r   r   r   r   rO   )r   rN  rM  r   Zactive_adbreakZad_breakr   Zmin_durationZmax_durationr   rC   r)   r)   r*   rI    sd    



rI  )image_1_pathimage_2_pathr   c                 C   s  zt j| rt j|s8td|  d|  W dS t| }t|}|dks\|dkrxtd|  d|  W dS t|jd |jd t|jd |jd  }}|d	k s|d	k rtd
| d|  W dS t	|||f}t	|||f}t
|tj}t
|tj}	z"ddlm}
 |
||	}d| }W n tk
rJ   d}Y nX t|gdgddgddg}t|	gdgddgddg}t||tj}|jd |	jd kr|jd |	jd krt|	|tj}t|}nt||	tj}t|}d| }t|t|	t d }t|d d}|dk	rV|d |d  |d  |d  }n|d |d  |d  }tdtd|}|W S  tk
r } z&td|  d| d|  W Y dS d}~X Y nX dS )aA  
    Compare two images and return a similarity score using multiple methods.
    
    Args:
        image_1_path (str): Path to the first image (template)
        image_2_path (str): Path to the second image (frame)
        
    Returns:
        float: Similarity score (0.0 = identical, 1.0 = completely different)
    zImage files not found: z or r<  Nz&Failed to load images for comparison: z, r   r   2   z*Images too small for reliable comparison: r  )structural_similarity   r  g     o@g?g333333?g?g?r=  zError comparing images z and r5   )r   rB   r   r"   r   r4  r5  minr6  resizeZcvtColorZCOLOR_BGR2GRAYZskimage.metricsrZ  ImportErrorZcalcHistZcompareHistZHISTCMP_BHATTACHARYYAZmatchTemplateZTM_CCOEFF_NORMEDnpmaxZmeanZastyper   r   r<   )rW  rX  Zimage_1Zimage_2heightwidthZimage_1_resizedZimage_2_resizedZgray_1Zgray_2ZssimZ
ssim_scoreZ	ssim_diffZhist_1Zhist_2Z	hist_diffZtemplate_resultZtemplate_matchZtemplate_diffZmseZmse_normalizedZcombined_diffrC   r)   r)   r*   rE  G  sT    

.

,

"rE  c              
   C   s   z`t |  }|d }ddddddg}t|d}|d	| W 5 Q R X td
|  W dS  tk
r } ztd|  W Y dS d}~X Y nX dS )z
    Create a master HLS playlist file for a channel.
    
    Args:
        channel (Channel): Channel to create playlist for
        
    Returns:
        bool: True if playlist was created successfully
    zmaster.m3u8z#EXTM3Uz#EXT-X-VERSION:3z#EXT-X-INDEPENDENT-SEGMENTSzz#EXT-X-MEDIA:TYPE=CLOSED-CAPTIONS,GROUP-ID="CC",LANGUAGE="eng",NAME="english",DEFAULT=YES,AUTOSELECT=YES,INSTREAM-ID="CC1"zy#EXT-X-STREAM-INF:BANDWIDTH=3405600,RESOLUTION=1280x720,CODECS="avc1.4d401f,mp4a.40.2",FRAME-RATE=25,CLOSED-CAPTIONS="CC"rz   w
zCreated master playlist: Tz"Failed to create master playlist: FN)	r   r   r   writer   r"   r#   r   r<   )r,   r>   Zmaster_pathcontentr   rC   r)   r)   r*   _create_master_playlist  s"    
	rg  c            	      C   s@  zt d tjjdd} d}| D ]|}z4t|}||7 }|dkrZt d| d|j  W q$ tk
r } z$t 	d|j d|  W Y q$W 5 d	}~X Y q$X q$dd
l
m} t |dd }tjj|dd}| }|  t d| d| d d||dW S  tk
r: } zt 	d|   W 5 d	}~X Y nX d	S )z
    Periodic task to clean up old segment files and maintain disk space.
    
    This task removes old segment files beyond the configured retention
    limits for each channel.
    
    Returns:
        dict: Cleanup results
    zRunning segment cleanup taskTr   r   r  z segments for channel z'Failed to cleanup segments for channel r5   N)r	      )daysF)Zprocessed_at__ltr   zSegment cleanup completed: z files, z DB records)r   Zfiles_deletedZdb_records_deletedzSegment cleanup task failed: )r"   r  r   r   r   r$  r#   rA   r   r<   r   r	   r   r   r   r   delete)	rQ   Ztotal_deletedr,   r#  rC   r	   Zcutoff_timeZold_segmentsZdb_deleted_countr)   r)   r*   cleanup_old_segments  s:    
rk  )r   c              
   C   s   zLt d|   tjj| d}t|}|rBdt|j|jdW S t	dW nf tj
k
r   d|  d}t | t	|Y n4 t	k
r } zt d|   W 5 d	}~X Y nX d	S )
z
    Create HLS playlists for a channel.
    
    Args:
        channel_id (str): UUID of the channel
        
    Returns:
        dict: Task result
    z Creating playlists for channel: r   T)r   r   r   z Failed to create master playlistr   r   zPlaylist creation task failed: N)r"   r#   r   r   r   rg  rO   r   rA   r   r   r<   )r   r,   r   r   rC   r)   r)   r*   create_playlist_for_channel  s"    
rl  c               
   C   s   zt d tjjdd } tjjt  d }ddl	m
} |jjt  d }ddl	m} |jjt  d	 }t }|jd
d| |||t  dd d| |||dW S  tk
r } zt d|   W 5 d}~X Y nX dS )z
    Send periodic status updates about system operation.
    
    This task provides regular status reports about stream processing
    activity and system health.
    
    Returns:
        dict: Status report data
    z!Generating periodic status reportr   )r   )Zstarted_at__dater   )r   )Zdetection_time__date)r   )Zstart_time__dater   Zperiodic_report)Zstatus_typer  sessions_todaydetections_todayad_breaks_todayr   r   T)r   r  rm  rn  ro  zPeriodic status task failed: N)r"   r  r   r   r   r   r   r   dateapps.jingles.modelsr   r   r   r   r   r   r<   )r  Ztotal_sessions_todayr   rn  r   ro  r   rC   r)   r)   r*   send_periodic_status  sD    







rr  )NN)r   )NN)NN)NN)N)G__doc__r   r4  r   r   r  r   loggingr   numpyr_  pathlibr   typingr   r   r   r   r   r   r	   celeryr
   django.confr   django.utilsr   django.core.exceptionsr   Zapps.streams.modelsr   r   r   r   r   rq  r   r   r   r   Zapps.notifications.servicesr   	getLoggerr"   boolr+   rO   rD   rT   rn   r   r   r   r   r   r   r   r  r  r  r  r$  r  r8  rI  r   rE  rg  rk  rl  rr  r)   r)   r)   r*   <module>   s   
%%  ^P p  t  6771
c( SQV#
8%