Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions presto-docs/src/main/sphinx/connector/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Property Name Description
``elasticsearch.max-http-connections`` Maximum number of persistent HTTP connections to Elasticsearch.
``elasticsearch.http-thread-count`` Number of threads handling HTTP connections to Elasticsearch.
``elasticsearch.ignore-publish-address`` Whether to ignore the published address and use the configured address.
``case-sensitive-name-matching`` Enable case-sensitive identifier support for schema and column names for the connector.
When disabled, names are matched case-insensitively using lowercase normalization.
Default is ``false``.
============================================= ==============================================================================

``elasticsearch.host``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public enum Security
private boolean ignorePublishAddress;
private boolean verifyHostnames = true;
private Security security;
private boolean caseSensitiveNameMatching;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Default value for caseSensitiveNameMatching is not set.

Initialize caseSensitiveNameMatching to false to clarify its default behavior and prevent ambiguity.

Suggested change
private boolean caseSensitiveNameMatching;
private boolean caseSensitiveNameMatching = false;


@NotNull
public String getHost()
Expand Down Expand Up @@ -323,4 +324,17 @@ public ElasticsearchConfig setSecurity(Security security)
this.security = security;
return this;
}

public boolean isCaseSensitiveNameMatching()
{
return caseSensitiveNameMatching;
}

