Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 58 additions & 38 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ def _requiredLogstashJar(pathPrefix, jarSpec, flavorSpec = null) {
}
}

static OutputStreamFunneler outputStreamFunneler(File logFile) {
logFile.parentFile.mkdirs()
logFile.delete()
logFile.createNewFile()

return new OutputStreamFunneler(new LazyFileOutputStream(logFile))
}

// https://docs.github.com/en/repositories/working-with-files/using-files/downloading-source-code-archives#source-code-archive-urls
String githubArchivePath(repo, treeish="main", archiveFormat="zip") {
def pathFragment = {
Expand Down Expand Up @@ -203,8 +211,10 @@ task downloadElasticsearchSourceZip(type: Download) {
task unzipDownloadedElasticsearchSourceZip(dependsOn: downloadElasticsearchSourceZip, type: Copy) {
description "extracts Elasticsearch source from a downloaded zip file"

ext.location = "${buildDir}/elasticsearch-source/"

from zipTree(downloadElasticsearchSourceZip.dest)
into "${buildDir}/elasticsearch-source/"
into ext.location
eachFile {
// strip top-level directory
path = path.replaceFirst(/^.+?\//, "")
Expand All @@ -216,15 +226,14 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource

def logFile = project.file("${buildDir}/elasticsearch-build.log")
doFirst {
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
def funneler = outputStreamFunneler(logFile)
standardOutput = funneler.funnelInstance
errorOutput = funneler.funnelInstance
}

def esSource = "${buildDir}/elasticsearch-source/"
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
def esBuildDir = "${esSource}/build"

inputs.dir esSource
outputs.dir esBuildDir

ext.buildRoot = esBuildDir
Expand All @@ -238,7 +247,7 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
ext.module = { moduleName -> localDistroResult.map { "${it}/modules/${moduleName}"} }

workingDir esSource
commandLine "./gradlew", "localDistro"
commandLine "./gradlew", "--stacktrace", "localDistro"

ignoreExitValue true // handled in doLast
doLast {
Expand All @@ -260,20 +269,22 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
task buildElasticsearchLogstashBridge(type: Exec) {
description "builds logstash-bridge lib module"

dependsOn buildElasticsearchLocalDistro
dependsOn unzipDownloadedElasticsearchSourceZip
dependsOn buildElasticsearchLocalDistro // mustRunAfter?

def logFile = project.file("${buildDir}/logstash-bridge-build.log")
doFirst {
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
def funneler = outputStreamFunneler(logFile)
standardOutput = funneler.funnelInstance
errorOutput = funneler.funnelInstance
}

def esSource = "${buildDir}/elasticsearch-source/"
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
def esBuildDir = "${esSource}/build"

inputs.dir esSource
outputs.dir "${esBuildDir}/libs/logstash-bridge"
inputs.dir "${esSource}/libs/logstash-bridge"

outputs.dir("${esSource}/libs/logstash-bridge/build/distributions")

ext.buildRoot = esBuildDir
workingDir esSource
Expand All @@ -295,6 +306,28 @@ task buildElasticsearchLogstashBridge(type: Exec) {
}
}

def ingestGeoipPluginShadeNamespace = "org.elasticsearch.ingest.geoip.shaded"

/**
* The StableBridge exposes GeoIP plugin internals, so it needs to relocate references to
* its bundled dependencies to match the shaded locations in our import of that plugin.
*/
task shadeElasticsearchStableBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades Maxmind dependencies"

dependsOn buildElasticsearchLogstashBridge

from(buildElasticsearchLogstashBridge)

archiveFileName = "logstash-stable-bridge-shaded.jar"
destinationDirectory = file("${buildDir}/shaded")

relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")

mergeServiceFiles()
}

task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades embedded dependencies of the Elasticsearch Ingest GeoIP module"

Expand All @@ -305,15 +338,16 @@ task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugi
archiveFileName = 'ingest-geoip-shaded.jar'
destinationDirectory = file("${buildDir}/shaded")

mergeServiceFiles()
relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")

String shadeNamespace = "org.elasticsearch.ingest.geoip.shaded"
relocate('com.fasterxml.jackson', "${shadeNamespace}.com.fasterxml.jackson")
relocate('com.maxmind', "${shadeNamespace}.com.maxmind")
mergeServiceFiles()

exclude '**/module-info.class'
}

def ingestGrokPluginShadeNamespace = "org.elasticsearch.grok.shaded"

task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades embedded dependencies of the Elasticsearch Grok implementation"

Expand All @@ -329,13 +363,16 @@ task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plug
destinationDirectory = file("${buildDir}/shaded")

mergeServiceFiles()
String shadeNamespace = "org.elasticsearch.grok.shaded"
relocate('org.joni', "${shadeNamespace}.org.joni")
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")

exclude '**/module-info.class'
}

/**
* The x-pack redact plugin reaches into the grok plugin's implementation, so
* they both need to point to the same relocated shaded components.
*/
task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades Elasticsearch Redact plugin to reference Grok's shaded dependencies"
dependsOn buildElasticsearchLocalDistro
Expand All @@ -347,37 +384,20 @@ task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.sh
destinationDirectory = file("${buildDir}/shaded")

// relocate elasticsearch-grok's dependencies to match
String shadeNamespace = "org.elasticsearch.grok.shaded"
relocate('org.joni', "${shadeNamespace}.org.joni")
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")

exclude '**/module-info.class'
}

task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades the Elasticsearch logstash-bridge jar"

dependsOn buildElasticsearchLogstashBridge

from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") {
include "elasticsearch-logstash-bridge-*.jar"
}

archiveFileName = "elasticsearch-logstash-bridge-shaded.jar"
destinationDirectory = file("${buildDir}/shaded")

exclude '**/module-info.class'
}

task importMinimalElasticsearch() {
description "Imports minimal portions of Elasticsearch localDistro"

dependsOn buildElasticsearchLocalDistro
dependsOn buildElasticsearchLogstashBridge
dependsOn shadeElasticsearchStableBridge
dependsOn shadeElasticsearchIngestGeoIpModule
dependsOn shadeElasticsearchGrokImplementation
dependsOn shadeElasticsearchRedactPlugin
dependsOn shadeElasticsearchLogstashBridge

ext.jars = "${buildDir}/elasticsearch-minimal-jars"

Expand All @@ -396,7 +416,7 @@ task importMinimalElasticsearch() {
include jarPackageNamed("lucene-core")
include jarPackageNamed("lucene-analysis-common")
}
from(shadeElasticsearchLogstashBridge)
from(shadeElasticsearchStableBridge.outputs.files.singleFile)
from(shadeElasticsearchGrokImplementation)
from(buildElasticsearchLocalDistro.module("x-pack-core"))

Expand Down
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
LOGSTASH_PATH=../../logstash
ELASTICSEARCH_TREEISH=main
ELASTICSEARCH_REPO=mashhurs/elasticsearch
ELASTICSEARCH_TREEISH=logstash-bridge-geoip-interfaces
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this will be removed once upstream PR is merged.

4 changes: 2 additions & 2 deletions lib/logstash/filters/elastic_integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,11 @@ def _elasticsearch_rest_client(config, &builder_interceptor)

def initialize_event_processor!
java_import('co.elastic.logstash.filters.elasticintegration.EventProcessorBuilder')
java_import('co.elastic.logstash.filters.elasticintegration.geoip.GeoIpProcessorFactory')
java_import('org.elasticsearch.logstashbridge.geoip.GeoIpProcessorFactoryBridge')

@event_processor = EventProcessorBuilder.fromElasticsearch(@elasticsearch_rest_client, extract_immutable_config)
.setFilterMatchListener(method(:filter_matched_java).to_proc)
.addProcessor("geoip") { GeoIpProcessorFactory.new(@geoip_database_provider) }
.addProcessor("geoip") { GeoIpProcessorFactoryBridge::create(@geoip_database_provider) }
.build(@plugin_context)
rescue => exception
raise_config_error!("configuration did not produce an EventProcessor: #{exception}")
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rootProject.name = 'logstash-filter-elastic_integration'
rootProject.name = 'logstash-filter-elastic_integration'
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge;

import java.util.Optional;

Expand All @@ -24,7 +24,7 @@
* that retrieves pipelines from Elasticsearch.
*/
public class ElasticsearchPipelineConfigurationResolver
extends AbstractSimpleResolver<String,PipelineConfiguration>
extends AbstractSimpleResolver<String, PipelineConfigurationBridge>
implements PipelineConfigurationResolver {
private final RestClient elasticsearchRestClient;
private final PipelineConfigurationFactory pipelineConfigurationFactory;
Expand All @@ -37,13 +37,13 @@ public ElasticsearchPipelineConfigurationResolver(final RestClient elasticsearch
}

@Override
public Optional<PipelineConfiguration> resolveSafely(String pipelineName) throws Exception {
public Optional<PipelineConfigurationBridge> resolveSafely(String pipelineName) throws Exception {
final Response response;
try {
final Request request = new Request("GET", URLEncodedUtils.formatSegments("_ingest", "pipeline", pipelineName));
response = elasticsearchRestClient.performRequest(request);
final String jsonEncodedPayload = EntityUtils.toString(response.getEntity());
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
final PipelineConfigurationBridge pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
return Optional.of(pipelineConfiguration);
} catch (ResponseException re) {
if (re.getResponse().getStatusLine().getStatusCode() == 404) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.LogstashInternalBridge;
import org.elasticsearch.ingest.common.FailProcessorException;
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge;
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -32,7 +31,6 @@

import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.eventAsMap;
import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.serializeEventForLog;
import static org.elasticsearch.core.Strings.format;

/**
* An {@link EventProcessor} processes {@link Event}s by:
Expand Down Expand Up @@ -94,7 +92,7 @@ public Collection<Event> processEvents(final Collection<Event> incomingEvents) t
final CountDownLatch latch = new CountDownLatch(1);
final IntegrationBatch batch = new IntegrationBatch(incomingEvents);

try (RefCountingRunnable ref = new RefCountingRunnable(latch::countDown)) {
try (RefCountingRunnableBridge ref = RefCountingRunnableBridge.create(latch::countDown)) {
batch.eachRequest(ref::acquire, this::processRequest);
}

Expand Down Expand Up @@ -151,7 +149,7 @@ void processRequest(final IntegrationRequest request) {

final IngestPipeline ingestPipeline = loadedPipeline.get();
LOGGER.trace(() -> String.format("Using loaded pipeline `%s` (%s)", pipelineName, System.identityHashCode(ingestPipeline)));
final IngestDocument ingestDocument = eventMarshaller.toIngestDocument(request.event());
final IngestDocumentBridge ingestDocument = eventMarshaller.toIngestDocument(request.event());

resolvedIndexName.ifPresent(indexName -> {
ingestDocument.getMetadata().setIndex(indexName);
Expand All @@ -170,19 +168,18 @@ void processRequest(final IntegrationRequest request) {
}
}

private void executePipeline(final IngestDocument ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
private void executePipeline(final IngestDocumentBridge ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
final String pipelineName = ingestPipeline.getId();
final String originalIndex = ingestDocument.getMetadata().getIndex();
ingestPipeline.execute(ingestDocument, (resultIngestDocument, ingestPipelineException) -> {
// If no exception, then the original event is to be _replaced_ by the result
if (Objects.nonNull(ingestPipelineException)) {
// If we had an exception in the IngestPipeline, tag and emit the original Event
final Throwable unwrappedException = unwrapException(ingestPipelineException);
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), unwrappedException);
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), ingestPipelineException);
request.complete(incomingEvent -> {
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of(
"message", unwrappedException.getMessage(),
"exception", unwrappedException.getClass().getName()
"message", ingestPipelineException.getMessage(),
"exception", ingestPipelineException.getClass().getName()
));
});
} else if (Objects.isNull(resultIngestDocument)) {
Expand All @@ -193,17 +190,17 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
} else {

final String newIndex = resultIngestDocument.getMetadata().getIndex();
if (!Objects.equals(originalIndex, newIndex) && LogstashInternalBridge.isReroute(resultIngestDocument)) {
LogstashInternalBridge.resetReroute(resultIngestDocument);
if (!Objects.equals(originalIndex, newIndex) && ingestDocument.isReroute()) {
ingestDocument.resetReroute();
boolean cycle = !resultIngestDocument.updateIndexHistory(newIndex);
if (cycle) {
request.complete(incomingEvent -> {
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
"index cycle detected while processing pipeline [%s]: %s + %s",
pipelineName,
resultIngestDocument.getIndexHistory(),
newIndex
)));
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message",
String.format(Locale.ROOT, "index cycle detected while processing pipeline [%s]: %s + %s",
pipelineName,
resultIngestDocument.getIndexHistory(),
newIndex)
));
});
return;
}
Expand All @@ -214,12 +211,14 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
final Optional<IngestPipeline> reroutePipeline = resolve(reroutePipelineName.get(), internalPipelineProvider);
if (reroutePipeline.isEmpty()) {
request.complete(incomingEvent -> {
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
"reroute failed to load next pipeline [%s]: %s -> %s",
annotateIngestPipelineFailure(
incomingEvent,
pipelineName,
resultIngestDocument.getIndexHistory(),
reroutePipelineName.get()
)));
Map.of("message",
String.format(Locale.ROOT, "reroute failed to load next pipeline [%s]: %s -> %s",
pipelineName,
resultIngestDocument.getIndexHistory(),
reroutePipelineName.get())));
});
} else {
executePipeline(resultIngestDocument, reroutePipeline.get(), request);
Expand Down Expand Up @@ -252,11 +251,6 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin
});
}

static private Throwable unwrapException(final Exception exception) {
if (exception.getCause() instanceof FailProcessorException) { return exception.getCause(); }
return exception;
}

static private String diff(final Event original, final Event changed) {
if (LOGGER.isTraceEnabled()) {
// dot notation less than ideal for LS-internal, but better than re-writing it ourselves.
Expand All @@ -277,6 +271,6 @@ static private <T,R> Optional<R> resolve(T resolvable, Resolver<T,R> resolver) {

@Override
public void close() throws IOException {
IOUtils.closeWhileHandlingException(this.resourcesToClose);
IOUtilsBridge.closeWhileHandlingException(this.resourcesToClose);
}
}
Loading