Skip to content

Commit e8128d0

Browse files
committed
Create output datasets from filter pipelines
Desktop only. Sets up some scaffolding for doing the same with transcoding pipelines in the future.
1 parent 69773b6 commit e8128d0

File tree

7 files changed

+105
-5
lines changed

7 files changed

+105
-5
lines changed

client/dive-common/constants.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ const zipFileTypes = [
124124

125125
const stereoPipelineMarker = 'measurement';
126126
const multiCamPipelineMarkers = ['2-cam', '3-cam'];
127+
const pipelineCreatesDatasetMarkers = ['transcode', 'filter'];
127128

128129
const JsonMetaRegEx = /^.*\.?(meta|config)\.json$/;
129130

@@ -153,6 +154,7 @@ export {
153154
zipFileTypes,
154155
stereoPipelineMarker,
155156
multiCamPipelineMarkers,
157+
pipelineCreatesDatasetMarkers,
156158
JsonMetaRegEx,
157159
simplifyTrainingName,
158160
};

client/platform/desktop/backend/native/viame.ts

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { cleanString } from 'platform/desktop/sharedUtils';
1111
import { serialize } from 'platform/desktop/backend/serializers/viame';
1212
import { observeChild } from 'platform/desktop/backend/native/processManager';
13+
import sendToRenderer from 'platform/desktop/background';
1314

1415
import { MultiType, stereoPipelineMarker, multiCamPipelineMarkers } from 'dive-common/constants';
1516
import * as common from './common';
@@ -129,12 +130,27 @@ async function runPipeline(
129130
}
130131
}
131132

133+
let outputDir: string | undefined;
134+
const DIVE_OUTPUT_PROJECT_DIR = 'DIVE_Jobs_Output';
132135
if (runPipelineArgs.pipeline.type === 'filter') {
133-
command.push(`-s kwa_writer:output_directory="${npath.join(jobWorkDir, 'output')}"`);
134-
command.push(`-s image_writer:file_name_prefix="${jobWorkDir}/"`);
136+
if (runPipelineArgs.outputDatasetName) {
137+
outputDir = runPipelineArgs.outputDatasetName;
138+
} else {
139+
const timestamp = (new Date()).toISOString().replace(/[:.]/g, '-');
140+
outputDir = `${runPipelineArgs.pipeline.name}_${runPipelineArgs.datasetId}_${timestamp}`;
141+
}
142+
outputDir = `${npath.join(settings.dataPath, DIVE_OUTPUT_PROJECT_DIR, outputDir)}`;
143+
if (outputDir !== jobWorkDir) {
144+
await fs.mkdir(outputDir, { recursive: true });
145+
}
146+
command.push(`-s kwa_writer:output_directory="${outputDir}/"`);
147+
command.push(`-s image_writer:file_name_prefix="${outputDir}/"`);
135148
}
149+
150+
let transcodedFilename: string;
136151
if (runPipelineArgs.pipeline.type === 'transcode') {
137-
command.push(`-s video_writer:video_filename="${npath.join(jobWorkDir, `${datasetId}.mp4`)}"`);
152+
transcodedFilename = npath.join(jobWorkDir, `${datasetId}.mp4`);
153+
command.push(`-s video_writer:video_filename="${transcodedFilename}"`);
138154
}
139155

140156
if (requiresInput && !stereoOrMultiCam) {
@@ -211,6 +227,31 @@ async function runPipeline(
211227
});
212228
});
213229

230+
if (['filter', 'transcode'].includes(runPipelineArgs.pipeline.type)) {
231+
job.on('exit', async (code) => {
232+
if (runPipelineArgs.pipeline.type === 'transcode') {
233+
// TODO: work must be done to find the video file
234+
return;
235+
}
236+
if (code === 0) {
237+
// Ingest the output into a new dataset
238+
updater({
239+
...jobBase,
240+
body: ['Creating dataset from output...'],
241+
exitCode: code,
242+
endTime: new Date(),
243+
});
244+
// transcodedFilename will only be assigned for transcode pipelines
245+
const importSource = transcodedFilename || outputDir;
246+
if (importSource) {
247+
const importPayload = await common.beginMediaImport(importSource);
248+
const conversionJobArgs = await common.finalizeMediaImport(settings, importPayload);
249+
sendToRenderer('filter-complete', conversionJobArgs.meta);
250+
}
251+
}
252+
});
253+
}
254+
214255
return jobBase;
215256
}
216257

client/platform/desktop/background.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,9 @@ if (process.platform === 'win32') {
133133
cleanup();
134134
});
135135
}
136+
137+
export default function sendToRenderer(channel: string, payload?: unknown) {
138+
if (win) {
139+
win.webContents.send(channel, payload);
140+
}
141+
}

client/platform/desktop/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ export interface RunPipeline extends JobArgs {
170170
type: JobType.RunPipeline;
171171
datasetId: string;
172172
pipeline: Pipe;
173+
outputDatasetName?: string;
173174
}
174175

