基于稀疏文本信息摘要的分块

基于稀疏文本信息摘要的分块#

背景#

从长文本(可能是播客音频的文字稿、长篇的访谈文本或分析报告)中,完整的提取出文章中的关键信息。

方案#

先对文本基于讨论的话题进行初步的分块,然后使用大模型对每个分块进行总结

问题#

现有的文本分块模式基本上是服务于 RAG 来做的,需要考虑召回的效果等,因此天然的限制了每个块的大小。

但我的需求是将文本基于所讨论的主要话题进行分块,并且每个块不能包含多个话题,因为不相关信息会损害总结的效果,并且一篇文章讨论的主要话题可能就只有四到五个,所以每个块的大小最终会到 2k-8k tokens 甚至更大。

解决方案 1#

Kamradt 提出的 sematic chunking,其原理是使用正则表达式先对句子进行分割,利用嵌入来聚合语义相似的文本块,并通过监控嵌入距离的显着变化来识别分割点。

缺陷是考虑的 embedding 结果是句子级别的(对单一句子进行 embedding,缺少上下文信息),因此分块结果很容易断在同一话题的句子转折处。

解决方案 2#

使用 \n 将正文切割成段落,从每个段落提取摘要,对摘要进行类似上述 sematic chunking 的处理。

"""
此处代码无法正常运行,后续会有补充,只是作为示例进行演示,最终版本为可运行的核心代码片段
"""

# # 方案二的代码

# # \n 分段落 


#     def split_sentences(text: str) -> List[Dict]:
#         """将文本分割成句子"""
#         sentences = text.split('\n')
#         # 过滤掉空字符串
#         sentences = [s.strip() for s in sentences if s.strip()]
#         return [{'sentence': s, 'index': i} for i, s in enumerate(sentences)]

# # 写摘要 

#     def summarize_sentences(sentences: List[Dict]) -> List[Dict]:
#         """为每个句子生成一句话摘要"""
#         client = OpenAI()

#         for sentence in sentences:
#             try:
#                 prompt = f"请用一句话总结以下内容,要求简洁且包含关键信息:\n{sentence['sentence']}"
                    
#                 completion = client.chat.completions.create(
#                     model='Qwen/Qwen2.5-7B-Instruct',
#                     messages=[
#                         {"role": "user", "content": prompt}
#                     ],
#                     stream=True,
#                 )
                
#                 print(f"正在生成句子摘要: {sentence['sentence']}\n摘要内容: ", end="", flush=True)
#                 summary = ""
#                 for chunk in completion:
#                     content = chunk.choices[0].delta.content or ""
#                     print(content, end="", flush=True)
#                     summary += content
#                 print("\n")
#                 sentence['summary'] = summary.strip()

                    
#     return sentences

# # combine 摘要 

#     def combine_sentences(sentences: List[Dict]) -> List[Dict]:
#             """组合相邻句子的摘要"""
#             for i in range(len(sentences)):
#                 combined_summary = ''
#                 for j in range(i - buffer_size, i + buffer_size + 1):
#                     if 0 <= j < len(sentences):
#                         combined_summary += sentences[j]['summary'] + ' '
#                 sentences[i]['combined_summary'] = combined_summary.strip()
#                 print(f"Sentence {i+1} combined summary: {sentences[i]['combined_summary']}")
#         return sentences

# # embedding 

#     def get_embeddings(sentences: List[Dict]) -> List[Dict]:
#             """获取句子的嵌入向量"""
#             combined_sentences = [s['combined_summary'] for s in sentences]
#             embedded_sentences = embedding_function(combined_sentences)
#             return embedded_sentences

# # 计算相似度

#     def calculate_distances(embeddings: List) -> List[float]:
#             """计算相邻句子嵌入向量的余弦距离"""
#             distances = []
#             for i in range(len(embeddings) - 1):
#                 embedding_current = embeddings[i]
#                 embedding_next = embeddings[i + 1]
#                 similarity = cosine_similarity([embedding_current], [embedding_next])[0][0]
#                 distance = 1 - similarity
#                 distances.append(distance)
#             return distances

# # 基于阈值的分块

#     def get_filtered_breakpoints(distances, min_chunk_size=3, percentile_threshold=90):
#         """
#         根据句子间距离获取过滤后的断点索引。
        
#         Args:
#             distances: 句子间的距离列表
#             min_chunk_size: 每个分组最少包含的句子数,默认为3
#             percentile_threshold: 用于确定断点的百分位数阈值,默认为90
            
#         Returns:
#             filtered_breakpoints: 过滤后的断点索引列表
#         """
#         # 使用百分位数确定距离阈值,用于识别异常值(断点)
#         breakpoint_distance_threshold = np.percentile(distances, percentile_threshold)

#         # 获取初始断点,这些断点是句间距离大于阈值的点的索引
#         initial_breakpoints = [i for i, x in enumerate(distances) if x > breakpoint_distance_threshold]

#         # 过滤断点,确保每个分组至少包含min_chunk_size个句子
#         filtered_breakpoints = []
#         start_idx = 0
#         for bp in initial_breakpoints:
#             # 检查分组大小是否至少为min_chunk_size个句子
#             if bp - start_idx >= min_chunk_size - 1:
#                 filtered_breakpoints.append(bp)
#                 start_idx = bp + 1

#         # 检查最后一个分组
#         if len(distances) - start_idx < min_chunk_size:
#             # 如果最后一个分组太小,则移除最后一个断点
#             if filtered_breakpoints:
#                 filtered_breakpoints.pop()

#         return filtered_breakpoints
'\n此处代码无法正常运行,后续会有补充,只是作为示例进行演示,最终版本为可运行的核心代码片段\n'