wuhp commited on
Commit
81d389f
·
verified ·
1 Parent(s): 3fd9086

Update extensions/youtube.py

Browse files
Files changed (1) hide show
  1. extensions/youtube.py +95 -33
extensions/youtube.py CHANGED
@@ -1,7 +1,7 @@
1
  """
2
- YouTube Video Summarization Extension
3
  Uses Gemini 2.5 Flash with native YouTube URL support
4
- Enhanced with caching, metadata, and smart query handling
5
  """
6
 
7
  from base_extension import BaseExtension
@@ -30,6 +30,10 @@ class YouTubeExtension(BaseExtension):
30
  def icon(self) -> str:
31
  return "📺"
32
 
 
 
 
 
33
  def get_system_context(self) -> str:
34
  return """
35
  You have access to YouTube video analysis using Gemini's native video understanding.
@@ -61,9 +65,33 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
61
  def _get_default_state(self) -> Dict[str, Any]:
62
  return {
63
  "analyzed_videos": [],
64
- "cache": {}, # Cache for analysis results
65
  "cache_hits": 0,
66
- "cache_misses": 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  }
68
 
69
  def get_tools(self) -> List[types.Tool]:
@@ -111,10 +139,17 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
111
  }
112
  )
113
 
 
 
 
 
 
 
114
  return [types.Tool(function_declarations=[
115
  analyze_video,
116
  list_analyzed_videos,
117
- get_video_chapters
 
118
  ])]
119
 
120
  def _extract_video_id(self, url: str) -> Optional[str]:
@@ -133,7 +168,6 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
133
 
134
  def _generate_cache_key(self, video_id: str, query: str) -> str:
135
  """Generate a cache key based on video ID and query intent"""
136
- # Normalize query to catch similar intents
137
  query_lower = query.lower()
138
 
139
  # Categorize queries into types for better cache hits
@@ -146,7 +180,6 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
146
  elif any(word in query_lower for word in ['key point', 'main point', 'important']):
147
  query_type = "keypoints"
148
  else:
149
- # For specific questions, use a hash of the query
150
  query_type = hashlib.md5(query.encode()).hexdigest()[:8]
151
 
152
  return f"{video_id}_{query_type}"
@@ -178,18 +211,14 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
178
  "has_chapters": any(word in analysis.lower() for word in ['chapter', 'section', 'part']),
179
  }
180
 
181
- # Try to extract duration if mentioned
182
  duration_match = re.search(r'(\d+)\s*(?:minute|min|hour|hr)', analysis.lower())
183
  if duration_match:
184
  metadata["estimated_duration"] = duration_match.group(0)
185
 
186
  return metadata
187
 
188
- def handle_tool_call(self, user_id: str, tool_name: str, args: Dict[str, Any]) -> Any:
189
- # Ensure state is initialized (user_id is actually the API key in this app)
190
- if user_id not in self.state:
191
- self.initialize_state(user_id)
192
-
193
  state = self.get_state(user_id)
194
 
195
  if tool_name == "analyze_youtube_video":
@@ -197,7 +226,6 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
197
  query = args.get("query", "Provide a detailed transcript of this video with timestamps")
198
  force_refresh = args.get("force_refresh", False)
199
 
200
- # Validate and extract video ID
201
  video_id = self._extract_video_id(video_url)
202
  if not video_id:
203
  return {
@@ -205,7 +233,7 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
205
  "error": "Invalid YouTube URL. Please provide a valid youtube.com or youtu.be URL."
206
  }
207
 
208
- # Check cache first (unless force_refresh)
209
  cache_key = self._generate_cache_key(video_id, query)
210
 
211
  if not force_refresh and cache_key in state.get("cache", {}):
@@ -221,22 +249,19 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
221
  # Cache miss - perform analysis
222
  print(f"🔄 Cache MISS for {video_id} ({cache_key})")
223
  state["cache_misses"] = state.get("cache_misses", 0) + 1
 
224
 
225
  try:
226
  print(f"🎬 Analyzing YouTube video with Gemini: {video_url}")
227
 
228
- # Enhance query for better results
229
  enhanced_query = self._enhance_query(query)
230
  print(f"📝 Enhanced query: {enhanced_query[:100]}...")
231
 
232
- # Import genai client
233
  from google import genai
234
  from google.genai import types as genai_types
235
 
236
- # Create Gemini client (user_id is the API key in this app's architecture)
237
  client = genai.Client(api_key=user_id)
238
 
239
- # Create the content with YouTube URL
240
  contents = [
241
  genai_types.Part(
242
  file_data=genai_types.FileData(
@@ -246,7 +271,6 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
246
  genai_types.Part(text=enhanced_query)
247
  ]
248
 
249
- # Configure with thinking and grounding
250
  config = genai_types.GenerateContentConfig(
251
  system_instruction="You are a YouTube video analyst. Extract transcripts with timestamps when requested. Provide detailed, accurate analysis of video content. Format timestamps as [MM:SS].",
252
  temperature=0.7,
@@ -259,14 +283,12 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
259
 
260
  print(f"🤖 Calling Gemini 2.5 Flash with YouTube URL...")
261
 
262
- # Generate response
263
  response = client.models.generate_content(
264
  model="gemini-2.5-flash",
265
  contents=contents,
266
  config=config
267
  )
268
 
269
- # Extract response text
270
  result_text = ""
271
  if response.candidates and response.candidates[0].content:
272
  for part in response.candidates[0].content.parts:
@@ -275,7 +297,6 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
275
 
276
  print(f"✅ Gemini analysis complete: {len(result_text)} characters")
277
 
278
- # Extract metadata
279
  metadata = self._extract_metadata_from_analysis(result_text, video_id)
280
 
281
  # Track analyzed video
@@ -287,12 +308,10 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
287
  "analysis_length": len(result_text)
288
  }
289
 
290
- # Update or append video entry
291
  existing_videos = [v for v in state.get("analyzed_videos", []) if v["video_id"] != video_id]
292
  existing_videos.append(video_entry)
293
- state["analyzed_videos"] = existing_videos[-20:] # Keep last 20
294
 
295
- # Build result
296
  result = {
297
  "success": True,
298
  "video_url": video_url,
@@ -312,19 +331,31 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
312
 
313
  # Limit cache size (keep last 50 entries)
314
  if len(state["cache"]) > 50:
315
- # Remove oldest entries
316
  cache_keys = list(state["cache"].keys())
317
  for old_key in cache_keys[:len(cache_keys)-50]:
318
  del state["cache"][old_key]
319
 
320
  self.update_state(user_id, state)
321
 
 
 
 
 
 
 
 
322
  return result
323
 
324
  except Exception as e:
325
  import traceback
326
  error_details = traceback.format_exc()
327
  print(f"❌ YouTube analysis error: {error_details}")
 
 
 
 
 
 
328
  return {
329
  "success": False,
330
  "error": f"Error analyzing video: {str(e)}",
@@ -340,7 +371,7 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
340
 
341
  return {
342
  "count": len(videos),
343
- "videos": videos[-10:], # Last 10 videos
344
  "cache_stats": {
345
  "hits": cache_hits,
346
  "misses": cache_misses,
@@ -352,24 +383,55 @@ Note: This uses Gemini 2.5 Flash with thinking for high-quality analysis.
352
  elif tool_name == "get_video_chapters":
353
  video_url = args["video_url"]
354
 
355
- # Use the analyze function with chapter-specific query
356
  chapter_query = "Break down this video into main chapters/sections with timestamps. For each chapter provide: timestamp, title, and brief description of what's covered."
357
 
358
- return self.handle_tool_call(user_id, "analyze_youtube_video", {
359
  "video_url": video_url,
360
  "query": chapter_query
361
  })
362
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
363
  return {"error": f"Unknown tool: {tool_name}"}
364
 
365
  def on_enable(self, user_id: str) -> str:
366
  self.initialize_state(user_id)
367
- return "📺 YouTube Summarizer enabled! Share a YouTube URL and I'll analyze it using Gemini's native video understanding. Analysis results are cached for faster follow-up questions. No additional API key needed!"
368
 
369
  def on_disable(self, user_id: str) -> str:
370
- # Optionally clear cache on disable to save memory
371
  state = self.get_state(user_id)
372
  cache_size = len(state.get("cache", {}))
373
  state["cache"] = {}
374
  self.update_state(user_id, state)
375
- return f"📺 YouTube Summarizer disabled. Cleared {cache_size} cached analyses."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ Enhanced YouTube Video Summarization Extension
3
  Uses Gemini 2.5 Flash with native YouTube URL support
4
+ Now with better caching, state management, and orchestrator integration
5
  """
