Skip to content

KAFKA-470: Add stateless field decryption and add MongoDB FLE support#204

Open
mdb-gtenrreiro wants to merge 30 commits intomasterfrom
KAFKA-470
Open

KAFKA-470: Add stateless field decryption and add MongoDB FLE support#204
mdb-gtenrreiro wants to merge 30 commits intomasterfrom
KAFKA-470

Conversation

@mdb-gtenrreiro
Copy link
Collaborator

No description provided.

Implement two encryption features for Oracle-to-MongoDB migration:

1. Pluggable Field Decryption
   - New FieldValueTransformer interface for custom decryption logic
   - FieldValueTransformPostProcessor recursively transforms configured fields
   - SampleAesFieldValueTransformer as reference implementation
   - Configuration: field.value.transformer, field.value.transformer.fields, etc.
   - Auto-registers in PostProcessor chain when configured

2. Client-Side Field Level Encryption (CS-FLE)
   - Enable MongoDB native CS-FLE in Sink Connector
   - MongoSinkTask builds AutoEncryptionSettings from configuration
   - Configuration: csfle.enabled, csfle.key.vault.namespace, csfle.local.master.key, csfle.schema.map
   - Added mongodb-crypt:5.6.4 dependency

Test Coverage:
- 28 new tests (all passing, 645 total)
- 15 tests for FieldValueTransformPostProcessor
- 8 tests for SampleAesFieldValueTransformer
- 8 tests for CS-FLE configuration
- 3 integration tests for PostProcessor chain

Demo:
- Comprehensive demo in demos/field-decryption-and-csfle/
- Automated run-demo.sh script
- Shows Oracle AES decryption + MongoDB CS-FLE re-encryption
- Uses Confluent Platform images for x86_64 compatibility
- Includes detailed README and implementation summary

Files Modified:
- build.gradle.kts
- MongoSinkConfig.java
- MongoSinkTopicConfig.java
- MongoSinkTask.java
- PostProcessors.java
- MongoSinkConfigTest.java

Files Added:
- FieldValueTransformer.java (interface)
- FieldValueTransformPostProcessor.java
- SampleAesFieldValueTransformer.java
- FieldValueTransformPostProcessorTest.java
- SampleAesFieldValueTransformerTest.java
- MongoSinkTaskCsfleTest.java
- demos/field-decryption-and-csfle/ (complete demo)
@mdb-gtenrreiro mdb-gtenrreiro requested a review from a team as a code owner March 10, 2026 13:42
@mdb-gtenrreiro mdb-gtenrreiro marked this pull request as draft March 10, 2026 13:42
@mdb-gtenrreiro mdb-gtenrreiro marked this pull request as ready for review March 12, 2026 00:43
CSFLE_KEY_VAULT_NAMESPACE_DISPLAY);
configDef.define(
CSFLE_LOCAL_MASTER_KEY_CONFIG,
Type.STRING,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use Type.Password here instead

From MSK's docs:

To prevent secrets from appearing in connector log files, a plugin developer must use the Kafka Connect enum constant ConfigDef.Type.PASSWORD to define sensitive properties. When a property is type ConfigDef.Type.PASSWORD, Kafka Connect excludes its value from connector logs even if the value is sent as plaintext.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

@@ -0,0 +1,484 @@
Design Proposal - MongoDB Kafka Sink Connector
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

build.gradle.kts Outdated
Comment on lines +122 to +123
options.isDebug = true
options.debugOptions.debugLevel = "source,lines,vars"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove these

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

implementation("org.apache.avro:avro:${project.extra["avroVersion"]}")

mongoAndAvroDependencies("org.mongodb:mongodb-driver-sync:${project.extra["mongodbDriverVersion"]}")
mongoAndAvroDependencies("org.mongodb:mongodb-crypt:1.11.0")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for using an older version? Can we use the latest here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

11.1.0 seems to be the latest version that is compatible with mongodb driver 4.7

Copy link
Collaborator

@arahmanan arahmanan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still going through this 😅 Sharing some more comments for now

@@ -0,0 +1,123 @@
# Implementation Summary
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like an outdated duplicate of the README. Should we remove it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed that file and added an implementation section to the README.md. Not sure we need that but it seemed helpful.

info "Cleanup commands:"
echo " curl -X DELETE $CONNECT_URL/connectors/enc-source"
echo " curl -X DELETE $CONNECT_URL/connectors/csfle-sink"
echo " cd demo && docker compose down -v" No newline at end of file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[opt] newline at the end of this file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a new line

}

