把听播客的数小时转换成即时的深刻见解
作为一个对健康或商业主题播客情有独钟的爱好者,我常常感到有太多的节目要听,而时间却远远不够。像 Huberman Lab 这样的播客,一期节目通常能长达三四个小时。问题不仅是找出时间来听,更多时候,我对谈话中的某些特定部分感兴趣。正因如此,我利用 OpenAI 模型、LangChain 和 Streamlit 开发了一个简洁的应用程序。它能让我粘贴一个 YouTube 播客链接(比如 Huberman Lab 或 The Diary of a CEO),迅速为我提供节目摘要。之后,我还可以针对性地提问,得到立刻的答案。这样一来,我就能直接获取我所需的信息,无需观看整期节目。
认识 RAG
在深入研究代码之前,让我们先简单了解一下 RAG。它是一种结合了像 GPT-4 这类模型的文本生成能力和信息检索功能的方法,能够提供精确、与上下文相关的信息。大语言模型有时候可能无法接触到我们需要的特定数据。RAG 允许我们输入相关数据(比如播客的文字记录),这样模型就能够给出切题的答案。
其过程很简单:我们载入数据(比如播客的文字记录),将其分割成小片段,进行嵌入处理,然后存储在向量数据库中。
当我们提出查询时,模型会将我们的问题与这些数据片段进行匹配。挑选出最相关的文本,将其连同问题一起输入大语言模型,从而生成准确的答案。简而言之,RAG 通过检索相关信息并利用这些信息来回答我们的问题。
总的来说,这就是 RAG 的工作原理。它找到与你的查询相关的信息片段,然后结合这些信息生成答案。
应用程序的工作原理
这款应用程序非常容易上手:只需将 YouTube 播客的链接粘贴到 Streamlit 界面,输入你的 OpenAI 密钥,转瞬间,你就能得到一个播客摘要。接下来,你可以提出具体的问题,比如“推荐了哪些书?”或者“提到了哪些最佳睡眠建议?”。
代码主要分为三个部分:从 YouTube 获取数据、生成摘要与回答以及 Streamlit 前端显示。还有一个可选的第四部分,用于更高级的 RAG 应用,不过基础版的功能已经非常出色了。让我们来深入了解代码吧!
第一部分:与 YouTube 的互动
首先,我们的应用会连接到 YouTube,提取视频的标题和字幕 —— 这是理解播客内容的关键所在。
功能 1:获取视频标题
这个功能利用视频的 URL 来获取 YouTube 视频的标题。它发送网络请求,解析 HTML 以找到标题标签,并返回标题。这是我们了解每个视频内容的起点。
python复制代码import requests
from bs4 import BeautifulSoup
def get_youtube_video_title(video_url):
response = requests.get(video_url)
soup = BeautifulSoup(response.content, 'html.parser')
title = soup.find('meta', property='og:title')
return title['content'] if title else "Title not found"
功能 2:提取字幕
紧接着,我们会获取视频的字幕。我们使用获取标题的功能,并借助 LangChain 中的 YoutubeLoader 来加载字幕。这些文字记录将被输入我们的 RAG 系统,以生成相关的回答。
ini复制代码from langchain.document_loaders import YoutubeLoader
from langchain.schema import Document
def fetch_youtube_captions(video_url):
title = get_youtube_video_title(video_url)
loader = YoutubeLoader.from_youtube_url(video_url)
docs = loader.load()
if docs and len(docs) > 0:
intro_sentence = "This is the title of the video/transcription/conversation: "
title_content = intro_sentence + title
docs[0] = Document(page_content=title_content + "nn" + docs[0].page_content)
return docs
第二部分:数据处理与 AI 整合
核心应用逻辑:将对话转换为数据
这里是“变魔术”的地方。我们将播客内容分解成易于消化的小块,转换成适合 AI 处理的格式,并进行存储,以便我们随时检索和生成摘要。
环境搭建和全局变量设置
我们首先搭建编程环境并定义一些全局变量。其中包括初始化一个数据库和对话内容的存储系统。
python复制代码from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from sklearn.cluster import KMeans
from langchain.chat_models import ChatOpenAI
from langchain.chains.summarize import load_summarize_chain
from langchain.schema import Document
import numpy as np
from langchain.chains import ConversationalRetrievalChain
import logging
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
logging.basicConfig(level=logging.INFO)
# Initialize global variables
global_chromadb = None
global_documents = None
global_short_documents = None
数据管理函数
这些函数对于管理应用数据至关重要。reset_globals 用于重置全局变量,init_chromadb 则用于用处理过的数据初始化数据库。
ini复制代码# Initialize the memory outside the function so it persists across different calls
conversation_memory = ConversationBufferMemory(
memory_key="chat_history",
max_len=50,
input_key="question",
output_key="answer",
return_messages=True,
)
# Function to reset global variables
def reset_globals():
global global_chromadb, global_documents, global_short_documents
global_chromadb = None
global_documents = None
global_short_documents = None
# Reset the conversation memory
if conversation_memory:
conversation_memory.clear()
def init_chromadb(openai_api_key):
global global_chromadb, global_short_documents
if global_chromadb is None and global_short_documents is not None:
global_chromadb = Chroma.from_documents(documents=global_short_documents, embedding=OpenAIEmbeddings(openai_api_key=openai_api_key))
处理字幕并生成摘要
process_and_cluster_captions: 这里,我们对 YouTube 字幕进行加工,为分析和生成回答做好准备。具体步骤包括:
-
初步数据检查: 我们先检查字幕的格式,确保它们适合后续处理。
-
切分字幕: 字幕被分割成更小的段落,分别用于制作摘要和回答问题。这样的切分对高效完成具体任务非常关键。
-
聚类归纳相关内容: 我们运用 KMeans 算法对制作摘要的段落进行聚类。这样做可以筛选出重复内容,仅保留播客中最有代表性的部分。从每个类别中选取一个段落,我们确保 AI 能接收到既多样化又精炼的信息,非常适合创建有意义的摘要。
-
全局存储: 分割好的字幕被全局存储,方便之后制作摘要和回答问题时随时访问。
def process_and_cluster_captions(captions, openai_api_key, num_clusters=12): global global_documents, global_short_documents logging.info(“正在处理和聚类字幕”)
ini复制代码# 记录字幕开头的500个字符,检查其格式 logging.info(f"接收到的字幕(前500个字符): {captions[0].page_content[:500]}") caption_content = captions[0].page_content # 确认字幕是字符串格式,以便处理 if not isinstance(caption_content, str): logging.error("字幕格式与预期不符") return [] # 为摘要创建较长的文本块 summary_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0, separators=["nn", "n", " ", ""]) summary_docs = summary_splitter.create_documents([caption_content]) # 为问答创建较短的文本块 qa_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=0, separators=["nn", "n", " ", ""]) qa_docs = qa_splitter.create_documents([caption_content]) # 处理用于摘要的文本 summary_embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key).embed_documents([x.page_content for x in summary_docs]) kmeans = KMeans(n_clusters=num_clusters, random_state=42).fit(summary_embeddings) closest_indices = [np.argmin(np.linalg.norm(summary_embeddings - center, axis=1)) for center in kmeans.cluster_centers_] representative_docs = [summary_docs[i] for i in closest_indices] # 全局存储文档 global_documents = summary_docs # 存储用于制作摘要的文档 global_short_documents = qa_docs # 存储用于问答的文档 init_chromadb(openai_api_key) # 利用较长的文本块初始化数据库 return representative_docs
generate_summary: 该函数使用处理过的字幕,借助 OpenAI 的 AI 模型生成视频的清晰且简洁的摘要。具体如下:
-
整合关键段落: 我们将聚类过程中挑选出的段落结合起来,形成连贯的叙述。这确保了摘要能够覆盖播客的不同层面。
-
AI 导向的摘要制作: 利用定制的提示语,我们指导 AI 制作一个简明且包含丰富信息的摘要。这一步骤对于引导 AI 聚焦于总体主题和重点内容、避免无关细节非常关键。
-
执行摘要链: AI 处理这些结合在一起的文本,生成摘要,让用户迅速把握播客的核心内容。
这种方法确保所生成的摘要既全面又精准,捕捉了播客的精华所在。
ini复制代码def generate_summary(representative_docs, openai_api_key, model_name):
logging.info("正在生成摘要")
llm4 = ChatOpenAI(model_name=model_name, temperature=0.2, openai_api_key=openai_api_key)
# 汇总文本以准备生成摘要
summary_text = "n".join([doc.page_content for doc in representative_docs])
summary_prompt_template = PromptTemplate(
template=(
"根据下面提供的文本,创建一个播客对话的简洁摘要。文本包含从对话不同部分挑选出的、具有代表性的节选。"
"你的任务是将这些节选综合成一个连贯且简洁的摘要。专注于播客中讨论的总体主题和主要观点。"
"摘要应该清晰完整地传达对话的关键话题和洞见,同时省略任何不必要的细节。它应该是引人入胜且易于阅读的,理想情况下为一到两段。尽可能保持简短"
"nn选定的播客节选:n{text}nn摘要:"
),
input_variables=["text"]
)
# 加载摘要链
summarize_chain = load_summarize_chain(llm=llm4, chain_type="stuff", prompt=summary_prompt_template)
# 执行摘要链
summary = summarize_chain.run([Document(page_content=summary_text)])
logging.info("摘要生成完成")
return summary
解答用户问题
answer_question: 这是我们 RAG(检索增强生成)系统的核心功能。此功能利用已处理的数据回答用户的特定问题,充分展现了结合检索的生成技术的能力。
-
数据库初始化: 它首先确认存储已处理段落的数据库是否准备就绪。如果尚未初始化,系统将配置数据库,确保能够高效地进行信息检索。
-
信息检索与回答生成: 接着,系统将用户的提问与从数据库中检索到的相关信息结合,进行智能匹配。这种精准检索确保了 AI 提供的答案既准确又切题。
-
生成具有情境感知的答案: AI 在掌握了问题和最匹配的上下文后,会生成一个简明直接的答案来回应用户的问题。
通过这一过程,用户能得到针对性的、信息丰富的答复,这大大提升了他们与播客内容的互动体验。
ini复制代码def answer_question(question, openai_api_key, model_name):
llm4 = ChatOpenAI(model_name=model_name, temperature=0, openai_api_key=openai_api_key)
global global_chromadb, global_short_documents
if global_chromadb is None and global_short_documents is not None:
init_chromadb(openai_api_key, documents=global_short_documents)
logging.info(f"Answering question: {question}")
chatTemplate = """
You are an AI assistant tasked with answering questions based on context from a podcast conversation. Use the provided context and relevant chat messages to answer. If unsure, say so. Keep your answer to three sentences or less, focusing on the most relevant information.
Chat Messages (if relevant): {chat_history}
Question: {question}
Context from Podcast: {context}
Answer:"""
QA_CHAIN_PROMPT = PromptTemplate(input_variables=["context", "question", "chat_history"],template=chatTemplate)
qa_chain = ConversationalRetrievalChain.from_llm(
llm=llm4,
chain_type="stuff",
retriever=global_chromadb.as_retriever(search_type="mmr", search_kwargs={"k":12}),
memory=conversation_memory,
#return_source_documents=True,
combine_docs_chain_kwargs={'prompt': QA_CHAIN_PROMPT},
)
# Log the current chat history
current_chat_history = conversation_memory.load_memory_variables({})
logging.info(f"Current Chat History: {current_chat_history}")
response = qa_chain({"question": question})
logging.info(f"this is the result: {response}")
output = response['answer']
return output
第 3 部分:Streamlit 用户界面
激活应用的灵魂
最终,我们采用 Streamlit 来打造一个互动性强的网络应用,它整合了所有的功能。用户可以输入 YouTube 视频链接,提出问题,并获得 AI 生成的摘要和答案。以下代码是我设计的界面版本,你可以直接采用或根据个人喜好进行调整。
python复制代码import streamlit as st
from youtuber import fetch_youtube_captions
from agent import process_and_cluster_captions, generate_summary, answer_question, reset_globals
# Set Streamlit page configuration with custom tab title
st.set_page_config(page_title="🏄GPTpod", page_icon="🏄", layout="wide")
def user_query(question, openai_api_key, model_name):
"""Process and display the query response."""
# Add the user's question to the conversation
st.session_state.conversation.append((f"{question}", "user-message"))
# Check if this query has been processed before
if question not in st.session_state.processed_questions:
# Process the query
answer = answer_question(question, openai_api_key, model_name)
if isinstance(answer, str):
st.session_state.conversation.append((f"{answer}", "grimoire-message"))
else:
st.session_state.conversation.append(("Could not find a proper answer.", "grimoire-message"))
st.rerun()
# Mark this question as processed
st.session_state.processed_questions.add(question)
# Initialize session state
if 'conversation' not in st.session_state:
st.session_state.conversation = []
st.session_state.asked_questions = set()
st.session_state.processed_questions = set()
# Sidebar for input and operations
with st.sidebar:
st.title("GPT Podcast Surfer🌊🏄🏼")
st.image("img.png")
# Expandable Instructions
with st.expander("🔍 How to use:", expanded=False):
st.markdown("""
- 🔐 **Enter your OpenAI API Key.**
- 📺 **Paste a YouTube URL.**
- 🏃♂️ **Click 'Run it' to process.**
- 🕵️♂️ **Ask questions in the chat.**
""")
# Model selection in the sidebar
model_choice = st.sidebar.selectbox("Choose Model:",
("GPT-4 Turbo", "GPT-3.5 Turbo"),
index=0) # Default to GPT-4 Turbo
# Map friendly names to actual model names
model_name_mapping = {
"GPT-4 Turbo": "gpt-4-1106-preview",
"GPT-3.5 Turbo": "gpt-3.5-turbo"
}
selected_model = model_name_mapping[model_choice]
st.session_state['selected_model'] = model_name_mapping[model_choice]
# Input for OpenAI API Key
openai_api_key = st.text_input("Enter your OpenAI API Key:", type="password")
# Save the API key in session state if it's entered
if openai_api_key:
st.session_state['openai_api_key'] = openai_api_key
youtube_url = st.text_input("Enter YouTube URL:")
# Button to trigger processing
if st.button("🚀Run it"):
if openai_api_key:
if youtube_url and 'processed_data' not in st.session_state:
reset_globals()
with st.spinner('👩🍳 GPT is cooking up your podcast... hang tight for a few secs🍳'):
captions = fetch_youtube_captions(youtube_url)
if captions:
representative_docs = process_and_cluster_captions(captions, st.session_state['openai_api_key'])
summary = generate_summary(representative_docs, st.session_state['openai_api_key'], selected_model)
st.session_state.processed_data = (representative_docs, summary)
if 'summary_displayed' not in st.session_state:
st.session_state.conversation.append((f"Here's a rundown of the conversation: {summary}", "summary-message"))
guiding_message = "Feel free to ask me anything else about it! :)"
st.session_state.conversation.append((guiding_message, "grimoire-message"))
st.session_state['summary_displayed'] = True
else:
st.error("Failed to fetch captions.")
else:
st.warning("Please add the OpenAI API key first.")
# Main app logic
for message, css_class in st.session_state.conversation:
role = "assistant" if css_class in ["grimoire-message", "summary-message", "suggestion-message"] else "user"
with st.chat_message(role):
st.markdown(message)
# Chat input field
if prompt := st.chat_input("Ask me anything about the podcast..."):
user_query(prompt, st.session_state.get('openai_api_key', ''), st.session_state.get('selected_model', 'gpt-4-1106-preview'))
这是用户界面的样子:
第 4 部分:增强结果的可选 RAG-Fusion 技术
RAG-Fusion 技术简介
RAG-Fusion 是我为了提升系统而加入的一个试验性功能。尽管这不是必须的,但它能在增加少量代码和稍微牺牲速度的基础上,提供更优的结果。其核心思想在于提高 AI 理解问题和提高回答准确度的能力。
为什么选择 RAG-Fusion?
-
弥补不足: 它通过生成和重新评估用户问题的多个版本,弥补了标准 RAG 的某些局限,确保更广泛和精准的搜索。
-
更优的搜索结果: 它结合了互惠排名融合技术和自定义向量评分方法,得到更全面和精确的答案。
RAG-Fusion 力图不只是解读用户所问的问题,还要洞悉用户真正意图提问的内容,深入挖掘那些常被忽略的深层见解。
RAG-Fusion 的工作流程
-
转换查询: 我们首先用语言模型将原始用户查询转换成几个相似但各有不同的问题。这种多角度的方法对于全面搜索非常关键。
-
增强的向量搜索: 这些新生成的查询将进行向量搜索,聚集多元化的结果。
-
智能重排: 通过互惠排名融合,我们对所有这些结果进行重组,把最相关的结果放在首位。
-
定制最终输出: 结合顶尖结果和新生成的查询,引导语言模型生成一个基于更广泛上下文的回答。
RAG-Fusion 的核心功能
-
reciprocal_rank_fusion: 这个功能会根据相关性分数重新排序搜索结果,确保优先考虑最佳答案。
-
generate_multiple_queries: 它能生成初始查询的多个变种,从而扩大搜索的范围。
-
answer_question: 这是将所有部分融合在一起的功能所在。首先,它生成多个查询,对每一个查询检索相应的文档,并进行互惠排名融合。然后它利用这些精炼后的结果和自定义数据库,引导 AI 制定出更加丰富和精确的回答。
ini复制代码def reciprocal_rank_fusion(results: list[list], k=60):
fused_scores = {}
for docs in results:
# the docs are returned in sorted order of relevance
for rank, doc in enumerate(docs):
doc_str = dumps(doc)
logging.info(f"Serialized Document: {doc_str}")
if doc_str not in fused_scores:
fused_scores[doc_str] = 0
previous_score = fused_scores[doc_str]
fused_scores[doc_str] += 1 / (rank + k)
reranked_results = [
loads(doc)
for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
]
logging.info(f"Reciprocal Rank Fusion applied. Reranked Results: {reranked_results[:10]}") # Log top 10 results
return reranked_results
def generate_multiple_queries(question, llm):
prompt = PromptTemplate(
input_variables=["question"],
template="""You are an AI language model assistant. Your task is to OUTPUT 4
different versions of the given user question to retrieve relevant documents from a vector
database. By generating multiple perspectives on the user question, your goal is to help
the user overcome some of the limitations of the distance-based similarity search.
Provide these alternative questions separated by newlines.
Original question: {question}""",
)
# Create a chain with the language model and the prompt
llm_chain = LLMChain(llm=llm, prompt=prompt, output_parser=StrOutputParser())
# Run the chain
response = llm_chain.run({"question": question})
queries = response.split("n")
logging.info(f"Generated Queries: {queries}")
return queries
def answer_question(question, openai_api_key, model_name):
llm4 = ChatOpenAI(model_name=model_name, temperature=0.1, openai_api_key=openai_api_key)
global global_chromadb, global_short_documents
if global_chromadb is None and global_short_documents is not None:
init_chromadb(openai_api_key, documents=global_short_documents)
logging.info(f"Answering question: {question}")
# Generate multiple queries
queries = generate_multiple_queries(question, llm4)
# Retrieve documents for each query
results = []
for query in queries:
retrieved_docs_with_scores = global_chromadb.similarity_search_with_score(query, k=8)
# Log the number of documents retrieved for each query and the first 3 docs
logging.info(f"Retrieved {len(retrieved_docs_with_scores)} documents for query '{query}': {retrieved_docs_with_scores[:3]}")
results.append(retrieved_docs_with_scores)
# Apply reciprocal rank fusion
reranked_results = reciprocal_rank_fusion(results)
logging.info(f"Number of reranked documents: {len(reranked_results)}")
#extract the Document object only
reranked_documents = [doc for doc, _ in reranked_results]
# Create a new Chroma instance with reranked results
custom_chromadb = Chroma.from_documents(documents=reranked_documents, embedding=OpenAIEmbeddings(openai_api_key=openai_api_key))
chatTemplate = """
You are an AI assistant tasked with answering questions based on context from a podcast conversation.
Use the provided context and relevant chat messages to answer. If unsure, say so. Keep your answer to four sentences or less, focusing on the most relevant information.
Chat Messages (if relevant): {chat_history}
Question: {question}
Context from Podcast: {context}
Answer:"""
QA_CHAIN_PROMPT = PromptTemplate(input_variables=["context", "question", "chat_history"],template=chatTemplate)
qa_chain = ConversationalRetrievalChain.from_llm(
llm=llm4,
chain_type="stuff",
retriever=custom_chromadb.as_retriever(search_type="similarity", search_kwargs={"k":10}),
memory=conversation_memory,
return_source_documents=True,
combine_docs_chain_kwargs={'prompt': QA_CHAIN_PROMPT},
)
# Log the current chat history
current_chat_history = conversation_memory.load_memory_variables({})
logging.info(f"Current Chat History: {current_chat_history}")
response = qa_chain({"question": question})
logging.info(f"Final response: {response}")
output = response['answer']
return output
总结 至此,我们完成了!以上是一份详细指南,教你如何打造一个互动性强、智能化的播客应用。欢迎在这里试用,并留下您宝贵的意见。
感谢您的陪伴,祝您编程愉快!:)