diff --git a/src/translators/crate.py b/src/translators/crate.py index ff5379bf..06675eb3 100644 --- a/src/translators/crate.py +++ b/src/translators/crate.py @@ -12,6 +12,8 @@ NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, \ NGSI_LD_GEOMETRY, TIME_INDEX, METADATA_TABLE_NAME, FIWARE_SERVICEPATH import logging +import sqlalchemy as sa +from sqlalchemy import create_engine from .crate_geo_query import from_ngsi_query from utils.cfgreader import EnvReader, StrVar, IntVar, FloatVar from utils.connection_manager import ConnectionManager @@ -47,14 +49,12 @@ def __init__(self, host, port=4200, db_name="ngsi-tsdb"): super(CrateTranslator, self).__init__(host, port, db_name) self.logger = logging.getLogger(__name__) self.dbCacheName = 'crate' - self.ccm = None self.connection = None self.cursor = None def setup(self): - url = "{}:{}".format(self.host, self.port) - self.ccm = ConnectionManager() - self.connection = self.ccm.get_connection('crate') + url = "crate://{}:{}".format(self.host, self.port) + self.engine = sa.create_engine(url, connect_args={"pool_size": 10}) # Added backoff_factor for retry interval between attempt of # consecutive retries backoff_factor = EnvReader(log=logging.getLogger(__name__).debug) \ @@ -63,12 +63,10 @@ def setup(self): try: self.connection = client.connect( [url], error_trace=True, backoff_factor=backoff_factor) - self.ccm.set_connection('crate', self.connection) except Exception as e: self.logger.warning(str(e), exc_info=True) raise e - - self.cursor = self.connection.cursor() + # TODO this reduce queries to crate, # but only within a single API call to QUANTUMLEAP # we need to think if we want to cache this information @@ -91,7 +89,7 @@ def sql_error_handler(self, exception): if analyzer.is_aggregation_error(): return "AggrMethod cannot be applied" if analyzer.is_transient_error(): - self.ccm.reset_connection('crate') + self.engine.reset_connection('crate') self.setup() def get_db_version(self): diff --git a/src/translators/sql_translator.py b/src/translators/sql_translator.py index 9c29e37f..4a87885d 100644 --- a/src/translators/sql_translator.py +++ b/src/translators/sql_translator.py @@ -16,6 +16,7 @@ from cache.factory import get_cache, is_cache_available from translators.insert_splitter import to_insert_batches from utils.connection_manager import Borg + # NGSI TYPES # Based on Orion output because official docs don't say much about these :( NGSI_DATETIME = 'DateTime' @@ -1740,7 +1741,6 @@ def _execute_query_via_cache(self, tenant_name, key, stmt, parameters=None, self.logger.warning("Caching not available, metadata data may " "not be consistent: " + str(e), exc_info=True) - self.cursor.execute(stmt, parameters) res = self.cursor.fetchall() if res and self.cache: @@ -1785,7 +1785,7 @@ class QueryCacheManager(Borg): cache = None def __init__(self): - super(QueryCacheManager, self).__init__() + super(QueryCacheManager, self ).__init__() if is_cache_available() and self.cache is None: try: self.cache = get_cache() diff --git a/src/translators/timescale.py b/src/translators/timescale.py index a574d919..89476ec1 100644 --- a/src/translators/timescale.py +++ b/src/translators/timescale.py @@ -14,7 +14,9 @@ import geocoding.slf.jsoncodec from geocoding.slf.querytypes import SlfQuery import geocoding.slf.wktcodec +import sqlalchemy as sa from utils.cfgreader import * +from sqlalchemy import create_engine from utils.connection_manager import ConnectionManager # POSTGRES TYPES @@ -88,8 +90,9 @@ def __init__(self, conn_data=PostgresConnectionData()): self.dbCacheName = 'timescale' def setup(self): - self.ccm = ConnectionManager() - self.connection = self.ccm.get_connection('timescale') + url = "timescale://{}:{}".format(self.host, self.port) + self.engine = sa.create_engine(url, connect_args={"pool_size": 10}) + self.connection = self.engine.connect() if self.connection is None: try: pg8000.paramstyle = "qmark"