@Config("case-sensitive-name-matching")
@ConfigDescription("Enable case-sensitive matching. When disabled, names are matched case-insensitively using lowercase normalization.")
public ElasticsearchConfig setCaseSensitiveNameMatching(boolean caseSensitiveNameMatching)
{
this.caseSensitiveNameMatching = caseSensitiveNameMatching;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class ElasticsearchMetadata
private final ElasticsearchClient client;
private final String schemaName;
private final Type ipAddressType;
private final boolean caseSensitiveNameMatching;

@Inject
public ElasticsearchMetadata(TypeManager typeManager, ElasticsearchClient client, ElasticsearchConfig config)
Expand All @@ -95,6 +96,7 @@ public ElasticsearchMetadata(TypeManager typeManager, ElasticsearchClient client
this.client = requireNonNull(client, "client is null");
requireNonNull(config, "config is null");
this.schemaName = config.getDefaultSchema();
this.caseSensitiveNameMatching = config.isCaseSensitiveNameMatching();

Type jsonType = typeManager.getType(new TypeSignature(StandardTypes.JSON));
queryResultColumnMetadata = ColumnMetadata.builder()
Expand Down Expand Up @@ -203,21 +205,21 @@ private InternalTableMetadata makeInternalTableMetadata(ConnectorSession session
private InternalTableMetadata makeInternalTableMetadata(ConnectorSession session, String schema, String tableName)
{
IndexMetadata metadata = client.getIndexMetadata(tableName);
List<IndexMetadata.Field> fields = getColumnFields(metadata);
List<IndexMetadata.Field> fields = getColumnFields(session, metadata);
return new InternalTableMetadata(new SchemaTableName(schema, tableName), makeColumnMetadata(session, fields), makeColumnHandles(fields));
}

private List<IndexMetadata.Field> getColumnFields(IndexMetadata metadata)
private List<IndexMetadata.Field> getColumnFields(ConnectorSession session, IndexMetadata metadata)
{
ImmutableList.Builder<IndexMetadata.Field> result = ImmutableList.builder();

Map<String, Long> counts = metadata.getSchema()
.getFields().stream()
.collect(Collectors.groupingBy(f -> f.getName().toLowerCase(ENGLISH), Collectors.counting()));
.collect(Collectors.groupingBy(f -> normalizeIdentifier(session, f.getName()), Collectors.counting()));

for (IndexMetadata.Field field : metadata.getSchema().getFields()) {
Type type = toPrestoType(field);
if (type == null || counts.get(field.getName().toLowerCase(ENGLISH)) > 1) {
if (type == null || counts.get(normalizeIdentifier(session, field.getName())) > 1) {
continue;
}
result.add(field);
Expand Down Expand Up @@ -423,6 +425,12 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
.collect(toImmutableMap(ConnectorTableMetadata::getTable, ConnectorTableMetadata::getColumns));
}

@Override
public String normalizeIdentifier(ConnectorSession session, String identifier)
{
return caseSensitiveNameMatching ? identifier : identifier.toLowerCase(ENGLISH);
}

private static class InternalTableMetadata
{
private final SchemaTableName tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,23 @@ private static void installElasticsearchPlugin(
Map<String, String> extraConnectorProperties)
{
queryRunner.installPlugin(new ElasticsearchPlugin(factory));
Map<String, String> config = ImmutableMap.<String, String>builder()
ImmutableMap.Builder<String, String> config = ImmutableMap.<String, String>builder()
.put("elasticsearch.host", address.getHost())
.put("elasticsearch.port", Integer.toString(address.getPort()))
// Node discovery relies on the publish_address exposed via the Elasticseach API
// This doesn't work well within a docker environment that maps ES's port to a random public port
.put("elasticsearch.ignore-publish-address", "true")
.put("elasticsearch.default-schema-name", TPCH_SCHEMA)
.put("elasticsearch.scroll-size", "1000")
.put("elasticsearch.scroll-timeout", "1m")
.put("elasticsearch.max-hits", "1000000")
.put("elasticsearch.request-timeout", "2m")
.putAll(extraConnectorProperties)
.build();
.putAll(extraConnectorProperties);
if (!extraConnectorProperties.containsKey("elasticsearch.default-schema-name")) {
config.put("elasticsearch.default-schema-name", TPCH_SCHEMA);
}
Map<String, String> newconfig = config.build();

queryRunner.createCatalog("elasticsearch", "elasticsearch", config);
queryRunner.createCatalog("elasticsearch", "elasticsearch", newconfig);
}

private static void loadTpchTopic(RestHighLevelClient client, TestingPrestoClient prestoClient, TpchTable<?> table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public void testDefaults()
.setTruststorePassword(null)
.setVerifyHostnames(true)
.setIgnorePublishAddress(false)
.setSecurity(null));
.setSecurity(null)
.setCaseSensitiveNameMatching(false));
}

@Test
Expand All @@ -79,6 +80,7 @@ public void testExplicitPropertyMappings()
.put("elasticsearch.tls.verify-hostnames", "false")
.put("elasticsearch.ignore-publish-address", "true")
.put("elasticsearch.security", "AWS")
.put("case-sensitive-name-matching", "true")
.build();

ElasticsearchConfig expected = new ElasticsearchConfig()
Expand All @@ -101,7 +103,8 @@ public void testExplicitPropertyMappings()
.setTruststorePassword("truststore-password")
.setVerifyHostnames(false)
.setIgnorePublishAddress(true)
.setSecurity(AWS);
.setSecurity(AWS)
.setCaseSensitiveNameMatching(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.elasticsearch;

import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import io.airlift.tpch.TpchTable;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Map;

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.elasticsearch.ElasticsearchQueryRunner.createElasticsearchQueryRunner;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.facebook.presto.tests.QueryAssertions.assertContains;
import static org.elasticsearch.client.RequestOptions.DEFAULT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@Test
public class TestElasticsearchMixedCaseTest
extends AbstractTestQueryFramework
{
private final String elasticsearchServer = "docker.elastic.co/elasticsearch/elasticsearch:7.17.27";
private ElasticsearchServer elasticsearch;
private RestHighLevelClient client;
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
elasticsearch = new ElasticsearchServer(elasticsearchServer, ImmutableMap.of());
HostAndPort address = elasticsearch.getAddress();
client = new RestHighLevelClient(RestClient.builder(new HttpHost(address.getHost(), address.getPort())));

return createElasticsearchQueryRunner(elasticsearch.getAddress(),
TpchTable.getTables(),
ImmutableMap.of(),
ImmutableMap.of("case-sensitive-name-matching", "true", "elasticsearch.default-schema-name", "MySchema"));
}

@AfterClass(alwaysRun = true)
public final void destroy()
throws IOException
{
elasticsearch.stop();
client.close();
}
private void index(String index, Map<String, Object> document)
throws IOException
{
client.index(new IndexRequest(index, "_doc")
.source(document)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), DEFAULT);
}

@Test
public void testShowColumns()
throws IOException
{
String indexName = "mixed_case";
index(indexName, ImmutableMap.<String, Object>builder()
.put("NAME", "JOHN")
.put("Profession", "Developer")
.put("id", 2)
.put("name", "john")
.build());

MaterializedResult actual = computeActual("SHOW columns FROM MySchema.mixed_case");
assertEquals(actual.getMaterializedRows().get(0).getField(0), "NAME");
assertEquals(actual.getMaterializedRows().get(1).getField(0), "Profession");
assertEquals(actual.getMaterializedRows().get(2).getField(0), "id");
assertEquals(actual.getMaterializedRows().get(3).getField(0), "name");
}

@Test
public void testSelect()
throws IOException
{
String indexName = "mixed_case_select";
index(indexName, ImmutableMap.<String, Object>builder()
.put("NAME", "JOHN")
.put("Profession", "Developer")
.put("name", "john")
.build());

MaterializedResult actualRow = computeActual("SELECT * from MySchema.mixed_case_select");
MaterializedResult expectedRow = resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR)
.row("JOHN", "Developer", "john")
.build();
assertTrue(actualRow.equals(expectedRow));
}

@Test
public void testSchema()
{
MaterializedResult actualRow = computeActual("SHOW schemas from elasticsearch");
MaterializedResult expectedRow = resultBuilder(getSession(), VARCHAR)
.row("MySchema")
.build();
assertContains(actualRow, expectedRow);
}
}
Loading