diff --git a/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java new file mode 100644 index 00000000..e2321a68 --- /dev/null +++ b/src/main/java/com/openlattice/shuttle/config/ArchiveConfig.java @@ -0,0 +1,55 @@ +package com.openlattice.shuttle.config; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * @author Andrew Carter andrew@openlattice.com + * + * Contains connection details (jdbc and s3) for archiving. + */ +public class ArchiveConfig { + private final Properties hikariConfiguration; + private final String s3Bucket; + private final String s3Region; + private final String accessKey; + private final String secretKey; + + ArchiveConfig( + @JsonProperty( "hikariConfig" ) Properties hikariConfiguration, + @JsonProperty( "s3Bucket" ) String s3Bucket, + @JsonProperty( "s3Region" ) String s3Region, + @JsonProperty( "accessKey" ) String accessKey, + @JsonProperty( "secretKey" ) String secretKey + ) { + this.hikariConfiguration = hikariConfiguration; + this.s3Bucket = s3Bucket; + this.s3Region = s3Region; + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + @JsonProperty( "hikariConfig" ) + public Properties getHikariConfiguration() { return hikariConfiguration; } + + @JsonProperty( "s3Bucket" ) + public String getS3Bucket() { return s3Bucket; } + + @JsonProperty( "s3Region" ) + public String getS3Region() { return s3Region; } + + @JsonProperty( "accessKey ") + public String getAccessKey() { return accessKey; } + + @JsonProperty( "secretKey" ) + public String getSecretKey() { return secretKey; } + +} diff --git a/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java b/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java new file mode 100644 index 00000000..da652c43 --- /dev/null +++ b/src/main/java/com/openlattice/shuttle/config/ArchiveYamlMapping.java @@ -0,0 +1,54 @@ +package com.openlattice.shuttle.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * @author Andrew Carter andrew@openlattice.com + * + * Archive configuration files are mapped to this class. + * This class contains all fields required to create an ArchiveService. + */ +public class ArchiveYamlMapping { + private final ArchiveConfig archiveConfig; + private final String dbName; + private final String schemaName; + private final String sourceName; + private final String destinationName; + private final String dateField; + + @JsonCreator + public ArchiveYamlMapping( + @JsonProperty( "archiveParameters" ) ArchiveConfig archiveConfig, + @JsonProperty( "dbName" ) String dbName, + @JsonProperty( "schemaName" ) String schemaName, + @JsonProperty( "sourceName" ) String sourceName, + @JsonProperty( "destinationName" ) String destinationName, + @JsonProperty( "dateField" ) String dateField + ) { + this.archiveConfig = archiveConfig; + this.dbName = dbName; + this.schemaName = schemaName; + this.sourceName = sourceName; + this.destinationName = destinationName; + this.dateField = dateField; + } + + @JsonProperty( "archiveConfig" ) + public ArchiveConfig getArchiveConfig() { return archiveConfig; } + + @JsonProperty( "dbName" ) + public String getDbName() { return dbName; } + + @JsonProperty("schemaName") + public String getSchemaName() { return schemaName; } + + @JsonProperty( "sourceName" ) + public String getSourceName() { return sourceName; } + + @JsonProperty( "destinationName" ) + public String getDestinationName() { return destinationName; } + + @JsonProperty( "dateField" ) + public String getDateField() { return dateField; } +} diff --git a/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt new file mode 100644 index 00000000..969e09db --- /dev/null +++ b/src/main/kotlin/com/openlattice/shuttle/ArchiveService.kt @@ -0,0 +1,253 @@ +package com.openlattice.shuttle + +import com.amazonaws.SdkClientException +import com.amazonaws.auth.AWSStaticCredentialsProvider +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.regions.RegionUtils +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.AmazonS3ClientBuilder +import com.amazonaws.services.s3.model.ListObjectsRequest +import com.amazonaws.services.s3.model.ObjectListing +import com.openlattice.shuttle.config.ArchiveConfig +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.postgresql.util.PSQLException +import org.slf4j.LoggerFactory + +import org.springframework.stereotype.Service +import java.sql.Connection +import java.sql.ResultSet +import java.sql.Statement +import java.time.LocalDate + +const val DEFAULT_DAYS = 1 +const val NO_START_DATE = "" +const val S3_DELIMITER = "/" +const val S3_MARKER = "" +const val S3_MAX_KEYS = 1000 + +/** + * @author Andrew Carter andrew@openlattice.com + */ + +private val logger = LoggerFactory.getLogger(ArchiveService::class.java) + +@Service +class ArchiveService( + private val archiveConfig: ArchiveConfig, + private val dbName: String, + private val schemaName: String = "openlattice", + private val sourceName: String, + private val destinationName: String = sourceName, + private val dateField: String +) { + private val s3Client: AmazonS3 + + init { + logger.info("Initiating ArchiveService...") + s3Client = AmazonS3ClientBuilder.standard().withCredentials( + AWSStaticCredentialsProvider( + BasicAWSCredentials ( + archiveConfig.accessKey, + archiveConfig.secretKey + ) + ) + ).withRegion(RegionUtils.getRegion(archiveConfig.s3Region).name).build() + } + + // archive data + // overwrite if already exists + fun mummify(startDate: String, days: Int) { + logger.info("Beginning mummification...") + connectToDatabase(dbName).use { connection -> + connection.createStatement().use { statement -> + generateAndExecuteSqlPerDay(statement, startDate, days, ::exportHandler) + } + } + } + + // restore data + fun exhume(startDate: String, days: Int) { + logger.info("Exhuming data...") + connectToDatabase(dbName).use { connection -> + connection.createStatement().use { statement -> + generateAndExecuteSqlPerDay(statement, startDate, days, ::importHandler) + } + } + } + + private fun generateAndExecuteSqlPerDay( + statement: Statement, + date: String, + days: Int, + sqlAndExecuteHandler: (statement: Statement, date: String) -> Unit + ) { + if (date == NO_START_DATE) { + // if start date not provided, pass empty string + sqlAndExecuteHandler(statement, NO_START_DATE) + } else { + // convert to date to LocalDate for date arithmetic + val startDate = LocalDate.parse(date) + for (dayIndex in 0 until days) { + val currentDate = startDate.plusDays(dayIndex.toLong()).toString() + sqlAndExecuteHandler(statement, currentDate) + } + } + } + + private fun exportHandler(statement: Statement, currentDate: String) { + isOverwrite(currentDate) + executeStatement(statement, exportSql(currentDate)) + validateExport(currentDate) + } + + private fun importHandler(statement: Statement, currentDate: String) { + val parts = countOfS3ObjectsWithPrefix(currentDate, ::sourcePrefix) + logger.info("Number of objects in s3 with prefix ${sourcePrefix(currentDate)}: $parts") + for(part in 0 until parts) { + // +1 to part to account for 0 vs 1 indexing + executeStatement(statement, importSql(currentDate, part + 1)) + } + validateImport(statement,currentDate) + } + + // generate sql to invoke an export using aws_s3 postgres extension + private fun exportSql( + date: String, + ): String { + + // avoid quoting hell in Postgres by using dollar-sign quotes ($exportQuery$) + return "SELECT * FROM aws_s3.query_export_to_s3(" + + "\$exportQuery\$ " + + "SELECT * FROM $schemaName.$sourceName " + + whereClauseByDate(date) + + " \$exportQuery\$," + + "aws_commons.create_s3_uri(\n" + + " '${archiveConfig.s3Bucket}',\n" + + " '${destinationPrefix(date)}',\n" + + " '${archiveConfig.s3Region}'\n" + + "));" + } + + // generate sql to invoke an import using aws_s3 postgres extension + private fun importSql( + date: String, + part: Int + ): String { + val partString = if (part > 1) "_part$part" else "" + return " SELECT aws_s3.table_import_from_s3(" + + "'$destinationName',\n" + + "'', ''," + + "aws_commons.create_s3_uri(\n" + + " '${archiveConfig.s3Bucket}',\n" + + " '${sourcePrefix(date)}$partString',\n" + + " '${archiveConfig.s3Region}'\n" + + "));" + } + + private fun validateExport(date: String) { + if (countOfS3ObjectsWithPrefix(date, ::destinationPrefix) > 0) { + logger.info("Export validation succeeded. Data written to s3.") + } else { + logger.error("Export validation failed: no data written was written to s3. " + + "Either there was a problem exporting or there is no data in the source table.") + } + } + + private fun validateImport(statement: Statement, date: String) { + val query = "SELECT count(*) count " + + "FROM $destinationName " + + "${whereClauseByDate(date)};" + + val resultSet = executeStatement(statement, query) + resultSet.next() + val numRowsWritten = resultSet.getInt(1) + + if (numRowsWritten > 0) { + logger.info("Import validation succeeded. $numRowsWritten rows found of $destinationName $date.") + } else { + logger.error("Import validation failed: no data written was written to $destinationName. " + + "Either there was a problem importing or there is no data in the source.") + } + } + + private fun isOverwrite(date: String) { + val count = countOfS3ObjectsWithPrefix(date, ::destinationPrefix) + if (count > 0) { + logger.info("Overwriting. Number of objects in s3 with prefix ${destinationPrefix(date)}: $count") + } else { + logger.info("Creating new objects. No objects exist in s3 with prefix ${destinationPrefix(date)}") + } + } + + private fun countOfS3ObjectsWithPrefix( + date: String, + prefix: (date: String) -> String + ): Int { + val objects: ObjectListing + val objectsRequest = ListObjectsRequest( + archiveConfig.s3Bucket, + prefix(date), + S3_MARKER, + S3_DELIMITER, + S3_MAX_KEYS + ) + try { + objects = s3Client.listObjects(objectsRequest) + } catch (e: SdkClientException) { + throw Exception(e) + } + if (objects.isTruncated) { + // TODO: Provide support for truncated / paginated result + throw Exception("Too many objects with prefix ${destinationPrefix(date)}. Truncated ObjectListing not supported.") + } + return objects.objectSummaries.size + } + + private fun destinationPrefix(date: String): String { + return "archive01/$dbName/$schemaName/$destinationName${dateSuffix(date)}" + } + + private fun sourcePrefix(date: String): String { + return "archive01/$dbName/$schemaName/$sourceName${dateSuffix(date)}" + } + + private fun dateSuffix(date: String): String { + return if (date == NO_START_DATE) { + "" + } else { + "/${sourceName}_$date" + } + } + + // if date not provided then don't include WHERE clause + private fun whereClauseByDate(date: String): String { + return if (date == NO_START_DATE) { + "" + } else { + "WHERE DATE($dateField) = '$date'" + } + } + + private fun connectToDatabase(dbName: String): Connection { + val config = HikariConfig(archiveConfig.hikariConfiguration) + // append org database name to the jdbc url + config.jdbcUrl = "${config.jdbcUrl.removeSuffix("/")}/$dbName" + try { + return HikariDataSource(config).connection + } catch(e: Exception) { + throw Error("Error connecting with HikariDataSource...", e) + } + } + + fun executeStatement(statement: Statement, sql: String): ResultSet { + logger.info("Executing query:\n $sql") + try { + val rs = statement.executeQuery(sql) + logger.info("Successfully executed query.\n") + return rs + } catch (e: PSQLException) { + throw Error("Unsuccessful execution of sql $sql", e) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt index 6adbd32e..9b39ca86 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ShuttleCli.kt @@ -39,7 +39,13 @@ import com.openlattice.shuttle.ShuttleCliOptions.Companion.TOKEN import com.openlattice.shuttle.ShuttleCliOptions.Companion.UPLOAD_SIZE import com.openlattice.shuttle.ShuttleCliOptions.Companion.USER import com.openlattice.shuttle.ShuttleCliOptions.Companion.XML +import com.openlattice.shuttle.ShuttleCliOptions.Companion.ARCHIVE +import com.openlattice.shuttle.ShuttleCliOptions.Companion.IMPORT +import com.openlattice.shuttle.ShuttleCliOptions.Companion.EXPORT +import com.openlattice.shuttle.ShuttleCliOptions.Companion.START_DATE +import com.openlattice.shuttle.ShuttleCliOptions.Companion.DAYS import com.openlattice.shuttle.config.IntegrationConfig +import com.openlattice.shuttle.config.ArchiveYamlMapping import com.openlattice.shuttle.payload.* import com.openlattice.shuttle.source.LocalFileOrigin import com.openlattice.shuttle.source.S3BucketOrigin @@ -49,6 +55,7 @@ import org.slf4j.LoggerFactory import java.io.File import java.io.IOException import java.nio.file.Paths +import java.time.LocalDate import java.util.* import kotlin.system.exitProcess @@ -82,6 +89,68 @@ fun main(args: Array) { return } + if (cl.hasOption(ARCHIVE)) { + if (cl.hasOption(CONFIGURATION)) { + + val archiveYamlMapping = try { + ObjectMappers.getYamlMapper() + .readValue(File(cl.getOptionValue(CONFIGURATION)), ArchiveYamlMapping::class.java) + } catch (io: IOException) { + logger.info("IOException encountered converting yaml file into java objects", io) + exitProcess(1) + } catch (jp: JsonParseException) { + logger.info("Shuttle was unable to parse the yaml file", jp) + exitProcess(1) + } catch (jm: JsonMappingException) { + logger.info( "Shuttle was unable to map the yaml objects into java objects", jm) + exitProcess(1) + } + + val days = if (cl.hasOption(DAYS)) { + cl.getOptionValue(DAYS).toInt() + } else { + DEFAULT_DAYS + } + + val startDate = if (cl.hasOption(START_DATE)) { + cl.getOptionValue(START_DATE) + } else { + logger.info("No start date provided.") + NO_START_DATE + } + + val archiver = ArchiveService( + archiveYamlMapping.archiveConfig, + archiveYamlMapping.dbName, + archiveYamlMapping.schemaName, + archiveYamlMapping.sourceName, + archiveYamlMapping.destinationName, + archiveYamlMapping.dateField + ) + + val archiveOption = cl.getOptionValue(ARCHIVE) + + if (archiveOption.equals(EXPORT)) { + archiver.mummify( + startDate, + days + ) + } else if (archiveOption.equals(IMPORT)) { + archiver.exhume( + startDate, + days + ) + } else { + logger.error("Export or import option must be specified with archive. \n\t--archive import\n\t--archive export") + exitProcess(1) + } + } else { + logger.error("Archive specified, but config yaml file not provided. \n\t--config /path/to/file.yaml") + exitProcess(1) + } + return + } + if (cl.hasOption(SERVER)) { if (cl.hasOption(PROFILES)) { val shuttleServer = ShuttleServer() diff --git a/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt b/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt index a97dc4c5..0952de12 100644 --- a/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt +++ b/src/main/kotlin/com/openlattice/shuttle/ShuttleCliOptions.kt @@ -43,7 +43,7 @@ class ShuttleCliOptions { const val FROM_EMAIL = "from-email" const val FROM_EMAIL_PASSWORD = "from-email-password" const val HELP = "help" - const val LOCAL_ORIGIN_EXPECTED_ARGS_COUNT = 2 + const val NOTIFICATION_EMAILS = "notify-emails" const val PASSWORD = "password" const val PROFILES = "profiles" @@ -51,17 +51,27 @@ class ShuttleCliOptions { const val S3 = "s3" const val S3_ORIGIN_MAXIMUM_ARGS_COUNT = 4 const val S3_ORIGIN_MINIMUM_ARGS_COUNT = 3 + const val LOCAL_ORIGIN_EXPECTED_ARGS_COUNT = 2 const val SERVER = "server" const val SHUTTLE_CONFIG = "shuttle-config" const val SMTP_SERVER = "smtp-server" const val SMTP_SERVER_PORT = "smtp-server-port" const val SQL = "sql" + + const val THREADS = "threads" + const val ARCHIVE = "archive" + const val START_DATE = "start-date" + const val DAYS = "days" + const val EXPORT = "export" + const val IMPORT = "import" + const val TOKEN = "token" const val UPLOAD_SIZE = "upload-size" const val USER = "user" const val XML = "xml" + private val options = Options() private val clp = DefaultParser() private val hf = HelpFormatter() @@ -111,12 +121,14 @@ class ShuttleCliOptions { private val dataOriginOption = Option.builder() .longOpt(DATA_ORIGIN) .hasArg(true) - .desc("Source location of the data to be integrated\n" + - " Current options are:\n" + - " S3 \n" + - " local ") + .desc( + "Source location of the data to be integrated\n" + + " Current options are:\n" + + " S3 \n" + + " local " + ) .argName("data origin") - .numberOfArgs( S3_ORIGIN_MAXIMUM_ARGS_COUNT) + .numberOfArgs(S3_ORIGIN_MAXIMUM_ARGS_COUNT) .build() private val datasourceOption = Option.builder() @@ -241,19 +253,43 @@ class ShuttleCliOptions { .argName("Port used to connect to smtp server") .build() + private val archiveOption = Option.builder() + .longOpt(ARCHIVE) + .hasArg(true) + .desc( + "Archive or restore data between JDBC and s3.\n" + + "\t--archive <\"export\" or \"import\"> --config [--start-date --days]" + ) + .argName("\"export\" or \"import\"") + .build() + + private val startDateOption = Option.builder() + .longOpt(START_DATE) + .hasArg(true) + .desc("Indicate date to begin archiving data (inclusive beginning at 00:00)") + .argName("start date") + .build() + + private val daysOption = Option.builder() + .longOpt(DAYS) + .hasArg(true) + .desc("Indicates number of days from startDate to archive. Includes start date.") + .argName("days") + .build() + private val dataStoreOption = Option.builder() - .argName(DATA_STORE) - .longOpt(DATA_STORE) - .desc("target data store to integrate into") - .hasArg(true) - .build() + .argName(DATA_STORE) + .longOpt(DATA_STORE) + .desc("target data store to integrate into") + .hasArg(true) + .build() private val shuttleConfigOption = Option.builder() - .argName(SHUTTLE_CONFIG) - .longOpt(SHUTTLE_CONFIG) - .numberOfArgs(2) - .desc("S3 bucket and region containing shuttle.yaml") - .build() + .argName(SHUTTLE_CONFIG) + .longOpt(SHUTTLE_CONFIG) + .numberOfArgs(2) + .desc("S3 bucket and region containing shuttle.yaml") + .build() init { options @@ -278,9 +314,13 @@ class ShuttleCliOptions { .addOption(smtpServerPortOption) .addOption(threadsOption) .addOption(serverOption) + .addOption(archiveOption) + .addOption(startDateOption) + .addOption(daysOption) .addOption(dataStoreOption) .addOption(shuttleConfigOption) + options.addOptionGroup( OptionGroup() .addOption(sqlOption)