175176
export interface ExportTrainedPipeline extends JobArgs {

client/platform/desktop/frontend/api.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ async function runPipeline(itemId: string, pipeline: Pipe): Promise<void> {
100100
gpuJobQueue.enqueue(args);
101101
}
102102

103+
async function runPipelineWithOutput(itemId: string, pipeline: Pipe, outputDatasetName: string): Promise<void> {
104+
const args: RunPipeline = {
105+
type: JobType.RunPipeline,
106+
pipeline,
107+
datasetId: itemId,
108+
outputDatasetName,
109+
};
110+
gpuJobQueue.enqueue(args);
111+
}
112+
103113
async function exportTrainedPipeline(path: string, pipeline: Pipe): Promise<void> {
104114
const args: ExportTrainedPipeline = {
105115
type: JobType.ExportTrainedPipeline,
@@ -277,4 +287,5 @@ export {
277287
openLink,
278288
nvidiaSmi,
279289
cancelJob,
290+
runPipelineWithOutput,
280291
};

client/platform/desktop/frontend/components/MultiPipeline.vue

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import {
99
import { DataTableHeader } from 'vuetify';
1010
import { useRouter } from 'vue-router/composables';
1111
import { Pipe, Pipelines, useApi } from 'dive-common/apispec';
12+
import { runPipelineWithOutput } from 'platform/desktop/frontend/api';
1213
import {
1314
itemsPerPageOptions,
1415
stereoPipelineMarker,
1516
multiCamPipelineMarkers,
17+
pipelineCreatesDatasetMarkers,
1618
MultiType,
1719
} from 'dive-common/constants';
1820
import { usePrompt } from 'dive-common/vue-utilities/prompt-service';
@@ -77,6 +79,23 @@ const stagedDatasetHeaders: DataTableHeader[] = headersTmpl.concat([
7779
width: 80,
7880
},
7981
]);
82+
const createNewDatasetHeaders: DataTableHeader[] = headersTmpl.concat([
83+
{
84+
text: 'Output Dataset Name',
85+
value: 'output',
86+
sortable: false,
87+
},
88+
{
89+
text: 'Remove',
90+
value: 'remove',
91+
sortable: false,
92+
width: 80,
93+
},
94+
]);
95+
function computeOutputDatasetName(item: JsonMetaCache) {
96+
const timeStamp = (new Date()).toISOString().replace(/[:.]/g, '-');
97+
return `${selectedPipeline.value?.name}_${item.name}_${timeStamp}`;
98+
}
8099
function getAvailableItems(): JsonMetaCache[] {
81100
if (!selectedPipelineType.value || !selectedPipeline.value) {
82101
return [];
@@ -108,7 +127,16 @@ function toggleStaged(item: JsonMetaCache) {
108127
async function runPipelineForDatasets() {
109128
if (selectedPipeline.value !== null) {
110129
const results = await Promise.allSettled(
111-
stagedDatasetIds.value.map((datasetId: string) => runPipeline(datasetId, selectedPipeline.value!)),
130+
stagedDatasetIds.value.map((datasetId: string) => {
131+
if (['transcode', 'filter'].includes(selectedPipeline.value?.type || '')) {
132+
const datasetMeta = availableItems.value.find((item: JsonMetaCache) => item.id === datasetId);
133+
if (!datasetMeta) {
134+
throw new Error(`Attempted to run pipeline on nonexistant dataset ${datasetId}`);
135+
}
136+
return runPipelineWithOutput(datasetId, selectedPipeline.value!, computeOutputDatasetName(datasetMeta));
137+
}
138+
return runPipeline(datasetId, selectedPipeline.value!);
139+
}),
112140
);
113141
const failed = results
114142
.map((result, i) => ({ result, datasetId: stagedDatasetIds.value[i] }))
@@ -178,7 +206,10 @@ onBeforeMount(async () => {
178206
<v-card-title>Datasets staged for selected pipeline</v-card-title>
179207
<v-data-table
180208
dense
181-
v-bind="{ headers: stagedDatasetHeaders, items: stagedDatasets }"
209+
v-bind="{
210+
headers: pipelineCreatesDatasetMarkers.includes(selectedPipelineType || '') ? createNewDatasetHeaders : stagedDatasetHeaders,
211+
items: stagedDatasets,
212+
}"
182213
:items-per-page.sync="clientSettings.rowsPerPage"
183214
hide-default-footer
184215
:hide-default-header="stagedDatasets.length === 0"
@@ -193,6 +224,9 @@ onBeforeMount(async () => {
193224
<v-icon>mdi-minus</v-icon>
194225
</v-btn>
195226
</template>
227+
<template #[`item.output`]="{ item }">
228+
<b>{{ computeOutputDatasetName(item) }}</b>
229+
</template>
196230
</v-data-table>
197231
</div>
198232
<v-row class="mt-7">

client/platform/desktop/frontend/store/jobs.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ import {
1515
JobType,
1616
RunPipeline,
1717
RunTraining,
18+
JsonMeta,
1819
} from 'platform/desktop/constants';
1920
import AsyncGpuJobQueue from './queues/asyncGpuJobQueue';
2021
import AsyncCpuJobQueue from './queues/asyncCpuJobQueue';
22+
import { setRecents } from './dataset';
2123

2224
interface DesktopJobHistory {
2325
job: DesktopJob;
@@ -94,6 +96,9 @@ function init() {
9496
...args, body: ['Job cancelled by user'], exitCode: cancelledJobExitCode, endTime: new Date(), cancelledJob: true,
9597
});
9698
});
99+
ipcRenderer.on('filter-complete', (event, args: JsonMeta) => {
100+
setRecents(args);
101+
});
97102
}
98103

99104
init();

0 commit comments

Comments
 (0)