Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/translators/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, \
NGSI_LD_GEOMETRY, TIME_INDEX, METADATA_TABLE_NAME, FIWARE_SERVICEPATH
import logging
import sqlalchemy as sa
from sqlalchemy import create_engine
from .crate_geo_query import from_ngsi_query
from utils.cfgreader import EnvReader, StrVar, IntVar, FloatVar
from utils.connection_manager import ConnectionManager
Expand Down Expand Up @@ -47,14 +49,12 @@ def __init__(self, host, port=4200, db_name="ngsi-tsdb"):
super(CrateTranslator, self).__init__(host, port, db_name)
self.logger = logging.getLogger(__name__)
self.dbCacheName = 'crate'
self.ccm = None
self.connection = None
self.cursor = None

def setup(self):
url = "{}:{}".format(self.host, self.port)
self.ccm = ConnectionManager()
self.connection = self.ccm.get_connection('crate')
url = "crate://{}:{}".format(self.host, self.port)
self.engine = sa.create_engine(url, connect_args={"pool_size": 10})
# Added backoff_factor for retry interval between attempt of
# consecutive retries
backoff_factor = EnvReader(log=logging.getLogger(__name__).debug) \
Expand All @@ -63,12 +63,10 @@ def setup(self):
try:
self.connection = client.connect(
[url], error_trace=True, backoff_factor=backoff_factor)
self.ccm.set_connection('crate', self.connection)
except Exception as e:
self.logger.warning(str(e), exc_info=True)
raise e

self.cursor = self.connection.cursor()

# TODO this reduce queries to crate,
# but only within a single API call to QUANTUMLEAP
# we need to think if we want to cache this information
Expand All @@ -91,7 +89,7 @@ def sql_error_handler(self, exception):
if analyzer.is_aggregation_error():
return "AggrMethod cannot be applied"
if analyzer.is_transient_error():
self.ccm.reset_connection('crate')
self.engine.reset_connection('crate')
self.setup()

def get_db_version(self):
Expand Down
4 changes: 2 additions & 2 deletions src/translators/sql_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from cache.factory import get_cache, is_cache_available
from translators.insert_splitter import to_insert_batches
from utils.connection_manager import Borg

# NGSI TYPES
# Based on Orion output because official docs don't say much about these :(
NGSI_DATETIME = 'DateTime'
Expand Down Expand Up @@ -1740,7 +1741,6 @@ def _execute_query_via_cache(self, tenant_name, key, stmt, parameters=None,
self.logger.warning("Caching not available, metadata data may "
"not be consistent: " + str(e),
exc_info=True)

self.cursor.execute(stmt, parameters)
res = self.cursor.fetchall()
if res and self.cache:
Expand Down Expand Up @@ -1785,7 +1785,7 @@ class QueryCacheManager(Borg):
cache = None

def __init__(self):
super(QueryCacheManager, self).__init__()
super(QueryCacheManager, self ).__init__()
if is_cache_available() and self.cache is None:
try:
self.cache = get_cache()
Expand Down
7 changes: 5 additions & 2 deletions src/translators/timescale.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import geocoding.slf.jsoncodec
from geocoding.slf.querytypes import SlfQuery
import geocoding.slf.wktcodec
import sqlalchemy as sa
from utils.cfgreader import *
from sqlalchemy import create_engine
from utils.connection_manager import ConnectionManager

# POSTGRES TYPES
Expand Down Expand Up @@ -88,8 +90,9 @@ def __init__(self, conn_data=PostgresConnectionData()):
self.dbCacheName = 'timescale'

def setup(self):
self.ccm = ConnectionManager()
self.connection = self.ccm.get_connection('timescale')
url = "timescale://{}:{}".format(self.host, self.port)
self.engine = sa.create_engine(url, connect_args={"pool_size": 10})
self.connection = self.engine.connect()
if self.connection is None:
try:
pg8000.paramstyle = "qmark"
Expand Down