Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions v1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,11 @@
<artifactId>protobuf-java</artifactId>
<version>4.33.2</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-cloudresourcemanager</artifactId>
<version>v3-rev20251103-2.0.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,27 @@
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.spanner.ExportPipeline.ExportPipelineOptions;
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Collections;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Dataflow template that exports a Cloud Spanner database to Avro files in GCS.
Expand Down Expand Up @@ -72,6 +82,7 @@
"In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the <a href=\"https://cloud.google.com/spanner/docs/export#iam\">appropriate IAM roles</a> for reading your Cloud Spanner data and writing to your Cloud Storage bucket."
})
public class ExportPipeline {
private static final Logger LOG = LoggerFactory.getLogger(ExportPipeline.class);

/** Options for Export pipeline. */
public interface ExportPipelineOptions extends PipelineOptions {
Expand Down Expand Up @@ -263,6 +274,7 @@ public static void main(String[] args) {
.withDatabaseId(options.getDatabaseId())
.withRpcPriority(options.getSpannerPriority())
.withDataBoostEnabled(options.getDataBoostEnabled());

p.begin()
.apply(
"Run Export",
Expand All @@ -275,6 +287,7 @@ public static void main(String[] args) {
options.getShouldExportRelatedTables(),
options.getShouldExportTimestampAsLogicalType(),
options.getAvroTempDirectory()));
validateRequiredPermissions(options);
PipelineResult result = p.run();
if (options.getWaitUntilFinish()
&&
Expand All @@ -285,4 +298,31 @@ public static void main(String[] args) {
result.waitUntilFinish();
}
}

private static void validateRequiredPermissions(ExportPipelineOptions options) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see this code is duplicated across pipelines. would it make sense to push it down as an API in the IAM checker module?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be per-Template. For Import and export the code is same. But this will change when we shift to other templates.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you pl help be understand better with an example, on how it might deviate? and may be leave a comment for future code readers

try {
IAMResourceRequirements spannerRequirements =
IAMRequirementsCreator.createSpannerResourceRequirement();

GcpOptions gcpOptions = options.as(GcpOptions.class);

IAMPermissionsChecker iamPermissionsChecker =
new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions);
IAMCheckResult missingPermission =
iamPermissionsChecker.check(Collections.singletonList(spannerRequirements));
if (missingPermission.isPermissionsAvailable()) {
return;
}
String errorString =
"For resource: "
+ missingPermission.getResourceName()
+ ", missing permissions: "
+ missingPermission.getMissingPermissions()
+ ";";
throw new RuntimeException(errorString);
} catch (GeneralSecurityException | IOException e) {
LOG.error("Error while validating permissions for spanner", e);
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,27 @@
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.spanner.ImportPipeline.Options;
import com.google.cloud.teleport.spanner.iam.IAMCheckResult;
import com.google.cloud.teleport.spanner.iam.IAMPermissionsChecker;
import com.google.cloud.teleport.spanner.iam.IAMRequirementsCreator;
import com.google.cloud.teleport.spanner.iam.IAMResourceRequirements;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Collections;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Avro to Cloud Spanner Import pipeline.
Expand All @@ -59,6 +69,7 @@
"The input Cloud Storage path must exist, and it must include a <a href=\"https://cloud.google.com/spanner/docs/import-non-spanner#create-export-json\">spanner-export.json</a> file that contains a JSON description of files to import."
})
public class ImportPipeline {
private static final Logger LOG = LoggerFactory.getLogger(ImportPipeline.class);

/** Options for {@link ImportPipeline}. */
public interface Options extends PipelineOptions {
Expand Down Expand Up @@ -240,7 +251,6 @@ public static void main(String[] args) {
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getDatabaseId())
.withRpcPriority(options.getSpannerPriority());

p.apply(
new ImportTransform(
spannerConfig,
Expand All @@ -253,6 +263,7 @@ public static void main(String[] args) {
options.getDdlCreationTimeoutInMinutes(),
options.getEarlyIndexCreateThreshold()));

validateRequiredPermissions(options);
PipelineResult result = p.run();

if (options.getWaitUntilFinish()
Expand All @@ -264,4 +275,30 @@ public static void main(String[] args) {
result.waitUntilFinish();
}
}

private static void validateRequiredPermissions(Options options) {
try {
IAMResourceRequirements spannerRequirements =
IAMRequirementsCreator.createSpannerResourceRequirement();
GcpOptions gcpOptions = options.as(GcpOptions.class);

IAMPermissionsChecker iamPermissionsChecker =
new IAMPermissionsChecker(gcpOptions.getProject(), gcpOptions);
IAMCheckResult missingPermission =
iamPermissionsChecker.check(Collections.singletonList(spannerRequirements));
if (missingPermission.isPermissionsAvailable()) {
return;
}
String errorString =
"For resource: "
+ missingPermission.getResourceName()
+ ", missing permissions: "
+ missingPermission.getMissingPermissions()
+ ";";
throw new RuntimeException(errorString);
} catch (GeneralSecurityException | IOException e) {
LOG.error("Error while validating permissions for spanner service", e);
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.spanner.iam;

import java.util.ArrayList;
import java.util.List;

/** Represents the result of an IAM permission check on a specific resource. */
public class IAMCheckResult {
private final String resourceName;
private final List<String> missingPermissions;

public IAMCheckResult(String resourceName, List<String> missingPermissions) {
this.resourceName = resourceName;
this.missingPermissions = new ArrayList<>(missingPermissions);
}

public String getResourceName() {
return resourceName;
}

public List<String> getMissingPermissions() {
return new ArrayList<>(missingPermissions);
}

public boolean isPermissionsAvailable() {
return missingPermissions.isEmpty();
}

@Override
public String toString() {
return "IAMCheckResult{"
+ "resourceName='"
+ resourceName
+ '\''
+ ", missingPermissions="
+ missingPermissions
+ ", success="
+ isPermissionsAvailable()
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.spanner.iam;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.cloudresourcemanager.v3.CloudResourceManager;
import com.google.api.services.cloudresourcemanager.v3.model.TestIamPermissionsRequest;
import com.google.api.services.cloudresourcemanager.v3.model.TestIamPermissionsResponse;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utility to check IAM permissions for various GCP resources. */
public class IAMPermissionsChecker {
private static final Logger LOG = LoggerFactory.getLogger(IAMPermissionsChecker.class);
private final Credentials credential;
private static final String RESOURCE_NAME_FORMAT = "projects/%s";
private final String projectIdResource;

private final CloudResourceManager resourceManager;

public IAMPermissionsChecker(String projectId, GcpOptions gcpOptions)
throws GeneralSecurityException, IOException {
this.credential = gcpOptions.getGcpCredential();
this.projectIdResource = String.format("projects/%s", projectId);
resourceManager = createCloudResourceManagerService();
}

@VisibleForTesting
IAMPermissionsChecker(
String projectId, GcpOptions gcpOptions, CloudResourceManager resourceManager) {
this.credential = gcpOptions.getGcpCredential();
this.projectIdResource = String.format("projects/%s", projectId);
this.resourceManager = resourceManager;
}

/**
* Checks IAM permissions for a list of requirements. This api should be called once with all the
* requirements.
*
* @param requirements List of resources and required permissions.
* @return List of results, only missing permissions are included. Empty list indicate all the
* requirements are met.
*/
public IAMCheckResult check(List<IAMResourceRequirements> requirements) {
List<String> permissionList =
requirements.stream()
.map(IAMResourceRequirements::getPermissions)
.flatMap(Collection::stream)
.toList();
HashSet<String> grantedPermissions =
new HashSet<>(checkPermission(resourceManager, projectIdResource, permissionList));

List<String> missingPermissions =
permissionList.stream()
.filter(p -> !grantedPermissions.contains(p))
.collect(Collectors.toList());

return new IAMCheckResult(projectIdResource, missingPermissions);
}

private List<String> checkPermission(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference between this and previous "check" method? their signature looks similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This require resource name. This was made from the perspective when we are checking permission at resource level.

Currently, there isn't much difference.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a helper method. In which case, it might make sense to call the other method from here (or vice versa) to not duplicate the flows.

CloudResourceManager resourceManager, String resourceName, List<String> permissions) {
try {

TestIamPermissionsRequest requestBody =
new TestIamPermissionsRequest().setPermissions(permissions);

TestIamPermissionsResponse testIamPermissionsResponse =
resourceManager.projects().testIamPermissions(resourceName, requestBody).execute();

List<String> granted = testIamPermissionsResponse.getPermissions();
return granted == null ? Collections.emptyList() : granted;
} catch (IOException e) {
LOG.error("Error checking permissions for resource {}", resourceName, e);
throw new RuntimeException("Failed to check project permissions", e);
}
}

private CloudResourceManager createCloudResourceManagerService()
throws IOException, GeneralSecurityException {
HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
HttpRequestInitializer initializer = getHttpRequestInitializer(this.credential);
CloudResourceManager service =
new CloudResourceManager.Builder(httpTransport, jsonFactory, initializer)
.setApplicationName("service-accounts")
.build();
return service;
}

private static HttpRequestInitializer getHttpRequestInitializer(Credentials credential)
throws IOException {
if (credential == null) {
try {
return GoogleCredential.getApplicationDefault();
} catch (Exception e) {
return new NullCredentialInitializer();
}
} else {
return new HttpCredentialsAdapter(credential);
}
}
}
Loading
Loading