From 3bc834f33535fad5b4442236e343b128fb42a264 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 8 Dec 2025 12:59:33 +0530 Subject: [PATCH 1/2] chore: IAM Validation added for fail-safe. --- v1/pom.xml | 5 + .../teleport/spanner/ExportPipeline.java | 30 ++++ .../teleport/spanner/ImportPipeline.java | 29 +++- .../teleport/spanner/iam/IAMCheckResult.java | 55 +++++++ .../spanner/iam/IAMPermissionsChecker.java | 135 ++++++++++++++++++ .../spanner/iam/IAMRequirementsCreator.java | 45 ++++++ .../spanner/iam/IAMResourceRequirements.java | 40 ++++++ .../teleport/spanner/iam/package-info.java | 18 +++ .../iam/IAMPermissionsCheckerTest.java | 113 +++++++++++++++ 9 files changed, 469 insertions(+), 1 deletion(-) create mode 100644 v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java create mode 100644 v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java create mode 100644 v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java create mode 100644 v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java create mode 100644 v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java create mode 100644 v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java diff --git a/v1/pom.xml b/v1/pom.xml index 708e1df2d7..f04cae15fd 100644 --- a/v1/pom.xml +++ b/v1/pom.xml @@ -558,6 +558,11 @@ protobuf-java 4.33.2 + + com.google.apis + google-api-services-cloudresourcemanager + v3-rev20251103-2.0.0 + diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java index a3a52d0d92..15c488bc29 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java @@ -23,10 +23,16 @@ import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption; import com.google.cloud.teleport.spanner.ExportPipeline.ExportPipelineOptions; +import com.google.cloud.teleport.spanner.iam.IAMCheckResult; +import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker; +import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator; +import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements; import com.google.cloud.teleport.spanner.spannerio.SpannerConfig; +import java.util.Collections; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; @@ -263,6 +269,7 @@ public static void main(String[] args) { .withDatabaseId(options.getDatabaseId()) .withRpcPriority(options.getSpannerPriority()) .withDataBoostEnabled(options.getDataBoostEnabled()); + p.begin() .apply( "Run Export", @@ -275,6 +282,7 @@ public static void main(String[] args) { options.getShouldExportRelatedTables(), options.getShouldExportTimestampAsLogicalType(), options.getAvroTempDirectory())); + validateRequiredPermissions(options); PipelineResult result = p.run(); if (options.getWaitUntilFinish() && @@ -285,4 +293,26 @@ public static void main(String[] args) { result.waitUntilFinish(); } } + + private static void validateRequiredPermissions(ExportPipelineOptions options) { + IAMResourceRequirements spannerRequirements = + IAMRequirementsCreator.createSpannerResourceRequirement(); + + GcpOptions gcpOptions = options.as(GcpOptions.class); + + IAMPermissionsChecker iamPermissionsChecker = + new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions); + IAMCheckResult missingPermission = + iamPermissionsChecker.check(Collections.singletonList(spannerRequirements)); + if (missingPermission.isSuccess()) { + return; + } + String errorString = + "For resource: " + + missingPermission.getResourceName() + + ", missing permissions: " + + missingPermission.getMissingPermissions() + + ";"; + throw new RuntimeException(errorString); + } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java index 04c016d344..9f9fbfcda3 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java @@ -23,10 +23,16 @@ import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption; import com.google.cloud.teleport.spanner.ImportPipeline.Options; +import com.google.cloud.teleport.spanner.iam.IAMCheckResult; +import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker; +import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator; +import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements; import com.google.cloud.teleport.spanner.spannerio.SpannerConfig; +import java.util.Collections; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; @@ -240,7 +246,6 @@ public static void main(String[] args) { .withInstanceId(options.getInstanceId()) .withDatabaseId(options.getDatabaseId()) .withRpcPriority(options.getSpannerPriority()); - p.apply( new ImportTransform( spannerConfig, @@ -253,6 +258,7 @@ public static void main(String[] args) { options.getDdlCreationTimeoutInMinutes(), options.getEarlyIndexCreateThreshold())); + validateRequiredPermissions(options); PipelineResult result = p.run(); if (options.getWaitUntilFinish() @@ -264,4 +270,25 @@ public static void main(String[] args) { result.waitUntilFinish(); } } + + private static void validateRequiredPermissions(Options options) { + IAMResourceRequirements spannerRequirements = + IAMRequirementsCreator.createSpannerResourceRequirement(); + GcpOptions gcpOptions = options.as(GcpOptions.class); + + IAMPermissionsChecker iamPermissionsChecker = + new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions); + IAMCheckResult missingPermission = + iamPermissionsChecker.check(Collections.singletonList(spannerRequirements)); + if (missingPermission.isSuccess()) { + return; + } + String errorString = + "For resource: " + + missingPermission.getResourceName() + + ", missing permissions: " + + missingPermission.getMissingPermissions() + + ";"; + throw new RuntimeException(errorString); + } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java new file mode 100644 index 0000000000..49e9cc74f8 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import java.util.ArrayList; +import java.util.List; + +/** Represents the result of an IAM permission check on a specific resource. */ +public class IAMCheckResult { + private final String resourceName; + private final List missingPermissions; + + public IAMCheckResult(String resourceName, List missingPermissions) { + this.resourceName = resourceName; + this.missingPermissions = new ArrayList<>(missingPermissions); + } + + public String getResourceName() { + return resourceName; + } + + public List getMissingPermissions() { + return new ArrayList<>(missingPermissions); + } + + public boolean isSuccess() { + return missingPermissions.isEmpty(); + } + + @Override + public String toString() { + return "IAMCheckResult{" + + "resourceName='" + + resourceName + + '\'' + + ", missingPermissions=" + + missingPermissions + + ", success=" + + isSuccess() + + '}'; + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java new file mode 100644 index 0000000000..caeb40285c --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.cloudresourcemanager.v3.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.v3.model.TestIamPermissionsRequest; +import com.google.api.services.cloudresourcemanager.v3.model.TestIamPermissionsResponse; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility to check IAM permissions for various GCP resources. */ +public class IAMPermissionsChecker { + private static final Logger LOG = LoggerFactory.getLogger(IAMPermissionsChecker.class); + private final Credentials credential; + private static final String RESOURCE_NAME_FORMAT = "projects/%s"; + private final String projectIdResource; + + @VisibleForTesting static CloudResourceManager resourceManagerForTesting; + + public IAMPermissionsChecker(String projectId, GcpOptions gcpOptions) { + this.credential = gcpOptions.getGcpCredential(); + this.projectIdResource = String.format("projects/%s", projectId); + } + + @VisibleForTesting + static void setResourceManagerForTesting(CloudResourceManager resourceManager) { + resourceManagerForTesting = resourceManager; + } + + /** + * Checks IAM permissions for a list of requirements. + * + * @param requirements List of resources and required permissions. + * @return List of results, only missing permissions are included. Empty list indicate all the + * requirements are met. + */ + public IAMCheckResult check(List requirements) { + try { + CloudResourceManager resourceManager = createCloudResourceManagerService(); + + List permissionList = + requirements.stream() + .map(IAMResourceRequirements::getPermissions) + .flatMap(Collection::stream) + .toList(); + HashSet grantedPermissions = + new HashSet<>(checkPermission(resourceManager, projectIdResource, permissionList)); + + List missingPermissions = + permissionList.stream() + .filter(p -> !grantedPermissions.contains(p)) + .collect(Collectors.toList()); + + return new IAMCheckResult(projectIdResource, missingPermissions); + } catch (IOException | GeneralSecurityException e) { + throw new RuntimeException(e); + } + } + + private List checkPermission( + CloudResourceManager resourceManager, String resourceName, List permissions) { + try { + + TestIamPermissionsRequest requestBody = + new TestIamPermissionsRequest().setPermissions(permissions); + + TestIamPermissionsResponse testIamPermissionsResponse = + resourceManager.projects().testIamPermissions(resourceName, requestBody).execute(); + + List granted = testIamPermissionsResponse.getPermissions(); + return granted == null ? Collections.emptyList() : granted; + } catch (IOException e) { + throw new RuntimeException("Failed to check project permissions", e); + } + } + + private CloudResourceManager createCloudResourceManagerService() + throws IOException, GeneralSecurityException { + if (resourceManagerForTesting != null) { + return resourceManagerForTesting; + } + HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); + JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); + HttpRequestInitializer initializer = getHttpRequestInitializer(this.credential); + CloudResourceManager service = + new CloudResourceManager.Builder(httpTransport, jsonFactory, initializer) + .setApplicationName("service-accounts") + .build(); + return service; + } + + private static HttpRequestInitializer getHttpRequestInitializer(Credentials credential) + throws IOException { + if (credential == null) { + try { + return GoogleCredential.getApplicationDefault(); + } catch (Exception e) { + return new NullCredentialInitializer(); + } + } else { + return new HttpCredentialsAdapter(credential); + } + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java new file mode 100644 index 0000000000..1180c2fb63 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import com.google.common.collect.ImmutableList; +import java.util.List; + +public class IAMRequirementsCreator { + private static final List SPANNER_PERMISSIONS = + ImmutableList.of( + "spanner.databases.beginOrRollbackReadWriteTransaction", + "spanner.databases.beginPartitionedDmlTransaction", + "spanner.databases.beginReadOnlyTransaction", + "spanner.databases.create", + "spanner.databases.drop", + "spanner.databases.get", + "spanner.databases.getDdl", + "spanner.databases.list", + "spanner.databases.partitionQuery", + "spanner.databases.partitionRead", + "spanner.databases.read", + "spanner.databases.select", + "spanner.databases.update", + "spanner.databases.updateDdl", + "spanner.databases.write", + "spanner.instances.get", + "spanner.instances.list"); + + public static IAMResourceRequirements createSpannerResourceRequirement() { + return new IAMResourceRequirements(SPANNER_PERMISSIONS); + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java new file mode 100644 index 0000000000..5211d2a206 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import java.util.ArrayList; +import java.util.List; + +/** Represents the IAM permissions required on a specific GCP resource. */ +public class IAMResourceRequirements { + private final List permissions; + + public IAMResourceRequirements(List permissions) { + if (permissions == null || permissions.isEmpty()) { + throw new IllegalArgumentException("Permissions list must not be empty"); + } + this.permissions = new ArrayList<>(permissions); + } + + public List getPermissions() { + return new ArrayList<>(permissions); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + "permissions=" + permissions + '}'; + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java new file mode 100644 index 0000000000..e65e4143f0 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2020 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** IAM validatory Utility classes for templates. */ +package com.google.cloud.teleport.spanner.iam; diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java new file mode 100644 index 0000000000..fd8c18ea76 --- /dev/null +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import com.google.api.services.cloudresourcemanager.v3.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.v3.model.TestIamPermissionsResponse; +import com.google.auth.Credentials; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for IAMPermissionsChecker. */ +@RunWith(JUnit4.class) +public class IAMPermissionsCheckerTest { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private CloudResourceManager mockResourceManager; + + @Mock private GcpOptions mockGcpOptions; + @Mock private Credentials mockCredentials; + + private IAMPermissionsChecker checker; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(mockGcpOptions.getGcpCredential()).thenReturn(mockCredentials); + checker = new IAMPermissionsChecker("test-project", mockGcpOptions); + + // Mock the CloudResourceManager service creation + IAMPermissionsChecker.setResourceManagerForTesting(mockResourceManager); + } + + @Test + public void testCheck_allPermissionsGranted() throws IOException { + // Arrange + List requiredPermissions = Arrays.asList("p1", "p2"); + IAMResourceRequirements requirements = new IAMResourceRequirements(requiredPermissions); + TestIamPermissionsResponse response = + new TestIamPermissionsResponse().setPermissions(requiredPermissions); + when(mockResourceManager.projects().testIamPermissions(anyString(), any()).execute()) + .thenReturn(response); + + // Act + IAMCheckResult result = checker.check(Collections.singletonList(requirements)); + + // Assert + assertTrue(result.isSuccess()); + assertTrue(result.getMissingPermissions().isEmpty()); + } + + @Test + public void testCheck_somePermissionsMissing() throws IOException { + // Arrange + List requiredPermissions = Arrays.asList("p1", "p2", "p3"); + List grantedPermissions = Arrays.asList("p1", "p3"); + IAMResourceRequirements requirements = new IAMResourceRequirements(requiredPermissions); + TestIamPermissionsResponse response = + new TestIamPermissionsResponse().setPermissions(grantedPermissions); + when(mockResourceManager.projects().testIamPermissions(anyString(), any()).execute()) + .thenReturn(response); + + // Act + IAMCheckResult result = checker.check(Collections.singletonList(requirements)); + + // Assert + assertEquals(Arrays.asList("p2"), result.getMissingPermissions()); + } + + @Test + public void testCheck_noPermissionsGranted() throws IOException { + // Arrange + List requiredPermissions = Arrays.asList("p1", "p2"); + IAMResourceRequirements requirements = new IAMResourceRequirements(requiredPermissions); + TestIamPermissionsResponse response = new TestIamPermissionsResponse(); // No permissions + when(mockResourceManager.projects().testIamPermissions(anyString(), any()).execute()) + .thenReturn(response); + + // Act + IAMCheckResult result = checker.check(Collections.singletonList(requirements)); + + // Assert + assertEquals(requiredPermissions, result.getMissingPermissions()); + } +} From 92d29c7b6eb30a84ae0c8710edef01e8ff7596e4 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 15 Dec 2025 10:39:21 +0530 Subject: [PATCH 2/2] chore: Fixed constructor argument. --- .../teleport/spanner/ExportPipeline.java | 46 ++++++++++------- .../teleport/spanner/ImportPipeline.java | 44 +++++++++------- .../teleport/spanner/iam/IAMCheckResult.java | 4 +- .../spanner/iam/IAMPermissionsChecker.java | 50 +++++++++---------- .../spanner/iam/IAMRequirementsCreator.java | 9 +++- .../spanner/iam/IAMResourceRequirements.java | 5 +- .../iam/IAMPermissionsCheckerTest.java | 7 +-- 7 files changed, 94 insertions(+), 71 deletions(-) diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java index 15c488bc29..42cdb7a238 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java @@ -28,6 +28,8 @@ import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator; import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements; import com.google.cloud.teleport.spanner.spannerio.SpannerConfig; +import java.io.IOException; +import java.security.GeneralSecurityException; import java.util.Collections; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; @@ -40,6 +42,8 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Dataflow template that exports a Cloud Spanner database to Avro files in GCS. @@ -78,6 +82,7 @@ "In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the appropriate IAM roles for reading your Cloud Spanner data and writing to your Cloud Storage bucket." }) public class ExportPipeline { + private static final Logger LOG = LoggerFactory.getLogger(ExportPipeline.class); /** Options for Export pipeline. */ public interface ExportPipelineOptions extends PipelineOptions { @@ -295,24 +300,29 @@ public static void main(String[] args) { } private static void validateRequiredPermissions(ExportPipelineOptions options) { - IAMResourceRequirements spannerRequirements = - IAMRequirementsCreator.createSpannerResourceRequirement(); - - GcpOptions gcpOptions = options.as(GcpOptions.class); - - IAMPermissionsChecker iamPermissionsChecker = - new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions); - IAMCheckResult missingPermission = - iamPermissionsChecker.check(Collections.singletonList(spannerRequirements)); - if (missingPermission.isSuccess()) { - return; + try { + IAMResourceRequirements spannerRequirements = + IAMRequirementsCreator.createSpannerResourceRequirement(); + + GcpOptions gcpOptions = options.as(GcpOptions.class); + + IAMPermissionsChecker iamPermissionsChecker = + new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions); + IAMCheckResult missingPermission = + iamPermissionsChecker.check(Collections.singletonList(spannerRequirements)); + if (missingPermission.isPermissionsAvailable()) { + return; + } + String errorString = + "For resource: " + + missingPermission.getResourceName() + + ", missing permissions: " + + missingPermission.getMissingPermissions() + + ";"; + throw new RuntimeException(errorString); + } catch (GeneralSecurityException | IOException e) { + LOG.error("Error while validating permissions for spanner", e); + throw new RuntimeException(e); } - String errorString = - "For resource: " - + missingPermission.getResourceName() - + ", missing permissions: " - + missingPermission.getMissingPermissions() - + ";"; - throw new RuntimeException(errorString); } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java index 9f9fbfcda3..d4f4ea9d01 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java @@ -28,6 +28,8 @@ import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator; import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements; import com.google.cloud.teleport.spanner.spannerio.SpannerConfig; +import java.io.IOException; +import java.security.GeneralSecurityException; import java.util.Collections; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; @@ -40,6 +42,8 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Avro to Cloud Spanner Import pipeline. @@ -65,6 +69,7 @@ "The input Cloud Storage path must exist, and it must include a spanner-export.json file that contains a JSON description of files to import." }) public class ImportPipeline { + private static final Logger LOG = LoggerFactory.getLogger(ImportPipeline.class); /** Options for {@link ImportPipeline}. */ public interface Options extends PipelineOptions { @@ -272,23 +277,28 @@ public static void main(String[] args) { } private static void validateRequiredPermissions(Options options) { - IAMResourceRequirements spannerRequirements = - IAMRequirementsCreator.createSpannerResourceRequirement(); - GcpOptions gcpOptions = options.as(GcpOptions.class); - - IAMPermissionsChecker iamPermissionsChecker = - new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions); - IAMCheckResult missingPermission = - iamPermissionsChecker.check(Collections.singletonList(spannerRequirements)); - if (missingPermission.isSuccess()) { - return; + try { + IAMResourceRequirements spannerRequirements = + IAMRequirementsCreator.createSpannerResourceRequirement(); + GcpOptions gcpOptions = options.as(GcpOptions.class); + + IAMPermissionsChecker iamPermissionsChecker = + new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions); + IAMCheckResult missingPermission = + iamPermissionsChecker.check(Collections.singletonList(spannerRequirements)); + if (missingPermission.isPermissionsAvailable()) { + return; + } + String errorString = + "For resource: " + + missingPermission.getResourceName() + + ", missing permissions: " + + missingPermission.getMissingPermissions() + + ";"; + throw new RuntimeException(errorString); + } catch (GeneralSecurityException | IOException e) { + LOG.error("Error while validating permissions for spanner service", e); + throw new RuntimeException(e); } - String errorString = - "For resource: " - + missingPermission.getResourceName() - + ", missing permissions: " - + missingPermission.getMissingPermissions() - + ";"; - throw new RuntimeException(errorString); } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java index 49e9cc74f8..509c3904f1 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java @@ -36,7 +36,7 @@ public List getMissingPermissions() { return new ArrayList<>(missingPermissions); } - public boolean isSuccess() { + public boolean isPermissionsAvailable() { return missingPermissions.isEmpty(); } @@ -49,7 +49,7 @@ public String toString() { + ", missingPermissions=" + missingPermissions + ", success=" - + isSuccess() + + isPermissionsAvailable() + '}'; } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java index caeb40285c..5b0b9a9131 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java @@ -46,46 +46,46 @@ public class IAMPermissionsChecker { private static final String RESOURCE_NAME_FORMAT = "projects/%s"; private final String projectIdResource; - @VisibleForTesting static CloudResourceManager resourceManagerForTesting; + private final CloudResourceManager resourceManager; - public IAMPermissionsChecker(String projectId, GcpOptions gcpOptions) { + public IAMPermissionsChecker(String projectId, GcpOptions gcpOptions) + throws GeneralSecurityException, IOException { this.credential = gcpOptions.getGcpCredential(); this.projectIdResource = String.format("projects/%s", projectId); + resourceManager = createCloudResourceManagerService(); } @VisibleForTesting - static void setResourceManagerForTesting(CloudResourceManager resourceManager) { - resourceManagerForTesting = resourceManager; + IAMPermissionsChecker( + String projectId, GcpOptions gcpOptions, CloudResourceManager resourceManager) { + this.credential = gcpOptions.getGcpCredential(); + this.projectIdResource = String.format("projects/%s", projectId); + this.resourceManager = resourceManager; } /** - * Checks IAM permissions for a list of requirements. + * Checks IAM permissions for a list of requirements. This api should be called once with all the + * requirements. * * @param requirements List of resources and required permissions. * @return List of results, only missing permissions are included. Empty list indicate all the * requirements are met. */ public IAMCheckResult check(List requirements) { - try { - CloudResourceManager resourceManager = createCloudResourceManagerService(); + List permissionList = + requirements.stream() + .map(IAMResourceRequirements::getPermissions) + .flatMap(Collection::stream) + .toList(); + HashSet grantedPermissions = + new HashSet<>(checkPermission(resourceManager, projectIdResource, permissionList)); - List permissionList = - requirements.stream() - .map(IAMResourceRequirements::getPermissions) - .flatMap(Collection::stream) - .toList(); - HashSet grantedPermissions = - new HashSet<>(checkPermission(resourceManager, projectIdResource, permissionList)); + List missingPermissions = + permissionList.stream() + .filter(p -> !grantedPermissions.contains(p)) + .collect(Collectors.toList()); - List missingPermissions = - permissionList.stream() - .filter(p -> !grantedPermissions.contains(p)) - .collect(Collectors.toList()); - - return new IAMCheckResult(projectIdResource, missingPermissions); - } catch (IOException | GeneralSecurityException e) { - throw new RuntimeException(e); - } + return new IAMCheckResult(projectIdResource, missingPermissions); } private List checkPermission( @@ -101,15 +101,13 @@ private List checkPermission( List granted = testIamPermissionsResponse.getPermissions(); return granted == null ? Collections.emptyList() : granted; } catch (IOException e) { + LOG.error("Error checking permissions for resource {}", resourceName, e); throw new RuntimeException("Failed to check project permissions", e); } } private CloudResourceManager createCloudResourceManagerService() throws IOException, GeneralSecurityException { - if (resourceManagerForTesting != null) { - return resourceManagerForTesting; - } HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); HttpRequestInitializer initializer = getHttpRequestInitializer(this.credential); diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java index 1180c2fb63..8cb35d0c92 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java @@ -18,8 +18,13 @@ import com.google.common.collect.ImmutableList; import java.util.List; +/** + * The goal of this class contain the list of permissions required by various templates. This will + * help in maintaining centralised lists for all templates. + */ public class IAMRequirementsCreator { - private static final List SPANNER_PERMISSIONS = + /** Default permissions required by templates. */ + private static final List DEFAULT_SPANNER_PERMISSIONS = ImmutableList.of( "spanner.databases.beginOrRollbackReadWriteTransaction", "spanner.databases.beginPartitionedDmlTransaction", @@ -40,6 +45,6 @@ public class IAMRequirementsCreator { "spanner.instances.list"); public static IAMResourceRequirements createSpannerResourceRequirement() { - return new IAMResourceRequirements(SPANNER_PERMISSIONS); + return new IAMResourceRequirements(DEFAULT_SPANNER_PERMISSIONS); } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java index 5211d2a206..9a9d512e45 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java @@ -18,7 +18,10 @@ import java.util.ArrayList; import java.util.List; -/** Represents the IAM permissions required on a specific GCP resource. */ +/** + * Represents the IAM permissions required on a specific GCP resource. This can be expanded to + * contain resource name when permission specific to resources need to be validated. + */ public class IAMResourceRequirements { private final List permissions; diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java index fd8c18ea76..a1219f5428 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java @@ -53,10 +53,7 @@ public class IAMPermissionsCheckerTest { public void setUp() throws IOException { MockitoAnnotations.initMocks(this); when(mockGcpOptions.getGcpCredential()).thenReturn(mockCredentials); - checker = new IAMPermissionsChecker("test-project", mockGcpOptions); - - // Mock the CloudResourceManager service creation - IAMPermissionsChecker.setResourceManagerForTesting(mockResourceManager); + checker = new IAMPermissionsChecker("test-project", mockGcpOptions, mockResourceManager); } @Test @@ -73,7 +70,7 @@ public void testCheck_allPermissionsGranted() throws IOException { IAMCheckResult result = checker.check(Collections.singletonList(requirements)); // Assert - assertTrue(result.isSuccess()); + assertTrue(result.isPermissionsAvailable()); assertTrue(result.getMissingPermissions().isEmpty()); }