Skip to content

Commit a171c7b

Browse files
authored
perf: buffer;fix: back up split (#4913)
* perf: buffer * fix: back up split * fix: app limit * doc
1 parent 802de11 commit a171c7b

File tree

11 files changed

+208
-93
lines changed

11 files changed

+208
-93
lines changed

docSite/content/zh-cn/docs/development/upgrading/4911.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ weight: 789
1414

1515
## ⚙️ 优化
1616

17-
17+
1. 原文缓存改用 gridfs 存储,提高上限。
1818

1919
## 🐛 修复
2020

21-
1. 工作流中,管理员声明的全局系统工具,无法进行版本管理。
21+
1. 工作流中,管理员声明的全局系统工具,无法进行版本管理。
22+
2. 工具调用节点前,有交互节点时,上下文异常。
23+
3. 修复备份导入,小于 1000 字时,无法分块问题。
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import { retryFn } from '@fastgpt/global/common/system/utils';
2+
import { connectionMongo } from '../../mongo';
3+
import { MongoRawTextBufferSchema, bucketName } from './schema';
4+
import { addLog } from '../../system/log';
5+
6+
const getGridBucket = () => {
7+
return new connectionMongo.mongo.GridFSBucket(connectionMongo.connection.db!, {
8+
bucketName: bucketName
9+
});
10+
};
11+
12+
export const addRawTextBuffer = async ({
13+
sourceId,
14+
sourceName,
15+
text,
16+
expiredTime
17+
}: {
18+
sourceId: string;
19+
sourceName: string;
20+
text: string;
21+
expiredTime: Date;
22+
}) => {
23+
const gridBucket = getGridBucket();
24+
const metadata = {
25+
sourceId,
26+
sourceName,
27+
expiredTime
28+
};
29+
30+
const buffer = Buffer.from(text);
31+
32+
const fileSize = buffer.length;
33+
// 单块大小:尽可能大,但不超过 14MB,不小于128KB
34+
const chunkSizeBytes = (() => {
35+
// 计算理想块大小:文件大小 ÷ 目标块数(10)。 并且每个块需要小于 14MB
36+
const idealChunkSize = Math.min(Math.ceil(fileSize / 10), 14 * 1024 * 1024);
37+
38+
// 确保块大小至少为128KB
39+
const minChunkSize = 128 * 1024; // 128KB
40+
41+
// 取理想块大小和最小块大小中的较大值
42+
let chunkSize = Math.max(idealChunkSize, minChunkSize);
43+
44+
// 将块大小向上取整到最接近的64KB的倍数,使其更整齐
45+
chunkSize = Math.ceil(chunkSize / (64 * 1024)) * (64 * 1024);
46+
47+
return chunkSize;
48+
})();
49+
50+
const uploadStream = gridBucket.openUploadStream(sourceId, {
51+
metadata,
52+
chunkSizeBytes
53+
});
54+
55+
return retryFn(async () => {
56+
return new Promise((resolve, reject) => {
57+
uploadStream.end(buffer);
58+
uploadStream.on('finish', () => {
59+
resolve(uploadStream.id);
60+
});
61+
uploadStream.on('error', (error) => {
62+
addLog.error('addRawTextBuffer error', error);
63+
resolve('');
64+
});
65+
});
66+
});
67+
};
68+
69+
export const getRawTextBuffer = async (sourceId: string) => {
70+
const gridBucket = getGridBucket();
71+
72+
return retryFn(async () => {
73+
const bufferData = await MongoRawTextBufferSchema.findOne(
74+
{
75+
'metadata.sourceId': sourceId
76+
},
77+
'_id metadata'
78+
).lean();
79+
if (!bufferData) {
80+
return null;
81+
}
82+
83+
// Read file content
84+
const downloadStream = gridBucket.openDownloadStream(bufferData._id);
85+
const chunks: Buffer[] = [];
86+
87+
return new Promise<{
88+
text: string;
89+
sourceName: string;
90+
} | null>((resolve, reject) => {
91+
downloadStream.on('data', (chunk) => {
92+
chunks.push(chunk);
93+
});
94+
95+
downloadStream.on('end', () => {
96+
const buffer = Buffer.concat(chunks);
97+
const text = buffer.toString('utf8');
98+
resolve({
99+
text,
100+
sourceName: bufferData.metadata?.sourceName || ''
101+
});
102+
});
103+
104+
downloadStream.on('error', (error) => {
105+
addLog.error('getRawTextBuffer error', error);
106+
resolve(null);
107+
});
108+
});
109+
});
110+
};
111+
112+
export const deleteRawTextBuffer = async (sourceId: string): Promise<boolean> => {
113+
const gridBucket = getGridBucket();
114+
115+
return retryFn(async () => {
116+
const buffer = await MongoRawTextBufferSchema.findOne({ 'metadata.sourceId': sourceId });
117+
if (!buffer) {
118+
return false;
119+
}
120+
121+
await gridBucket.delete(buffer._id);
122+
return true;
123+
});
124+
};
125+
126+
export const updateRawTextBufferExpiredTime = async ({
127+
sourceId,
128+
expiredTime
129+
}: {
130+
sourceId: string;
131+
expiredTime: Date;
132+
}) => {
133+
return retryFn(async () => {
134+
return MongoRawTextBufferSchema.updateOne(
135+
{ 'metadata.sourceId': sourceId },
136+
{ $set: { 'metadata.expiredTime': expiredTime } }
137+
);
138+
});
139+
};
Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,22 @@
1-
import { getMongoModel, Schema } from '../../mongo';
2-
import { type RawTextBufferSchemaType } from './type';
1+
import { getMongoModel, type Types, Schema } from '../../mongo';
32

4-
export const collectionName = 'buffer_rawtexts';
3+
export const bucketName = 'buffer_rawtext';
54

65
const RawTextBufferSchema = new Schema({
7-
sourceId: {
8-
type: String,
9-
required: true
10-
},
11-
rawText: {
12-
type: String,
13-
default: ''
14-
},
15-
createTime: {
16-
type: Date,
17-
default: () => new Date()
18-
},
19-
metadata: Object
6+
metadata: {
7+
sourceId: { type: String, required: true },
8+
sourceName: { type: String, required: true },
9+
expiredTime: { type: Date, required: true }
10+
}
2011
});
12+
RawTextBufferSchema.index({ 'metadata.sourceId': 'hashed' });
13+
RawTextBufferSchema.index({ 'metadata.expiredTime': -1 });
2114

22-
try {
23-
RawTextBufferSchema.index({ sourceId: 1 });
24-
// 20 minutes
25-
RawTextBufferSchema.index({ createTime: 1 }, { expireAfterSeconds: 20 * 60 });
26-
} catch (error) {
27-
console.log(error);
28-
}
29-
30-
export const MongoRawTextBuffer = getMongoModel<RawTextBufferSchemaType>(
31-
collectionName,
32-
RawTextBufferSchema
33-
);
15+
export const MongoRawTextBufferSchema = getMongoModel<{
16+
_id: Types.ObjectId;
17+
metadata: {
18+
sourceId: string;
19+
sourceName: string;
20+
expiredTime: Date;
21+
};
22+
}>(`${bucketName}.files`, RawTextBufferSchema);

packages/service/common/buffer/rawText/type.d.ts

Lines changed: 0 additions & 8 deletions
This file was deleted.

packages/service/common/file/gridfs/controller.ts

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import { type DatasetFileSchema } from '@fastgpt/global/core/dataset/type';
66
import { MongoChatFileSchema, MongoDatasetFileSchema } from './schema';
77
import { detectFileEncoding, detectFileEncodingByPath } from '@fastgpt/global/common/file/tools';
88
import { CommonErrEnum } from '@fastgpt/global/common/error/code/common';
9-
import { MongoRawTextBuffer } from '../../buffer/rawText/schema';
109
import { readRawContentByFileBuffer } from '../read/utils';
1110
import { gridFsStream2Buffer, stream2Encoding } from './utils';
1211
import { addLog } from '../../system/log';
13-
import { readFromSecondary } from '../../mongo/utils';
1412
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
1513
import { Readable } from 'stream';
14+
import { addRawTextBuffer, getRawTextBuffer } from '../../buffer/rawText/controller';
15+
import { addMinutes } from 'date-fns';
1616

1717
export function getGFSCollection(bucket: `${BucketNameEnum}`) {
1818
MongoDatasetFileSchema;
@@ -225,13 +225,11 @@ export const readFileContentFromMongo = async ({
225225
}> => {
226226
const bufferId = `${fileId}-${customPdfParse}`;
227227
// read buffer
228-
const fileBuffer = await MongoRawTextBuffer.findOne({ sourceId: bufferId }, undefined, {
229-
...readFromSecondary
230-
}).lean();
228+
const fileBuffer = await getRawTextBuffer(bufferId);
231229
if (fileBuffer) {
232230
return {
233-
rawText: fileBuffer.rawText,
234-
filename: fileBuffer.metadata?.filename || ''
231+
rawText: fileBuffer.text,
232+
filename: fileBuffer?.sourceName
235233
};
236234
}
237235

@@ -265,16 +263,13 @@ export const readFileContentFromMongo = async ({
265263
}
266264
});
267265

268-
// < 14M
269-
if (fileBuffers.length < 14 * 1024 * 1024 && rawText.trim()) {
270-
MongoRawTextBuffer.create({
271-
sourceId: bufferId,
272-
rawText,
273-
metadata: {
274-
filename: file.filename
275-
}
276-
});
277-
}
266+
// Add buffer
267+
addRawTextBuffer({
268+
sourceId: bufferId,
269+
sourceName: file.filename,
270+
text: rawText,
271+
expiredTime: addMinutes(new Date(), 20)
272+
});
278273

279274
return {
280275
rawText,
Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
import { Schema, getMongoModel } from '../../mongo';
22

3-
const DatasetFileSchema = new Schema({});
4-
const ChatFileSchema = new Schema({});
3+
const DatasetFileSchema = new Schema({
4+
metadata: Object
5+
});
6+
const ChatFileSchema = new Schema({
7+
metadata: Object
8+
});
59

6-
try {
7-
DatasetFileSchema.index({ uploadDate: -1 });
10+
DatasetFileSchema.index({ uploadDate: -1 });
811

9-
ChatFileSchema.index({ uploadDate: -1 });
10-
ChatFileSchema.index({ 'metadata.chatId': 1 });
11-
} catch (error) {
12-
console.log(error);
13-
}
12+
ChatFileSchema.index({ uploadDate: -1 });
13+
ChatFileSchema.index({ 'metadata.chatId': 1 });
1414

1515
export const MongoDatasetFileSchema = getMongoModel('dataset.files', DatasetFileSchema);
1616
export const MongoChatFileSchema = getMongoModel('chat.files', ChatFileSchema);

packages/service/core/dataset/collection/controller.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,10 @@ export const createCollectionAndInsertData = async ({
7777
const chunkSplitter = computeChunkSplitter(createCollectionParams);
7878
const paragraphChunkDeep = computeParagraphChunkDeep(createCollectionParams);
7979

80-
if (trainingType === DatasetCollectionDataProcessModeEnum.qa) {
80+
if (
81+
trainingType === DatasetCollectionDataProcessModeEnum.qa ||
82+
trainingType === DatasetCollectionDataProcessModeEnum.backup
83+
) {
8184
delete createCollectionParams.chunkTriggerType;
8285
delete createCollectionParams.chunkTriggerMinSize;
8386
delete createCollectionParams.dataEnhanceCollectionName;

packages/service/core/dataset/read.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,10 @@ export const rawText2Chunks = ({
218218
};
219219
};
220220

221+
if (backupParse) {
222+
return parseDatasetBackup2Chunks(rawText).chunks;
223+
}
224+
221225
// Chunk condition
222226
// 1. 选择最大值条件,只有超过了最大值(默认为模型的最大值*0.7),才会触发分块
223227
if (chunkTriggerType === ChunkTriggerConfigTypeEnum.maxSize) {
@@ -240,10 +244,6 @@ export const rawText2Chunks = ({
240244
}
241245
}
242246

243-
if (backupParse) {
244-
return parseDatasetBackup2Chunks(rawText).chunks;
245-
}
246-
247247
const { chunks } = splitText2Chunks({
248248
text: rawText,
249249
chunkSize,

packages/service/core/workflow/dispatch/tools/readFiles.ts

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@ import { NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
55
import { type DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type';
66
import axios from 'axios';
77
import { serverRequestBaseUrl } from '../../../../common/api/serverRequest';
8-
import { MongoRawTextBuffer } from '../../../../common/buffer/rawText/schema';
9-
import { readFromSecondary } from '../../../../common/mongo/utils';
108
import { getErrText } from '@fastgpt/global/common/error/utils';
119
import { detectFileEncoding, parseUrlToFileType } from '@fastgpt/global/common/file/tools';
1210
import { readRawContentByFileBuffer } from '../../../../common/file/read/utils';
1311
import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
1412
import { type ChatItemType, type UserChatItemValueItemType } from '@fastgpt/global/core/chat/type';
1513
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
1614
import { addLog } from '../../../../common/system/log';
15+
import { addRawTextBuffer, getRawTextBuffer } from '../../../../common/buffer/rawText/controller';
16+
import { addMinutes } from 'date-fns';
1717

1818
type Props = ModuleDispatchProps<{
1919
[NodeInputKeyEnum.fileUrlList]: string[];
@@ -158,14 +158,12 @@ export const getFileContentFromLinks = async ({
158158
parseUrlList
159159
.map(async (url) => {
160160
// Get from buffer
161-
const fileBuffer = await MongoRawTextBuffer.findOne({ sourceId: url }, undefined, {
162-
...readFromSecondary
163-
}).lean();
161+
const fileBuffer = await getRawTextBuffer(url);
164162
if (fileBuffer) {
165163
return formatResponseObject({
166-
filename: fileBuffer.metadata?.filename || url,
164+
filename: fileBuffer.sourceName || url,
167165
url,
168-
content: fileBuffer.rawText
166+
content: fileBuffer.text
169167
});
170168
}
171169

@@ -220,17 +218,12 @@ export const getFileContentFromLinks = async ({
220218
});
221219

222220
// Add to buffer
223-
try {
224-
if (buffer.length < 14 * 1024 * 1024 && rawText.trim()) {
225-
MongoRawTextBuffer.create({
226-
sourceId: url,
227-
rawText,
228-
metadata: {
229-
filename: filename
230-
}
231-
});
232-
}
233-
} catch (error) {}
221+
addRawTextBuffer({
222+
sourceId: url,
223+
sourceName: filename,
224+
text: rawText,
225+
expiredTime: addMinutes(new Date(), 20)
226+
});
234227

235228
return formatResponseObject({ filename, url, content: rawText });
236229
} catch (error) {

0 commit comments

Comments
 (0)