@VisibleForTesting(otherwise = PRIVATE)
static AutoEncryptionSettings buildAutoEncryptionSettings(final MongoSinkConfig sinkConfig) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add an integration test that confirms we're encrypting the correct fields? Maybe the demos can be turned into integration tests

Copy link
Collaborator Author

@mdb-gtenrreiro mdb-gtenrreiro Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for this; I removed the demos.

I personally think demo examples can be helpful for complex functionality like this, but I replaced them with tests to address the review feedback.

super(config);
this.transformer = createTransformer(config);
String fieldsCsv = config.getString(FIELD_VALUE_TRANSFORMER_FIELDS_CONFIG);
this.targetFields = new HashSet<>(Arrays.asList(fieldsCsv.split("\\s*,\\s*")));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a unit test to make sure this works properly with more than 2 fields and subfields? It might actually be best to have an integration test that confirms we properly support encrypted subfields

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests

this.transformer = createTransformer(config);
String fieldsCsv = config.getString(FIELD_VALUE_TRANSFORMER_FIELDS_CONFIG);
this.targetFields = new HashSet<>(Arrays.asList(fieldsCsv.split("\\s*,\\s*")));
this.failOnError = config.getBoolean(FIELD_VALUE_TRANSFORMER_FAIL_ON_ERROR_CONFIG);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have mongo.errors.tolerance (docs). Should we use that instead?
We should also test this out with a dead letter queue setup and make sure we aren't adding unencrypted data to the DLQ. E.g. what happens if we successfully decrypt the first field and fail to decrypt the second one? Would we end up sending to the DLQ and unencrypted field?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right about the "tolerance" config. I removed the redundant configuration change.

The messages going the DLQ were already free from unencrypted data. I added a new integration test to make sure not data is leaked.

