基于稀疏文本信息摘要的分块#
背景#
从长文本(可能是播客音频的文字稿、长篇的访谈文本或分析报告)中,完整的提取出文章中的关键信息。
方案#
先对文本基于讨论的话题进行初步的分块,然后使用大模型对每个分块进行总结
问题#
现有的文本分块模式基本上是服务于 RAG 来做的,需要考虑召回的效果等,因此天然的限制了每个块的大小。
但我的需求是将文本基于所讨论的主要话题进行分块,并且每个块不能包含多个话题,因为不相关信息会损害总结的效果,并且一篇文章讨论的主要话题可能就只有四到五个,所以每个块的大小最终会到 2k-8k tokens 甚至更大。
解决方案 1#
Kamradt 提出的 sematic chunking,其原理是使用正则表达式先对句子进行分割,利用嵌入来聚合语义相似的文本块,并通过监控嵌入距离的显着变化来识别分割点。
缺陷是考虑的 embedding 结果是句子级别的(对单一句子进行 embedding,缺少上下文信息),因此分块结果很容易断在同一话题的句子转折处。
解决方案 2#
使用 \n 将正文切割成段落,从每个段落提取摘要,对摘要进行类似上述 sematic chunking 的处理。
"""
此处代码无法正常运行,后续会有补充,只是作为示例进行演示,最终版本为可运行的核心代码片段
"""
# # 方案二的代码
# # \n 分段落
# def split_sentences(text: str) -> List[Dict]:
# """将文本分割成句子"""
# sentences = text.split('\n')
# # 过滤掉空字符串
# sentences = [s.strip() for s in sentences if s.strip()]
# return [{'sentence': s, 'index': i} for i, s in enumerate(sentences)]
# # 写摘要
# def summarize_sentences(sentences: List[Dict]) -> List[Dict]:
# """为每个句子生成一句话摘要"""
# client = OpenAI()
# for sentence in sentences:
# try:
# prompt = f"请用一句话总结以下内容,要求简洁且包含关键信息:\n{sentence['sentence']}"
# completion = client.chat.completions.create(
# model='Qwen/Qwen2.5-7B-Instruct',
# messages=[
# {"role": "user", "content": prompt}
# ],
# stream=True,
# )
# print(f"正在生成句子摘要: {sentence['sentence']}\n摘要内容: ", end="", flush=True)
# summary = ""
# for chunk in completion:
# content = chunk.choices[0].delta.content or ""
# print(content, end="", flush=True)
# summary += content
# print("\n")
# sentence['summary'] = summary.strip()
# return sentences
# # combine 摘要
# def combine_sentences(sentences: List[Dict]) -> List[Dict]:
# """组合相邻句子的摘要"""
# for i in range(len(sentences)):
# combined_summary = ''
# for j in range(i - buffer_size, i + buffer_size + 1):
# if 0 <= j < len(sentences):
# combined_summary += sentences[j]['summary'] + ' '
# sentences[i]['combined_summary'] = combined_summary.strip()
# print(f"Sentence {i+1} combined summary: {sentences[i]['combined_summary']}")
# return sentences
# # embedding
# def get_embeddings(sentences: List[Dict]) -> List[Dict]:
# """获取句子的嵌入向量"""
# combined_sentences = [s['combined_summary'] for s in sentences]
# embedded_sentences = embedding_function(combined_sentences)
# return embedded_sentences
# # 计算相似度
# def calculate_distances(embeddings: List) -> List[float]:
# """计算相邻句子嵌入向量的余弦距离"""
# distances = []
# for i in range(len(embeddings) - 1):
# embedding_current = embeddings[i]
# embedding_next = embeddings[i + 1]
# similarity = cosine_similarity([embedding_current], [embedding_next])[0][0]
# distance = 1 - similarity
# distances.append(distance)
# return distances
# # 基于阈值的分块
# def get_filtered_breakpoints(distances, min_chunk_size=3, percentile_threshold=90):
# """
# 根据句子间距离获取过滤后的断点索引。
# Args:
# distances: 句子间的距离列表
# min_chunk_size: 每个分组最少包含的句子数,默认为3
# percentile_threshold: 用于确定断点的百分位数阈值,默认为90
# Returns:
# filtered_breakpoints: 过滤后的断点索引列表
# """
# # 使用百分位数确定距离阈值,用于识别异常值(断点)
# breakpoint_distance_threshold = np.percentile(distances, percentile_threshold)
# # 获取初始断点,这些断点是句间距离大于阈值的点的索引
# initial_breakpoints = [i for i, x in enumerate(distances) if x > breakpoint_distance_threshold]
# # 过滤断点,确保每个分组至少包含min_chunk_size个句子
# filtered_breakpoints = []
# start_idx = 0
# for bp in initial_breakpoints:
# # 检查分组大小是否至少为min_chunk_size个句子
# if bp - start_idx >= min_chunk_size - 1:
# filtered_breakpoints.append(bp)
# start_idx = bp + 1
# # 检查最后一个分组
# if len(distances) - start_idx < min_chunk_size:
# # 如果最后一个分组太小,则移除最后一个断点
# if filtered_breakpoints:
# filtered_breakpoints.pop()
# return filtered_breakpoints
'\n此处代码无法正常运行,后续会有补充,只是作为示例进行演示,最终版本为可运行的核心代码片段\n'