Skip to content

Commit 8d1ef20

Browse files
committed
feat: ES6 java 客户端示例
1 parent 2f93f74 commit 8d1ef20

File tree

9 files changed

+581
-0
lines changed

9 files changed

+581
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>io.github.dunwu</groupId>
7+
<artifactId>javadb-elasticsearch6</artifactId>
8+
<version>1.0.0</version>
9+
<packaging>jar</packaging>
10+
11+
<properties>
12+
<java.version>1.8</java.version>
13+
<maven.compiler.source>${java.version}</maven.compiler.source>
14+
<maven.compiler.target>${java.version}</maven.compiler.target>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.elasticsearch.client</groupId>
22+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
23+
<version>6.4.3</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.projectlombok</groupId>
27+
<artifactId>lombok</artifactId>
28+
<version>1.18.22</version>
29+
</dependency>
30+
<dependency>
31+
<groupId>cn.hutool</groupId>
32+
<artifactId>hutool-all</artifactId>
33+
<version>5.7.20</version>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>ch.qos.logback</groupId>
38+
<artifactId>logback-classic</artifactId>
39+
<version>1.2.10</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.logging.log4j</groupId>
43+
<artifactId>log4j-to-slf4j</artifactId>
44+
<version>2.17.1</version>
45+
</dependency>
46+
</dependencies>
47+
48+
</project>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.github.dunwu.javadb.elasticsearch;
2+
3+
import io.github.dunwu.javadb.elasticsearch.entity.User;
4+
import io.github.dunwu.javadb.elasticsearch.mapper.UserEsMapper;
5+
6+
import java.io.IOException;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
10+
11+
public class Demo {
12+
public static void main(String[] args) throws IOException, InterruptedException {
13+
UserEsMapper mapper = new UserEsMapper();
14+
15+
System.out.println("索引是否存在:" + mapper.isIndexExists());
16+
17+
User jack = User.builder().id(1L).username("jack").age(18).build();
18+
User tom = User.builder().id(2L).username("tom").age(20).build();
19+
List<User> users = Arrays.asList(jack, tom);
20+
21+
System.out.println("批量插入:" + mapper.batchInsert(users));
22+
System.out.println("根据ID查询:" + mapper.getById("1").toString());
23+
System.out.println("根据ID查询:" + mapper.pojoById("2").toString());
24+
System.out.println("根据ID批量查询:" + mapper.pojoListByIds(Arrays.asList("1", "2")).toString());
25+
26+
Thread.sleep(1000);
27+
System.out.println("根据ID批量删除:" + mapper.deleteByIds(Arrays.asList("1", "2")));
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.github.dunwu.javadb.elasticsearch.entity;
2+
3+
/**
4+
* ES 实体接口
5+
*
6+
* @author <a href="mailto:[email protected]">Zhang Peng</a>
7+
* @since 2023-06-28
8+
*/
9+
public interface EsEntity {
10+
Long getId();
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.github.dunwu.javadb.elasticsearch.entity;
2+
3+
import lombok.Builder;
4+
import lombok.Data;
5+
6+
/**
7+
* 用户实体
8+
*
9+
* @author <a href="mailto:[email protected]">Zhang Peng</a>
10+
* @since 2023-06-28
11+
*/
12+
@Data
13+
@Builder
14+
public class User implements EsEntity {
15+
private Long id;
16+
private String username;
17+
private String password;
18+
private Integer age;
19+
private String email;
20+
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package io.github.dunwu.javadb.elasticsearch.mapper;
2+
3+
import cn.hutool.core.bean.BeanUtil;
4+
import cn.hutool.core.bean.copier.CopyOptions;
5+
import cn.hutool.core.collection.CollectionUtil;
6+
import cn.hutool.core.io.IoUtil;
7+
import cn.hutool.core.lang.Assert;
8+
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
9+
import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
13+
import org.elasticsearch.action.bulk.BackoffPolicy;
14+
import org.elasticsearch.action.bulk.BulkProcessor;
15+
import org.elasticsearch.action.bulk.BulkRequest;
16+
import org.elasticsearch.action.bulk.BulkResponse;
17+
import org.elasticsearch.action.delete.DeleteRequest;
18+
import org.elasticsearch.action.get.MultiGetItemResponse;
19+
import org.elasticsearch.action.get.MultiGetRequest;
20+
import org.elasticsearch.action.get.MultiGetResponse;
21+
import org.elasticsearch.action.index.IndexRequest;
22+
import org.elasticsearch.action.index.IndexResponse;
23+
import org.elasticsearch.action.search.SearchRequest;
24+
import org.elasticsearch.action.search.SearchResponse;
25+
import org.elasticsearch.client.IndicesClient;
26+
import org.elasticsearch.client.RequestOptions;
27+
import org.elasticsearch.client.Requests;
28+
import org.elasticsearch.client.RestHighLevelClient;
29+
import org.elasticsearch.common.unit.ByteSizeUnit;
30+
import org.elasticsearch.common.unit.ByteSizeValue;
31+
import org.elasticsearch.common.unit.TimeValue;
32+
import org.elasticsearch.common.xcontent.XContentBuilder;
33+
import org.elasticsearch.common.xcontent.XContentFactory;
34+
import org.elasticsearch.index.query.QueryBuilder;
35+
import org.elasticsearch.index.query.QueryBuilders;
36+
import org.elasticsearch.search.builder.SearchSourceBuilder;
37+
38+
import java.io.Closeable;
39+
import java.io.IOException;
40+
import java.util.*;
41+
import java.util.concurrent.TimeUnit;
42+
import java.util.function.BiConsumer;
43+
44+
/**
45+
* ES Mapper 基础类
46+
*
47+
* @author <a href="mailto:[email protected]">Zhang Peng</a>
48+
* @date 2023-06-27
49+
*/
50+
@Slf4j
51+
public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T>, Closeable {
52+
53+
public static final String HOSTS = "127.0.0.1:9200";
54+
55+
private BulkProcessor bulkProcessor;
56+
57+
private final RestHighLevelClient restHighLevelClient = ElasticsearchUtil.newRestHighLevelClient(HOSTS);
58+
59+
@Override
60+
public RestHighLevelClient getClient() throws IOException {
61+
Assert.notNull(restHighLevelClient, () -> new IOException("【ES】not connected."));
62+
return restHighLevelClient;
63+
}
64+
65+
66+
@Override
67+
public synchronized BulkProcessor getBulkProcessor() {
68+
if (bulkProcessor == null) {
69+
bulkProcessor = newAsyncBulkProcessor();
70+
}
71+
return bulkProcessor;
72+
}
73+
74+
@Override
75+
public boolean isIndexExists() throws IOException {
76+
IndicesClient indicesClient = getClient().indices();
77+
GetIndexRequest request = new GetIndexRequest();
78+
request.indices(getIndexAlias());
79+
return indicesClient.exists(request, RequestOptions.DEFAULT);
80+
}
81+
82+
@Override
83+
public SearchResponse getById(String id) throws IOException {
84+
SearchRequest searchRequest = Requests.searchRequest(getIndexAlias());
85+
QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(id);
86+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
87+
sourceBuilder.query(queryBuilder);
88+
searchRequest.source(sourceBuilder);
89+
return getClient().search(searchRequest, RequestOptions.DEFAULT);
90+
}
91+
92+
@Override
93+
public T pojoById(String id) throws IOException {
94+
SearchResponse response = getById(id);
95+
if (response == null) {
96+
return null;
97+
}
98+
List<T> list = ElasticsearchUtil.toPojoList(response, getEntityClass());
99+
if (CollectionUtil.isEmpty(list)) {
100+
return null;
101+
}
102+
return list.get(0);
103+
}
104+
105+
@Override
106+
public List<T> pojoListByIds(Collection<String> ids) throws IOException {
107+
108+
if (CollectionUtil.isEmpty(ids)) {
109+
return null;
110+
}
111+
112+
MultiGetRequest request = new MultiGetRequest();
113+
for (String id : ids) {
114+
request.add(new MultiGetRequest.Item(getIndexAlias(), getIndexType(), id));
115+
}
116+
117+
MultiGetResponse multiGetResponse = getClient().mget(request, RequestOptions.DEFAULT);
118+
if (null == multiGetResponse || multiGetResponse.getResponses() == null || multiGetResponse.getResponses().length <= 0) {
119+
return new ArrayList<>();
120+
}
121+
122+
List<T> list = new ArrayList<>();
123+
for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) {
124+
if (itemResponse.isFailed()) {
125+
log.error("通过id获取文档失败", itemResponse.getFailure().getFailure());
126+
} else {
127+
T entity = ElasticsearchUtil.toPojo(itemResponse.getResponse(), getEntityClass());
128+
if (entity != null) {
129+
list.add(entity);
130+
}
131+
}
132+
}
133+
return list;
134+
}
135+
136+
@Override
137+
public String insert(T entity) throws IOException {
138+
Map<String, Object> map = new HashMap<>();
139+
BeanUtil.beanToMap(entity, map, CopyOptions.create().ignoreError());
140+
XContentBuilder builder = XContentFactory.jsonBuilder();
141+
builder.startObject();
142+
for (Map.Entry<String, Object> entry : map.entrySet()) {
143+
String key = entry.getKey();
144+
Object value = entry.getValue();
145+
builder.field(key, value);
146+
}
147+
builder.endObject();
148+
149+
IndexRequest request = Requests.indexRequest(getIndexAlias()).type(getIndexType()).source(builder);
150+
if (entity.getId() != null) {
151+
request.id(entity.getId().toString());
152+
}
153+
154+
IndexResponse response = getClient().index(request, RequestOptions.DEFAULT);
155+
if (response == null) {
156+
return null;
157+
}
158+
return response.getId();
159+
}
160+
161+
@Override
162+
public boolean batchInsert(Collection<T> list) throws IOException {
163+
164+
if (CollectionUtil.isEmpty(list)) {
165+
return true;
166+
}
167+
168+
BulkRequest bulkRequest = new BulkRequest();
169+
for (T entity : list) {
170+
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
171+
IndexRequest request = Requests.indexRequest(getIndexAlias()).type(getIndexType()).source(map);
172+
if (entity.getId() != null) {
173+
request.id(entity.getId().toString());
174+
}
175+
bulkRequest.add(request);
176+
}
177+
178+
BulkResponse response = getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
179+
return !(response == null || response.hasFailures());
180+
}
181+
182+
@Override
183+
public boolean deleteById(String id) throws IOException {
184+
return deleteByIds(Collections.singleton(id));
185+
}
186+
187+
@Override
188+
public boolean deleteByIds(Collection<String> ids) throws IOException {
189+
190+
if (CollectionUtil.isEmpty(ids)) {
191+
return true;
192+
}
193+
194+
BulkRequest bulkRequest = new BulkRequest();
195+
ids.forEach(id -> {
196+
DeleteRequest deleteRequest = Requests.deleteRequest(getIndexAlias()).type(getIndexType()).id(id);
197+
bulkRequest.add(deleteRequest);
198+
});
199+
200+
BulkResponse response = getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
201+
return response != null && !response.hasFailures();
202+
}
203+
204+
private BulkProcessor newAsyncBulkProcessor() {
205+
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
206+
@Override
207+
public void beforeBulk(long executionId, BulkRequest request) {
208+
}
209+
210+
@Override
211+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
212+
if (response.hasFailures()) {
213+
log.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
214+
}
215+
}
216+
217+
@Override
218+
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
219+
}
220+
};
221+
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
222+
bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
223+
// 1000条数据请求执行一次bulk
224+
.setBulkActions(1000)
225+
// 5mb的数据刷新一次bulk
226+
.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
227+
// 并发请求数量, 0不并发, 1并发允许执行
228+
.setConcurrentRequests(2)
229+
// 固定1s必须刷新一次
230+
.setFlushInterval(TimeValue.timeValueMillis(1000L))
231+
// 重试3次,间隔100ms
232+
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(200L), 3)).build();
233+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
234+
try {
235+
bulkProcessor.flush();
236+
bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
237+
} catch (Exception e) {
238+
log.error("Failed to close bulkProcessor", e);
239+
}
240+
log.info("bulkProcessor closed!");
241+
}));
242+
return bulkProcessor;
243+
}
244+
245+
@Override
246+
public void close() {
247+
IoUtil.close(restHighLevelClient);
248+
}
249+
250+
}

0 commit comments

Comments
 (0)