6
 
7
  from base_extension import BaseExtension
 
30
  def icon(self) -> str:
31
  return "📺"
32
 
33
+ @property
34
+ def version(self) -> str:
35
+ return "2.0.0"
36
+
37
  def get_system_context(self) -> str:
38
  return """
39
  You have access to YouTube video analysis using Gemini's native video understanding.
 
65
  def _get_default_state(self) -> Dict[str, Any]:
66
  return {
67
  "analyzed_videos": [],
68
+ "cache": {},
69
  "cache_hits": 0,
70
+ "cache_misses": 0,
71
+ "total_analyses": 0,
72
+ "created_at": datetime.datetime.now().isoformat(),
73
+ "last_updated": datetime.datetime.now().isoformat()
74
+ }
75
+
76
+ def get_state_summary(self, user_id: str) -> Optional[str]:
77
+ """Provide state summary for system prompt"""
78
+ state = self.get_state(user_id)
79
+ video_count = len(state.get("analyzed_videos", []))
80
+ if video_count > 0:
81
+ return f"{video_count} videos analyzed"
82
+ return None
83
+
84
+ def get_metrics(self, user_id: str) -> Dict[str, Any]:
85
+ """Provide usage metrics"""
86
+ state = self.get_state(user_id)
87
+ total_requests = state.get("cache_hits", 0) + state.get("cache_misses", 0)
88
+ hit_rate = (state.get("cache_hits", 0) / total_requests * 100) if total_requests > 0 else 0
89
+
90
+ return {
91
+ "total_analyses": state.get("total_analyses", 0),
92
+ "videos_analyzed": len(state.get("analyzed_videos", [])),
93
+ "cache_hit_rate": f"{hit_rate:.1f}%",
94
+ "cached_items": len(state.get("cache", {}))
95
  }
96
 
97
  def get_tools(self) -> List[types.Tool]:
 
139
  }
140
  )
141
 
142
+ clear_cache = types.FunctionDeclaration(
143
+ name="clear_youtube_cache",
144
+ description="Clear the YouTube analysis cache to free up memory",
145
+ parameters={"type": "object", "properties": {}}
146
+ )
147
+
148
  return [types.Tool(function_declarations=[
149
  analyze_video,
150
  list_analyzed_videos,
151
+ get_video_chapters,
152
+ clear_cache
153
  ])]
154
 
155
  def _extract_video_id(self, url: str) -> Optional[str]:
 
168
 
169
  def _generate_cache_key(self, video_id: str, query: str) -> str:
170
  """Generate a cache key based on video ID and query intent"""
 
171
  query_lower = query.lower()
172
 
173
  # Categorize queries into types for better cache hits
 
180
  elif any(word in query_lower for word in ['key point', 'main point', 'important']):
181
  query_type = "keypoints"
182
  else:
 
183
  query_type = hashlib.md5(query.encode()).hexdigest()[:8]
184
 
185
  return f"{video_id}_{query_type}"
 
211
  "has_chapters": any(word in analysis.lower() for word in ['chapter', 'section', 'part']),
212
  }
213
 
 
214
  duration_match = re.search(r'(\d+)\s*(?:minute|min|hour|hr)', analysis.lower())
215
  if duration_match:
216
  metadata["estimated_duration"] = duration_match.group(0)
217
 
218
  return metadata
219
 
220
+ def _execute_tool(self, user_id: str, tool_name: str, args: Dict[str, Any]) -> Any:
221
+ """Execute tool logic"""
 
 
 
222
  state = self.get_state(user_id)
223
 
224
  if tool_name == "analyze_youtube_video":
 
226
  query = args.get("query", "Provide a detailed transcript of this video with timestamps")
227
  force_refresh = args.get("force_refresh", False)
228
 
 
229
  video_id = self._extract_video_id(video_url)
230
  if not video_id:
231
  return {
 
233
  "error": "Invalid YouTube URL. Please provide a valid youtube.com or youtu.be URL."
234
  }
235
 
236
+ # Check cache
237
  cache_key = self._generate_cache_key(video_id, query)
238
 
239
  if not force_refresh and cache_key in state.get("cache", {}):
 
249
  # Cache miss - perform analysis
250
  print(f"🔄 Cache MISS for {video_id} ({cache_key})")
251
  state["cache_misses"] = state.get("cache_misses", 0) + 1
252
+ state["total_analyses"] = state.get("total_analyses", 0) + 1
253
 
254
  try:
255
  print(f"🎬 Analyzing YouTube video with Gemini: {video_url}")
256
 
 
257
  enhanced_query = self._enhance_query(query)
258
  print(f"📝 Enhanced query: {enhanced_query[:100]}...")
259
 
 
260
  from google import genai
261
  from google.genai import types as genai_types
262
 
 
263
  client = genai.Client(api_key=user_id)
264
 
 
265
  contents = [
266
  genai_types.Part(
267
  file_data=genai_types.FileData(
 
271
  genai_types.Part(text=enhanced_query)
272
  ]
273
 
 
274
  config = genai_types.GenerateContentConfig(
275
  system_instruction="You are a YouTube video analyst. Extract transcripts with timestamps when requested. Provide detailed, accurate analysis of video content. Format timestamps as [MM:SS].",
276
  temperature=0.7,
 
283
 
284
  print(f"🤖 Calling Gemini 2.5 Flash with YouTube URL...")
285
 
 
286
  response = client.models.generate_content(
287
  model="gemini-2.5-flash",
288
  contents=contents,
289
  config=config
290
  )
291
 
 
292
  result_text = ""
293
  if response.candidates and response.candidates[0].content:
294
  for part in response.candidates[0].content.parts:
 
297
 
298
  print(f"✅ Gemini analysis complete: {len(result_text)} characters")
299
 
 
300
  metadata = self._extract_metadata_from_analysis(result_text, video_id)
301
 
302
  # Track analyzed video
 
308
  "analysis_length": len(result_text)
309
  }
310
 
 
311
  existing_videos = [v for v in state.get("analyzed_videos", []) if v["video_id"] != video_id]
312
  existing_videos.append(video_entry)
313
+ state["analyzed_videos"] = existing_videos[-20:]
314
 
 
315
  result = {
316
  "success": True,
317
  "video_url": video_url,
 
331
 
332
  # Limit cache size (keep last 50 entries)
333
  if len(state["cache"]) > 50:
 
334
  cache_keys = list(state["cache"].keys())
335
  for old_key in cache_keys[:len(cache_keys)-50]:
336
  del state["cache"][old_key]
337
 
338
  self.update_state(user_id, state)
339
 
340
+ # Log activity
341
+ self.log_activity(user_id, "video_analyzed", {
342
+ "video_id": video_id,
343
+ "query_type": cache_key.split('_', 1)[1],
344
+ "analysis_length": len(result_text)
345
+ })
346
+
347
  return result
348
 
349
  except Exception as e:
350
  import traceback
351
  error_details = traceback.format_exc()
352
  print(f"❌ YouTube analysis error: {error_details}")
353
+
354
+ self.log_activity(user_id, "analysis_failed", {
355
+ "video_id": video_id,
356
+ "error": str(e)
357
+ })
358
+
359
  return {
360
  "success": False,
361
  "error": f"Error analyzing video: {str(e)}",
 
371
 
372
  return {
373
  "count": len(videos),
374
+ "videos": videos[-10:],
375
  "cache_stats": {
376
  "hits": cache_hits,
377
  "misses": cache_misses,
 
383
  elif tool_name == "get_video_chapters":
384
  video_url = args["video_url"]
385
 
 
386
  chapter_query = "Break down this video into main chapters/sections with timestamps. For each chapter provide: timestamp, title, and brief description of what's covered."
387
 
388
+ return self._execute_tool(user_id, "analyze_youtube_video", {
389
  "video_url": video_url,
390
  "query": chapter_query
391
  })
392
 
393
+ elif tool_name == "clear_youtube_cache":
394
+ cache_size = len(state.get("cache", {}))
395
+ state["cache"] = {}
396
+ state["cache_hits"] = 0
397
+ state["cache_misses"] = 0
398
+ self.update_state(user_id, state)
399
+
400
+ self.log_activity(user_id, "cache_cleared", {"items_cleared": cache_size})
401
+
402
+ return {
403
+ "success": True,
404
+ "message": f"Cleared {cache_size} cached analyses",
405
+ "items_cleared": cache_size
406
+ }
407
+
408
  return {"error": f"Unknown tool: {tool_name}"}
409
 
410
  def on_enable(self, user_id: str) -> str:
411
  self.initialize_state(user_id)
412
+ return "📺 YouTube Summarizer enabled! Share a YouTube URL and I'll analyze it using Gemini's native video understanding. Analysis results are cached for faster follow-up questions!"
413
 
414
  def on_disable(self, user_id: str) -> str:
 
415
  state = self.get_state(user_id)
416
  cache_size = len(state.get("cache", {}))
417
  state["cache"] = {}
418
  self.update_state(user_id, state)
419
+ return f"📺 YouTube Summarizer disabled. Cleared {cache_size} cached analyses."
420
+
421
+ def health_check(self, user_id: str) -> Dict[str, Any]:
422
+ """Check extension health"""
423
+ state = self.get_state(user_id)
424
+ issues = []
425
+
426
+ # Check cache size
427
+ cache_size = len(state.get("cache", {}))
428
+ if cache_size > 100:
429
+ issues.append(f"Large cache size ({cache_size} items) - consider clearing")
430
+
431
+ return {
432
+ "healthy": len(issues) == 0,
433
+ "extension": self.name,
434
+ "version": self.version,
435
+ "cache_size": cache_size,
436
+ "issues": issues if issues else None
437
+ }