1616
1717package io .fabric8 .elasticsearch .plugin ;
1818
19+ import java .io .IOException ;
1920import java .util .HashSet ;
2021import java .util .Iterator ;
2122import java .util .Map ;
2223import java .util .Set ;
2324import java .util .concurrent .Callable ;
25+ import java .util .concurrent .ExecutionException ;
2426
2527import org .apache .commons .lang .StringUtils ;
2628import org .apache .logging .log4j .Logger ;
2729import org .elasticsearch .ElasticsearchException ;
30+
2831import org .elasticsearch .action .admin .indices .alias .IndicesAliasesRequestBuilder ;
2932import org .elasticsearch .action .admin .indices .alias .IndicesAliasesResponse ;
3033import org .elasticsearch .action .admin .indices .alias .get .GetAliasesRequestBuilder ;
3134import org .elasticsearch .action .admin .indices .alias .get .GetAliasesResponse ;
35+ import org .elasticsearch .action .admin .indices .create .CreateIndexRequestBuilder ;
36+ import org .elasticsearch .action .admin .indices .create .CreateIndexResponse ;
3237import org .elasticsearch .action .admin .indices .exists .indices .IndicesExistsRequestBuilder ;
3338import org .elasticsearch .action .admin .indices .exists .indices .IndicesExistsResponse ;
39+ import org .elasticsearch .action .admin .indices .get .GetIndexRequestBuilder ;
3440import org .elasticsearch .action .admin .indices .get .GetIndexResponse ;
41+ import org .elasticsearch .action .admin .indices .refresh .RefreshRequestBuilder ;
42+ import org .elasticsearch .action .admin .indices .refresh .RefreshResponse ;
43+ import org .elasticsearch .action .admin .indices .settings .put .UpdateSettingsRequestBuilder ;
44+ import org .elasticsearch .action .admin .indices .settings .put .UpdateSettingsResponse ;
3545import org .elasticsearch .action .delete .DeleteResponse ;
3646import org .elasticsearch .action .get .GetRequestBuilder ;
3747import org .elasticsearch .action .get .GetResponse ;
3848import org .elasticsearch .action .index .IndexRequestBuilder ;
3949import org .elasticsearch .action .index .IndexResponse ;
4050import org .elasticsearch .action .search .SearchResponse ;
51+ import org .elasticsearch .action .update .UpdateRequestBuilder ;
4152import org .elasticsearch .action .update .UpdateResponse ;
4253import org .elasticsearch .client .Client ;
4354import org .elasticsearch .common .logging .Loggers ;
55+ import org .elasticsearch .common .settings .Settings ;
4456import org .elasticsearch .common .util .concurrent .ThreadContext ;
4557import org .elasticsearch .common .util .concurrent .ThreadContext .StoredContext ;
4658import org .elasticsearch .common .xcontent .XContentType ;
@@ -75,10 +87,10 @@ public Object call() throws Exception {
7587 }
7688 return null ;
7789 }
78-
90+
7991 });
8092 }
81-
93+
8294 public void updateDocument (String index , String type , String id , String source ) {
8395 execute (new Callable <Object >() {
8496
@@ -88,35 +100,33 @@ public Object call() throws Exception {
88100 LOGGER .debug ("Updating Document: '{}/{}/{}' source: '{}'" , index , type , id , source );
89101 }
90102 addCommonHeaders ();
91- UpdateResponse response = client .prepareUpdate (index , type , id )
92- .setDoc (source , XContentType .JSON )
93- .setDocAsUpsert (true )
94- .get ();
95-
103+ UpdateResponse response = client .prepareUpdate (index , type , id ).setDoc (source , XContentType .JSON )
104+ .setDocAsUpsert (true ).get ();
105+
96106 if (LOGGER .isDebugEnabled ()) {
97107 LOGGER .debug ("Document Updated: '{}'" , response .status ());
98108 }
99109 return null ;
100110 }
101111 });
102112 }
103-
113+
104114 public SearchResponse search (String index , String type ) {
105- return search (new String []{ index }, new String [] {type });
115+ return search (new String [] { index }, new String [] { type });
106116 }
107-
108- public SearchResponse search (String [] indicies , String [] types ) {
117+
118+ public SearchResponse search (String [] indicies , String [] types ) {
109119 return execute (new Callable <SearchResponse >() {
110120
111121 @ Override
112122 public SearchResponse call () throws Exception {
113123 addCommonHeaders ();
114124 return client .prepareSearch (indicies ).setTypes (types ).get ();
115125 }
116-
126+
117127 });
118128 }
119-
129+
120130 public GetIndexResponse getIndex (String ... indicies ) {
121131 return execute (new Callable <GetIndexResponse >() {
122132 @ Override
@@ -126,20 +136,31 @@ public GetIndexResponse call() throws Exception {
126136 }
127137 });
128138 }
129-
139+
130140 public GetResponse getDocument (String index , String type , String id ) {
131141 return execute (new Callable <GetResponse >() {
132142
133143 @ Override
134144 public GetResponse call () throws Exception {
135145 addCommonHeaders ();
136- return client
137- .prepareGet (index , type , id )
138- .get ();
146+ return client .prepareGet (index , type , id ).get ();
139147 }
140148 });
141149 }
142-
150+
151+ public UpdateResponse update (String index , String type , String id , String source ) {
152+
153+ LOGGER .debug ("UPDATE: '{}/{}/{}' source: '{}'" , index , type , id , source );
154+
155+ UpdateRequestBuilder builder = client .prepareUpdate (index , type , id ).setDoc (source , XContentType .JSON )
156+ .setDocAsUpsert (true );
157+ addCommonHeaders ();
158+ UpdateResponse response = builder .get ();
159+
160+ LOGGER .debug ("Created with update? '{}'" , response .status ());
161+ return response ;
162+ }
163+
143164 public IndexResponse createDocument (String index , String type , String id , String source ) {
144165 return execute (new Callable <IndexResponse >() {
145166
@@ -154,6 +175,46 @@ public IndexResponse call() throws Exception {
154175 });
155176 }
156177
178+ public GetIndexResponse getIndices (String ... indices ) throws InterruptedException , ExecutionException {
179+ if (LOGGER .isTraceEnabled ()) {
180+ LOGGER .trace ("Getting indices '{}'" , StringUtils .join (indices , ", " ));
181+ }
182+ GetIndexRequestBuilder builder = client .admin ().indices ().prepareGetIndex ().setIndices (indices );
183+ addCommonHeaders ();
184+ return builder .get ();
185+ }
186+
187+ public CreateIndexResponse copyIndex (final String index , final String target , Settings settings , String ... types )
188+ throws InterruptedException , ExecutionException , IOException {
189+ LOGGER .trace ("Copying {} index to {} for types {}" , index , target , types );
190+ GetIndexResponse response = getIndices (index );
191+ CreateIndexRequestBuilder builder = client .admin ().indices ().prepareCreate (target );
192+ if (settings != null ) {
193+ builder .setSettings (settings );
194+ }
195+ for (String type : types ) {
196+ builder .addMapping (type , response .mappings ().get (index ).get (type ).getSourceAsMap ());
197+ }
198+ addCommonHeaders ();
199+ return builder .get ();
200+ }
201+
202+ public UpdateSettingsResponse updateSettings (final String index , Settings settings ) {
203+ UpdateSettingsRequestBuilder builder = client .admin ().indices ().prepareUpdateSettings (index )
204+ .setSettings (settings );
205+ addCommonHeaders ();
206+ return builder .get ();
207+ }
208+
209+ public RefreshResponse refreshIndices (String ... indices ) {
210+ RefreshRequestBuilder builder = client .admin ().indices ().prepareRefresh (indices );
211+ addCommonHeaders ();
212+ RefreshResponse response = builder .get ();
213+ LOGGER .debug ("Refreshed '{}' successfully on {} of {} shards" , indices , response .getSuccessfulShards (),
214+ response .getTotalShards ());
215+ return response ;
216+ }
217+
157218 public boolean indexExists (final String index ) {
158219 return execute (new Callable <Boolean >() {
159220 @ Override
@@ -168,16 +229,13 @@ public Boolean call() throws Exception {
168229 }
169230 });
170231 }
171-
232+
172233 public boolean documentExists (final String index , final String type , final String id ) {
173234 return execute (new Callable <Boolean >() {
174235 @ Override
175236 public Boolean call () throws Exception {
176237 LOGGER .trace ("Checking for existence of document: '{}/{}/{}'" , index , type , id );
177- GetRequestBuilder builder = client .prepareGet ()
178- .setIndex (index )
179- .setType (type )
180- .setId (id );
238+ GetRequestBuilder builder = client .prepareGet ().setIndex (index ).setType (type ).setId (id );
181239 addCommonHeaders ();
182240 GetResponse response = builder .get ();
183241 final boolean exists = response .isExists ();
@@ -186,14 +244,15 @@ public Boolean call() throws Exception {
186244 }
187245 });
188246 }
189-
247+
190248 /**
191249 * Retrieve the set of indices for a given alias
192250 *
193- * @param alias The alias to lookup
251+ * @param alias
252+ * The alias to lookup
194253 * @return The set of indices to the given alias
195254 */
196- public Set <String > getIndicesForAlias (String alias ){
255+ public Set <String > getIndicesForAlias (String alias ) {
197256 return execute (new Callable <Set <String >>() {
198257
199258 @ Override
@@ -221,7 +280,7 @@ public Set<String> call() throws Exception {
221280 * @return true if the request was acknowledged
222281 */
223282 public boolean alias (Map <String , String > aliases ) {
224- return execute (new Callable <Boolean >(){
283+ return execute (new Callable <Boolean >() {
225284
226285 @ Override
227286 public Boolean call () throws Exception {
@@ -245,11 +304,11 @@ public Boolean call() throws Exception {
245304 }
246305
247306 private void addCommonHeaders () {
248- if (StringUtils .isBlank (threadContext .getTransient (ConfigConstants .SG_CHANNEL_TYPE ))) {
307+ if (StringUtils .isBlank (threadContext .getTransient (ConfigConstants .SG_CHANNEL_TYPE ))) {
249308 threadContext .putTransient (ConfigConstants .SG_CHANNEL_TYPE , "direct" );
250309 }
251310 }
252-
311+
253312 private <T > T execute (Callable <T > callable ) {
254313 try (StoredContext context = threadContext .stashContext ()) {
255314 threadContext .putTransient (ConfigConstants .SG_CHANNEL_TYPE , "direct" );
@@ -258,5 +317,4 @@ private <T> T execute(Callable<T> callable) {
258317 throw new ElasticsearchException (e );
259318 }
260319 }
261-
262320}
0 commit comments