From a1ea248f6a6300477a6ac0e0c4bf11cdb963ae70 Mon Sep 17 00:00:00 2001 From: and-carter Date: Thu, 4 Nov 2021 18:37:45 -0700 Subject: [PATCH 1/8] WIP add archiving --- .../shuttle/config/ArchiveConfig.java | 51 ++++++++ .../shuttle/config/ArchiveYamlMapping.java | 46 +++++++ .../shuttle/ArchiveConfiguration.kt | 8 ++ .../com/openlattice/shuttle/ArchiveService.kt | 113 ++++++++++++++++++ .../com/openlattice/shuttle/ShuttleCli.kt | 52 ++++++++ .../openlattice/shuttle/ShuttleCliOptions.kt | 49 ++++++++ 6 files changed, 319 insertions(+) create mode 100644 src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java create mode 100644 src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java create mode 100644 src/main/kotlin/com/openlattice/shuttle/ArchiveConfiguration.kt create mode 100644 src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt diff --git a/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java new file mode 100644 index 00000000..dd7d6b11 --- /dev/null +++ b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java @@ -0,0 +1,51 @@ +package com.openlattice.shuttle.config; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * @author Andrew Carter andrew@openlattice.com + */ +public class ArchiveConfig { + private static final Logger logger = LoggerFactory.getLogger( ArchiveYamlMapping.class ); + private final Properties hikariConfiguration; + private final String s3Bucket; + private final String s3Region; + + ArchiveConfig( + @JsonProperty( "hikariConfig" ) Properties hikariConfiguration, + @JsonProperty( "s3Bucket" ) String s3Bucket, + @JsonProperty( "s3Region" ) String s3Region + ) { + this.hikariConfiguration = hikariConfiguration; + this.s3Bucket = s3Bucket; + this.s3Region = s3Region; + } + + @JsonProperty( "hikariConfig" ) + public Properties getHikariConfiguration() { return hikariConfiguration; } + + @JsonProperty( "s3Bucket" ) + public String getS3Bucket() { return s3Bucket; } + + @JsonProperty( "s3Region" ) + public String getS3Region() { return s3Region; } + + + @JsonIgnore + public HikariDataSource getHikariDataSource(String name ) { + Properties properties = checkNotNull( (Properties) hikariConfiguration.get( name ), "Hikari configuration does not exist."); + + HikariConfig hc = new HikariConfig( properties ); + logger.info( "JDBC URL = {}", hc.getJdbcUrl() ); + return new HikariDataSource( hc ); + } +} diff --git a/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java b/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java new file mode 100644 index 00000000..20f02fd7 --- /dev/null +++ b/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java @@ -0,0 +1,46 @@ +package com.openlattice.shuttle.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.openlattice.shuttle.ArchiveService; + +/** + * @author Andrew Carter andrew@openlattice.com + */ +public class ArchiveYamlMapping { + private final ArchiveConfig archiveConfig; + private final String dbName; + private final String schemaName; + private final String sourceName; + private final String destinationName; + + @JsonCreator + public ArchiveYamlMapping( + @JsonProperty( "archiveParameters" ) ArchiveConfig archiveConfig, + @JsonProperty( "dbName" ) String dbName, + @JsonProperty( "schemaName" ) String schemaName, + @JsonProperty( "sourceName" ) String sourceName, + @JsonProperty( "destinationName" ) String destinationName + ) { + this.archiveConfig = archiveConfig; + this.dbName = dbName; + this.schemaName = schemaName; + this.sourceName = sourceName; + this.destinationName = destinationName; + } + + @JsonProperty( "archiveConfig" ) + public ArchiveConfig getArchiveConfig() { return archiveConfig; } + + @JsonProperty( "dbName" ) + public String getDbName() { return dbName; } + + @JsonProperty("schemaName") + public String getSchemaName() { return schemaName; } + + @JsonProperty( "sourceName" ) + public String getSourceName() { return sourceName; } + + @JsonProperty( "destinationName" ) + public String getDestinationName() { return destinationName; } +} diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveConfiguration.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveConfiguration.kt new file mode 100644 index 00000000..60779fe5 --- /dev/null +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveConfiguration.kt @@ -0,0 +1,8 @@ +package com.openlattice.shuttle + +/** + * @author Andrew Carter andrew@openlattice.com + */ +class ArchiveConfiguration { + +} \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt new file mode 100644 index 00000000..8e6a2032 --- /dev/null +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt @@ -0,0 +1,113 @@ +package com.openlattice.shuttle + +import com.openlattice.organizations.JdbcConnectionParameters +import com.openlattice.shuttle.config.ArchiveConfig +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.joda.time.Days +import org.slf4j.LoggerFactory + +import org.springframework.stereotype.Service +import java.util.* +import java.time.temporal.ChronoUnit.DAYS +import java.sql.Connection +import java.sql.Statement +import java.time.LocalDate + +/** + * @author Andrew Carter andrew@openlattice.com + */ + +//Export +//1. Accept parameters in shuttle cli +//2. Validate parameters +//3. Pass parameters to new class +//4. New class creates chunks by day or specified duration +//5. Ensure that s3 is reachable? +//6. Connect to atlas +//7. Execute query per day - updating file path and export query per step +//8. Check if succeeded per step +//9. Write finished chechpoint +//10. Validate write succeed by querying s3 to ensure row count matches (per step?) + +private val logger = LoggerFactory.getLogger(ArchiveService::class.java) + +@Service +class ArchiveService( + private val archiveConfig: ArchiveConfig, + private val dbName: String, + private val schemaName: String = "openlattice", + private val sourceName: String, + private val destinationName: String = sourceName +) { + init { + logger.info("Initiating ArchiveService...") + } + + fun mummify(startDate: LocalDate, days: Int) { + logger.info("Beginning mummification...") + connectAsSuperuser(dbName).use { connection -> + connection.createStatement().use { statement -> + generateAndExecuteSqlPerDay(statement, startDate, days, ::exportSql) + } + } + } + + fun exhume(startDate: LocalDate, days: Int) { + logger.info("Exhuming data...") + connectAsSuperuser(dbName).use { connection -> + connection.createStatement().use { statement -> + generateAndExecuteSqlPerDay(statement, startDate, days, ::importSql) + } + } + } + + fun generateAndExecuteSqlPerDay( + statement: Statement, + startDate: LocalDate, + days: Int, + sqlGenerator: (input: String) -> String + ) { + for (index in 0 until days) { + val currentDate = startDate.plusDays(index.toLong()).toString() + val sql = sqlGenerator(currentDate) + logger.info("Executing query:\n $sql") + if (statement.execute(sql)) { + logger.info("Successfully executed query of $currentDate from $sourceName to $destinationName") + } + } + } + + fun connectAsSuperuser(dbName: String): Connection { + val config = HikariConfig(archiveConfig.hikariConfiguration) + config.jdbcUrl = "${config.jdbcUrl.removeSuffix("/")}/$dbName" + + return HikariDataSource(config).connection + } + + fun exportSql( + date: String, + ): String { + return "SELECT * FROM aws_s3.query_export_to_s3(" + + "'SELECT * FROM $schemaName.$sourceName', " + + "aws_commons.create_s3_uri(\n" + + " '${archiveConfig.s3Bucket},\n" + + " '$dbName/$schemaName/$destinationName-$date',\n" + + " '${archiveConfig.s3Region}'\n" + + "));" + } + + fun importSql( + date: String, + ): String { + return " SELECT aws_s3.table_import_from_s3(" + + "'$destinationName',\n" + + "'', ''," + + "aws_commons.create_s3_uri(\n" + + " '${archiveConfig.s3Bucket}',\n" + + " '$dbName/$schemaName/$sourceName-$date',\n" + + " '${archiveConfig.s3Region}'\n" + + "));" + } + +} \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt index f9be0a81..ddee391f 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt @@ -38,7 +38,13 @@ import com.openlattice.shuttle.ShuttleCliOptions.Companion.TOKEN import com.openlattice.shuttle.ShuttleCliOptions.Companion.UPLOAD_SIZE import com.openlattice.shuttle.ShuttleCliOptions.Companion.USER import com.openlattice.shuttle.ShuttleCliOptions.Companion.XML +import com.openlattice.shuttle.ShuttleCliOptions.Companion.ARCHIVE +import com.openlattice.shuttle.ShuttleCliOptions.Companion.IMPORT +import com.openlattice.shuttle.ShuttleCliOptions.Companion.EXPORT +import com.openlattice.shuttle.ShuttleCliOptions.Companion.START_DATE +import com.openlattice.shuttle.ShuttleCliOptions.Companion.DAYS import com.openlattice.shuttle.config.IntegrationConfig +import com.openlattice.shuttle.config.ArchiveYamlMapping import com.openlattice.shuttle.payload.* import com.openlattice.shuttle.source.LocalFileOrigin import com.openlattice.shuttle.source.S3BucketOrigin @@ -47,6 +53,7 @@ import org.slf4j.LoggerFactory import java.io.File import java.io.IOException import java.nio.file.Paths +import java.time.LocalDate import java.util.* import java.util.function.Supplier import kotlin.system.exitProcess @@ -81,6 +88,51 @@ fun main(args: Array) { return } + if (cl.hasOption(ARCHIVE)) { + if (cl.hasOption(CONFIGURATION) && cl.hasOption(START_DATE)) { + + val archiveYamlMapping = try { + ObjectMappers.getYamlMapper() + .readValue(File(cl.getOptionValue(CONFIGURATION)), ArchiveYamlMapping::class.java) + } catch (io: IOException) { + logger.info("IOException encountered converting yaml file into java objects", io) + exitProcess(1) + } catch (jp: JsonParseException) { + logger.info("Shuttle was unable to parse the yaml file", jp) + exitProcess(1) + } catch (jm: JsonMappingException) { + logger.info( "Shuttle was unable to map the yaml objects into java objects", jm) + exitProcess(1) + } + + val days = if (cl.hasOption(DAYS)) { + cl.getOptionValue(DAYS).toInt() + } else { + 1 + } + val archiver = ArchiveService( + archiveYamlMapping.archiveConfig, + archiveYamlMapping.dbName, + archiveYamlMapping.schemaName, + archiveYamlMapping.sourceName, + archiveYamlMapping.destinationName + ) + + if (cl.hasOption(EXPORT)) { + archiver.mummify( + LocalDate.parse(cl.getOptionValue(START_DATE)), + days + ) + } else if (cl.hasOption(IMPORT)) { + archiver.exhume( + LocalDate.parse(cl.getOptionValue(START_DATE)), + days + ) + } else { printErrorHelpAndExit("Export or import option must be specified.") } + } else { printErrorHelpAndExit("Archive specified but either config or start_date missing.") } + return + } + if (cl.hasOption(SERVER)) { if (cl.hasOption(PROFILES)) { val shuttleServer = ShuttleServer() diff --git a/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt b/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt index 74fa0508..af17fa37 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt @@ -60,6 +60,11 @@ class ShuttleCliOptions { const val S3_ORIGIN_MAXIMUM_ARGS_COUNT = 4 const val S3_ORIGIN_MINIMUM_ARGS_COUNT = 3 const val LOCAL_ORIGIN_EXPECTED_ARGS_COUNT = 2 + const val ARCHIVE = "archive" + const val START_DATE = "start-date" + const val DAYS = "days" + const val EXPORT = "export" + const val IMPORT = "import" private val options = Options() private val clp = DefaultParser() @@ -248,6 +253,41 @@ class ShuttleCliOptions { .argName("Port used to connect to smtp server") .build() + private val archiveOption = Option.builder() + .longOpt(ARCHIVE) + .hasArg(false) + .desc("Archive or import data between jdbc and s3") + .argName("archive") + .build() + + private val startDateOption = Option.builder() + .longOpt(START_DATE) + .hasArg(true) + .desc("Indicate date to begin archiving data (inclusive beginning at 00:00)") + .argName("start date") + .build() + + private val daysOption = Option.builder() + .longOpt(DAYS) + .hasArg(true) + .desc("Indicates number of days from startDate to archive. Includes start date.") + .argName("days") + .build() + + private val exportOption = Option.builder() + .longOpt(EXPORT) + .hasArg(false) + .desc("Indicates export for archival") + .argName("export-archive") + .build() + + private val importOption = Option.builder() + .longOpt(IMPORT) + .hasArg(false) + .desc("Indicates import for archival") + .argName("import-archive") + .build() + init { options .addOption(helpOption) @@ -272,6 +312,15 @@ class ShuttleCliOptions { .addOption(postgresOption) .addOption(threadsOption) .addOption(serverOption) + .addOption(archiveOption) + .addOption(startDateOption) + .addOption(daysOption) + + options.addOptionGroup( + OptionGroup() + .addOption(importOption) + .addOption(exportOption) + ) options.addOptionGroup( OptionGroup() From 316f78b56af046e89352e703a0844c0ff0eedd54 Mon Sep 17 00:00:00 2001 From: and-carter Date: Fri, 5 Nov 2021 10:41:42 -0700 Subject: [PATCH 2/8] clean up and comments --- .../shuttle/config/ArchiveConfig.java | 11 ++-------- .../shuttle/config/ArchiveYamlMapping.java | 4 +++- .../shuttle/ArchiveConfiguration.kt | 8 -------- .../com/openlattice/shuttle/ArchiveService.kt | 20 ++++++++++++++----- .../com/openlattice/shuttle/ShuttleCli.kt | 2 +- 5 files changed, 21 insertions(+), 24 deletions(-) delete mode 100644 src/main/kotlin/com/openlattice/shuttle/ArchiveConfiguration.kt diff --git a/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java index dd7d6b11..99cf0a68 100644 --- a/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java +++ b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java @@ -13,6 +13,8 @@ /** * @author Andrew Carter andrew@openlattice.com + * + * Contains connection details (jdbc and s3) for archiving. */ public class ArchiveConfig { private static final Logger logger = LoggerFactory.getLogger( ArchiveYamlMapping.class ); @@ -39,13 +41,4 @@ public class ArchiveConfig { @JsonProperty( "s3Region" ) public String getS3Region() { return s3Region; } - - @JsonIgnore - public HikariDataSource getHikariDataSource(String name ) { - Properties properties = checkNotNull( (Properties) hikariConfiguration.get( name ), "Hikari configuration does not exist."); - - HikariConfig hc = new HikariConfig( properties ); - logger.info( "JDBC URL = {}", hc.getJdbcUrl() ); - return new HikariDataSource( hc ); - } } diff --git a/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java b/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java index 20f02fd7..067356c4 100644 --- a/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java +++ b/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java @@ -2,10 +2,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.openlattice.shuttle.ArchiveService; /** * @author Andrew Carter andrew@openlattice.com + * + * Archive configuration files are mapped to this class. + * This class contains all fields required to create an ArchiveService. */ public class ArchiveYamlMapping { private final ArchiveConfig archiveConfig; diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveConfiguration.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveConfiguration.kt deleted file mode 100644 index 60779fe5..00000000 --- a/src/main/kotlin/com/openlattice/shuttle/ArchiveConfiguration.kt +++ /dev/null @@ -1,8 +0,0 @@ -package com.openlattice.shuttle - -/** - * @author Andrew Carter andrew@openlattice.com - */ -class ArchiveConfiguration { - -} \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt index 8e6a2032..a6c86a64 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt @@ -5,6 +5,7 @@ import com.openlattice.shuttle.config.ArchiveConfig import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariDataSource import org.joda.time.Days +import org.postgresql.util.PSQLException import org.slf4j.LoggerFactory import org.springframework.stereotype.Service @@ -14,6 +15,8 @@ import java.sql.Connection import java.sql.Statement import java.time.LocalDate +const val DEFAULT_DAYS = 1 + /** * @author Andrew Carter andrew@openlattice.com */ @@ -44,18 +47,20 @@ class ArchiveService( logger.info("Initiating ArchiveService...") } + // archives data fun mummify(startDate: LocalDate, days: Int) { logger.info("Beginning mummification...") - connectAsSuperuser(dbName).use { connection -> + connectToDatabase(dbName).use { connection -> connection.createStatement().use { statement -> generateAndExecuteSqlPerDay(statement, startDate, days, ::exportSql) } } } + // restores data fun exhume(startDate: LocalDate, days: Int) { logger.info("Exhuming data...") - connectAsSuperuser(dbName).use { connection -> + connectToDatabase(dbName).use { connection -> connection.createStatement().use { statement -> generateAndExecuteSqlPerDay(statement, startDate, days, ::importSql) } @@ -72,19 +77,24 @@ class ArchiveService( val currentDate = startDate.plusDays(index.toLong()).toString() val sql = sqlGenerator(currentDate) logger.info("Executing query:\n $sql") - if (statement.execute(sql)) { + try { + statement.execute(sql) logger.info("Successfully executed query of $currentDate from $sourceName to $destinationName") + } catch (e: PSQLException) { + throw Error("Unsuccessful sql execution", e) } } } - fun connectAsSuperuser(dbName: String): Connection { + fun connectToDatabase(dbName: String): Connection { val config = HikariConfig(archiveConfig.hikariConfiguration) + // append org database name to the jdbc url config.jdbcUrl = "${config.jdbcUrl.removeSuffix("/")}/$dbName" return HikariDataSource(config).connection } + // generates sql to invoke an export using aws_s3 postgres extension fun exportSql( date: String, ): String { @@ -97,6 +107,7 @@ class ArchiveService( "));" } + // generates sql to invoke an import using aws_s3 postgres extension fun importSql( date: String, ): String { @@ -109,5 +120,4 @@ class ArchiveService( " '${archiveConfig.s3Region}'\n" + "));" } - } \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt index ddee391f..07f09824 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt @@ -108,7 +108,7 @@ fun main(args: Array) { val days = if (cl.hasOption(DAYS)) { cl.getOptionValue(DAYS).toInt() } else { - 1 + DEFAULT_DAYS } val archiver = ArchiveService( archiveYamlMapping.archiveConfig, From 8cd4fbadbc237c44f7bf194268c47e86200e21da Mon Sep 17 00:00:00 2001 From: and-carter Date: Fri, 5 Nov 2021 10:42:44 -0700 Subject: [PATCH 3/8] cleanup --- .../kotlin/com/openlattice/shuttle/ArchiveService.kt | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt index a6c86a64..6bac1c24 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt @@ -21,18 +21,6 @@ const val DEFAULT_DAYS = 1 * @author Andrew Carter andrew@openlattice.com */ -//Export -//1. Accept parameters in shuttle cli -//2. Validate parameters -//3. Pass parameters to new class -//4. New class creates chunks by day or specified duration -//5. Ensure that s3 is reachable? -//6. Connect to atlas -//7. Execute query per day - updating file path and export query per step -//8. Check if succeeded per step -//9. Write finished chechpoint -//10. Validate write succeed by querying s3 to ensure row count matches (per step?) - private val logger = LoggerFactory.getLogger(ArchiveService::class.java) @Service From 0052e209663528318d8e1a4b7910dbbe659eccd3 Mon Sep 17 00:00:00 2001 From: and-carter Date: Tue, 9 Nov 2021 16:20:55 -0800 Subject: [PATCH 4/8] WIP add multipart support and validation --- .../shuttle/config/ArchiveYamlMapping.java | 8 +- .../com/openlattice/shuttle/ArchiveService.kt | 162 ++++++++++++++---- .../com/openlattice/shuttle/ShuttleCli.kt | 3 +- 3 files changed, 138 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java b/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java index 067356c4..da652c43 100644 --- a/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java +++ b/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java @@ -15,6 +15,7 @@ public class ArchiveYamlMapping { private final String schemaName; private final String sourceName; private final String destinationName; + private final String dateField; @JsonCreator public ArchiveYamlMapping( @@ -22,13 +23,15 @@ public ArchiveYamlMapping( @JsonProperty( "dbName" ) String dbName, @JsonProperty( "schemaName" ) String schemaName, @JsonProperty( "sourceName" ) String sourceName, - @JsonProperty( "destinationName" ) String destinationName + @JsonProperty( "destinationName" ) String destinationName, + @JsonProperty( "dateField" ) String dateField ) { this.archiveConfig = archiveConfig; this.dbName = dbName; this.schemaName = schemaName; this.sourceName = sourceName; this.destinationName = destinationName; + this.dateField = dateField; } @JsonProperty( "archiveConfig" ) @@ -45,4 +48,7 @@ public ArchiveYamlMapping( @JsonProperty( "destinationName" ) public String getDestinationName() { return destinationName; } + + @JsonProperty( "dateField" ) + public String getDateField() { return dateField; } } diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt index 6bac1c24..e2b8ad25 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt @@ -1,17 +1,21 @@ package com.openlattice.shuttle -import com.openlattice.organizations.JdbcConnectionParameters +import com.amazonaws.SdkClientException +import com.amazonaws.auth.AWSStaticCredentialsProvider +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.regions.RegionUtils +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.AmazonS3ClientBuilder +import com.amazonaws.services.s3.model.ObjectListing import com.openlattice.shuttle.config.ArchiveConfig import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariDataSource -import org.joda.time.Days import org.postgresql.util.PSQLException import org.slf4j.LoggerFactory import org.springframework.stereotype.Service -import java.util.* -import java.time.temporal.ChronoUnit.DAYS import java.sql.Connection +import java.sql.ResultSet import java.sql.Statement import java.time.LocalDate @@ -22,6 +26,10 @@ const val DEFAULT_DAYS = 1 */ private val logger = LoggerFactory.getLogger(ArchiveService::class.java) +private val s3Credentials = BasicAWSCredentials( + "***", + "***" +) @Service class ArchiveService( @@ -29,83 +37,171 @@ class ArchiveService( private val dbName: String, private val schemaName: String = "openlattice", private val sourceName: String, - private val destinationName: String = sourceName + private val destinationName: String = sourceName, + private val dateField: String // TODO: Add to sql statements once ready to test ) { + private val s3Client: AmazonS3 + init { logger.info("Initiating ArchiveService...") + s3Client = AmazonS3ClientBuilder.standard().withCredentials( + AWSStaticCredentialsProvider(s3Credentials) + ).withRegion(RegionUtils.getRegion(archiveConfig.s3Region).name).build() } - // archives data + // archive data + // overwrite if already exists fun mummify(startDate: LocalDate, days: Int) { logger.info("Beginning mummification...") connectToDatabase(dbName).use { connection -> connection.createStatement().use { statement -> - generateAndExecuteSqlPerDay(statement, startDate, days, ::exportSql) + generateAndExecuteSqlPerDay(statement, startDate, days, ::exportHandler) } } } - // restores data + // restore data fun exhume(startDate: LocalDate, days: Int) { logger.info("Exhuming data...") connectToDatabase(dbName).use { connection -> connection.createStatement().use { statement -> - generateAndExecuteSqlPerDay(statement, startDate, days, ::importSql) + generateAndExecuteSqlPerDay(statement, startDate, days, ::importHandler) } } } - fun generateAndExecuteSqlPerDay( + private fun generateAndExecuteSqlPerDay( statement: Statement, startDate: LocalDate, days: Int, - sqlGenerator: (input: String) -> String + sqlAndExecuteHandler: (statement: Statement, date: String) -> Unit ) { - for (index in 0 until days) { - val currentDate = startDate.plusDays(index.toLong()).toString() - val sql = sqlGenerator(currentDate) - logger.info("Executing query:\n $sql") - try { - statement.execute(sql) - logger.info("Successfully executed query of $currentDate from $sourceName to $destinationName") - } catch (e: PSQLException) { - throw Error("Unsuccessful sql execution", e) - } + for (dayIndex in 0 until days) { + val currentDate = startDate.plusDays(dayIndex.toLong()).toString() + sqlAndExecuteHandler(statement, currentDate) } } - fun connectToDatabase(dbName: String): Connection { - val config = HikariConfig(archiveConfig.hikariConfiguration) - // append org database name to the jdbc url - config.jdbcUrl = "${config.jdbcUrl.removeSuffix("/")}/$dbName" + private fun exportHandler(statement: Statement, currentDate: String) { + isOverwrite(currentDate) + executeStatement(statement, exportSql(currentDate)) + validateExport(currentDate) + } - return HikariDataSource(config).connection + private fun importHandler(statement: Statement, currentDate: String) { + val parts = countOfS3ObjectsWithPrefix(currentDate) + for(part in 0 until parts) { + // add one to part to account for 0 vs 1 indexing + executeStatement(statement, importSql(currentDate, part + 1)) + } + validateImport(statement,currentDate) } - // generates sql to invoke an export using aws_s3 postgres extension - fun exportSql( + // generate sql to invoke an export using aws_s3 postgres extension + private fun exportSql( date: String, ): String { return "SELECT * FROM aws_s3.query_export_to_s3(" + - "'SELECT * FROM $schemaName.$sourceName', " + + "'SELECT * FROM $schemaName.$sourceName " + + "WHERE $dateField = ''$date'' '," + "aws_commons.create_s3_uri(\n" + - " '${archiveConfig.s3Bucket},\n" + + " '${archiveConfig.s3Bucket}',\n" + " '$dbName/$schemaName/$destinationName-$date',\n" + " '${archiveConfig.s3Region}'\n" + "));" } - // generates sql to invoke an import using aws_s3 postgres extension - fun importSql( + // generate sql to invoke an import using aws_s3 postgres extension + private fun importSql( date: String, + part: Int ): String { + val partString = if (part > 1) "_part$part" else "" return " SELECT aws_s3.table_import_from_s3(" + "'$destinationName',\n" + "'', ''," + "aws_commons.create_s3_uri(\n" + " '${archiveConfig.s3Bucket}',\n" + - " '$dbName/$schemaName/$sourceName-$date',\n" + + " '$dbName/$schemaName/$sourceName-$date$partString',\n" + " '${archiveConfig.s3Region}'\n" + "));" } + + private fun validateExport(date: String) { + if (countOfS3ObjectsWithPrefix(date) > 0) { + logger.info("Export validation succeeded. Data written to s3.") + } else { + logger.error("Export validation failed: no data written was written to s3. " + + "Either there was a problem exporting or there is no data in the source table.") + } + } + + private fun validateImport(statement: Statement, date: String) { + val query = "SELECT count(*) count " + + "FROM $destinationName " + + "WHERE $dateField = '$date';" + val resultSet = executeStatement(statement, query) + resultSet.next() + val numRowsWritten = resultSet.getInt(1) + if (numRowsWritten > 0) { + logger.info("Import validation succeeded. $numRowsWritten rows found in $destinationName for $date.") + } else { + logger.error("Import validation failed: no data written was written to $destinationName. " + + "Either there was a problem importing or there is no data in the source.") + } + } + + // TODO: Refactor into init to establish the connection asap? + private fun connectToDatabase(dbName: String): Connection { + val config = HikariConfig(archiveConfig.hikariConfiguration) + // append org database name to the jdbc url + config.jdbcUrl = "${config.jdbcUrl.removeSuffix("/")}/$dbName" + try { + return HikariDataSource(config).connection + } catch(e: Exception) { + throw Error("Error connecting with HikariDataSource...", e) + } + } + + fun executeStatement(statement: Statement, sql: String): ResultSet { + logger.info("Executing query:\n $sql") + try { + val rs = statement.executeQuery(sql) + logger.info("Successfully executed query.") + return rs + } catch (e: PSQLException) { + throw Error("Unsuccessful sql execution of $sql", e) + } + } + + private fun isOverwrite(date: String) { + val count = countOfS3ObjectsWithPrefix(date) + if (count > 0) { + logger.info("Overwriting. Number of existing objects in s3 with prefix ${prefix(date)}: $count") + } else { + logger.info("Creating new objects. No objects exist in s3 with prefix ${prefix(date)}") + } + } + + private fun countOfS3ObjectsWithPrefix(date: String): Int { + val objects: ObjectListing + try { + objects = s3Client.listObjects(archiveConfig.s3Bucket, prefix(date)) + } catch (e: SdkClientException) { + throw Exception(e) + } + if (objects.isTruncated) { + // TODO: Provide support for truncated result + throw Exception("Too many objects with prefix ${prefix()}. Truncated ObjectListing not supported.") + } + return objects.objectSummaries.size + } + + private fun prefix(): String { + return "$dbName/$schemaName/$sourceName" + } + + private fun prefix(date: String): String { + return "$dbName/$schemaName/$sourceName-$date" + } } \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt index 07f09824..0d082279 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt @@ -115,7 +115,8 @@ fun main(args: Array) { archiveYamlMapping.dbName, archiveYamlMapping.schemaName, archiveYamlMapping.sourceName, - archiveYamlMapping.destinationName + archiveYamlMapping.destinationName, + archiveYamlMapping.dateField ) if (cl.hasOption(EXPORT)) { From d9b5e50d16e5e63b42f15f4916fd6f30c3de1e15 Mon Sep 17 00:00:00 2001 From: and-carter Date: Wed, 10 Nov 2021 10:16:31 -0800 Subject: [PATCH 5/8] clean up --- .../com/openlattice/shuttle/config/ArchiveConfig.java | 1 - .../kotlin/com/openlattice/shuttle/ArchiveService.kt | 9 +++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java index 99cf0a68..7af39150 100644 --- a/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java +++ b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java @@ -17,7 +17,6 @@ * Contains connection details (jdbc and s3) for archiving. */ public class ArchiveConfig { - private static final Logger logger = LoggerFactory.getLogger( ArchiveYamlMapping.class ); private final Properties hikariConfiguration; private final String s3Bucket; private final String s3Region; diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt index e2b8ad25..643e3568 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt @@ -38,7 +38,7 @@ class ArchiveService( private val schemaName: String = "openlattice", private val sourceName: String, private val destinationName: String = sourceName, - private val dateField: String // TODO: Add to sql statements once ready to test + private val dateField: String ) { private val s3Client: AmazonS3 @@ -91,7 +91,7 @@ class ArchiveService( private fun importHandler(statement: Statement, currentDate: String) { val parts = countOfS3ObjectsWithPrefix(currentDate) for(part in 0 until parts) { - // add one to part to account for 0 vs 1 indexing + // +1 to part to account for 0 vs 1 indexing executeStatement(statement, importSql(currentDate, part + 1)) } validateImport(statement,currentDate) @@ -140,9 +140,11 @@ class ArchiveService( val query = "SELECT count(*) count " + "FROM $destinationName " + "WHERE $dateField = '$date';" + val resultSet = executeStatement(statement, query) resultSet.next() val numRowsWritten = resultSet.getInt(1) + if (numRowsWritten > 0) { logger.info("Import validation succeeded. $numRowsWritten rows found in $destinationName for $date.") } else { @@ -151,7 +153,6 @@ class ArchiveService( } } - // TODO: Refactor into init to establish the connection asap? private fun connectToDatabase(dbName: String): Connection { val config = HikariConfig(archiveConfig.hikariConfiguration) // append org database name to the jdbc url @@ -191,7 +192,7 @@ class ArchiveService( throw Exception(e) } if (objects.isTruncated) { - // TODO: Provide support for truncated result + // TODO: Provide support for truncated / paginated result throw Exception("Too many objects with prefix ${prefix()}. Truncated ObjectListing not supported.") } return objects.objectSummaries.size From f17ddb58a36bb9971151bd6de7843758e8571281 Mon Sep 17 00:00:00 2001 From: and-carter Date: Wed, 10 Nov 2021 14:53:18 -0800 Subject: [PATCH 6/8] allow archiving by date(s) or entire table --- .../com/openlattice/shuttle/ArchiveService.kt | 121 ++++++++++++------ .../com/openlattice/shuttle/ShuttleCli.kt | 30 ++++- .../openlattice/shuttle/ShuttleCliOptions.kt | 27 +--- 3 files changed, 106 insertions(+), 72 deletions(-) diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt index 643e3568..910f07b0 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt @@ -6,6 +6,7 @@ import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.regions.RegionUtils import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.AmazonS3ClientBuilder +import com.amazonaws.services.s3.model.ListObjectsRequest import com.amazonaws.services.s3.model.ObjectListing import com.openlattice.shuttle.config.ArchiveConfig import com.zaxxer.hikari.HikariConfig @@ -20,6 +21,7 @@ import java.sql.Statement import java.time.LocalDate const val DEFAULT_DAYS = 1 +const val NO_START_DATE = "" /** * @author Andrew Carter andrew@openlattice.com @@ -51,7 +53,7 @@ class ArchiveService( // archive data // overwrite if already exists - fun mummify(startDate: LocalDate, days: Int) { + fun mummify(startDate: String, days: Int) { logger.info("Beginning mummification...") connectToDatabase(dbName).use { connection -> connection.createStatement().use { statement -> @@ -61,7 +63,7 @@ class ArchiveService( } // restore data - fun exhume(startDate: LocalDate, days: Int) { + fun exhume(startDate: String, days: Int) { logger.info("Exhuming data...") connectToDatabase(dbName).use { connection -> connection.createStatement().use { statement -> @@ -72,13 +74,20 @@ class ArchiveService( private fun generateAndExecuteSqlPerDay( statement: Statement, - startDate: LocalDate, + date: String, days: Int, sqlAndExecuteHandler: (statement: Statement, date: String) -> Unit ) { - for (dayIndex in 0 until days) { - val currentDate = startDate.plusDays(dayIndex.toLong()).toString() - sqlAndExecuteHandler(statement, currentDate) + if (date == NO_START_DATE) { + // if no start date provided, pass empty string + sqlAndExecuteHandler(statement, NO_START_DATE) + } else { + // convert to date to LocalDate, so we can add days in loop + val startDate = LocalDate.parse(date) + for (dayIndex in 0 until days) { + val currentDate = startDate.plusDays(dayIndex.toLong()).toString() + sqlAndExecuteHandler(statement, currentDate) + } } } @@ -101,12 +110,16 @@ class ArchiveService( private fun exportSql( date: String, ): String { + + // avoid quoting hell in Postgres by using dollar-sign quotes ($exportQuery$) return "SELECT * FROM aws_s3.query_export_to_s3(" + - "'SELECT * FROM $schemaName.$sourceName " + - "WHERE $dateField = ''$date'' '," + + "\$exportQuery\$" + + "SELECT * FROM $schemaName.$sourceName " + + whereClauseByDate(date) + + "\$exportQuery\$," + "aws_commons.create_s3_uri(\n" + " '${archiveConfig.s3Bucket}',\n" + - " '$dbName/$schemaName/$destinationName-$date',\n" + + " '${destinationPrefix(date)}',\n" + " '${archiveConfig.s3Region}'\n" + "));" } @@ -122,7 +135,7 @@ class ArchiveService( "'', ''," + "aws_commons.create_s3_uri(\n" + " '${archiveConfig.s3Bucket}',\n" + - " '$dbName/$schemaName/$sourceName-$date$partString',\n" + + " '${sourcePrefix(date)}$partString',\n" + " '${archiveConfig.s3Region}'\n" + "));" } @@ -139,70 +152,94 @@ class ArchiveService( private fun validateImport(statement: Statement, date: String) { val query = "SELECT count(*) count " + "FROM $destinationName " + - "WHERE $dateField = '$date';" + "${whereClauseByDate(date)};" val resultSet = executeStatement(statement, query) resultSet.next() val numRowsWritten = resultSet.getInt(1) if (numRowsWritten > 0) { - logger.info("Import validation succeeded. $numRowsWritten rows found in $destinationName for $date.") + logger.info("Import validation succeeded. $numRowsWritten rows found of $destinationName $date.") } else { logger.error("Import validation failed: no data written was written to $destinationName. " + "Either there was a problem importing or there is no data in the source.") } } - private fun connectToDatabase(dbName: String): Connection { - val config = HikariConfig(archiveConfig.hikariConfiguration) - // append org database name to the jdbc url - config.jdbcUrl = "${config.jdbcUrl.removeSuffix("/")}/$dbName" - try { - return HikariDataSource(config).connection - } catch(e: Exception) { - throw Error("Error connecting with HikariDataSource...", e) - } - } - - fun executeStatement(statement: Statement, sql: String): ResultSet { - logger.info("Executing query:\n $sql") - try { - val rs = statement.executeQuery(sql) - logger.info("Successfully executed query.") - return rs - } catch (e: PSQLException) { - throw Error("Unsuccessful sql execution of $sql", e) - } - } - private fun isOverwrite(date: String) { val count = countOfS3ObjectsWithPrefix(date) if (count > 0) { - logger.info("Overwriting. Number of existing objects in s3 with prefix ${prefix(date)}: $count") + logger.info("Overwriting. Number of existing objects in s3 with prefix ${destinationPrefix(date)}: $count") } else { - logger.info("Creating new objects. No objects exist in s3 with prefix ${prefix(date)}") + logger.info("Creating new objects. No objects exist in s3 with prefix ${destinationPrefix(date)}") } } private fun countOfS3ObjectsWithPrefix(date: String): Int { val objects: ObjectListing + val objectsRequest = ListObjectsRequest( + archiveConfig.s3Bucket, + destinationPrefix(date), + "", + "/", + 1000 + ) try { - objects = s3Client.listObjects(archiveConfig.s3Bucket, prefix(date)) + objects = s3Client.listObjects(objectsRequest) } catch (e: SdkClientException) { throw Exception(e) } if (objects.isTruncated) { // TODO: Provide support for truncated / paginated result - throw Exception("Too many objects with prefix ${prefix()}. Truncated ObjectListing not supported.") + throw Exception("Too many objects with prefix ${destinationPrefix(date)}. Truncated ObjectListing not supported.") } return objects.objectSummaries.size } - private fun prefix(): String { - return "$dbName/$schemaName/$sourceName" + private fun destinationPrefix(date: String): String { + return "archive01/$dbName/$schemaName/$destinationName${dateSuffix(date)}" + } + + private fun sourcePrefix(date: String): String { + return "archive01/$dbName/$schemaName/$sourceName${dateSuffix(date)}" } - private fun prefix(date: String): String { - return "$dbName/$schemaName/$sourceName-$date" + private fun dateSuffix(date: String): String { + return if (date == NO_START_DATE) { + "" + } else { + "/${sourceName}_$date" + } + } + + // if date not provided then don't include WHERE clause + private fun whereClauseByDate(date: String): String { + return if (date == NO_START_DATE) { + "" + } else { + "WHERE DATE($dateField) = '$date' " + } + } + + private fun connectToDatabase(dbName: String): Connection { + val config = HikariConfig(archiveConfig.hikariConfiguration) + // append org database name to the jdbc url + config.jdbcUrl = "${config.jdbcUrl.removeSuffix("/")}/$dbName" + try { + return HikariDataSource(config).connection + } catch(e: Exception) { + throw Error("Error connecting with HikariDataSource...", e) + } + } + + fun executeStatement(statement: Statement, sql: String): ResultSet { + logger.info("Executing query:\n $sql") + try { + val rs = statement.executeQuery(sql) + logger.info("Successfully executed query.") + return rs + } catch (e: PSQLException) { + throw Error("Unsuccessful sql execution of $sql", e) + } } } \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt index 0d082279..c41765f5 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt @@ -89,7 +89,7 @@ fun main(args: Array) { } if (cl.hasOption(ARCHIVE)) { - if (cl.hasOption(CONFIGURATION) && cl.hasOption(START_DATE)) { + if (cl.hasOption(CONFIGURATION)) { val archiveYamlMapping = try { ObjectMappers.getYamlMapper() @@ -110,6 +110,14 @@ fun main(args: Array) { } else { DEFAULT_DAYS } + + val startDate = if (cl.hasOption(START_DATE)) { + cl.getOptionValue(START_DATE) + } else { + logger.info("No start date provided.") + NO_START_DATE + } + val archiver = ArchiveService( archiveYamlMapping.archiveConfig, archiveYamlMapping.dbName, @@ -119,18 +127,26 @@ fun main(args: Array) { archiveYamlMapping.dateField ) - if (cl.hasOption(EXPORT)) { + val archiveOption = cl.getOptionValue(ARCHIVE) + + if (archiveOption.equals(EXPORT)) { archiver.mummify( - LocalDate.parse(cl.getOptionValue(START_DATE)), + startDate, days ) - } else if (cl.hasOption(IMPORT)) { + } else if (archiveOption.equals(IMPORT)) { archiver.exhume( - LocalDate.parse(cl.getOptionValue(START_DATE)), + startDate, days ) - } else { printErrorHelpAndExit("Export or import option must be specified.") } - } else { printErrorHelpAndExit("Archive specified but either config or start_date missing.") } + } else { + logger.error("Export or import option must be specified with archive. \n\t--archive import\n\t--archive export") + exitProcess(1) + } + } else { + logger.error("Archive specified, but config yaml file not provided. \n\t--config /path/to/file.yaml") + exitProcess(1) + } return } diff --git a/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt b/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt index af17fa37..e6103751 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt @@ -255,9 +255,10 @@ class ShuttleCliOptions { private val archiveOption = Option.builder() .longOpt(ARCHIVE) - .hasArg(false) - .desc("Archive or import data between jdbc and s3") - .argName("archive") + .hasArg(true) + .desc("Archive or restore data between JDBC and s3.\n" + + "\t--archive <\"export\" or \"import\"> --config [--start-date --days]") + .argName("\"export\" or \"import\"") .build() private val startDateOption = Option.builder() @@ -274,20 +275,6 @@ class ShuttleCliOptions { .argName("days") .build() - private val exportOption = Option.builder() - .longOpt(EXPORT) - .hasArg(false) - .desc("Indicates export for archival") - .argName("export-archive") - .build() - - private val importOption = Option.builder() - .longOpt(IMPORT) - .hasArg(false) - .desc("Indicates import for archival") - .argName("import-archive") - .build() - init { options .addOption(helpOption) @@ -316,12 +303,6 @@ class ShuttleCliOptions { .addOption(startDateOption) .addOption(daysOption) - options.addOptionGroup( - OptionGroup() - .addOption(importOption) - .addOption(exportOption) - ) - options.addOptionGroup( OptionGroup() .addOption(sqlOption) From 7ddcf8c1f6f1362e718ea77a3a45d278356907c1 Mon Sep 17 00:00:00 2001 From: and-carter Date: Fri, 12 Nov 2021 09:22:53 -0800 Subject: [PATCH 7/8] fix s3 prefixes --- .../com/openlattice/shuttle/ArchiveService.kt | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt index 910f07b0..f090d084 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt @@ -79,10 +79,10 @@ class ArchiveService( sqlAndExecuteHandler: (statement: Statement, date: String) -> Unit ) { if (date == NO_START_DATE) { - // if no start date provided, pass empty string + // if start date not provided, pass empty string sqlAndExecuteHandler(statement, NO_START_DATE) } else { - // convert to date to LocalDate, so we can add days in loop + // convert to date to LocalDate for date arithmetic val startDate = LocalDate.parse(date) for (dayIndex in 0 until days) { val currentDate = startDate.plusDays(dayIndex.toLong()).toString() @@ -98,7 +98,8 @@ class ArchiveService( } private fun importHandler(statement: Statement, currentDate: String) { - val parts = countOfS3ObjectsWithPrefix(currentDate) + val parts = countOfS3ObjectsWithPrefix(currentDate, ::sourcePrefix) + logger.info("Number of objects in s3 with prefix ${sourcePrefix(currentDate)}: $parts") for(part in 0 until parts) { // +1 to part to account for 0 vs 1 indexing executeStatement(statement, importSql(currentDate, part + 1)) @@ -113,10 +114,10 @@ class ArchiveService( // avoid quoting hell in Postgres by using dollar-sign quotes ($exportQuery$) return "SELECT * FROM aws_s3.query_export_to_s3(" + - "\$exportQuery\$" + + "\$exportQuery\$ " + "SELECT * FROM $schemaName.$sourceName " + whereClauseByDate(date) + - "\$exportQuery\$," + + " \$exportQuery\$," + "aws_commons.create_s3_uri(\n" + " '${archiveConfig.s3Bucket}',\n" + " '${destinationPrefix(date)}',\n" + @@ -141,7 +142,7 @@ class ArchiveService( } private fun validateExport(date: String) { - if (countOfS3ObjectsWithPrefix(date) > 0) { + if (countOfS3ObjectsWithPrefix(date, ::destinationPrefix) > 0) { logger.info("Export validation succeeded. Data written to s3.") } else { logger.error("Export validation failed: no data written was written to s3. " + @@ -167,19 +168,22 @@ class ArchiveService( } private fun isOverwrite(date: String) { - val count = countOfS3ObjectsWithPrefix(date) + val count = countOfS3ObjectsWithPrefix(date, ::destinationPrefix) if (count > 0) { - logger.info("Overwriting. Number of existing objects in s3 with prefix ${destinationPrefix(date)}: $count") + logger.info("Overwriting. Number of objects in s3 with prefix ${destinationPrefix(date)}: $count") } else { logger.info("Creating new objects. No objects exist in s3 with prefix ${destinationPrefix(date)}") } } - private fun countOfS3ObjectsWithPrefix(date: String): Int { + private fun countOfS3ObjectsWithPrefix( + date: String, + prefix: (date: String) -> String + ): Int { val objects: ObjectListing val objectsRequest = ListObjectsRequest( archiveConfig.s3Bucket, - destinationPrefix(date), + prefix(date), "", "/", 1000 @@ -217,7 +221,7 @@ class ArchiveService( return if (date == NO_START_DATE) { "" } else { - "WHERE DATE($dateField) = '$date' " + "WHERE DATE($dateField) = '$date'" } } @@ -236,10 +240,10 @@ class ArchiveService( logger.info("Executing query:\n $sql") try { val rs = statement.executeQuery(sql) - logger.info("Successfully executed query.") + logger.info("Successfully executed query.\n") return rs } catch (e: PSQLException) { - throw Error("Unsuccessful sql execution of $sql", e) + throw Error("Unsuccessful execution of sql $sql", e) } } } \ No newline at end of file From 15c9c98f1e926d2ee3c9b95852426a0e3d7e24ae Mon Sep 17 00:00:00 2001 From: and-carter Date: Fri, 12 Nov 2021 11:30:03 -0800 Subject: [PATCH 8/8] move aws creds to config --- .../shuttle/config/ArchiveConfig.java | 16 +++++++++++++-- .../com/openlattice/shuttle/ArchiveService.kt | 20 +++++++++++-------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java index 7af39150..e2321a68 100644 --- a/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java +++ b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java @@ -20,15 +20,21 @@ public class ArchiveConfig { private final Properties hikariConfiguration; private final String s3Bucket; private final String s3Region; + private final String accessKey; + private final String secretKey; ArchiveConfig( @JsonProperty( "hikariConfig" ) Properties hikariConfiguration, @JsonProperty( "s3Bucket" ) String s3Bucket, - @JsonProperty( "s3Region" ) String s3Region - ) { + @JsonProperty( "s3Region" ) String s3Region, + @JsonProperty( "accessKey" ) String accessKey, + @JsonProperty( "secretKey" ) String secretKey + ) { this.hikariConfiguration = hikariConfiguration; this.s3Bucket = s3Bucket; this.s3Region = s3Region; + this.accessKey = accessKey; + this.secretKey = secretKey; } @JsonProperty( "hikariConfig" ) @@ -40,4 +46,10 @@ public class ArchiveConfig { @JsonProperty( "s3Region" ) public String getS3Region() { return s3Region; } + @JsonProperty( "accessKey ") + public String getAccessKey() { return accessKey; } + + @JsonProperty( "secretKey" ) + public String getSecretKey() { return secretKey; } + } diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt index f090d084..969e09db 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt @@ -22,16 +22,15 @@ import java.time.LocalDate const val DEFAULT_DAYS = 1 const val NO_START_DATE = "" +const val S3_DELIMITER = "/" +const val S3_MARKER = "" +const val S3_MAX_KEYS = 1000 /** * @author Andrew Carter andrew@openlattice.com */ private val logger = LoggerFactory.getLogger(ArchiveService::class.java) -private val s3Credentials = BasicAWSCredentials( - "***", - "***" -) @Service class ArchiveService( @@ -47,7 +46,12 @@ class ArchiveService( init { logger.info("Initiating ArchiveService...") s3Client = AmazonS3ClientBuilder.standard().withCredentials( - AWSStaticCredentialsProvider(s3Credentials) + AWSStaticCredentialsProvider( + BasicAWSCredentials ( + archiveConfig.accessKey, + archiveConfig.secretKey + ) + ) ).withRegion(RegionUtils.getRegion(archiveConfig.s3Region).name).build() } @@ -184,9 +188,9 @@ class ArchiveService( val objectsRequest = ListObjectsRequest( archiveConfig.s3Bucket, prefix(date), - "", - "/", - 1000 + S3_MARKER, + S3_DELIMITER, + S3_MAX_KEYS ) try { objects = s3Client.listObjects(objectsRequest)