1
1
import logging
2
-
3
- from elasticsearch import Elasticsearch
4
- from elasticsearch .exceptions import NotFoundError
5
-
6
2
from mdps_ds_lib .lib .aws .es_abstract import ESAbstract , DEFAULT_TYPE
7
3
8
4
LOGGER = logging .getLogger (__name__ )
9
5
10
6
11
- class ESMiddleware (ESAbstract ):
12
-
7
+ class ESMiddlewareAbstract (ESAbstract ):
13
8
def __init__ (self , index , base_url , port = 443 , use_ssl = True ) -> None :
14
9
if any ([k is None for k in [index , base_url ]]):
15
10
raise ValueError (f'index or base_url is None' )
16
11
self .__index = index
17
- base_url = base_url .replace ('https://' , '' ) # hide https
18
- # https://elasticsearch-py.readthedocs.io/en/v7.13.4/api.html#elasticsearch.Elasticsearch
19
- self ._engine = Elasticsearch (hosts = [{'host' : base_url , 'port' : port , 'use_ssl' : use_ssl }])
12
+ self ._engine = None
20
13
21
14
def __validate_index (self , index ):
22
15
if index is not None :
@@ -73,9 +66,11 @@ def create_index(self, index_name, index_body):
73
66
def get_index_mapping (self , index_name ):
74
67
try :
75
68
result = self ._engine .indices .get_mapping (index = index_name )
76
- except NotFoundError as e :
77
- return None
78
- return result
69
+ except Exception as e :
70
+ if e .error == 'index_not_found_exception' :
71
+ return None
72
+ raise e
73
+ return result [index_name ]
79
74
80
75
def has_index (self , index_name ):
81
76
result = self ._engine .indices .exists (index = index_name )
@@ -84,18 +79,21 @@ def has_index(self, index_name):
84
79
def swap_index_for_alias (self , alias_name , old_index_name , new_index_name ):
85
80
try :
86
81
temp_result = self ._engine .indices .delete_alias (index = old_index_name , name = alias_name )
87
- except NotFoundError as ee :
88
- LOGGER .exception (f'error while unlinking { old_index_name } from { alias_name } ' )
89
- temp_result = {}
82
+ except Exception as ee :
83
+ if 'NotFoundError' in str (ee ):
84
+ return {}
85
+ raise ee
90
86
result = self .create_alias (new_index_name , alias_name )
91
87
return result
92
88
93
89
def get_alias (self , alias_name ):
94
90
# /Users/wphyo/anaconda3/envs/cumulus_py_3.9/lib/python3.9/site-packages/elasticsearch-7.13.4-py3.9.egg/elasticsearch/client/indices.py
95
91
try :
96
92
result = self ._engine .indices .get_alias (name = alias_name )
97
- except NotFoundError as ee :
98
- return {}
93
+ except Exception as ee :
94
+ if 'NotFoundError' in str (ee ):
95
+ return {}
96
+ raise ee
99
97
return result
100
98
101
99
def create_alias (self , index_name , alias_name ):
@@ -124,7 +122,7 @@ def index_many(self, docs=None, doc_ids=None, doc_dict=None, index=None):
124
122
body = body , doc_type = DEFAULT_TYPE )
125
123
LOGGER .info ('indexed. result: {}' .format (index_result ))
126
124
return self .__check_errors_for_bulk (index_result )
127
- except :
125
+ except Exception as e :
128
126
LOGGER .exception ('cannot add indices with ids: {} for index: {}' .format (list (doc_dict .keys ()), index ))
129
127
return doc_dict
130
128
return
@@ -136,9 +134,9 @@ def index_one(self, doc, doc_id, index=None):
136
134
body = doc , doc_type = DEFAULT_TYPE , id = doc_id )
137
135
LOGGER .info ('indexed. result: {}' .format (index_result ))
138
136
pass
139
- except Exception as e :
137
+ except :
140
138
LOGGER .exception ('cannot add a new index with id: {} for index: {}' .format (doc_id , index ))
141
- raise e
139
+ return None
142
140
return self
143
141
144
142
def update_many (self , docs = None , doc_ids = None , doc_dict = None , index = None ):
@@ -208,13 +206,13 @@ def query_with_scroll(self, dsl, querying_index=None):
208
206
first_batch ['hits' ]['hits' ].extend (scrolled_result ['hits' ]['hits' ])
209
207
return first_batch
210
208
211
- def query (self , dsl , querying_index = None ):
209
+ def delete_by_query (self , dsl , querying_index = None ):
212
210
index = self .__validate_index (querying_index )
213
- return self ._engine .search (body = dsl , index = index )
211
+ return self ._engine .delete_by_query (body = dsl , index = index , conflicts = 'proceed' , request_timeout = 120 )
214
212
215
- def delete_by_query (self , dsl , querying_index = None ):
213
+ def query (self , dsl , querying_index = None ):
216
214
index = self .__validate_index (querying_index )
217
- return self ._engine .delete_by_query (body = dsl , index = index )
215
+ return self ._engine .search (body = dsl , index = index )
218
216
219
217
def __is_querying_next_page (self , targeted_size : int , current_size : int , total_size : int ):
220
218
if targeted_size < 0 :
0 commit comments