|
| 1 | +import type { NextApiRequest, NextApiResponse } from 'next'; |
| 2 | +import { jsonRes } from '@fastgpt/service/common/response'; |
| 3 | +import { connectToDatabase } from '@/service/mongo'; |
| 4 | +import { authCert } from '@fastgpt/service/support/permission/auth/common'; |
| 5 | +import { addHours } from 'date-fns'; |
| 6 | +import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema'; |
| 7 | +import { MongoDataset } from '@fastgpt/service/core/dataset/schema'; |
| 8 | +import { delay, retryFn } from '@fastgpt/global/common/system/utils'; |
| 9 | +import { delCollection } from '@fastgpt/service/core/dataset/collection/controller'; |
| 10 | +import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun'; |
| 11 | +import { MongoDatasetDataText } from '@fastgpt/service/core/dataset/data/dataTextSchema'; |
| 12 | +import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema'; |
| 13 | +import { DatasetCollectionSchemaType } from '@fastgpt/global/core/dataset/type'; |
| 14 | +import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema'; |
| 15 | +import { deleteDatasetDataVector } from '@fastgpt/service/common/vectorStore/controller'; |
| 16 | + |
| 17 | +// 删了库,没删集合 |
| 18 | +const checkInvalidCollection = async () => { |
| 19 | + const batchSize = 1000; |
| 20 | + |
| 21 | + let skip = 0; |
| 22 | + let success = 0; |
| 23 | + while (true) { |
| 24 | + try { |
| 25 | + const collections = await MongoDatasetCollection.find( |
| 26 | + {}, |
| 27 | + '_id teamId datasetId fileId metadata' |
| 28 | + ) |
| 29 | + .limit(batchSize) |
| 30 | + .skip(skip) |
| 31 | + .lean(); |
| 32 | + if (collections.length === 0) break; |
| 33 | + |
| 34 | + const datasetMap: Record<string, DatasetCollectionSchemaType[]> = {}; |
| 35 | + |
| 36 | + // 相同 datasetId 的集合放到一起 |
| 37 | + for await (const collection of collections) { |
| 38 | + const datasetId = String(collection.datasetId); |
| 39 | + const val = datasetMap[datasetId]; |
| 40 | + if (val) { |
| 41 | + val.push(collection); |
| 42 | + } else { |
| 43 | + datasetMap[datasetId] = [collection]; |
| 44 | + } |
| 45 | + } |
| 46 | + |
| 47 | + const datasetIds = Object.keys(datasetMap); |
| 48 | + for await (const datasetId of datasetIds) { |
| 49 | + try { |
| 50 | + const val = datasetMap[datasetId]; |
| 51 | + if (!val) { |
| 52 | + continue; |
| 53 | + } |
| 54 | + |
| 55 | + await retryFn(async () => { |
| 56 | + const datasetExists = await MongoDataset.findById(datasetId, '_id').lean(); |
| 57 | + if (!datasetExists) { |
| 58 | + console.log('清理无效的知识库集合, datasetId', datasetId); |
| 59 | + await mongoSessionRun(async (session) => { |
| 60 | + return await delCollection({ |
| 61 | + collections: val, |
| 62 | + delImg: true, |
| 63 | + delFile: true, |
| 64 | + session |
| 65 | + }); |
| 66 | + }); |
| 67 | + } |
| 68 | + }); |
| 69 | + } catch (error) { |
| 70 | + console.log(error); |
| 71 | + } |
| 72 | + } |
| 73 | + |
| 74 | + success += batchSize; |
| 75 | + skip += batchSize; |
| 76 | + console.log(`检测集合完成:${success}`); |
| 77 | + } catch (error) { |
| 78 | + console.log(error); |
| 79 | + await delay(1000); |
| 80 | + } |
| 81 | + } |
| 82 | +}; |
| 83 | + |
| 84 | +// 删了集合,没删 data |
| 85 | +const checkInvalidData = async () => { |
| 86 | + try { |
| 87 | + const datas = (await MongoDatasetData.aggregate([ |
| 88 | + { |
| 89 | + $group: { |
| 90 | + _id: '$collectionId', |
| 91 | + teamId: { $first: '$teamId' }, |
| 92 | + datasetId: { $first: '$datasetId' }, |
| 93 | + collectionId: { $first: '$collectionId' } |
| 94 | + } |
| 95 | + } |
| 96 | + ])) as { |
| 97 | + _id: string; |
| 98 | + teamId: string; |
| 99 | + datasetId: string; |
| 100 | + collectionId: string; |
| 101 | + }[]; |
| 102 | + console.log('Total data collections length', datas.length); |
| 103 | + // 批量获取集合 |
| 104 | + const collections = await MongoDatasetCollection.find({}, '_id').lean(); |
| 105 | + console.log('Total collection length', collections.length); |
| 106 | + const collectionMap: Record<string, DatasetCollectionSchemaType> = {}; |
| 107 | + for await (const collection of collections) { |
| 108 | + collectionMap[collection._id] = collection; |
| 109 | + } |
| 110 | + // 逐一删除无效的集合内容 |
| 111 | + for await (const data of datas) { |
| 112 | + try { |
| 113 | + const col = collectionMap[data.collectionId]; |
| 114 | + if (!col) { |
| 115 | + console.log('清理无效的知识库集合内容, collectionId', data.collectionId); |
| 116 | + await retryFn(async () => { |
| 117 | + await MongoDatasetTraining.deleteMany({ |
| 118 | + teamId: data.teamId, |
| 119 | + datasetId: data.datasetId, |
| 120 | + collectionId: data.collectionId |
| 121 | + }); |
| 122 | + await MongoDatasetDataText.deleteMany({ |
| 123 | + teamId: data.teamId, |
| 124 | + datasetId: data.datasetId, |
| 125 | + collectionId: data.collectionId |
| 126 | + }); |
| 127 | + await deleteDatasetDataVector({ |
| 128 | + teamId: data.teamId, |
| 129 | + datasetIds: [data.datasetId], |
| 130 | + collectionIds: [data.collectionId] |
| 131 | + }); |
| 132 | + await MongoDatasetData.deleteMany({ |
| 133 | + teamId: data.teamId, |
| 134 | + datasetId: data.datasetId, |
| 135 | + collectionId: data.collectionId |
| 136 | + }); |
| 137 | + }); |
| 138 | + } |
| 139 | + } catch (error) { |
| 140 | + console.log(error); |
| 141 | + } |
| 142 | + } |
| 143 | + |
| 144 | + console.log(`检测集合完成`); |
| 145 | + } catch (error) { |
| 146 | + console.log(error); |
| 147 | + } |
| 148 | +}; |
| 149 | + |
| 150 | +// 删了data,没删 data_text |
| 151 | +const checkInvalidDataText = async () => { |
| 152 | + try { |
| 153 | + // 获取所有索引层的 dataId |
| 154 | + const dataTexts = await MongoDatasetDataText.find({}, 'dataId').lean(); |
| 155 | + const dataIds = dataTexts.map((item) => String(item.dataId)); |
| 156 | + console.log('Total data_text dataIds:', dataIds.length); |
| 157 | + |
| 158 | + // 获取数据层的 dataId |
| 159 | + const datas = await MongoDatasetData.find({}, '_id').lean(); |
| 160 | + const datasSet = new Set(datas.map((item) => String(item._id))); |
| 161 | + console.log('Total data length:', datas.length); |
| 162 | + |
| 163 | + // 存在索引层,不存在数据层的 dataId,说明数据已经被删了 |
| 164 | + const unExistsSet = dataIds.filter((id) => !datasSet.has(id)); |
| 165 | + console.log('Total unExists dataIds:', unExistsSet.length); |
| 166 | + await MongoDatasetDataText.deleteMany({ |
| 167 | + dataId: { $in: unExistsSet } |
| 168 | + }); |
| 169 | + } catch (error) {} |
| 170 | +}; |
| 171 | + |
| 172 | +/* pg 中的数据搬到 mongo dataset.datas 中,并做映射 */ |
| 173 | +export default async function handler(req: NextApiRequest, res: NextApiResponse) { |
| 174 | + try { |
| 175 | + await connectToDatabase(); |
| 176 | + await authCert({ req, authRoot: true }); |
| 177 | + const { start = -2, end = -360 * 24 } = req.body as { start: number; end: number }; |
| 178 | + |
| 179 | + (async () => { |
| 180 | + try { |
| 181 | + // 360天 ~ 2小时前 |
| 182 | + const endTime = addHours(new Date(), start); |
| 183 | + const startTime = addHours(new Date(), end); |
| 184 | + console.log('清理无效的集合'); |
| 185 | + await checkInvalidCollection(); |
| 186 | + console.log('清理无效的数据'); |
| 187 | + await checkInvalidData(); |
| 188 | + console.log('清理无效的data_text'); |
| 189 | + await checkInvalidDataText(); |
| 190 | + } catch (error) { |
| 191 | + console.log('执行脏数据清理任务出错了'); |
| 192 | + } |
| 193 | + })(); |
| 194 | + |
| 195 | + jsonRes(res, { |
| 196 | + message: 'success' |
| 197 | + }); |
| 198 | + } catch (error) { |
| 199 | + console.log(error); |
| 200 | + |
| 201 | + jsonRes(res, { |
| 202 | + code: 500, |
| 203 | + error |
| 204 | + }); |
| 205 | + } |
| 206 | +} |
0 commit comments