Skip to content

Commit 0687108

Browse files
DarshitChanpuradbwiddisowaiskazi19
authored
Onboards flow-framework plugin to resource-sharing and access control framework (#1251)
* Onboards flow-framework plugin to resource-sharing and access control framework Signed-off-by: Darshit Chanpura <[email protected]> * Adds javadoc and a changelog entry Signed-off-by: Darshit Chanpura <[email protected]> * Adds tests for resource sharing flow and adds a CI job to run resource-sharing tests Signed-off-by: Darshit Chanpura <[email protected]> # Conflicts: # .github/workflows/test_security.yml * Explicitly set security spi version to 3.4 Signed-off-by: Darshit Chanpura <[email protected]> * Refactors test to use in house Recipient class Signed-off-by: Darshit Chanpura <[email protected]> * Address PR comments around renaming and version Signed-off-by: Darshit Chanpura <[email protected]> * Constant for awaitility version Signed-off-by: Darshit Chanpura <[email protected]> * Fix typo in awaitilityVersion Co-authored-by: Owais Kazi <[email protected]> Signed-off-by: Daniel Widdis <[email protected]> * Update ResourceProvider to use anonymous class Signed-off-by: Daniel Widdis <[email protected]> * Fix javadocs Signed-off-by: Daniel Widdis <[email protected]> * Wrap checked exception from PluginClient Signed-off-by: Daniel Widdis <[email protected]> * Fix tests with param index and eq rather than raw strings Signed-off-by: Daniel Widdis <[email protected]> * Properly skip ResourceSharingApiIT Signed-off-by: Daniel Widdis <[email protected]> * Add unit test for ResourceSharingExtension Signed-off-by: Daniel Widdis <[email protected]> * Updates template to include `all_shared_principals` and marks relevant action-requests as DocRequests Signed-off-by: Darshit Chanpura <[email protected]> * Fix checkstyle errors Signed-off-by: Darshit Chanpura <[email protected]> * Fix early .onResponse return and add all_shared_principals to workflow state reset call Signed-off-by: Darshit Chanpura <[email protected]> * Add implementation of type method Signed-off-by: Darshit Chanpura <[email protected]> * Completes FlowFrameworkSecureRestApiIT Signed-off-by: Darshit Chanpura <[email protected]> * Fixes javadoc CI and addresses flakyness in search test when resource sharing is enab;ed Signed-off-by: Darshit Chanpura <[email protected]> * Fixes unit tests Signed-off-by: Darshit Chanpura <[email protected]> * Updates coverage and adds missing builder method call in Template class Signed-off-by: Darshit Chanpura <[email protected]> * Fix test to wait for async state deletion attempt Signed-off-by: Daniel Widdis <[email protected]> --------- Signed-off-by: Darshit Chanpura <[email protected]> Signed-off-by: Daniel Widdis <[email protected]> Co-authored-by: Daniel Widdis <[email protected]> Co-authored-by: Owais Kazi <[email protected]>
1 parent 60c3da9 commit 0687108

File tree

51 files changed

+1852
-220
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1852
-220
lines changed

.github/workflows/test_security.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ jobs:
2020
strategy:
2121
matrix:
2222
java: [21, 25]
23+
resource_sharing_flag: [ false, true ]
2324

2425
name: Run Security Integration Tests on Linux
2526
runs-on: ubuntu-latest
@@ -45,4 +46,8 @@ jobs:
4546
# switching the user, as OpenSearch cluster can only be started as root/Administrator on linux-deb/linux-rpm/windows-zip.
4647
run: |
4748
chown -R 1000:1000 `pwd`
48-
su `id -un 1000` -c "whoami && java -version && ./gradlew integTest -Dsecurity.enabled=true"
49+
su `id -un 1000` -c "whoami && java -version && ./gradlew integTest \
50+
-Dsecurity.enabled=true \
51+
-Dhttps=true \
52+
-Dresource_sharing.enabled=${{ matrix.resource_sharing_flag }} \
53+
--tests '*IT'"

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
55

66
## [Unreleased 3.3](https://github.com/opensearch-project/flow-framework/compare/3.2...HEAD)
77
### Features
8+
- Onboards flow-framework plugin to resource-sharing and access control framework ([#1251](https://github.com/opensearch-project/flow-framework/pull/1251))
9+
810
### Enhancements
911
### Bug Fixes
1012
- Pre-create ML Commons indices for Tenant Aware tests ([#1217](https://github.com/opensearch-project/flow-framework/pull/1217))

build.gradle

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ buildscript {
4848
parssonVersion = "1.1.7"
4949
swaggerVersion = "2.1.35"
5050
swaggerCoreVersion = "2.2.40"
51+
awaitilityVersion= "4.3.0"
5152
}
5253

5354
repositories {
@@ -95,6 +96,7 @@ opensearchplugin {
9596
classname = "${projectPath}.${pathToPlugin}.${pluginClassName}"
9697
licenseFile = rootProject.file('LICENSE')
9798
noticeFile = rootProject.file('NOTICE')
99+
extendedPlugins = ['opensearch-security;optional=true']
98100
}
99101

100102
dependencyLicenses.enabled = false
@@ -193,6 +195,9 @@ configurations {
193195
}
194196

195197
dependencies {
198+
// For resource access control
199+
compileOnly("org.opensearch:opensearch-security-spi:${opensearch_build}")
200+
196201
implementation("org.opensearch:opensearch:${opensearch_version}")
197202
api("org.opensearch:opensearch-ml-client:${opensearch_build}")
198203
// json, jsonpath, and commons-text are required by MLClient but must be provided by calling plugins
@@ -252,6 +257,8 @@ dependencies {
252257
testImplementation("org.junit.jupiter:junit-jupiter:${junitJupiterVersion}")
253258
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson_databind}")
254259

260+
testImplementation("org.awaitility:awaitility:${awaitilityVersion}")
261+
255262
// ZipArchive dependencies used for integration tests
256263
zipArchive("org.opensearch.plugin:opensearch-job-scheduler:${opensearch_build}")
257264
zipArchive("org.opensearch.plugin:opensearch-ml-plugin:${opensearch_build}")
@@ -354,6 +361,8 @@ integTest {
354361
systemProperty('user', user)
355362
systemProperty('password', password)
356363

364+
systemProperty "resource_sharing.enabled", System.getProperty("resource_sharing.enabled")
365+
357366
// Only tenant aware test if set
358367
if (System.getProperty("tests.rest.tenantaware") == "true") {
359368
filter {
@@ -382,13 +391,15 @@ integTest {
382391
if (System.getProperty("https") == null || System.getProperty("https") == "false") {
383392
filter {
384393
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkSecureRestApiIT"
394+
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkResourceSharingRestApiIT"
385395
}
386396
}
387397

388398
// Include only secure integration tests in security enabled clusters
389399
if (System.getProperty("https") != null && System.getProperty("https") == "true") {
390400
filter {
391401
includeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkSecureRestApiIT"
402+
includeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkResourceSharingRestApiIT"
392403
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkRestApiIT"
393404
excludeTestsMatching "org.opensearch.flowframework.rest.*TenantAwareIT"
394405
}
@@ -513,6 +524,10 @@ testClusters.integTest {
513524
'".plugins-flow-framework-state"' +
514525
']'
515526
)
527+
if (System.getProperty("resource_sharing.enabled") == "true") {
528+
setting("plugins.security.experimental.resource_sharing.enabled", "true")
529+
setting("plugins.security.experimental.resource_sharing.protected_types", "[\"workflow\", \"workflow_state\"]")
530+
}
516531
setSecure(true)
517532
}
518533

@@ -545,7 +560,13 @@ testClusters.integTest {
545560
if (System.getProperty("opensearch.debug") != null) {
546561
def debugPort = 5005
547562
nodes.forEach { node ->
548-
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}")
563+
// server=n,suspend=y -> node tries to connect to a debugger and hence test runs fails with
564+
// Exec output and error:
565+
// | Output for ./bin/opensearch-plugin:ERROR: transport error 202: connect failed: Connection refused
566+
// | ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510)
567+
// | JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports initialized [src/jdk.jdwp.agent/share/native/libjdwp/debugInit.c:700].
568+
// So instead, we listen to a debugger by saying server=y and suspend=n
569+
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:${debugPort}")
549570
debugPort += 1
550571
}
551572
}
@@ -573,6 +594,8 @@ task integTestRemote(type: RestIntegTestTask) {
573594
systemProperty 'cluster.number_of_nodes', "${_numNodes}"
574595
systemProperty 'tests.security.manager', 'false'
575596

597+
systemProperty "resource_sharing.enabled", System.getProperty("resource_sharing.enabled")
598+
576599
// Run tests with remote cluster only if rest case is defined
577600
if (System.getProperty("tests.rest.cluster") != null) {
578601
filter {
@@ -584,6 +607,7 @@ task integTestRemote(type: RestIntegTestTask) {
584607
if (System.getProperty("https") == null || System.getProperty("https") == "false") {
585608
filter {
586609
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkSecureRestApiIT"
610+
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkResourceSharingRestApiIT"
587611
}
588612
}
589613

@@ -592,6 +616,7 @@ task integTestRemote(type: RestIntegTestTask) {
592616
filter {
593617
includeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkSecureRestApiIT"
594618
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkRestApiIT"
619+
includeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkResourceSharingRestApiIT"
595620
}
596621
}
597622
}

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,14 @@
5757
import org.opensearch.flowframework.transport.SearchWorkflowTransportAction;
5858
import org.opensearch.flowframework.transport.handler.SearchHandler;
5959
import org.opensearch.flowframework.util.EncryptorUtils;
60+
import org.opensearch.flowframework.util.PluginClient;
6061
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
6162
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
63+
import org.opensearch.identity.PluginSubject;
6264
import org.opensearch.indices.SystemIndexDescriptor;
6365
import org.opensearch.ml.client.MachineLearningNodeClient;
6466
import org.opensearch.plugins.ActionPlugin;
67+
import org.opensearch.plugins.IdentityAwarePlugin;
6568
import org.opensearch.plugins.Plugin;
6669
import org.opensearch.plugins.SystemIndexPlugin;
6770
import org.opensearch.remote.metadata.client.SdkClient;
@@ -116,10 +119,12 @@
116119
/**
117120
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
118121
*/
119-
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin, SystemIndexPlugin {
122+
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin, SystemIndexPlugin, IdentityAwarePlugin {
120123

121124
private FlowFrameworkSettings flowFrameworkSettings;
122125

126+
private PluginClient pluginClient;
127+
123128
/**
124129
* Instantiate this plugin.
125130
*/
@@ -143,8 +148,11 @@ public Collection<Object> createComponents(
143148
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
144149
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
145150
boolean multiTenancyEnabled = FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED.get(settings);
151+
152+
this.pluginClient = new PluginClient(client);
153+
146154
SdkClient sdkClient = SdkClientFactory.createSdkClient(
147-
client,
155+
pluginClient,
148156
xContentRegistry,
149157
// Here we assume remote metadata client is only used with tenant awareness.
150158
// This may change in the future allowing more options for this map
@@ -296,4 +304,11 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
296304
);
297305
}
298306

307+
@Override
308+
public void assignSubject(PluginSubject pluginSubject) {
309+
if (this.pluginClient != null) {
310+
this.pluginClient.setSubject(pluginSubject);
311+
}
312+
}
313+
299314
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
package org.opensearch.flowframework;
10+
11+
import org.opensearch.flowframework.common.CommonValue;
12+
import org.opensearch.flowframework.util.ResourceSharingClientAccessor;
13+
import org.opensearch.security.spi.resources.ResourceProvider;
14+
import org.opensearch.security.spi.resources.ResourceSharingExtension;
15+
import org.opensearch.security.spi.resources.client.ResourceSharingClient;
16+
17+
import java.util.Set;
18+
19+
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
20+
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
21+
22+
/**
23+
* Implementation for sharing resources that require access control.
24+
*/
25+
public class FlowFrameworkResourceSharingExtension implements ResourceSharingExtension {
26+
@Override
27+
public Set<ResourceProvider> getResourceProviders() {
28+
return Set.of(new ResourceProvider() {
29+
@Override
30+
public String resourceType() {
31+
return CommonValue.WORKFLOW_RESOURCE_TYPE;
32+
}
33+
34+
@Override
35+
public String resourceIndexName() {
36+
return GLOBAL_CONTEXT_INDEX;
37+
}
38+
}, new ResourceProvider() {
39+
@Override
40+
public String resourceType() {
41+
return CommonValue.WORKFLOW_STATE_RESOURCE_TYPE;
42+
}
43+
44+
@Override
45+
public String resourceIndexName() {
46+
return WORKFLOW_STATE_INDEX;
47+
}
48+
});
49+
}
50+
51+
@Override
52+
public void assignResourceSharingClient(ResourceSharingClient resourceSharingClient) {
53+
ResourceSharingClientAccessor.getInstance().setResourceSharingClient(resourceSharingClient);
54+
}
55+
}

src/main/java/org/opensearch/flowframework/common/CommonValue.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ private CommonValue() {}
5151
public static final String CREATE_TIME = "create_time";
5252
/** The template field name for the user who created the workflow **/
5353
public static final String USER_FIELD = "user";
54+
/** The template field name for the entities whom this workflow is shared with **/
55+
public static final String ALL_SHARED_PRINCIPALS_FIELD = "all_shared_principals";
5456
/** The created time field */
5557
public static final String CREATED_TIME = "created_time";
5658
/** The last updated time field */
@@ -264,4 +266,10 @@ private CommonValue() {}
264266
*/
265267
/** Version 2.19.0 */
266268
public static final Version VERSION_2_19_0 = Version.fromString("2.19.0");
269+
270+
/*
271+
* Constants associated with resource-sharing
272+
*/
273+
public static final String WORKFLOW_STATE_RESOURCE_TYPE = "workflow_state";
274+
public static final String WORKFLOW_RESOURCE_TYPE = "workflow";
267275
}

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,9 +410,16 @@ public void initializeConfigIndex(String tenantId, ActionListener<Boolean> liste
410410
* @param workflowId the workflowId, corresponds to document ID
411411
* @param tenantId the tenant id
412412
* @param user passes the user that created the workflow
413+
* @param allSharedPrincipals the entities this workflow is shared with
413414
* @param listener action listener
414415
*/
415-
public void putInitialStateToWorkflowState(String workflowId, String tenantId, User user, ActionListener<IndexResponse> listener) {
416+
public void putInitialStateToWorkflowState(
417+
String workflowId,
418+
String tenantId,
419+
User user,
420+
List<String> allSharedPrincipals,
421+
ActionListener<IndexResponse> listener
422+
) {
416423
WorkflowState state = WorkflowState.builder()
417424
.workflowId(workflowId)
418425
.state(State.NOT_STARTED.name())
@@ -421,6 +428,7 @@ public void putInitialStateToWorkflowState(String workflowId, String tenantId, U
421428
.resourcesCreated(Collections.emptyList())
422429
.userOutputs(Collections.emptyMap())
423430
.tenantId(tenantId)
431+
.allSharedPrincipals(allSharedPrincipals)
424432
.build();
425433
initWorkflowStateIndexIfAbsent(ActionListener.wrap(indexCreated -> {
426434
if (!indexCreated) {

0 commit comments

Comments
 (0)