Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ dependencies {
implementation 'org.postgresql:postgresql' // PostgreSQL Driver
//object storage dependency
implementation 'io.micronaut.objectstorage:micronaut-object-storage-aws'
implementation 'io.micronaut.objectstorage:micronaut-object-storage-local'
// include sts to allow the use of service account role - https://stackoverflow.com/a/73306570
// this sts dependency is require by micronaut-aws-parameter-store,
// not directly used by the app, for this reason keeping `runtimeOnly`
Expand Down
60 changes: 60 additions & 0 deletions src/main/groovy/io/seqera/wave/configuration/BuildConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import javax.annotation.Nullable
import javax.annotation.PostConstruct

import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Value
import io.seqera.wave.api.SubmitContainerTokenRequest
import io.seqera.wave.util.BucketTokenizer
import jakarta.inject.Singleton
/**
* Model Wave build config settings
Expand Down Expand Up @@ -105,6 +107,28 @@ class BuildConfig {
@Value('${wave.build.max-container-file-size:10000}')
int maxContainerFileSize

/**
* The path where build logs locks files are stored. Can be either
* a S3 path e.g. {@code s3://some-bucket/data/path} or a local file system
* path e.g. {@code /some/data/path}
*/
@Value('${wave.build.logs.path}')
String logsPath

/**
* The path where Conda locks files are stored. Can be either
* a S3 path e.g. {@code s3://some-bucket/data/path} or a local file system
* path e.g. {@code /some/data/path}
*/
@Value('${wave.build.locks.path}')
String locksPath

/**
* Max length allowed for build logs download
*/
@Value('${wave.build.logs.maxLength:100000}')
long maxLength

@PostConstruct
private void init() {
log.info("Builder config: " +
Expand All @@ -116,6 +140,8 @@ class BuildConfig {
"build-workspace=${buildWorkspace}; " +
"build-timeout=${defaultTimeout}; " +
"build-trusted-timeout=${trustedTimeout}; " +
"build-logs-path=${logsPath}; " +
"build-locks-path=${locksPath}; " +
"status-delay=${statusDelay}; " +
"status-duration=${statusDuration}; " +
"failure-duration=${getFailureDuration()}; " +
Expand All @@ -137,4 +163,38 @@ class BuildConfig {
? trustedTimeout
: defaultTimeout
}

/**
* The file name prefix applied when storing a build logs file into an object storage.
* For example having {@link #logsPath} as {@code s3://bucket-name/foo/bar} the
* value returned by this method is {@code foo/bar}.
*
* When using a local path the prefix is {@code null}.
*
* @return the log file name prefix
*/
@Memoized
String getLogsPrefix() {
if( !logsPath )
return null
final store = BucketTokenizer.from(logsPath)
return store.scheme ? store.getKey() : null
}

/**
* The file name prefix applied when storing a Conda lock file into an object storage.
* For example having {@link #logsPath} as {@code s3://bucket-name/foo/bar} the
* value returned by this method is {@code foo/bar}.
*
* When using a local path the prefix is {@code null}.
*
* @return the log file name prefix
*/
@Memoized
String getLocksPrefix() {
if( !locksPath )
return null
final store = BucketTokenizer.from(locksPath)
return store.scheme ? store.getKey() : null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@

package io.seqera.wave.service.aws

import java.nio.file.Path

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.micronaut.inject.qualifiers.Qualifiers
import io.micronaut.objectstorage.InputStreamMapper
import io.micronaut.objectstorage.ObjectStorageOperations
import io.micronaut.objectstorage.aws.AwsS3Configuration
import io.micronaut.objectstorage.aws.AwsS3Operations
import io.micronaut.objectstorage.local.LocalStorageConfiguration
import io.micronaut.objectstorage.local.LocalStorageOperations
import io.seqera.wave.configuration.BuildConfig
import io.seqera.wave.util.BucketTokenizer
import jakarta.annotation.Nullable
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import software.amazon.awssdk.services.s3.S3Client
Expand All @@ -38,16 +46,61 @@ import software.amazon.awssdk.services.s3.S3Client
@Factory
@CompileStatic
@Slf4j
@Requires(property = 'wave.build.logs.bucket')
class ObjectStorageOperationsFactory {

@Value('${wave.build.logs.bucket}')
String storageBucket
public static final String BUILD_LOGS = "build-logs"

public static final String BUILD_LOCKS = "build-locks"

@Inject
private ApplicationContext context

@Inject
@Nullable
private BuildConfig buildConfig

@Singleton
@Named(BUILD_LOGS)
ObjectStorageOperations<?, ?, ?> createLogsStorageOps() {
if( !buildConfig )
throw new IllegalStateException("Build configuration is not defined")
return create0(BUILD_LOGS, buildConfig.logsPath, "wave.build.logs.path")
}

@Singleton
@Named("build-logs")
ObjectStorageOperations<?, ?, ?> awsStorageOperations(@Named("DefaultS3Client") S3Client s3Client, InputStreamMapper inputStreamMapper) {
AwsS3Configuration configuration = new AwsS3Configuration('build-logs')
@Named(BUILD_LOCKS)
ObjectStorageOperations<?, ?, ?> createLocksStorageOpts() {
if( !buildConfig )
throw new IllegalStateException("Build configuration is not defined")
return create0(BUILD_LOCKS, buildConfig.locksPath, "wave.build.locks.path")
}

protected ObjectStorageOperations<?, ?, ?> create0(String scope, String path, String setting) {
if( !path )
throw new IllegalStateException("Missing config setting '${setting}' in the wave config")
final store = BucketTokenizer.from(path)
if( !store.scheme ) {
return localFactory(scope, path)
}
if( store.scheme=='s3' ) {
return awsFactory(scope, store.bucket)
}
throw new IllegalArgumentException("Unsupported storage scheme: '${store.scheme}' - offending setting '${setting}': ${path}" )
}

protected ObjectStorageOperations<?, ?, ?> localFactory(String scope, String storageBucket) {
log.debug "Using local ObjectStorageOperations scope='${scope}'; storageBucket='${storageBucket}'"
final localPath = Path.of(storageBucket)
LocalStorageConfiguration configuration = new LocalStorageConfiguration(scope)
configuration.setPath(localPath)
return new LocalStorageOperations(configuration)
}

protected ObjectStorageOperations<?, ?, ?> awsFactory(String scope, String storageBucket) {
log.debug "Using AWS S3 ObjectStorageOperations scope='${scope}'; storageBucket='${storageBucket}'"
final s3Client = context.getBean(S3Client, Qualifiers.byName("DefaultS3Client"))
final inputStreamMapper = context.getBean(InputStreamMapper)
AwsS3Configuration configuration = new AwsS3Configuration(scope)
configuration.setBucket(storageBucket)
return new AwsS3Operations(configuration, s3Client, inputStreamMapper)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,26 @@ package io.seqera.wave.service.logs
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService

import io.micronaut.core.annotation.Nullable

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.micronaut.http.server.types.files.StreamedFile
import io.micronaut.objectstorage.ObjectStorageEntry
import io.micronaut.objectstorage.ObjectStorageOperations
import io.micronaut.objectstorage.request.UploadRequest
import io.micronaut.runtime.event.annotation.EventListener
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.configuration.BuildConfig
import io.seqera.wave.service.builder.BuildEvent
import io.seqera.wave.service.builder.BuildRequest
import io.seqera.wave.service.persistence.PersistenceService
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import org.apache.commons.io.input.BoundedInputStream
import static org.apache.commons.lang3.StringUtils.strip

import static io.seqera.wave.service.aws.ObjectStorageOperationsFactory.*

/**
* Implements Service to manage logs from an Object store
*
Expand All @@ -50,50 +49,38 @@ import static org.apache.commons.lang3.StringUtils.strip
@Slf4j
@Singleton
@CompileStatic
@Requires(property = 'wave.build.logs.bucket')
@Requires(property = 'wave.build')
class BuildLogServiceImpl implements BuildLogService {

private static final String CONDA_LOCK_START = ">> CONDA_LOCK_START"

private static final String CONDA_LOCK_END = "<< CONDA_LOCK_END"

@Inject
@Named('build-logs')
private ObjectStorageOperations<?, ?, ?> objectStorageOperations
@Named(BUILD_LOGS)
private ObjectStorageOperations<?, ?, ?> logsStoreOps

@Inject
private PersistenceService persistenceService
@Named(BUILD_LOCKS)
private ObjectStorageOperations<?, ?, ?> locksStoreOps

@Nullable
@Value('${wave.build.logs.prefix}')
private String prefix

@Value('${wave.build.logs.bucket}')
private String bucket

@Value('${wave.build.logs.maxLength:100000}')
private long maxLength
@Inject
private PersistenceService persistenceService

@Nullable
@Value('${wave.build.logs.conda-lock-prefix}')
private String condaLockPrefix
@Inject
private BuildConfig buildConfig

@Inject
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

@PostConstruct
private void init() {
log.info "Creating Build log service bucket=$bucket; logs prefix=$prefix; maxLength: ${maxLength}; condaLock prefix=$condaLockPrefix"
}

protected String logKey(String buildId) {
if( !buildId )
return null
if( !prefix )
return buildId + '.log'
final base = strip(prefix, '/')
return "${base}/${buildId}.log"
final prefix = buildConfig?.logsPrefix
return prefix
? "${prefix}/${buildId}.log"
: buildId + '.log'
}

@EventListener
Expand All @@ -105,12 +92,11 @@ class BuildLogServiceImpl implements BuildLogService {

@Override
void storeLog(String buildId, String content) {

try {
final String logs = removeCondaLockFile(content)
log.debug "Storing logs for buildId: $buildId"
final uploadRequest = UploadRequest.fromBytes(logs.bytes, logKey(buildId))
objectStorageOperations.upload(uploadRequest)
logsStoreOps.upload(uploadRequest)
// check if needed to store the conda lock
final condaLock = content.contains(CONDA_LOCK_START)
if ( condaLock )
Expand All @@ -128,7 +114,7 @@ class BuildLogServiceImpl implements BuildLogService {

private StreamedFile fetchLogStream0(String buildId) {
if( !buildId ) return null
final Optional<ObjectStorageEntry<?>> result = objectStorageOperations.retrieve(logKey(buildId))
final Optional<ObjectStorageEntry<?>> result = logsStoreOps.retrieve(logKey(buildId))
return result.isPresent() ? result.get().toStreamedFile() : null
}

Expand All @@ -137,8 +123,8 @@ class BuildLogServiceImpl implements BuildLogService {
final result = fetchLogStream(buildId)
if( !result )
return null
final logs = new BoundedInputStream(result.getInputStream(), maxLength).getText()
return new BuildLog(logs, logs.length()>=maxLength)
final logs = new BoundedInputStream(result.getInputStream(), buildConfig.maxLength).getText()
return new BuildLog(logs, logs.length()>=buildConfig.maxLength)
}

protected static removeCondaLockFile(String logs) {
Expand All @@ -165,7 +151,7 @@ class BuildLogServiceImpl implements BuildLogService {
if ( condaLock ){
log.debug "Storing conda lock for buildId: $buildId"
final uploadRequest = UploadRequest.fromBytes(condaLock.bytes, condaLockKey(buildId))
objectStorageOperations.upload(uploadRequest)
locksStoreOps.upload(uploadRequest)
}
}
catch (Exception e) {
Expand All @@ -176,10 +162,10 @@ class BuildLogServiceImpl implements BuildLogService {
protected String condaLockKey(String buildId) {
if( !buildId )
return null
if( !condaLockPrefix )
return buildId + '.lock'
final base = strip(condaLockPrefix, '/')
return "${base}/${buildId}.lock"
final prefix = buildConfig?.locksPrefix
return prefix
? "${prefix}/${buildId}.lock"
: buildId + '.lock'
}

@Override
Expand All @@ -188,13 +174,12 @@ class BuildLogServiceImpl implements BuildLogService {
if( !result )
return null
return result.getInputStream().getText()

}

@Override
StreamedFile fetchCondaLockStream(String buildId) {
if( !buildId ) return null
final Optional<ObjectStorageEntry<?>> result = objectStorageOperations.retrieve(condaLockKey(buildId))
final Optional<ObjectStorageEntry<?>> result = locksStoreOps.retrieve(condaLockKey(buildId))
return result.isPresent() ? result.get().toStreamedFile() : null
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/resources/application-buildlogs-aws-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
wave:
build:
logs:
bucket: "nextflow-ci"
prefix: 'wave-build/logs'
conda-lock-prefix: 'wave-build/conda-locks'
path: "s3://nextflow-ci/test/l1"
locks:
path: "s3://nextflow-ci"
...
8 changes: 0 additions & 8 deletions src/main/resources/application-buildlogs-local.yml

This file was deleted.

4 changes: 4 additions & 0 deletions src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ wave:
duration: '30s'
build:
workspace: 'build-workspace'
logs:
path: "${PWD}/build-workspace/logs"
locks:
path: "${PWD}/build-workspace/locks"
metrics:
enabled: true
accounts:
Expand Down
Loading
Loading