Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9f5cca9
[backend/frontend] Rework
Dimfacion Oct 17, 2025
efdd545
[backend] Batching inject trace execution
Dimfacion Oct 17, 2025
a855e0c
[backend] Batching Execution Traces
Dimfacion Nov 13, 2025
722267d
[backend] Batching Execution Traces
Dimfacion Nov 17, 2025
53bb1ad
[backend] Batching Execution Traces
Dimfacion Nov 17, 2025
f2dc738
[backend] Batching Execution Traces
Dimfacion Nov 17, 2025
9214173
[backend] Batching Execution Traces
Dimfacion Nov 17, 2025
eb2bf5d
[backend] Batching Execution Traces
Dimfacion Nov 18, 2025
2c76764
[backend] Batching Execution Traces
Dimfacion Nov 20, 2025
02a5a71
[backend] Batching Execution Traces
Dimfacion Nov 20, 2025
9e33f7f
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 23, 2025
4b14622
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 23, 2025
f6bee2c
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 23, 2025
54b5a9b
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 23, 2025
69ebb5d
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 23, 2025
fcbdd65
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 23, 2025
e469576
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 23, 2025
4944505
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 24, 2025
60dab13
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 24, 2025
d18828d
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 25, 2025
134b9b6
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 25, 2025
263077f
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 25, 2025
8312fbb
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 25, 2025
a150613
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 25, 2025
5220596
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 25, 2025
2b04268
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 25, 2025
cb5f227
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 25, 2025
9c65d77
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 26, 2025
d10d089
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 26, 2025
ed7bddb
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 26, 2025
b21c6f2
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 26, 2025
5b152d0
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 27, 2025
9c39c90
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 27, 2025
eff6a6b
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 27, 2025
dbf1efb
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 27, 2025
0e56c4d
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Nov 28, 2025
4daa960
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Dec 2, 2025
556ef91
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Dec 2, 2025
83ad9d2
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Dec 3, 2025
02b6635
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Dec 3, 2025
7389f9c
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Dec 3, 2025
8cfdb14
[backend] feat(engine): Batching injects traces (#2895)
Dimfacion Dec 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ steps:
MINIO_ENDPOINT: minio
MINIO_PORT: 9000
ENGINE_URL: http://elastic:9200
OPENBAS_RABBITMQ_HOSTNAME: rabbitmq
commands:
- mvn spotless:check
- sleep 60
Expand Down Expand Up @@ -53,6 +54,7 @@ steps:
OPENAEV_ADMIN_PASSWORD: admin
OPENAEV_ADMIN_TOKEN: 0d17ce9a-f3a8-4c6d-9721-c98dc3dc023f
SPRING_PROFILES_ACTIVE: ci
OPENBAS_RABBITMQ_HOSTNAME: rabbitmq-e2e
commands:
- apt update && apt install -y gnupg
- curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add -
Expand Down Expand Up @@ -177,6 +179,13 @@ services:
POSTGRES_USER: openaev
POSTGRES_PASSWORD: openaev
POSTGRES_DB: openaev
commands:
- docker-entrypoint.sh -c 'max_connections=500'
- name: rabbitmq
image: rabbitmq:4.1-management
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
- name: minio-e2e
image: minio/minio:RELEASE.2025-06-13T11-33-47Z
environment:
Expand All @@ -189,6 +198,13 @@ services:
POSTGRES_USER: openaev
POSTGRES_PASSWORD: openaev
POSTGRES_DB: openaev
commands:
- docker-entrypoint.sh -c 'max_connections=500'
- name: rabbitmq-e2e
image: rabbitmq:4.1-management
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
- name: elastic
image: docker.elastic.co/elasticsearch/elasticsearch:8.18.3
environment:
Expand Down
20 changes: 19 additions & 1 deletion openaev-api/src/main/java/io/openaev/config/CachingConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,41 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Duration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
@EnableCaching
@Slf4j
public class CachingConfig {

@Bean
public CacheManager cacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager("license");
/**
* Creating some cache : - license is for the EE license - global for global settings that do
* not need to be fetched from the DB everytime we need it (like features flags) - adminUsers is
* a low retention cache for users that are admin. This is useful when receiving a lot of calls.
* Execution traces for instance can receive several thousands a sec and not fetching the user
* everytime helps for the RBAC
*/
CaffeineCacheManager cacheManager = new CaffeineCacheManager("license", "global", "adminUsers");

cacheManager.setCaffeine(
Caffeine.newBuilder().expireAfterWrite(Duration.ofDays(1)).maximumSize(100));

return cacheManager;
}

/** Emptying the cache every second to avoid old data on the admin users being persisted */
@CacheEvict(value = "adminUsers", allEntries = true)
@Scheduled(fixedRateString = "1000")
public void emptyAdminUsersCache() {
log.info("emptying admin users cache");
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
package io.openaev.config;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
@Slf4j
public class ThreadPoolTaskSchedulerConfig {

@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(20);
threadPoolTaskScheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
threadPoolTaskScheduler.setErrorHandler(
t -> log.error("Error during scheduled task : {}", t.getMessage(), t));
return threadPoolTaskScheduler;
}

/** Dedicated executor for stream events */
@Bean(name = "streamExecutor")
public Executor streamExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Stream-");

// If we have more event to deal with than the available size in the waiting queue, we discard
// the oldest to prevent overloading the stream. This also helps a little preventing
// overloading the tab of a user connected when having a lot of events
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.openaev.migration;

import java.sql.Statement;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.springframework.stereotype.Component;

@Component
public class V4_53__Convert_expectations_to_jsonb extends BaseJavaMigration {

@Override
public boolean canExecuteInTransaction() {
return false;
}

@Override
public void migrate(Context context) throws Exception {
try (Statement select = context.getConnection().createStatement()) {
select.execute(
"""
ALTER TABLE injects_expectations
ALTER COLUMN inject_expectation_signatures
TYPE jsonb
USING inject_expectation_signatures::jsonb;
-- Transform Foreign Key to deferrable key
ALTER TABLE execution_traces
DROP CONSTRAINT execution_traces_execution_agent_id_fkey,
DROP CONSTRAINT execution_traces_execution_inject_status_id_fkey,
DROP CONSTRAINT execution_traces_execution_inject_test_status_id_fkey;

ALTER TABLE execution_traces
ADD CONSTRAINT execution_traces_execution_inject_status_id_fkey
FOREIGN KEY (execution_inject_status_id)
REFERENCES injects_statuses(status_id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED,

ADD CONSTRAINT execution_traces_execution_inject_test_status_id_fkey
FOREIGN KEY (execution_inject_test_status_id)
REFERENCES injects_tests_statuses(status_id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED,

ADD CONSTRAINT execution_traces_execution_agent_id_fkey
FOREIGN KEY (execution_agent_id)
REFERENCES agents(agent_id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED;
""");
select.execute(
"""
CREATE INDEX CONCURRENTLY idx_injects_expectations_inject_agent
ON injects_expectations(inject_id, agent_id);
""");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -91,71 +90,25 @@ public void deleteFinding(@NotNull final String id) {
*/
public void buildFinding(
Inject inject, Asset asset, ContractOutputElement contractOutputElement, String finalValue) {
try {
Optional<Finding> optionalFinding =
findingRepository.findByInjectIdAndValueAndTypeAndKey(
inject.getId(),
finalValue,
contractOutputElement.getType(),
contractOutputElement.getKey());

Finding finding =
optionalFinding.orElseGet(
() -> {
Finding newFinding = new Finding();
newFinding.setInject(inject);
newFinding.setField(contractOutputElement.getKey());
newFinding.setType(contractOutputElement.getType());
newFinding.setValue(finalValue);
newFinding.setName(contractOutputElement.getName());
newFinding.setTags(new HashSet<>(contractOutputElement.getTags()));
return newFinding;
});

boolean isNewAsset =
finding.getAssets().stream().noneMatch(a -> a.getId().equals(asset.getId()));

if (isNewAsset) {
finding.getAssets().add(asset);
}

if (optionalFinding.isEmpty() || isNewAsset) {
findingRepository.save(finding);
}

} catch (DataIntegrityViolationException ex) {
log.info(
String.format(
"Race condition: finding already exists. Retrying ... %s ", ex.getMessage()),
ex);
// Re-fetch and try to add the asset
handleRaceCondition(inject, asset, contractOutputElement, finalValue);
}
}

private void handleRaceCondition(
Inject inject, Asset asset, ContractOutputElement contractOutputElement, String finalValue) {
Optional<Finding> retryFinding =
findingRepository.findByInjectIdAndValueAndTypeAndKey(
inject.getId(),
finalValue,
contractOutputElement.getType(),
contractOutputElement.getKey());

if (retryFinding.isPresent()) {
Finding existingFinding = retryFinding.get();
boolean isNewAsset =
existingFinding.getAssets().stream().noneMatch(a -> a.getId().equals(asset.getId()));
if (isNewAsset) {
existingFinding.getAssets().add(asset);
findingRepository.save(existingFinding);
}
} else {
log.warn("Retry failed: Finding still not found after race condition.");
}
String[] tagIds =
contractOutputElement.getTags().isEmpty()
? new String[0]
: contractOutputElement.getTags().stream().map(Tag::getId).toArray(String[]::new);

// Save or update the finding and add or update the list of assets and/or tags
findingRepository.saveCompleteFinding(
contractOutputElement.getKey(),
contractOutputElement.getType().name(),
finalValue,
new String[0],
inject.getId(),
contractOutputElement.getName(),
asset.getId(),
tagIds);
}

// -- Extract findings from strctured output : Here we compute the findings from structured output
// -- Extract findings from structured output : Here we compute the findings from structured
// output
// from ExecutionInjectInput sent by injectors
// This structured output is generated based on injectorcontract where we can find the node
// Outputs and with that the injector generate this structure output--
Expand Down
Loading