diff --git a/v1/pom.xml b/v1/pom.xml index 708e1df2d7..f04cae15fd 100644 --- a/v1/pom.xml +++ b/v1/pom.xml @@ -558,6 +558,11 @@ protobuf-java 4.33.2 + + com.google.apis + google-api-services-cloudresourcemanager + v3-rev20251103-2.0.0 + diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java index a3a52d0d92..42cdb7a238 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportPipeline.java @@ -23,10 +23,18 @@ import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption; import com.google.cloud.teleport.spanner.ExportPipeline.ExportPipelineOptions; +import com.google.cloud.teleport.spanner.iam.IAMCheckResult; +import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker; +import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator; +import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements; import com.google.cloud.teleport.spanner.spannerio.SpannerConfig; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Collections; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; @@ -34,6 +42,8 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Dataflow template that exports a Cloud Spanner database to Avro files in GCS. @@ -72,6 +82,7 @@ "In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the appropriate IAM roles for reading your Cloud Spanner data and writing to your Cloud Storage bucket." }) public class ExportPipeline { + private static final Logger LOG = LoggerFactory.getLogger(ExportPipeline.class); /** Options for Export pipeline. */ public interface ExportPipelineOptions extends PipelineOptions { @@ -263,6 +274,7 @@ public static void main(String[] args) { .withDatabaseId(options.getDatabaseId()) .withRpcPriority(options.getSpannerPriority()) .withDataBoostEnabled(options.getDataBoostEnabled()); + p.begin() .apply( "Run Export", @@ -275,6 +287,7 @@ public static void main(String[] args) { options.getShouldExportRelatedTables(), options.getShouldExportTimestampAsLogicalType(), options.getAvroTempDirectory())); + validateRequiredPermissions(options); PipelineResult result = p.run(); if (options.getWaitUntilFinish() && @@ -285,4 +298,31 @@ public static void main(String[] args) { result.waitUntilFinish(); } } + + private static void validateRequiredPermissions(ExportPipelineOptions options) { + try { + IAMResourceRequirements spannerRequirements = + IAMRequirementsCreator.createSpannerResourceRequirement(); + + GcpOptions gcpOptions = options.as(GcpOptions.class); + + IAMPermissionsChecker iamPermissionsChecker = + new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions); + IAMCheckResult missingPermission = + iamPermissionsChecker.check(Collections.singletonList(spannerRequirements)); + if (missingPermission.isPermissionsAvailable()) { + return; + } + String errorString = + "For resource: " + + missingPermission.getResourceName() + + ", missing permissions: " + + missingPermission.getMissingPermissions() + + ";"; + throw new RuntimeException(errorString); + } catch (GeneralSecurityException | IOException e) { + LOG.error("Error while validating permissions for spanner", e); + throw new RuntimeException(e); + } + } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java index 04c016d344..d4f4ea9d01 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportPipeline.java @@ -23,10 +23,18 @@ import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption; import com.google.cloud.teleport.spanner.ImportPipeline.Options; +import com.google.cloud.teleport.spanner.iam.IAMCheckResult; +import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker; +import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator; +import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements; import com.google.cloud.teleport.spanner.spannerio.SpannerConfig; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Collections; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; @@ -34,6 +42,8 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Avro to Cloud Spanner Import pipeline. @@ -59,6 +69,7 @@ "The input Cloud Storage path must exist, and it must include a spanner-export.json file that contains a JSON description of files to import." }) public class ImportPipeline { + private static final Logger LOG = LoggerFactory.getLogger(ImportPipeline.class); /** Options for {@link ImportPipeline}. */ public interface Options extends PipelineOptions { @@ -240,7 +251,6 @@ public static void main(String[] args) { .withInstanceId(options.getInstanceId()) .withDatabaseId(options.getDatabaseId()) .withRpcPriority(options.getSpannerPriority()); - p.apply( new ImportTransform( spannerConfig, @@ -253,6 +263,7 @@ public static void main(String[] args) { options.getDdlCreationTimeoutInMinutes(), options.getEarlyIndexCreateThreshold())); + validateRequiredPermissions(options); PipelineResult result = p.run(); if (options.getWaitUntilFinish() @@ -264,4 +275,30 @@ public static void main(String[] args) { result.waitUntilFinish(); } } + + private static void validateRequiredPermissions(Options options) { + try { + IAMResourceRequirements spannerRequirements = + IAMRequirementsCreator.createSpannerResourceRequirement(); + GcpOptions gcpOptions = options.as(GcpOptions.class); + + IAMPermissionsChecker iamPermissionsChecker = + new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions); + IAMCheckResult missingPermission = + iamPermissionsChecker.check(Collections.singletonList(spannerRequirements)); + if (missingPermission.isPermissionsAvailable()) { + return; + } + String errorString = + "For resource: " + + missingPermission.getResourceName() + + ", missing permissions: " + + missingPermission.getMissingPermissions() + + ";"; + throw new RuntimeException(errorString); + } catch (GeneralSecurityException | IOException e) { + LOG.error("Error while validating permissions for spanner service", e); + throw new RuntimeException(e); + } + } } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java new file mode 100644 index 0000000000..509c3904f1 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMCheckResult.java @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import java.util.ArrayList; +import java.util.List; + +/** Represents the result of an IAM permission check on a specific resource. */ +public class IAMCheckResult { + private final String resourceName; + private final List missingPermissions; + + public IAMCheckResult(String resourceName, List missingPermissions) { + this.resourceName = resourceName; + this.missingPermissions = new ArrayList<>(missingPermissions); + } + + public String getResourceName() { + return resourceName; + } + + public List getMissingPermissions() { + return new ArrayList<>(missingPermissions); + } + + public boolean isPermissionsAvailable() { + return missingPermissions.isEmpty(); + } + + @Override + public String toString() { + return "IAMCheckResult{" + + "resourceName='" + + resourceName + + '\'' + + ", missingPermissions=" + + missingPermissions + + ", success=" + + isPermissionsAvailable() + + '}'; + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java new file mode 100644 index 0000000000..5b0b9a9131 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsChecker.java @@ -0,0 +1,133 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.cloudresourcemanager.v3.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.v3.model.TestIamPermissionsRequest; +import com.google.api.services.cloudresourcemanager.v3.model.TestIamPermissionsResponse; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility to check IAM permissions for various GCP resources. */ +public class IAMPermissionsChecker { + private static final Logger LOG = LoggerFactory.getLogger(IAMPermissionsChecker.class); + private final Credentials credential; + private static final String RESOURCE_NAME_FORMAT = "projects/%s"; + private final String projectIdResource; + + private final CloudResourceManager resourceManager; + + public IAMPermissionsChecker(String projectId, GcpOptions gcpOptions) + throws GeneralSecurityException, IOException { + this.credential = gcpOptions.getGcpCredential(); + this.projectIdResource = String.format("projects/%s", projectId); + resourceManager = createCloudResourceManagerService(); + } + + @VisibleForTesting + IAMPermissionsChecker( + String projectId, GcpOptions gcpOptions, CloudResourceManager resourceManager) { + this.credential = gcpOptions.getGcpCredential(); + this.projectIdResource = String.format("projects/%s", projectId); + this.resourceManager = resourceManager; + } + + /** + * Checks IAM permissions for a list of requirements. This api should be called once with all the + * requirements. + * + * @param requirements List of resources and required permissions. + * @return List of results, only missing permissions are included. Empty list indicate all the + * requirements are met. + */ + public IAMCheckResult check(List requirements) { + List permissionList = + requirements.stream() + .map(IAMResourceRequirements::getPermissions) + .flatMap(Collection::stream) + .toList(); + HashSet grantedPermissions = + new HashSet<>(checkPermission(resourceManager, projectIdResource, permissionList)); + + List missingPermissions = + permissionList.stream() + .filter(p -> !grantedPermissions.contains(p)) + .collect(Collectors.toList()); + + return new IAMCheckResult(projectIdResource, missingPermissions); + } + + private List checkPermission( + CloudResourceManager resourceManager, String resourceName, List permissions) { + try { + + TestIamPermissionsRequest requestBody = + new TestIamPermissionsRequest().setPermissions(permissions); + + TestIamPermissionsResponse testIamPermissionsResponse = + resourceManager.projects().testIamPermissions(resourceName, requestBody).execute(); + + List granted = testIamPermissionsResponse.getPermissions(); + return granted == null ? Collections.emptyList() : granted; + } catch (IOException e) { + LOG.error("Error checking permissions for resource {}", resourceName, e); + throw new RuntimeException("Failed to check project permissions", e); + } + } + + private CloudResourceManager createCloudResourceManagerService() + throws IOException, GeneralSecurityException { + HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); + JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); + HttpRequestInitializer initializer = getHttpRequestInitializer(this.credential); + CloudResourceManager service = + new CloudResourceManager.Builder(httpTransport, jsonFactory, initializer) + .setApplicationName("service-accounts") + .build(); + return service; + } + + private static HttpRequestInitializer getHttpRequestInitializer(Credentials credential) + throws IOException { + if (credential == null) { + try { + return GoogleCredential.getApplicationDefault(); + } catch (Exception e) { + return new NullCredentialInitializer(); + } + } else { + return new HttpCredentialsAdapter(credential); + } + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java new file mode 100644 index 0000000000..8cb35d0c92 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMRequirementsCreator.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import com.google.common.collect.ImmutableList; +import java.util.List; + +/** + * The goal of this class contain the list of permissions required by various templates. This will + * help in maintaining centralised lists for all templates. + */ +public class IAMRequirementsCreator { + /** Default permissions required by templates. */ + private static final List DEFAULT_SPANNER_PERMISSIONS = + ImmutableList.of( + "spanner.databases.beginOrRollbackReadWriteTransaction", + "spanner.databases.beginPartitionedDmlTransaction", + "spanner.databases.beginReadOnlyTransaction", + "spanner.databases.create", + "spanner.databases.drop", + "spanner.databases.get", + "spanner.databases.getDdl", + "spanner.databases.list", + "spanner.databases.partitionQuery", + "spanner.databases.partitionRead", + "spanner.databases.read", + "spanner.databases.select", + "spanner.databases.update", + "spanner.databases.updateDdl", + "spanner.databases.write", + "spanner.instances.get", + "spanner.instances.list"); + + public static IAMResourceRequirements createSpannerResourceRequirement() { + return new IAMResourceRequirements(DEFAULT_SPANNER_PERMISSIONS); + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java new file mode 100644 index 0000000000..9a9d512e45 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/IAMResourceRequirements.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import java.util.ArrayList; +import java.util.List; + +/** + * Represents the IAM permissions required on a specific GCP resource. This can be expanded to + * contain resource name when permission specific to resources need to be validated. + */ +public class IAMResourceRequirements { + private final List permissions; + + public IAMResourceRequirements(List permissions) { + if (permissions == null || permissions.isEmpty()) { + throw new IllegalArgumentException("Permissions list must not be empty"); + } + this.permissions = new ArrayList<>(permissions); + } + + public List getPermissions() { + return new ArrayList<>(permissions); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + "permissions=" + permissions + '}'; + } +} diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java new file mode 100644 index 0000000000..e65e4143f0 --- /dev/null +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/iam/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2020 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** IAM validatory Utility classes for templates. */ +package com.google.cloud.teleport.spanner.iam; diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java new file mode 100644 index 0000000000..a1219f5428 --- /dev/null +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/iam/IAMPermissionsCheckerTest.java @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner.iam; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import com.google.api.services.cloudresourcemanager.v3.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.v3.model.TestIamPermissionsResponse; +import com.google.auth.Credentials; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for IAMPermissionsChecker. */ +@RunWith(JUnit4.class) +public class IAMPermissionsCheckerTest { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private CloudResourceManager mockResourceManager; + + @Mock private GcpOptions mockGcpOptions; + @Mock private Credentials mockCredentials; + + private IAMPermissionsChecker checker; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(mockGcpOptions.getGcpCredential()).thenReturn(mockCredentials); + checker = new IAMPermissionsChecker("test-project", mockGcpOptions, mockResourceManager); + } + + @Test + public void testCheck_allPermissionsGranted() throws IOException { + // Arrange + List requiredPermissions = Arrays.asList("p1", "p2"); + IAMResourceRequirements requirements = new IAMResourceRequirements(requiredPermissions); + TestIamPermissionsResponse response = + new TestIamPermissionsResponse().setPermissions(requiredPermissions); + when(mockResourceManager.projects().testIamPermissions(anyString(), any()).execute()) + .thenReturn(response); + + // Act + IAMCheckResult result = checker.check(Collections.singletonList(requirements)); + + // Assert + assertTrue(result.isPermissionsAvailable()); + assertTrue(result.getMissingPermissions().isEmpty()); + } + + @Test + public void testCheck_somePermissionsMissing() throws IOException { + // Arrange + List requiredPermissions = Arrays.asList("p1", "p2", "p3"); + List grantedPermissions = Arrays.asList("p1", "p3"); + IAMResourceRequirements requirements = new IAMResourceRequirements(requiredPermissions); + TestIamPermissionsResponse response = + new TestIamPermissionsResponse().setPermissions(grantedPermissions); + when(mockResourceManager.projects().testIamPermissions(anyString(), any()).execute()) + .thenReturn(response); + + // Act + IAMCheckResult result = checker.check(Collections.singletonList(requirements)); + + // Assert + assertEquals(Arrays.asList("p2"), result.getMissingPermissions()); + } + + @Test + public void testCheck_noPermissionsGranted() throws IOException { + // Arrange + List requiredPermissions = Arrays.asList("p1", "p2"); + IAMResourceRequirements requirements = new IAMResourceRequirements(requiredPermissions); + TestIamPermissionsResponse response = new TestIamPermissionsResponse(); // No permissions + when(mockResourceManager.projects().testIamPermissions(anyString(), any()).execute()) + .thenReturn(response); + + // Act + IAMCheckResult result = checker.check(Collections.singletonList(requirements)); + + // Assert + assertEquals(requiredPermissions, result.getMissingPermissions()); + } +}