.keyVaultNamespace(keyVaultNamespace)
.kmsProviders(kmsProviders)
.extraOptions(extraOptions)
.bypassQueryAnalysis(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disables automatic analysis of outgoing commands. Specify true to use explicit encryption without the Automatic Encryption Shared Library.

Does this mean we need to explicitly encrypt? I guess the integration tests I suggested adding will confirm if this is an issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It had to be explicit before; but now I ve changed it so that it is configurable so that when:

bypassQueryAnalysis=false

automatic encryptions works correctly

and when:

bypassQueryAnalysis=true

automatic encryption is DISABLED and it would need explicit encryption.

- Consolidated implementation details into the main demo README
- Added architecture details for Field Decryption and CS-FLE
- Included configuration properties reference
- Added known limitations section
- Removed redundant IMPLEMENTATION-SUMMARY.md file
private String encryptAes(final String plaintext) {
try {
SecretKeySpec secretKey = new SecretKeySpec(AES_KEY.getBytes(StandardCharsets.UTF_8), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
Copy link

@semgrep-code-mongodb semgrep-code-mongodb bot Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of AES with ECB mode detected. ECB doesn't provide message confidentiality and is not semantically secure so should not be used. Instead, use a strong, secure cipher: Cipher.getInstance("AES/CBC/PKCS7PADDING"). See https://owasp.org/www-community/Using_the_Java_Cryptographic_Extensions for more information.

🎉 Removed in commit 196e587 🎉

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/other This is test-only code used to simulate legacy AES encryption for integration testing and demos. The ECB mode is intentionally used to demonstrate decrypting data from legacy systems that may use insecure encryption. This code is never used in production - it only exists in integration tests and demo scripts to create sample encrypted data that the connector then decrypts.

private String encryptAes(final String plaintext) {
try {
SecretKeySpec secretKey = new SecretKeySpec(AES_KEY.getBytes(StandardCharsets.UTF_8), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
Copy link

@semgrep-code-mongodb semgrep-code-mongodb bot Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cipher in ECB mode is detected. ECB mode produces the same output for the same input each time which allows an attacker to intercept and replay the data. Further, ECB mode does not provide any integrity checking. See https://find-sec-bugs.github.io/bugs.htm#CIPHER_INTEGRITY.

🚀 Removed in commit 196e587 🚀

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/other This is test-only code used to simulate legacy AES encryption for integration testing and demos. The ECB mode is intentionally used to demonstrate decrypting data from legacy systems that may use insecure encryption. This code is never used in production - it only exists in integration tests and demo scripts to create sample encrypted data that the connector then decrypts.

step "Demo Complete!"

info "To stop the demo:"
echo " cd $SCRIPT_DIR && docker compose down -v"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the demo experience but I'm not sure this should live in this repo, perhaps we could look into moving this into the kafka-edu repo or maybe a net new repo dedicated to demos.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you might be right. Let me ping Product and see what they want.

}
});
// Clean up any stale JMX MBeans from previous test runs
cleanUpMBeans();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this is a driveby that addresses flaky tests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I had issues with flakyness and this seems to quiet it down

Comment on lines +128 to +135
public static final String CSFLE_BYPASS_QUERY_ANALYSIS_CONFIG = "csfle.bypass.query.analysis";
private static final String CSFLE_BYPASS_QUERY_ANALYSIS_DOC =
"Whether to bypass automatic query analysis for CS-FLE. "
+ "When true, automatic encryption based on schema maps is disabled and explicit encryption must be used. "
+ "When false, automatic encryption is enabled but requires mongocryptd or crypt_shared library. "
+ "Default is true to avoid external dependencies.";
private static final boolean CSFLE_BYPASS_QUERY_ANALYSIS_DEFAULT = true;
private static final String CSFLE_BYPASS_QUERY_ANALYSIS_DISPLAY = "CS-FLE bypass query analysis";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about this logic, I didn't see a test that tested this being true either, could you add a test? That may shed some light on this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests here:

@DisplayName("Ensure bypass query analysis disables automatic encryption")

and Here:

@DisplayName("test bypass query analysis defaults to true")


public static final String AES_KEY_CONFIG = "field.value.transformer.aes.key";
public static final String AES_ALGORITHM_CONFIG = "field.value.transformer.aes.algorithm";
private static final String DEFAULT_ALGORITHM = "AES/ECB/PKCS5Padding";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bot detected this in the tests but I think it is probably worth defaulting to a more secure algorithm due to the concerns around ECB that the bot highlighted.

Even though this is a sample it appears to be fully functional so it could still be used technically.

"The key vault namespace for CS-FLE in the format 'database.collection'. "
+ "Example: 'encryption.__keyVault'.";
private static final String CSFLE_KEY_VAULT_NAMESPACE_DEFAULT = "";
private static final String CSFLE_KEY_VAULT_NAMESPACE_DISPLAY = "CS-FLE Key vault namespace";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add validation to make sure these are setup as expected when csfle.enabled is true.
something like:

  if ("true".equalsIgnoreCase(props.getOrDefault(CSFLE_ENABLED_CONFIG, "false"))) {
      if (props.getOrDefault(CSFLE_KEY_VAULT_NAMESPACE_CONFIG, "").isEmpty()) {
          results.get(CSFLE_KEY_VAULT_NAMESPACE_CONFIG)
              .addErrorMessage("csfle.key.vault.namespace must be set when csfle.enabled=true");
      }
      if (props.getOrDefault(CSFLE_LOCAL_MASTER_KEY_CONFIG, "").isEmpty()) {
          results.get(CSFLE_LOCAL_MASTER_KEY_CONFIG)
              .addErrorMessage("csfle.local.master.key must be set when csfle.enabled=true");
      }
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added extra validation

.extraOptions(extraOptions);

if (bypassQueryAnalysis) {
autoEncryptionBuilder.bypassQueryAnalysis(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to set this twice?

e.g. set already here

    if (bypassQueryAnalysis) {
      extraOptions.put("bypassQueryAnalysis", true);
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, remove the redundant one

Comment on lines +92 to +94
byte[] encryptedBytes = Base64.getDecoder().decode(encrypted);
Cipher cipher = Cipher.getInstance(algorithm);
cipher.init(Cipher.DECRYPT_MODE, secretKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This runs for each document, shouldn't this be in the init?

@Calvinnix Calvinnix removed the request for review from mdb-vpurohit March 16, 2026 20:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants