Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 90 additions & 124 deletions src/main/groovy/nextflow/slack/BotSlackSender.groovy

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/main/groovy/nextflow/slack/SlackConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class SlackConfig {
*/
final boolean validateOnStartup

/**
* If true, throw an exception (aborting the pipeline) when a Slack notification fails.
* Default: false (log warning and continue)
*/
final boolean failOnError

/**
* Configuration for workflow start notifications
*/
Expand Down Expand Up @@ -130,6 +136,7 @@ class SlackConfig {
this.botChannel = botConfig?.channel as String
this.useThreads = botConfig?.useThreads != null ? botConfig.useThreads as boolean : true
this.validateOnStartup = config.validateOnStartup != null ? config.validateOnStartup as boolean : true
this.failOnError = config.failOnError != null ? config.failOnError as boolean : false
this.onStart = new OnStartConfig(config.onStart as Map)
this.onComplete = new OnCompleteConfig(config.onComplete as Map)
this.onError = new OnErrorConfig(config.onError as Map)
Expand Down Expand Up @@ -234,6 +241,7 @@ class SlackConfig {
return "SlackConfig[enabled=${enabled}, " +
"webhook=${webhook ? '***configured***' : 'null'}, " +
"botToken=${botToken ? '***configured***' : 'null'}, " +
"failOnError=${failOnError}, " +
"onStart=${onStart}, onComplete=${onComplete}, onError=${onError}]"
}
}
107 changes: 58 additions & 49 deletions src/main/groovy/nextflow/slack/SlackExtension.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ class SlackExtension extends PluginExtensionPoint {
*/
@Function
void slackMessage(String text) {
try {
// Get the observer instance from factory
def observer = SlackFactory.observerInstance
// Get the observer instance from factory
def observer = SlackFactory.observerInstance

if (!observer) {
log.debug "Slack plugin: Observer not initialized, skipping message"
return
}
if (!observer) {
log.debug "Slack plugin: Observer not initialized, skipping message"
return
}

if (!observer.sender || !observer.messageBuilder) {
log.debug "Slack plugin: Not configured, skipping message"
return
}
if (!observer.sender || !observer.messageBuilder) {
log.debug "Slack plugin: Not configured, skipping message"
return
}

try {
// Get thread timestamp if threading is enabled
def threadTs = null
if (observer.config?.useThreads && observer.sender instanceof BotSlackSender) {
Expand All @@ -83,8 +83,11 @@ class SlackExtension extends PluginExtensionPoint {
log.debug "Slack plugin: Sent custom text message"

} catch (Exception e) {
log.error "Slack plugin: Error sending message: ${e.message}", e
// Don't propagate exception - never fail the workflow
def msg = "Slack plugin: Error sending message: ${e.message}"
log.error msg, e
if (observer.config?.failOnError) {
throw new RuntimeException(msg, e)
}
}
}

Expand All @@ -106,26 +109,26 @@ class SlackExtension extends PluginExtensionPoint {
*/
@Function
void slackMessage(Map options) {
try {
// Validate required parameters
if (!options.message) {
log.error "Slack plugin: 'message' parameter is required for rich messages"
return
}
// Validate required parameters
if (!options.message) {
log.error "Slack plugin: 'message' parameter is required for rich messages"
return
}

// Get the observer instance from factory
def observer = SlackFactory.observerInstance
// Get the observer instance from factory
def observer = SlackFactory.observerInstance

if (!observer) {
log.debug "Slack plugin: Observer not initialized, skipping message"
return
}
if (!observer) {
log.debug "Slack plugin: Observer not initialized, skipping message"
return
}

if (!observer.sender || !observer.messageBuilder) {
log.debug "Slack plugin: Not configured, skipping message"
return
}
if (!observer.sender || !observer.messageBuilder) {
log.debug "Slack plugin: Not configured, skipping message"
return
}

try {
// Get thread timestamp if threading is enabled
def threadTs = null
if (observer.config?.useThreads && observer.sender instanceof BotSlackSender) {
Expand All @@ -139,8 +142,11 @@ class SlackExtension extends PluginExtensionPoint {
log.debug "Slack plugin: Sent custom rich message"

} catch (Exception e) {
log.error "Slack plugin: Error sending rich message: ${e.message}", e
// Don't propagate exception - never fail the workflow
def msg = "Slack plugin: Error sending rich message: ${e.message}"
log.error msg, e
if (observer.config?.failOnError) {
throw new RuntimeException(msg, e)
}
}
}

Expand Down Expand Up @@ -176,26 +182,26 @@ class SlackExtension extends PluginExtensionPoint {
*/
@Function
void slackFileUpload(Map options) {
try {
// Validate required parameters
if (!options.file) {
log.error "Slack plugin: 'file' parameter is required for file upload"
return
}
// Validate required parameters
if (!options.file) {
log.error "Slack plugin: 'file' parameter is required for file upload"
return
}

// Get the observer instance from factory
def observer = SlackFactory.observerInstance
// Get the observer instance from factory
def observer = SlackFactory.observerInstance

if (!observer) {
log.debug "Slack plugin: Observer not initialized, skipping file upload"
return
}
if (!observer) {
log.debug "Slack plugin: Observer not initialized, skipping file upload"
return
}

if (!observer.sender) {
log.debug "Slack plugin: Not configured, skipping file upload"
return
}
if (!observer.sender) {
log.debug "Slack plugin: Not configured, skipping file upload"
return
}

try {
// Resolve file path
def file = options.file
Path path
Expand Down Expand Up @@ -223,8 +229,11 @@ class SlackExtension extends PluginExtensionPoint {
log.debug "Slack plugin: Uploaded file ${path.fileName}"

} catch (Exception e) {
log.error "Slack plugin: Error uploading file: ${e.message}", e
// Don't propagate exception - never fail the workflow
def msg = "Slack plugin: Error uploading file: ${e.message}"
log.error msg, e
if (observer.config?.failOnError) {
throw new RuntimeException(msg, e)
}
}
}
}
71 changes: 51 additions & 20 deletions src/main/groovy/nextflow/slack/SlackObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,13 @@ class SlackObserver implements TraceObserver {

// Send workflow started notification if enabled
if (config.onStart.enabled) {
def message = messageBuilder.buildWorkflowStartMessage()
sender.sendMessage(message)
log.debug "Slack plugin: Sent workflow start notification"
try {
def message = messageBuilder.buildWorkflowStartMessage()
sender.sendMessage(message)
log.debug "Slack plugin: Sent workflow start notification"
} catch (Exception e) {
handleNotificationError("send workflow start notification", e)
}
}

// Set up progress updates if enabled and using bot sender
Expand Down Expand Up @@ -193,13 +197,17 @@ class SlackObserver implements TraceObserver {
if (isSuccess) {
// Send completion message if enabled
if (config.onComplete.enabled) {
def threadTs = getThreadTsIfEnabled()
def message = messageBuilder.buildWorkflowCompleteMessage(threadTs)
sender.sendMessage(message)
log.debug "Slack plugin: Sent workflow complete notification"
try {
def threadTs = getThreadTsIfEnabled()
def message = messageBuilder.buildWorkflowCompleteMessage(threadTs)
sender.sendMessage(message)
log.debug "Slack plugin: Sent workflow complete notification"
} catch (Exception e) {
handleNotificationError("send workflow complete notification", e)
}

// Upload configured files
uploadConfiguredFiles(config.onComplete.files, threadTs)
uploadConfiguredFiles(config.onComplete.files, getThreadTsIfEnabled())
}

// Handle reactions independently of notification
Expand Down Expand Up @@ -229,14 +237,17 @@ class SlackObserver implements TraceObserver {
if (!isConfigured()) return

if (config.onError.enabled) {
// Get thread timestamp if threading is enabled and we're using bot sender
def threadTs = getThreadTsIfEnabled()
def message = messageBuilder.buildWorkflowErrorMessage(trace, threadTs)
sender.sendMessage(message)
log.debug "Slack plugin: Sent workflow error notification"
try {
def threadTs = getThreadTsIfEnabled()
def message = messageBuilder.buildWorkflowErrorMessage(trace, threadTs)
sender.sendMessage(message)
log.debug "Slack plugin: Sent workflow error notification"
} catch (Exception e) {
handleNotificationError("send workflow error notification", e)
}

// Upload configured files
uploadConfiguredFiles(config.onError.files, threadTs)
uploadConfiguredFiles(config.onError.files, getThreadTsIfEnabled())
}

if (startReactionAdded) {
Expand All @@ -246,7 +257,8 @@ class SlackObserver implements TraceObserver {
}

/**
* Upload files configured in the notification config
* Upload files configured in the notification config.
* Respects failOnError: throws on failure when enabled, logs and continues otherwise.
*/
private void uploadConfiguredFiles(List<String> files, String threadTs) {
if (!files) return
Expand All @@ -262,13 +274,14 @@ class SlackObserver implements TraceObserver {
log.debug "Slack plugin: Uploaded file ${filePath}"
}
catch (Exception e) {
log.warn "Slack plugin: Failed to upload file ${filePath}: ${e.message}"
handleNotificationError("upload file ${filePath}", e)
}
}
}

/**
* Add an emoji reaction to the start message if reactions are enabled
* Add an emoji reaction to the start message if reactions are enabled.
* Respects failOnError: throws on failure when enabled, logs and continues otherwise.
*/
private void addReactionIfEnabled(String emoji) {
if (!emoji) return
Expand All @@ -282,12 +295,13 @@ class SlackObserver implements TraceObserver {
}
}
catch (Exception e) {
log.debug "Slack plugin: Failed to add reaction: ${e.message}"
handleNotificationError("add reaction '${emoji}'", e)
}
}

/**
* Remove an emoji reaction from the start message if reactions are enabled
* Remove an emoji reaction from the start message if reactions are enabled.
* Respects failOnError: throws on failure when enabled, logs and continues otherwise.
*/
private void removeReactionIfEnabled(String emoji) {
if (!emoji) return
Expand All @@ -301,7 +315,7 @@ class SlackObserver implements TraceObserver {
}
}
catch (Exception e) {
log.debug "Slack plugin: Failed to remove reaction: ${e.message}"
handleNotificationError("remove reaction '${emoji}'", e)
}
}

Expand All @@ -315,6 +329,23 @@ class SlackObserver implements TraceObserver {
return null
}

/**
* Handle a notification error based on failOnError configuration.
* When failOnError is false (default): logs a warning and continues.
* When failOnError is true: logs a warning and throws to abort the pipeline.
*
* @param description Human-readable description of the failed operation
* @param e The exception that caused the failure
* @throws RuntimeException if failOnError is true
*/
private void handleNotificationError(String description, Exception e) {
def msg = "Slack plugin: Failed to ${description}: ${e.message}"
log.warn msg
if (config?.failOnError) {
throw new RuntimeException(msg, e)
}
}

/**
* Check if the observer is properly configured
*/
Expand Down
24 changes: 11 additions & 13 deletions src/main/groovy/nextflow/slack/WebhookSlackSender.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import groovy.util.logging.Slf4j
class WebhookSlackSender implements SlackSender {

private final String webhookUrl
private final Set<String> loggedErrors = Collections.synchronizedSet(new HashSet<String>())

/**
* Create a new WebhookSlackSender with the given webhook URL
Expand All @@ -44,15 +43,18 @@ class WebhookSlackSender implements SlackSender {
}

/**
* Send a message to Slack webhook
* Send a message to Slack webhook.
* Throws a RuntimeException if the message cannot be delivered.
*
* @param message JSON message payload
* @throws RuntimeException if the webhook call fails
*/
@Override
void sendMessage(String message) {
HttpURLConnection connection = null
try {
def url = new URL(webhookUrl)
def connection = url.openConnection() as HttpURLConnection
connection = url.openConnection() as HttpURLConnection
connection.requestMethod = 'POST'
connection.doOutput = true
connection.setRequestProperty('Content-type', 'application/json')
Expand All @@ -65,18 +67,14 @@ class WebhookSlackSender implements SlackSender {
def responseCode = connection.responseCode
if (responseCode != 200) {
def errorBody = connection.errorStream?.text ?: ""
def errorMsg = "Slack webhook HTTP ${responseCode}: ${errorBody}".toString()
if (loggedErrors.add(errorMsg)) {
log.error errorMsg
}
throw new RuntimeException("Slack webhook HTTP ${responseCode}: ${errorBody}")
}

connection.disconnect()
} catch (RuntimeException e) {
throw e
} catch (Exception e) {
def errorMsg = "Slack plugin: Error sending message: ${e.message}".toString()
if (loggedErrors.add(errorMsg)) {
log.error errorMsg
}
throw new RuntimeException("Slack plugin: Error sending webhook message: ${e.message}", e)
} finally {
connection?.disconnect()
}
}

Expand Down
Loading
Loading