3131import java .io .UncheckedIOException ;
3232import java .util .HashMap ;
3333import java .util .Map ;
34+ import java .util .Objects ;
3435import java .util .concurrent .atomic .AtomicBoolean ;
3536
3637import org .opensearch .OpenSearchWrapperException ;
@@ -156,7 +157,7 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
156157 String indexName = index .getIndexName ();
157158 String mapping = index .getMapping ();
158159 Integer version = index .getVersion ();
159- initIndexIfAbsent (indexName , mapping , version , listener );
160+ initIndexIfAbsent (indexName , mapping , version , listener , true );
160161 }
161162
162163 private String getMapping (String mappingPath ) {
@@ -174,20 +175,29 @@ private String getMapping(String mappingPath) {
174175
175176 public void createSessionMemoryDataIndex (String indexName , MemoryConfiguration configuration , ActionListener <Boolean > listener ) {
176177 String indexMappings = getMapping (ML_MEMORY_SESSION_INDEX_MAPPING_PATH );
177- Map <String , Object > indexSettings = configuration .getMemoryIndexMapping (SESSION_INDEX );
178- initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
178+ Map <String , Object > indexSettings = configuration .getMemoryIndexMapping (SESSION_INDEX ) == null
179+ || configuration .getMemoryIndexMapping (SESSION_INDEX ).isEmpty ()
180+ ? ALL_NODES_REPLICA_INDEX_SETTINGS
181+ : configuration .getMemoryIndexMapping (SESSION_INDEX );
182+ initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener , configuration .isUseSystemIndex ());
179183 }
180184
181185 public void createWorkingMemoryDataIndex (String indexName , MemoryConfiguration configuration , ActionListener <Boolean > listener ) {
182186 String indexMappings = getMapping (ML_WORKING_MEMORY_INDEX_MAPPING_PATH );
183- Map <String , Object > indexSettings = configuration .getMemoryIndexMapping (WORKING_MEMORY_INDEX );
184- initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
187+ Map <String , Object > indexSettings = configuration .getMemoryIndexMapping (WORKING_MEMORY_INDEX ) == null
188+ || configuration .getMemoryIndexMapping (WORKING_MEMORY_INDEX ).isEmpty ()
189+ ? ALL_NODES_REPLICA_INDEX_SETTINGS
190+ : configuration .getMemoryIndexMapping (WORKING_MEMORY_INDEX );
191+ initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener , configuration .isUseSystemIndex ());
185192 }
186193
187194 public void createLongTermMemoryHistoryIndex (String indexName , MemoryConfiguration configuration , ActionListener <Boolean > listener ) {
188195 String indexMappings = getMapping (ML_LONG_MEMORY_HISTORY_INDEX_MAPPING_PATH );
189- Map <String , Object > indexSettings = configuration .getMemoryIndexMapping (LONG_TERM_MEMORY_HISTORY_INDEX );
190- initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
196+ Map <String , Object > indexSettings = configuration .getMemoryIndexMapping (LONG_TERM_MEMORY_HISTORY_INDEX ) == null
197+ || configuration .getMemoryIndexMapping (LONG_TERM_MEMORY_HISTORY_INDEX ).isEmpty ()
198+ ? ALL_NODES_REPLICA_INDEX_SETTINGS
199+ : configuration .getMemoryIndexMapping (LONG_TERM_MEMORY_HISTORY_INDEX );
200+ initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener , configuration .isUseSystemIndex ());
191201 }
192202
193203 /**
@@ -268,32 +278,86 @@ public void createLongTermMemoryIndex(
268278 if (!memoryConfig .getIndexSettings ().isEmpty () && memoryConfig .getIndexSettings ().containsKey (LONG_TERM_MEMORY_INDEX )) {
269279 Map <String , Object > configuredIndexSettings = memoryConfig .getMemoryIndexMapping (LONG_TERM_MEMORY_INDEX );
270280 indexSettings .putAll (configuredIndexSettings );
281+ } else {
282+ indexSettings .putAll (ALL_NODES_REPLICA_INDEX_SETTINGS );
271283 }
272284
273285 // Initialize index with mapping and settings
274- initIndexIfAbsent (indexName , indexMappings , indexSettings , 1 , listener );
286+ initIndexIfAbsent (indexName , StringUtils . toJson ( indexMappings ) , indexSettings , 1 , listener , memoryConfig . isUseSystemIndex () );
275287 } catch (Exception e ) {
276288 log .error ("Failed to create long-term memory index" , e );
277289 listener .onFailure (e );
278290 }
279291 }
280292
281- public void initIndexWithMappingFileIfAbsent (String indexName , String mappingPath , Integer version , ActionListener <Boolean > listener ) {
282- String mapping = getMapping (mappingPath );
283- initIndexIfAbsent (indexName , mapping , version , listener );
293+ public void initIndexIfAbsent (
294+ String indexName ,
295+ String mapping ,
296+ Integer version ,
297+ ActionListener <Boolean > listener ,
298+ Boolean isSystemIndex
299+ ) {
300+ initIndexIfAbsent (indexName , mapping , null , version , listener , isSystemIndex );
284301 }
285302
286- public void initIndexIfAbsent (String indexName , String mapping , Integer version , ActionListener <Boolean > listener ) {
287- initIndexIfAbsent (indexName , mapping , null , version , listener );
303+ public void initIndexIfAbsent (
304+ String indexName ,
305+ String mapping ,
306+ Map <String , Object > indexSettings ,
307+ Integer version ,
308+ ActionListener <Boolean > listener ,
309+ Boolean isSystemIndex
310+ ) {
311+ if (isSystemIndex ) {
312+ initIndexWithoutThreadContext (indexName , mapping , indexSettings , version , listener );
313+ } else {
314+ initIndexWithThreadContext (indexName , mapping , indexSettings , version , listener );
315+ }
288316 }
289317
290- public void initIndexIfAbsent (
318+ public void initIndexWithThreadContext (
319+ String indexName ,
320+ String mapping ,
321+ Map <String , Object > indexSettings ,
322+ Integer version ,
323+ ActionListener <Boolean > listener
324+ ) {
325+ log .info ("Using initIndexWithThreadContext method to create index: {}" , indexName );
326+ try {
327+ ActionListener <CreateIndexResponse > actionListener = ActionListener .wrap (r -> {
328+ if (r .isAcknowledged ()) {
329+ log .info ("create index:{}" , indexName );
330+ listener .onResponse (true );
331+ } else {
332+ listener .onResponse (false );
333+ }
334+ }, e -> {
335+ if (e instanceof ResourceAlreadyExistsException
336+ || (e instanceof OpenSearchWrapperException && e .getCause () instanceof ResourceAlreadyExistsException )) {
337+ log .info ("Skip creating the Index:{} that is already created by another parallel request" , indexName );
338+ listener .onResponse (true );
339+ } else {
340+ log .error ("Failed to create index {}" , indexName , e );
341+ listener .onFailure (e );
342+ }
343+ });
344+ CreateIndexRequest request = new CreateIndexRequest (indexName ).mapping (mapping , XContentType .JSON );
345+ request .settings (Objects .requireNonNullElse (indexSettings , DEFAULT_INDEX_SETTINGS ));
346+ client .admin ().indices ().create (request , actionListener );
347+ } catch (Exception e ) {
348+ log .error ("Failed to init index {}" , indexName , e );
349+ listener .onFailure (e );
350+ }
351+ }
352+
353+ public void initIndexWithoutThreadContext (
291354 String indexName ,
292355 String mapping ,
293356 Map <String , Object > indexSettings ,
294357 Integer version ,
295358 ActionListener <Boolean > listener
296359 ) {
360+ log .info ("Using initIndexWithoutThreadContext method to create index: {}" , indexName );
297361 try (ThreadContext .StoredContext threadContext = client .threadPool ().getThreadContext ().stashContext ()) {
298362 ActionListener <Boolean > internalListener = ActionListener .runBefore (listener , () -> threadContext .restore ());
299363 if (!MLIndicesHandler .doesMultiTenantIndexExist (clusterService , mlFeatureEnabledSetting .isMultiTenancyEnabled (), indexName )) {
0 commit comments