diff --git a/.gitignore b/.gitignore index 31698e7c..df6cd897 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,6 @@ *.log.* *.pyc .ipynb_* -examon-cache/ -examon-cache/* +examon-cache build site diff --git a/Dockerfile b/Dockerfile index 3c8c8d63..e9e41253 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,52 +1,35 @@ -FROM examonhpc/examon:0.2.0 +FROM examonhpc/examon:0.3.2 ENV EXAMON_HOME /etc/examon_deploy/examon -# Create a backup of the existing sources.list -RUN mv /etc/apt/sources.list /etc/apt/sources.list.backup - -# Create a new sources.list file -RUN touch /etc/apt/sources.list - -# Debian strech moved to archived -RUN echo "deb https://debian.mirror.garr.it/debian-archive/ stretch main" > /etc/apt/sources.list - - -# Install dependencies -RUN apt-get update && apt-get install -y \ - apt-transport-https \ - ca-certificates \ - libffi-dev \ - build-essential \ - libssl-dev \ - python-dev \ - && rm -rf /var/lib/apt/lists/* - -# copy app +# Copy app ADD ./publishers/random_pub ${EXAMON_HOME}/publishers/random_pub -ADD ./lib/examon-common $EXAMON_HOME/lib/examon-common ADD ./docker/examon/supervisor.conf /etc/supervisor/conf.d/supervisor.conf ADD ./scripts/examon.conf $EXAMON_HOME/scripts/examon.conf ADD ./web $EXAMON_HOME/web -# install -RUN pip --trusted-host pypi.python.org install --upgrade pip==20.1.1 +# Venvs +WORKDIR $EXAMON_HOME/scripts +RUN virtualenv -p $(which python) py3_env + ENV PIP $EXAMON_HOME/scripts/ve/bin/pip +ENV S_PIP $EXAMON_HOME/scripts/py3_env/bin/pip +# Install WORKDIR $EXAMON_HOME/lib/examon-common -RUN $PIP install . -RUN pip install . +RUN $S_PIP install . +# Random publisher WORKDIR $EXAMON_HOME/publishers/random_pub RUN $PIP install -r requirements.txt +# Web WORKDIR $EXAMON_HOME/web -RUN virtualenv flask -RUN flask/bin/pip --trusted-host pypi.python.org install --upgrade pip==20.1.1 -RUN CASS_DRIVER_BUILD_CONCURRENCY=8 flask/bin/pip --trusted-host pypi.python.org install -r ./examon-server/requirements.txt +RUN virtualenv -p $(which python) flask +RUN CASS_DRIVER_BUILD_CONCURRENCY=8 flask/bin/pip install -r ./examon-server/requirements.txt WORKDIR $EXAMON_HOME/scripts -EXPOSE 1883 9001 +EXPOSE 1883 5000 9001 CMD ["./frontend_ctl.sh", "start"] diff --git a/README.md b/README.md index 69aa7fe2..3948cdd3 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ It can be disabled as described in the [Enable/disable plugins](#enabledisable-t ## Prerequisites Since Cassandra is the component that requires the majority of resources, you can find more details about the suggested hardware configuration of the system that will host the services here: -[Hardware Configuration](https://cassandra.apache.org/doc/latest/operating/hardware.html#:~:text=While%20Cassandra%20can%20be%20made,at%20least%2032GB%20of%20RAM) +[Hardware Configuration](https://cassandra.apache.org/doc/latest/cassandra/managing/operating/hardware.html) To install all the services needed by ExaMon we will use Docker and Docker Compose: diff --git a/VERSION b/VERSION index 1e66a616..01e994d3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v0.3.1 \ No newline at end of file +v0.4.0 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index b1da1f8b..98fe5c06 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: "3.8" name: "examon" networks: @@ -11,11 +10,8 @@ services: examon: build: context: . - image: examonhpc/examon:0.2.2 + image: examonhpc/examon:latest volumes: - - type: bind - source: ./lib/examon-common - target: /etc/examon_deploy/examon/lib/examon-common - type: bind source: ./web/examon-server target: /etc/examon_deploy/examon/web/examon-server diff --git a/docker/examon/supervisor.conf b/docker/examon/supervisor.conf index 34a623c9..4efbbe72 100644 --- a/docker/examon/supervisor.conf +++ b/docker/examon/supervisor.conf @@ -23,7 +23,7 @@ startsec=2 [program:random_pub] directory=/etc/examon_deploy/examon/publishers/random_pub -command=python ./random_pub.py run +command=/etc/examon_deploy/examon/scripts/ve/bin/python random_pub.py run autostart=true autorestart=true stderr_logfile=/var/log/random_pub.log diff --git a/docker/kairosdb/Dockerfile b/docker/kairosdb/Dockerfile index 16ab7c67..761753da 100644 --- a/docker/kairosdb/Dockerfile +++ b/docker/kairosdb/Dockerfile @@ -2,15 +2,6 @@ FROM adoptopenjdk:8-jre-hotspot-focal -# Create a backup of the existing sources.list -RUN mv /etc/apt/sources.list /etc/apt/sources.list.backup - -# Create a new sources.list file -RUN touch /etc/apt/sources.list - -# Add the new server repository for focal packages -RUN echo "deb https://ubuntu.mirror.garr.it/ubuntu/ focal main" > /etc/apt/sources.list - RUN set -eux; \ apt-get update; \ apt-get install -y --no-install-recommends \ diff --git a/docs/Administrators/Getting_started.md b/docs/Administrators/Getting_started.md index 2ac5fa47..cf9aba62 100644 --- a/docs/Administrators/Getting_started.md +++ b/docs/Administrators/Getting_started.md @@ -9,7 +9,7 @@ This setup will install all server-side components of the ExaMon framework: ## Prerequisites Since Cassandra is the component that requires the majority of resources, you can find more details about the suggested hardware configuration of the system that will host the services here: -[Hardware Configuration](https://cassandra.apache.org/doc/latest/operating/hardware.html#:~:text=While%20Cassandra%20can%20be%20made,at%20least%2032GB%20of%20RAM) +[Hardware Configuration](https://cassandra.apache.org/doc/latest/cassandra/managing/operating/hardware.html) To install all the services needed by ExaMon we will use Docker and Docker Compose: diff --git a/docs/Plugins/examon_pub.ipynb b/docs/Plugins/examon_pub.ipynb index 28681d2f..3e0eadca 100644 --- a/docs/Plugins/examon_pub.ipynb +++ b/docs/Plugins/examon_pub.ipynb @@ -6,12 +6,12 @@ "metadata": {}, "source": [ "# Example plugin\n", - "This notebook shows how to create a simple Examon publisher using Python (v3)\n", + "This notebook shows how to create a simple Examon publisher using Python.\n", "\n", "## Install \n", - "Install the publisher library.\n", + "Install the latest Examon publisher library.\n", "\n", - "NOTE: This is a development release so the final API may be different in future versions." + "https://github.com/ExamonHPC/examon-common\n" ] }, { @@ -23,7 +23,7 @@ }, "outputs": [], "source": [ - "! python -m pip install --upgrade https://github.com/fbeneventi/releases/releases/latest/download/examon-common-py3.zip" + "! python -m pip install --upgrade git+https://github.com/ExamonHPC/examon-common.git@master" ] }, { diff --git a/docs/contributing.md b/docs/contributing.md new file mode 100644 index 00000000..04800515 --- /dev/null +++ b/docs/contributing.md @@ -0,0 +1,88 @@ +# Contributing to ExaMon + +First off, thank you for considering contributing to our project! + +## How Can I Contribute? + +### Reporting Bugs + +Before creating bug reports, please check the issue list as you might find out that you don't need to create one. When you are creating a bug report, please include as many details as possible: + +* Use a clear and descriptive title +* Describe the exact steps which reproduce the problem +* Provide specific examples to demonstrate the steps +* Describe the behavior you observed after following the steps +* Explain which behavior you expected to see instead and why +* Include screenshots if possible + +### Suggesting Enhancements + +If you have a suggestion for the project, we'd love to hear about it. Please include: + +* A clear and detailed explanation of the feature +* The motivation behind this feature +* Any alternative solutions you've considered +* If applicable, examples from other projects + +### Pull Request Process + +1. Fork the repository and create your branch from `master` +2. If you've added code that should be tested, add tests +3. Ensure the test suite passes +4. Update the documentation if needed +5. Issue that pull request! + +#### Pull Request Guidelines + +* Follow our coding standards (see below) +* Include relevant issue numbers in your PR description +* Update the README.md with details of changes if applicable +* The PR must pass all CI/CD checks [TBD] +* Wait for review from maintainers + +### Development Setup + +1. Fork and clone the repo +3. Create a branch: `git checkout -b my-branch-name` + +### Coding Standards + +* Use consistent code formatting +* Write clear commit messages following [Conventional Commits](https://www.conventionalcommits.org/) or at least the basic specification as in the [Commit Messages](#commit-messages) section. +* Comment your code where necessary +* Write tests for new features +* Keep the code simple and maintainable + +### Commit Messages + +Basic specification example: + +``` +type(scope): description +[optional body] +[optional footer] +``` + +The type should be one of the following: + +| Type | Description | +|------|-------------| +| add | Introduces a new feature or functionality | +| fix | Patches a bug or resolves an issue | +| change | Modifies existing functionality or behavior | +| remove | Deletes or deprecates functionality | +| merge | Combines branches or resolves conflicts | +| doc | Updates documentation or comments | + + +### First Time Contributors + +Looking for work? Check out our issues labeled `good first issue` or `help wanted`. + +## License + +By contributing, you agree that your contributions will be licensed under the same license that covers the project. + +## Questions? + +Don't hesitate to contact the project maintainers if you have any questions! diff --git a/docs/credits.md b/docs/credits.md index d8bf5f66..07758b23 100644 --- a/docs/credits.md +++ b/docs/credits.md @@ -5,5 +5,5 @@ This work is supported by the EU FETHPC projects: - [MULTITHERMAN (g.a. 291125)](https://cordis.europa.eu/project/id/291125) - [ANTAREX (g.a. 671623)](https://antarex.fe.up.pt/) - [IOTWINS (g.a. 857191)](https://www.iotwins.eu/) -- [REGALE (g.a. 956560)](https://regale-project.eu/) +- [REGALE (g.a. 956560)](https://regale-project.eu/) - [The Italian Ministry of Enterprises and Made in Italy ("MIMIT")](https://www.mimit.gov.it/en/) - [GRAPH MASSIVIZER (g.a. 101093202)](https://graph-massivizer.eu/) diff --git a/docs/overrides/home.html b/docs/overrides/home.html index 26c5c391..b3632c4f 100644 --- a/docs/overrides/home.html +++ b/docs/overrides/home.html @@ -317,101 +317,359 @@ .logo-grid { - flex-wrap: wrap; /* Allow flex items to wrap to the next row */ - justify-content: center; /* Center items horizontally within the container */ - } + flex-wrap: wrap; + justify-content: center; + align-items: center; + gap: 1rem; + } .logo-grid a { - flex: 1; /* Distribute available space equally among items */ - text-align: center; /* Center the text within each item */ - margin: 10px; /* Add some spacing between items */ - } + flex: 1; + text-align: center; + margin: 10px; + display: flex; + justify-content: center; + align-items: center; + } - /* Add media query using em units for max-width */ - @media screen and (max-width: 76.25em) { + .logo-grid img { + max-width: 100%; + height: auto; + object-fit: contain; + } - .top-hr { - display: grid; - } - /* Change the order of flex items within .second-row */ - .second-row .feature-item:nth-child(1) { - order: 2; /* Change order for the first .feature-item in the second-row */ - } + /* Modern Footer Styles - Material Theme Compatible */ + .modern-footer { + background-color: var(--md-footer-bg-color); + color: var(--md-footer-fg-color); + margin-top: 4rem; + border-top: 1px solid var(--md-default-fg-color--lightest); + } - .second-row .feature-item:nth-child(2) { - order: 1; /* Change order for the second .feature-item in the second-row */ - } + .footer-main { + padding: 3rem 0 2rem; + } - /* Disable Menu drawer: broken in hompage */ - .md-header__button:not([hidden]) { - display: none; - } + .footer-content { + display: grid; + grid-template-columns: 2fr 1fr 1fr 1fr; + gap: 2rem; + max-width: 1200px; + margin: 0 auto; + padding: 0 1rem; + } - .md-sidebar { - display: none; - } - } + .footer-section { + display: flex; + flex-direction: column; + } + .footer-brand { + max-width: 300px; + } - /* footer */ + .footer-logo { + max-height: 60px; + width: auto; + margin-bottom: 1rem; + filter: brightness(0.9); + object-fit: contain; + max-width: 100%; + } - footer { - background-color: #000000de; - color: #fff; - padding: 20px 0; + /* Dark mode logo adjustment */ + [data-md-color-scheme="slate"] .footer-logo { + filter: brightness(1.2); } - .footer-content { - display: flex; - justify-content: space-between; - align-items: center; - max-width: 1200px; - margin: 0 auto; + .footer-description { + color: var(--md-footer-fg-color--light); + line-height: 1.6; + margin-bottom: 1.5rem; + font-size: 0.9rem; + } + + .footer-title { + color: var(--md-footer-fg-color); + font-size: 1.1rem; + font-weight: 600; + margin-bottom: 1rem; + position: relative; + text-align: left; } - .logo-section img { - max-height: 50px; - margin-right: 20px; + .footer-title::after { + content: ''; + position: absolute; + bottom: -0.5rem; + left: 0; + width: 2rem; + height: 2px; + background: var(--md-primary-fg-color); + border-radius: 1px; } - nav ul { + .footer-links { list-style: none; + padding: 0; + margin: 0; + } + + .footer-links li { + margin-bottom: 0.7rem; + } + + .footer-links a { + color: var(--md-footer-fg-color--light); + text-decoration: none; + font-size: 0.9rem; + transition: all 0.3s ease; + position: relative; + } + + .footer-links a:hover { + color: var(--md-footer-fg-color); + padding-left: 0; + } + + .footer-social { display: flex; + gap: 1rem; + margin-top: 1rem; + } + + .social-link { + display: flex; + align-items: center; justify-content: center; - flex-wrap: wrap; /* Allow flex items to wrap to the next row */ + width: 2.5rem; + height: 2.5rem; + /* background: var(--md-default-fg-color--lightest); */ + background: var(--md-footer-fg-color--light); + border-radius: 50%; + color: var(--md-footer-fg-color--lighter); + text-decoration: none; + transition: all 0.3s ease; + flex-shrink: 0; } - nav li { - margin: 0 10px; + .social-link .twemoji { + width: 1.2rem; + height: 1.2rem; + color: var(--md-footer-fg-color--lighter); + display: flex; + align-items: center; + justify-content: center; + flex-shrink: 0; + } + + .social-link .twemoji svg { + width: 100%; + height: 100%; + object-fit: contain; + } + + .social-link:hover { + background: var(--md-primary-fg-color); + color: var(--md-primary-bg-color); + transform: translateY(-2px); + box-shadow: 0 4px 12px var(--md-shadow-z2); + } + + .social-link:hover .twemoji { + color: var(--md-primary-bg-color); } - nav a { + .footer-bottom { + background: var(--md-default-bg-color--lighter); + border-top: 1px solid var(--md-default-fg-color--lightest); + padding: 1.5rem 0; + } + + /* Dark mode footer bottom adjustment */ + [data-md-color-scheme="slate"] .footer-bottom { + background: var(--md-default-bg-color--light); + } + + .footer-bottom-content { + display: flex; + justify-content: space-between; + align-items: center; + max-width: 1200px; + margin: 0 auto; + padding: 0 1rem; + } + + .footer-copyright p { + margin: 0.2rem 0; color: var(--md-footer-fg-color--lighter); + font-size: 0.85rem; + } + + .footer-copyright a { + color: var(--md-footer-fg-color--light); text-decoration: none; - font-size: 0.8rem; - transition: all 0.3s ease-in-out; + transition: color 0.3s ease; } - nav a:hover { - color: #0088cc; + .footer-copyright a:hover { + color: var(--md-footer-fg-color); } - .social-media { + .footer-legal { display: flex; + align-items: center; + gap: 0.5rem; + } + + .footer-legal a { + color: var(--md-footer-fg-color--lighter); + text-decoration: none; + font-size: 0.85rem; + transition: color 0.3s ease; } - .social-icon { - color: #fff; - font-size: 1.5rem; - margin-right: 10px; - transition: all 0.3s ease-in-out; + .footer-legal a:hover { + color: var(--md-footer-fg-color); } - .social-icon:hover { - color: #0088cc; + .separator { + color: var(--md-footer-fg-color--lightest); + font-size: 0.85rem; } + /* --- Responsive Design --- */ + @media screen and (max-width: 1024px) { + /* -- Tablet and Mobile -- */ + + /* Hide navigation */ + .md-sidebar--primary { + display: none; + } + + .top-hr { + display: flex; + flex-direction: column; + gap: 2rem; + margin-right: auto; + margin-left: auto; + padding: 0 .2rem; + max-width: 61rem; + } + + .second-row .feature-item:nth-child(1) { order: 2; } + .second-row .feature-item:nth-child(2) { order: 1; } + + .top-hr .feature-item { + margin-bottom: 1.5rem; + text-align: center; + } + + .feature-item img { + max-width: 100%; + height: auto; + object-fit: contain; + max-height: 400px; + } + + .logo-grid { + display: flex; + flex-direction: column; + gap: 2rem; + width: 100%; + } + + .logo-grid a { + flex: none; + margin: 0; + width: 60%; + max-width: 250px; + } + + .logo-grid img { + width: 100%; + max-height: 160px; + height: auto; + } + + .footer-content { + grid-template-columns: 1fr 1fr; + } + + .footer-brand { + grid-column: 1 / -1; + max-width: 100%; + display: flex; + flex-direction: column; + align-items: center; + } + + .footer-brand .footer-title { + text-align: center; + } + + .footer-brand .footer-title::after { + left: 50%; + transform: translateX(-50%); + } + } + + @media screen and (max-width: 768px) { + /* -- Mobile Only -- */ + + .top-hr { + display: grid; + } + + .footer-content { + grid-template-columns: 1fr; + } + + .footer-section { + text-align: center; + } + + .footer-title { + text-align: center; + } + + .footer-title::after { + left: 50%; + transform: translateX(-50%); + } + + .footer-brand { + text-align: center; + } + + .footer-logo { + max-height: 50px; + } + + .footer-bottom-content { + flex-direction: column; + gap: 1rem; + text-align: center; + } + + .footer-social { + justify-content: center; + flex-wrap: wrap; + } + + .logo-grid a { + width: 80%; + max-width: 300px; + } + + .md-header__button:not([hidden]) { + display: none; + } + .md-sidebar { + display: none; + } + } @@ -577,11 +835,6 @@

Partners

-
-
-
-
-

European Projects

@@ -606,53 +859,77 @@

European Projects

{% block footer %} - +{% endblock %} \ No newline at end of file diff --git a/lib/examon-common/.gitignore b/lib/examon-common/.gitignore deleted file mode 100644 index bf054b85..00000000 --- a/lib/examon-common/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -*.pyc -./examon/version.py diff --git a/lib/examon-common/README.rst b/lib/examon-common/README.rst deleted file mode 100644 index ffdce31a..00000000 --- a/lib/examon-common/README.rst +++ /dev/null @@ -1,3 +0,0 @@ -Examon common utilities package -=============================== -v0.2.3 diff --git a/lib/examon-common/examon/__init__.py b/lib/examon-common/examon/__init__.py deleted file mode 100644 index 1f8fc77d..00000000 --- a/lib/examon-common/examon/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ - -import logging -from logging import NullHandler - -logging.getLogger(__name__).addHandler(NullHandler()) \ No newline at end of file diff --git a/lib/examon-common/examon/db/kairosdb.py b/lib/examon-common/examon/db/kairosdb.py deleted file mode 100644 index 8a375579..00000000 --- a/lib/examon-common/examon/db/kairosdb.py +++ /dev/null @@ -1,77 +0,0 @@ - -import sys -import zlib -import gzip -import json -import requests -import StringIO -import logging - - -class KairosDB: - """ - KairosDB REST client - """ - def __init__(self, server, port, user=None, password=None): - self.server = server - self.port = port - self.user = user - self.password = password - self.s = requests.Session() - if self.password: - self.s.auth = (self.user, self.password) - #self.s.headers.update({'x-test': 'true'}) - self.logger = logging.getLogger(__name__) - self.apis = {} - self.api_server = "http://" + self.server + ":" + self.port - self.apis['post_metrics'] = self.api_server + "/api/v1/datapoints" - self.apis['post_query'] = self.api_server + "/api/v1/datapoints/query" - - def _compress(self, payload): - s = StringIO.StringIO() - with gzip.GzipFile(fileobj=s, mode='w') as g: - g.write(payload) - return s.getvalue() - - def put_metrics(self, metrics, comp=True): - headers = {} - response = None - if comp: - headers = {'content-type': 'application/gzip'} - payload = self._compress(json.dumps(metrics)) - else: - payload = json.dumps(metrics) - try: - self.logger.debug("Inserting %d metrics" % len(metrics)) - response = self.s.post(self.apis['post_metrics'], payload, headers=headers) - response.raise_for_status() - - # # DEBUG: send one metric at time - # for m in metrics: - # pay = [m] - # try: - # response = self.s.post(self.apis['post_metrics'], json.dumps([m]), headers=headers) - # response.raise_for_status() - # except: - # self.logger.error("Exception in post()", exc_info=True) - # self.logger.error("Request payload: %s" % (json.dumps(pay, indent=4))) - # self.logger.error("Reason %s" % (response.text)) - - except: - #e = sys.exc_info()[0] - #logger.error("[%s] Exception in post(): %s", "KairosDB", e) - #self.logger.error("Exception in post()", exc_info=True) - self.logger.exception("Exception in post()") - #if response: - # self.logger.error("Reason %s" % (response.text)) - #self.logger.error("Request payload: %s" % (json.dumps(pay, indent=4))) - #print "[%s] Exception in post(): %s" % ("KairosDB", e,) - #print "[%s] Reason: " % ("KairosDB",) - #print response.text - #sys.exit(1) - - def query_metrics(self, query): - self.logger.debug("query metrics: %s" % repr(query)) - headers = {'Accept-Encoding': 'gzip, deflate'} - response = self.s.post(self.apis['post_query'], data=json.dumps(query), headers=headers) - return response.json() diff --git a/lib/examon-common/examon/plugin/__init__.py b/lib/examon-common/examon/plugin/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/examon-common/examon/plugin/examonapp.py b/lib/examon-common/examon/plugin/examonapp.py deleted file mode 100644 index 18d2d974..00000000 --- a/lib/examon-common/examon/plugin/examonapp.py +++ /dev/null @@ -1,73 +0,0 @@ -import os -import sys -import signal -import logging -import collections -from logging.handlers import RotatingFileHandler - -from examon.utils.executor import Executor -from examon.utils.config import Config -from examon.utils.daemon import Daemon - -#import multiprocessing_logging as mp_logging -from concurrent_log_handler import ConcurrentRotatingFileHandler - -class ExamonApp(Executor): - def __init__(self, executor='Daemon', configfilename=None): - if configfilename == None: - self.configfilename = os.path.splitext(os.path.basename(sys.argv[0]))[0] - else: - self.configfilename = configfilename - self.cfg = Config(self.configfilename + '.conf') - self.conf = self.cfg.get_defaults() - self.pidfile = None - self.daemon = None - self.runmode = 'run' - self.logger = logging.getLogger('examon') - super(ExamonApp, self).__init__(executor) - - def parse_opt(self): - self.conf = self.cfg.get_conf() - self.runmode = self.conf['runmode'] - self.pidfile = self.conf['PID_FILENAME'] - self.daemon = Daemon(self.pidfile, signal.SIGINT) - - def examon_tags(self): - return collections.OrderedDict() - - def set_logging(self): - LOGFILE_SIZE_B = int(self.conf['LOGFILE_SIZE_B']) - LOG_LEVEL = getattr(logging, self.conf['LOG_LEVEL'].upper(), None) - #logger = logging.getLogger('examon') - #handler = RotatingFileHandler(self.conf['LOG_FILENAME'], mode='a', maxBytes=LOGFILE_SIZE_B, backupCount=2) - handler = ConcurrentRotatingFileHandler(self.conf['LOG_FILENAME'], mode='a', maxBytes=LOGFILE_SIZE_B, backupCount=2) - #log_formatter = logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s] - [%(processName)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') - log_formatter = logging.Formatter(fmt='%(levelname)s - %(asctime)s - [%(processName)s] - [%(filename)s] - %(name)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') - handler.setFormatter(log_formatter) - self.logger.addHandler(handler) - self.logger.setLevel(LOG_LEVEL) - # if run print logs also to stdout - if self.runmode == 'run': - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(log_formatter) - self.logger.addHandler(handler) - #mp_logging.install_mp_handler() - - def run(self): - self.set_logging() - if ('stop' == self.runmode): - print " Terminating daemon..." - self.logger.info("Terminating daemon...") - self.daemon.stop() - sys.exit(0) - elif self.runmode in ['run','start','restart']: - if self.runmode == 'start': - print "Daemonize.." - self.daemon.start() - elif self.runmode == 'restart': - print "Restarting Daemon.." - self.daemon.restart() - else: - pass - print "Starting jobs..." - self.exec_par() diff --git a/lib/examon-common/examon/plugin/sensorreader.py b/lib/examon-common/examon/plugin/sensorreader.py deleted file mode 100644 index 2bf06be1..00000000 --- a/lib/examon-common/examon/plugin/sensorreader.py +++ /dev/null @@ -1,117 +0,0 @@ -import os -import sys -import copy -import time -import json -import logging -import collections -import thread - -from threading import Timer -from examon.db.kairosdb import KairosDB -from examon.transport.mqtt import Mqtt - - -def timeout_handler(): - logger = logging.getLogger(__name__) - logger.error('Timeout in main loop, exiting..') - logger.debug('Process PID: %d' % os.getpid()) - #sys.exit(1) - #thread.interrupt_main() - os._exit(1) - -class SensorReader: - """ - Examon Sensor adapter - """ - def __init__(self, conf, sensor): - self.conf = copy.deepcopy(conf) - self.sensor = sensor - self.tags = collections.OrderedDict() - self.read_data = None - self.dest_client = None - self.comp = self.conf['COMPRESS'] - self.logger = logging.getLogger(__name__) - - # if self.conf['OUT_PROTOCOL'] == 'kairosdb': - # self.dest_client = KairosDB(self.conf['K_SERVERS'], self.conf['K_PORT'], self.conf['K_USER'], self.conf['K_PASSWORD']) - # elif self.conf['OUT_PROTOCOL'] == 'mqtt': - # # TODO: add MQTT format in conf - # self.dest_client = Mqtt(self.conf['MQTT_BROKER'], self.conf['MQTT_PORT'], format=self.conf['MQTT_FORMAT'], outtopic=self.conf['MQTT_TOPIC']) - # self.dest_client.run() - - def add_tag_v(self, v): - """Sanitize tag values""" - if (v is not None) and (v is not u'') and (v is not 'None'): - ret = v.replace(' ','_').replace('/','_').replace('+','_').replace('#','_') - else: - ret = '_' - return ret - - def add_payload_v(self, v): - """Sanitize payload values""" - if (v is not None) and (v is not u'') and (v is not 'None'): - if isinstance(v, basestring): - ret = v.replace(';','_') - else: - ret = v - else: - ret = '_' - return ret - - def add_tags(self, tags): - self.tags = copy.deepcopy(tags) - - def get_tags(self): - return copy.deepcopy(self.tags) - - def run(self): - if not self.read_data: - raise Exception("'read_data' must be implemented!") - - if self.conf['OUT_PROTOCOL'] == 'kairosdb': - self.dest_client = KairosDB(self.conf['K_SERVERS'], self.conf['K_PORT'], self.conf['K_USER'], self.conf['K_PASSWORD']) - elif self.conf['OUT_PROTOCOL'] == 'mqtt': - # TODO: add MQTT format in conf - self.dest_client = Mqtt(self.conf['MQTT_BROKER'], self.conf['MQTT_PORT'], username=self.conf['MQTT_USER'], password=self.conf['MQTT_PASSWORD'], format=self.conf['MQTT_FORMAT'], outtopic=self.conf['MQTT_TOPIC'], dryrun=self.conf['DRY_RUN']) - self.dest_client.run() - - TS = float(self.conf['TS']) - - while True: - try: - self.logger.debug("Start timeout timer") - timeout_timer = Timer(10*TS, timeout_handler) #timeout after 3*sampling time - timeout_timer.start() - - t0 = time.time() - #if self.read_data: - worker_id, payload = self.read_data(self) - t1 = time.time() - #print "Retrieved and processed %d nodes in %f seconds" % (len(res),(t1-t0),) - self.logger.info("Worker [%s] - Retrieved and processed %d metrics in %f seconds" % (worker_id, len(payload),(t1-t0),)) - #print json.dumps(res) - #sys.exit(0) - t0 = time.time() - self.dest_client.put_metrics(payload, comp=self.comp) - t1 = time.time() - #print json.dumps(payload[0:3], indent=4) - # print "Worker %s:...............insert: %d sensors, time: %f sec, insert_rate %f sens/sec" % (worker_id, \ - # len(payload),\ - # (t1-t0),\ - # len(payload)/(t1-t0), ) - self.logger.debug("Worker [%s] - Insert: %d sensors, time: %f sec, insert_rate: %f sens/sec" % (worker_id, \ - len(payload),\ - (t1-t0),\ - len(payload)/(t1-t0), )) - except Exception: - self.logger.exception('Uncaught exception in main loop!') - self.logger.debug("Cancel timeout timer") - timeout_timer.cancel() - continue - - self.logger.debug("Cancel timeout timer") - timeout_timer.cancel() - - self.logger.debug("Start new loop") - time.sleep(TS - (time.time() % TS)) diff --git a/lib/examon-common/examon/transport/__init__.py b/lib/examon-common/examon/transport/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/examon-common/examon/transport/mqtt.py b/lib/examon-common/examon/transport/mqtt.py deleted file mode 100644 index 3ad0efcc..00000000 --- a/lib/examon-common/examon/transport/mqtt.py +++ /dev/null @@ -1,189 +0,0 @@ -# -*- coding: utf-8 -*- -""" - Mqtt.py - MQTT protocol handler - - Copyright (c) 2014, francesco.beneventi@unibo.it - -""" - -import sys -import zlib -import gzip -import json -import struct -import StringIO -import logging -import paho.mqtt.client as mosquitto - - - -class Mqtt(object): - """ - MQTT client - """ - def __init__(self, brokerip, brokerport, username=None, password=None, format='csv', intopic=None, outtopic=None, qos=0, retain=False, dryrun=False): - self.brokerip = brokerip - self.brokerport = brokerport - self.intopic = intopic - self.outtopic = outtopic - self.qos = qos - self.retain = retain - self.dryrun = dryrun - self.client = mosquitto.Mosquitto() - if username: - self.client.username_pw_set(username, password) - self.client.on_connect = self.on_connect - self.client.on_message = self.on_message - self.client.on_log = self.on_log - self.status = 1 # ok - self.logger = logging.getLogger(__name__) - - # set msg format: default = 'csv' - if format == 'csv': - self.put_metrics = self._put_metrics_csv - elif format == 'json': - self.put_metrics = self._put_metrics_json - elif format == 'bulk': - self.put_metrics = self._put_metrics_json_bulk - - - def process(self, client, msg): - """ - Stream processing method. Override - """ - pass - - def on_log(self, client, userdata, level, buff): - self.logger.debug('MQTT logs: %s' % (buff)) - - def on_connect(self, client, userdata, flags, rc): # paho - #def on_connect(self, client, userdata, rc): - """ - On connect callback - """ - if int(rc) != 0: - self.logger.error('Error in connect. Result code: %s' % (str(rc))) - self.logger.info('Closing the MQTT connection') - self.client.disconnect() - self.status = 0 # error - else: - self.logger.info("Connected with result code %s" % (str(rc))) - if self.intopic: - self.logger.info("Subscribing to: %s" % (self.intopic)) - self.client.subscribe(self.intopic) - - # The callback for when a PUBLISH message is received from the server. - def on_message(self, client, userdata, msg): - """ - On message callback - """ - self.process(client,msg) - - def _compress(self, payload): - """ - Compress payload. TODO: replace with blosc - """ - s = StringIO.StringIO() - with gzip.GzipFile(fileobj=s, mode='w') as g: - g.write(payload) - return s.getvalue() - - def _put_metrics_csv(self, metrics, comp=False): - """ - One value per message: csv. - Topic is a / sequence obtained from metric['tags'] dict - Payload is a string cat ; - """ - if not self.status: - self.logger.error('Bad client status. Exit.') - sys.exit(1) - - for metric in metrics: - # build value - payload = str(metric['value']).encode('utf-8') - # skip if no value - if payload == '': - continue - payload += (";%.3f" % ((metric['timestamp'])/1000)) - payload = str(payload) - if comp: - payload = self._compress(payload) # TODO: find a better way. This manage both strings and floats - # build topic - topic = '/'.join([(val).replace('/','_').encode('utf-8') for pair in metric['tags'].items() for val in pair]) - topic += '/' + (metric['name']).encode('utf-8') - # sanitize - topic = topic.replace(' ','_').replace('+','_').replace('#','_') - topic = (topic).decode('utf-8') - # publish - self.logger.debug('[MqttPub] Topic: %s - Payload: %s' % (topic,str(payload))) - self._publish(topic, payload) - - - def _put_metrics_json(self, metrics, comp=False): - """ - One value per message: json. - Topic is a pre-defined value (outtopic) - Payload is the json obtained from metric - """ - if not self.status: - self.logger.error('Bad client status. Exit.') - sys.exit(1) - - for metric in metrics: - # build topic - topic = self.outtopic - # build value - if comp: - payload = self._compress(json.dumps(metric)) - else: - payload = json.dumps(metric) - # publish - self.logger.debug('[MqttPub] Topic: %s - Payload: %s' % (topic,json.dumps(metric))) - self._publish(topic, payload) - - - def _put_metrics_json_bulk(self, metrics, comp=True): - """ - Multiple values per message. - Topic is a pre-defined value (outtopic) - Payload is the (compressed) list of metrics - """ - if not self.status: - self.logger.error('Bad client status. Exit.') - sys.exit(1) - - # build topic - topic = self.outtopic - # build value - if comp: - payload = self._compress(json.dumps(metrics)) - else: - payload = json.dumps(metrics) - # publish - self.logger.debug('[MqttPub] Topic: %s - Payload: %s' % (topic,json.dumps(metrics))) - self._publish(topic, payload) - - def _publish(self, topic, payload): - if not self.dryrun: - try: - self.client.publish(topic, payload=payload, qos=self.qos, retain=self.retain) - except: - self.logger.exception('Exception in MQTT publish. Exit.') - sys.exit(1) - - def run(self): - """ - Connect and start MQTT FSM - """ - rc = -1 - self.logger.info('Connecting to MQTT server: %s:%s' % (self.brokerip,self.brokerport)) - try: - rc = self.client.connect(self.brokerip, port=int(self.brokerport)) - self.logger.debug('Connect rc: %d' % (rc)) - if rc != 0: - raise - except: - self.logger.exception('Exception in MQTT connect, rc: %d' % (rc)) - sys.exit(1) - self.logger.info('MQTT started') - self.client.loop_start() diff --git a/lib/examon-common/examon/utils/__init__.py b/lib/examon-common/examon/utils/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/examon-common/examon/utils/config.py b/lib/examon-common/examon/utils/config.py deleted file mode 100644 index ffa98d11..00000000 --- a/lib/examon-common/examon/utils/config.py +++ /dev/null @@ -1,48 +0,0 @@ - -import argparse -import ConfigParser - - -class Config: - def __init__(self, configfile): - self.configfile = configfile - self.defaults = {} - self.parser = argparse.ArgumentParser() - # default args - self.parser.add_argument('runmode', choices=['run','start','restart','stop'], help='Run mode') - self.parser.add_argument('-b', dest='MQTT_BROKER', help='IP address of the MQTT broker') - self.parser.add_argument('-p', dest='MQTT_PORT', help='Port of the MQTT broker') - self.parser.add_argument('-t', dest='MQTT_TOPIC', help='MQTT topic') - self.parser.add_argument('-s', dest='TS', help='Sampling time (seconds)') - self.parser.add_argument('-x', dest='PID_FILENAME', help='pid filename') - self.parser.add_argument('-l', dest='LOG_FILENAME', help='log filename') - self.parser.add_argument('-d', dest='OUT_PROTOCOL', choices=['mqtt','kairosdb'], default='mqtt', help='select where to send data (default: mqtt)') - self.parser.add_argument('-f', dest='MQTT_FORMAT', choices=['csv','json','bulk'], default='csv', help='MQTT payload format (default: csv)') - self.parser.add_argument('--compress', dest='COMPRESS', action='store_true', default=False, help='enable payload compression (default: False)') - #self.parser.add_argument('--version', action='version', version=version) - self.parser.add_argument('--kairosdb-server', dest='K_SERVERS', help='kairosdb servers') - self.parser.add_argument('--kairosdb-port', dest='K_PORT', help='kairosdb port') - self.parser.add_argument('--kairosdb-user', dest='K_USER', help='kairosdb username') - self.parser.add_argument('--kairosdb-password', dest='K_PASSWORD', help='kairosdb password') - self.parser.add_argument('--logfile-size', dest='LOGFILE_SIZE_B', default=5*1024*1024, help='log file size (max) in bytes') - self.parser.add_argument('--loglevel', dest='LOG_LEVEL', choices=['DEBUG','INFO','WARNING','ERROR','CRITICAL'], default='INFO', help='log level') - self.parser.add_argument('--dry-run', dest='DRY_RUN', action='store_true', default=False, help='Data is not sent to the broker if True (default: False)') - self.parser.add_argument('--mqtt-user', dest='MQTT_USER', help='MQTT username', default=None) - self.parser.add_argument('--mqtt-password', dest='MQTT_PASSWORD', help='MQTT password', default=None) - - def get_defaults(self): - config = ConfigParser.RawConfigParser(allow_no_value=True) - config.optionxform = str #preserve caps - config.read(self.configfile) - for section in config.sections(): - self.defaults.update(dict(config.items(section))) - return self.defaults - - def update_optparser(self, parser): - self.parser = parser - - def get_conf(self): - args = vars(self.parser.parse_args()) - conf = self.get_defaults() - conf.update({k: v for k, v in args.items() if v is not None}) - return conf diff --git a/lib/examon-common/examon/utils/daemon.py b/lib/examon-common/examon/utils/daemon.py deleted file mode 100644 index 05a6d9a5..00000000 --- a/lib/examon-common/examon/utils/daemon.py +++ /dev/null @@ -1,150 +0,0 @@ -import os -import sys -import signal -import atexit -import time - -from signal import SIGTERM - -class Daemon: - """ - A generic daemon class. - - Usage: subclass the Daemon class and override the run() method - """ - def __init__(self, pidfile, sig=signal.SIGTERM, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): - self.stdin = stdin - self.stdout = stdout - self.stderr = stderr - self.pidfile = pidfile - self.sig = sig - - def daemonize(self): - """ - do the UNIX double-fork magic, see Stevens' "Advanced - Programming in the UNIX Environment" for details (ISBN 0201563177) - http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 - """ - try: - pid = os.fork() - if pid > 0: - # exit first parent - sys.exit(0) - except OSError, e: - sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) - sys.exit(1) - - # decouple from parent environment - #os.chdir("/") - os.setsid() - os.umask(0) - - # do second fork - try: - pid = os.fork() - if pid > 0: - # exit from second parent - sys.exit(0) - except OSError, e: - sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) - sys.exit(1) - - # redirect standard file descriptors - sys.stdout.flush() - sys.stderr.flush() - si = file(self.stdin, 'r') - so = file(self.stdout, 'a+') - se = file(self.stderr, 'a+', 0) - os.dup2(si.fileno(), sys.stdin.fileno()) - os.dup2(so.fileno(), sys.stdout.fileno()) - os.dup2(se.fileno(), sys.stderr.fileno()) - - # write pidfile - atexit.register(self.delpid) - pid = str(os.getpid()) - #file(self.pidfile,'w+').write("%s\n" % pid) - with open(self.pidfile,'w+') as fp: - fp.write("%s\n" % pid) - - def delpid(self): - os.remove(self.pidfile) - - def check_pid(self, pid): - """ Check For the existence of a unix pid. """ - try: - os.kill(pid, 0) - except OSError: - return False - else: - return True - - def start(self): - """ - Start the daemon - """ - # Check for a pidfile to see if the daemon already runs - try: - pf = file(self.pidfile,'r') - pid = int(pf.read().strip()) - pf.close() - except IOError: - pid = None - - if pid: - if self.check_pid(pid): - #message = "pidfile %s already exist. Daemon already running?\n" - message = "pidfile %s already exist and Daemon is running. Do nothing\n" - sys.stderr.write(message % self.pidfile) - sys.exit(1) - else: - message = "pidfile %s already exist and Daemon is NOT running. Restarting...\n" - sys.stderr.write(message % self.pidfile) - self.stop() - - # Start the daemon - self.daemonize() - self.run() - - def stop(self): - """ - Stop the daemon - """ - # Get the pid from the pidfile - try: - pf = file(self.pidfile,'r') - pid = int(pf.read().strip()) - pf.close() - except IOError: - pid = None - - if not pid: - message = "pidfile %s does not exist. Daemon not running?\n" - sys.stderr.write(message % self.pidfile) - return # not an error in a restart - - # Try killing the daemon process - try: - while True: - os.kill(pid, self.sig) - time.sleep(0.1) - except OSError, err: - err = str(err) - if err.find("No such process") > 0: - if os.path.exists(self.pidfile): - os.remove(self.pidfile) - else: - print str(err) - sys.exit(1) - - def restart(self): - """ - Restart the daemon - """ - self.stop() - self.start() - - def run(self): - """ - You should override this method when you subclass Daemon. It will be called after the process has been - daemonized by start() or restart(). - """ \ No newline at end of file diff --git a/lib/examon-common/examon/utils/executor.py b/lib/examon-common/examon/utils/executor.py deleted file mode 100644 index 4273b7f4..00000000 --- a/lib/examon-common/examon/utils/executor.py +++ /dev/null @@ -1,73 +0,0 @@ - -import sys -import time -import copy -import logging -from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor -from multiprocessing import Process - -class Executor(object): - """ - Execute concurrent workers - """ - def __init__(self, executor='ProcessPool', keepalivesec=60): - self.executor = executor - self.workers = [] - self.keepalivesec = keepalivesec - self.logger = logging.getLogger('examon') - - - def add_worker(self, *args): - self.workers.append(copy.deepcopy(args)) - - - def exec_par(self): - if self.executor == 'ProcessPool': - with ProcessPoolExecutor() as pexecutor: - pfutures = [pexecutor.submit(*worker) for worker in self.workers] - results = [r.result() for r in as_completed(pfutures)] - return results - if self.executor == 'Daemon': - daemons = [] - for worker in self.workers: - if len(worker) > 1: - d = Process(target=worker[0], args=worker[1:]) - else: - d = Process(target=worker[0]) - daemons.append({'d': d, 'worker': worker}) - d.daemon = True - d.start() - try: - ''' - Auto-restart on failure. - Check every keepalivesec seconds if the worker is alive, otherwise - we recreate it. - ''' - n_worker = len(self.workers) - if self.keepalivesec > 0: - while 1: - alive_workers = 0 - time.sleep(self.keepalivesec) - for d in daemons: - if d['d'].is_alive() == False: - self.logger.warning("Process [%s], died. Try to restart..." % (d['d'].name)) - if len(d['worker']) > 1: - d_ = Process(target=d['worker'][0], args=d['worker'][1:]) - else: - d_ = Process(target=d['worker'][0]) - d['d'] = d_ - d_.daemon = True - d_.start() - time.sleep(1) - if d_.is_alive() == True: - alive_workers +=1 - else: - alive_workers +=1 - self.logger.info("%d/%d workers alive" % (alive_workers, n_worker)) - - for d in daemons: - d['d'].join() - print "Workers job finished!" - sys.exit(0) - except KeyboardInterrupt: - print "Interrupted.." \ No newline at end of file diff --git a/lib/examon-common/setup.py b/lib/examon-common/setup.py deleted file mode 100644 index ba38f3b6..00000000 --- a/lib/examon-common/setup.py +++ /dev/null @@ -1,20 +0,0 @@ -# -*- coding: utf-8 -*- -from setuptools import setup - -setup(name='examon-common', - version='v0.2.3', - description='Examon common utilities', - url='http://github.com/fbeneventi/examon-common', - author='Francesco Beneventi', - author_email='francesco.beneventi@unibo.it', - license='MIT', - packages=['examon', 'examon.plugin', 'examon.utils', 'examon.db', 'examon.transport'], - install_requires=[ - 'requests == 2.21.0', - 'paho-mqtt == 1.4.0', - 'futures == 3.2.0', - 'setuptools == 40.6.3', - 'concurrent-log-handler == 0.9.16', - 'portalocker == 1.7.1' - ], - zip_safe=False) diff --git a/mkdocs.yml b/mkdocs.yml index f0638760..dde70b3d 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -71,6 +71,7 @@ nav: - Credits: "credits.md" - Contact Us: "contactus.md" - Publications: "publications.md" + - Contributing: "contributing.md" - Blog: - blog/index.md diff --git a/lib/examon-common/examon/db/__init__.py b/web/.gitkeep similarity index 100% rename from lib/examon-common/examon/db/__init__.py rename to web/.gitkeep diff --git a/web/examon-server/.gitignore b/web/examon-server/.gitignore index 83658ec5..88471e20 100644 --- a/web/examon-server/.gitignore +++ b/web/examon-server/.gitignore @@ -1,2 +1,6 @@ *.pyc *.log +flask +server.conf +*.bak +__pycache__ diff --git a/web/examon-server/doc.md b/web/examon-server/doc.md new file mode 100644 index 00000000..4ad147da --- /dev/null +++ b/web/examon-server/doc.md @@ -0,0 +1,154 @@ +# Examon API Documentation + +## Base URL + +http://172.16.47.112:5000/api/v1 + +## Authentication + +The authentication method is basic auth, using the provided username and password. + +## Endpoint: POST /examon/jobs/query + +This endpoint is used to query job information in the ExaMon system. The job information are stored in a table having the schema of the SLURM [job response API](https://slurm.schedmd.com/archive/slurm-22.05.10/rest_api.html#v0.0.38_job_response_properties) corresponding to the appropriate SLURM version. +In addition to the standard SLURM job properties, ExaMon adds the `energy` column to the table. It is a stringified JSON object with the following properties: + +- `job_id`: The ID of the job. +- `data_quality_(%)`: The data quality as a percentage. +- `version`: The version of the energy schema. +- `total_energy_consumption`: The total energy consumption for the job. +- `message`: A message about the data quality and score. +- `unit`: The unit of the energy consumption, such as "Wh". + +This endpoint supports pagination. To handle large tables, it is possible to define the time window, using the `tstart` and `tstop` properties, in order to limit the amount of data for each request. + +### Request + +The request should be a JSON object with the following properties: + +- `tags`: An object where each key is a tag name and the value is an array of tag values. For example, to filter by job_id, you would include `"tags": {"job_id": ["5296"]}`. +- `time_zone`: A string representing the time zone, such as "Europe/Rome". +- `aggrby`: This field is currently not used and should be set to null. +- `metrics`: An array of strings representing the job table name, such as `["job_info_E4red"]`. +- `limit`: Limits the number of rows in the table to a given value. +- `tstart`: A number representing the initial bound of the time window used to filter the data in milliseconds since the Unix epoch - Mandatory. +- `tstop`: A number representing the final bound of the time window used to filter the data in milliseconds since the Unix epoch - Optional. +- `groupby`: An array of objects, each with a `name` and `tags` property. The `name` should be "tag" and the `tags` should be an array of strings representing the selected column in the table. Use `"*"` as wildcard to select all columns of the table. + +### Response + +The response will be a serialized Pandas Dataframe in the `records` format, a collection of table rows, where each row is an array of `{Column_Name: Value}` objects. The columns returned are the ones specified in the `groupby` field of the request. +The response will be a JSON array of objects. Each object will have a key for each tag specified in the `groupby` field of the request. + +### Example + +In this example, we query for the `energy` column of the job table `job_info_E4red` corresponding to the job_id `5326`. + + +Request: + +```json +{ + "tags": { + "job_id": ["5326"] + }, + "time_zone": "Europe/Rome", + "aggrby": null, + "metrics": ["job_info_E4red"], + "limit": null, + "tstart": 1000, + "tstop": null, + "groupby": [ + { + "name": "tag", + "tags": ["energy"] + } + ] +} +``` + + +Response: + +```json +[ + { + "energy": "{\"job_id\": 5326, \"data_quality_(%)\": 100, \"version\": \"v0.1\", \"total_energy_consumption\": 6.388100000752343, \"message\": \"Missing nodes (%): 0.000000; Quality score (%): 100.000000; \", \"unit\": \"Wh\"}" + } +] +``` + +Notes: + +- In this example we filter by job_id so to extend the db search to the full table is suggested to use a low value (>0) for the mandatory tstart property. + + +Example + +This is a full example using the Python lunguage. + +```python +import requests +from requests.auth import HTTPBasicAuth + +# Endpoint +api_url = 'http://172.16.47.112:5000/api/v1/examon/jobs/query' + +# Replace these values with your actual username, and password +username = '' +password = '' + +# JSON data to be sent in the request body +data = { + "tags": { + "job_id": [ + "5326" + ] + }, + "time_zone": "Europe/Rome", + "aggrby": None, + "metrics": [ + "job_info_E4red" + ], + "limit": None, + "tstart": 1000, + "tstop": None, + "groupby": [ + { + "name": "tag", + "tags": [ + "energy" + ] + } + ] +} + +headers = { + 'Content-Type': 'application/json', + 'Accept-Encoding': 'gzip, deflate' +} + +# the payload should be encoded as follow: +json_data = json.dumps(json.dumps(data)).encode("utf-8") + +# Set up basic authentication +auth = HTTPBasicAuth(username, password) + +# Send POST request with JSON content and basic authentication +response = requests.post(api_url, data=json_data, auth=auth, headers=headers) + +# Check the response status code +if response.status_code == 200: + print("Request successful. Response:") + print(response.json()) +else: + print(f"Request failed with status code {response.status_code}. Response content:") + print(response.text) +``` + +Response: + +``` +Request successful. Response: +[{"energy":"{\"job_id\": 5326, \"data_quality_(%)\": 100, \"version\": \"v0.1\", \"total_energy_consumption\": 6.388100000752343, \"message\": \"Missing nodes (%): 0.000000; Quality score (%): 100.000000; \", \"unit\": \"Wh\"}"}] +``` \ No newline at end of file diff --git a/web/examon-server/example_server.conf b/web/examon-server/example_server.conf index 784742e3..b4744189 100644 --- a/web/examon-server/example_server.conf +++ b/web/examon-server/example_server.conf @@ -6,3 +6,7 @@ CASSANDRA_USER = '' CASSANDRA_PASSW = '' EXAMON_SERVER_HOST = 0.0.0.0 EXAMON_SERVER_PORT = 5000 +THREADS_NUM = 8 +SCHEDULER_TYPE = SLURM +CACHE_TYPE = simple +CACHE_TIMEOUT = 18000 \ No newline at end of file diff --git a/web/examon-server/requirements.txt b/web/examon-server/requirements.txt index d9fe025c..8639ad1e 100644 --- a/web/examon-server/requirements.txt +++ b/web/examon-server/requirements.txt @@ -1,20 +1,8 @@ -cassandra-driver==3.19.0 -certifi==2019.9.11 -chardet==3.0.4 -Click==7.0 -Flask==1.1.1 -Flask-HTTPAuth==3.3.0 -futures==3.3.0 -idna==2.8 -itsdangerous==1.1.0 -Jinja2==2.10.1 -MarkupSafe==1.1.1 -numpy==1.16.5 -pandas>=0.24.2 -python-dateutil==2.8.0 -pytz==2019.2 -requests==2.22.0 -six==1.12.0 -urllib3==1.25.6 -waitress==1.3.1 -Werkzeug==0.16.0 +cassandra-driver==3.29.2 +Flask==3.1.0 +Flask-HTTPAuth==4.8.0 +Flask-Caching +pandas==2.2.3 +pytz==2025.1 +requests==2.32.3 +waitress==3.0.2 diff --git a/web/examon-server/server.py b/web/examon-server/server.py index 5dc43252..b6d418cd 100644 --- a/web/examon-server/server.py +++ b/web/examon-server/server.py @@ -1,11 +1,13 @@ #!flask/bin/python from flask import Flask, jsonify, request, abort, Response -from cassandra.cluster import Cluster +from cassandra.cluster import Cluster, OperationTimedOut, NoHostAvailable from cassandra.auth import PlainTextAuthProvider +from cassandra.policies import ExponentialReconnectionPolicy from cassandra.query import dict_factory from cassandra.util import OrderedMapSerializedKey +import os import sys import json import pandas as pd @@ -20,65 +22,95 @@ import logging from logging.handlers import RotatingFileHandler -import ConfigParser +import configparser +from flask_caching import Cache +import time -LOGFILE_SIZE_B = 5*1024*1024 + +LOGFILE_SIZE_B = 5 * 1024 * 1024 LOG_LEVEL = logging.INFO LOGFILE = 'server.log' app = Flask(__name__, - static_url_path='', - static_folder='static/docs/html') + static_url_path='', + static_folder='static/docs/html') # enable gzipped responses gzip = Gzip(app) # enable basic auth auth = HTTPBasicAuth() -""" #conf = json.load(open('conf.json')) -c_auth = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASSW) -cluster = Cluster(contact_points=(CASSANDRA_IP,), auth_provider = c_auth) -session = cluster.connect(CASSANDRA_KEY_SPACE) -queries = {} """ +# Configure Flask-Caching +cache = Cache(app, config={ + 'CACHE_TYPE': 'simple', # Use in-memory cache + 'CACHE_DEFAULT_TIMEOUT': 18000 # 300 minutes +}) @auth.verify_password +@cache.memoize() def verify_password(username, password): ret = req.get(AUTH_URL, auth=RHTTPBasicAuth(username, password)) - logger.info('USER: %s ret: %s' % (username,str(ret.status_code),)) + logger.info('USER: %s ret: %s', username, str(ret.status_code)) if ret.status_code == 200: return True - else: - return False + return False + def get_prep_query(session, stmt): global queries query = queries.get(stmt) if query is None: - query = session.prepare(stmt) - queries[stmt]=query + query = session.prepare(stmt) + queries[stmt] = query return query def pandas_factory(colnames, rows): - # Convert tuple items of 'rows' into list (elements of tuples cannot be replaced) rows = [list(i) for i in rows] # Convert only 'OrderedMapSerializedKey' type list elements into dict for idx_row, i_row in enumerate(rows): for idx_value, i_value in enumerate(i_row): - if type(i_value) is OrderedMapSerializedKey: + if isinstance(i_value, OrderedMapSerializedKey): rows[idx_row][idx_value] = dict(rows[idx_row][idx_value]) - return [pd.DataFrame(rows, columns=colnames)] + df = pd.DataFrame(rows, columns=colnames) + df = localize_timestamps(df, 'UTC') + return [df] - -def get_jobs(stmt): +def get_jobs(stmt, fetch_size=20000, max_retries=3): # Set a default fetch size + """Execute query with automatic retry on connection failures.""" + df = pd.DataFrame() - for page in session.execute(stmt, timeout=120.0): - df = df.append(page, ignore_index=True) - return df + last_exception = None + + for attempt in range(max_retries): + try: + # Set the fetch size for the query + statement = session.execute(stmt, timeout=120.0) + statement.fetch_size = fetch_size + for page in statement: + df = pd.concat([df, pd.DataFrame(page)], ignore_index=True) + logger.info('QUERYBUILDER: Number of records: %s', str(len(df))) + return df + + except (OperationTimedOut, NoHostAvailable) as e: + last_exception = e + logger.warning(f'Connection issue during query execution (attempt {attempt + 1}/{max_retries}): {e}') + + if attempt < max_retries - 1: + # Short delay before retry to allow driver reconnection + time.sleep(1) + continue + else: + logger.error(f'Query failed after {max_retries} attempts due to connection issues') + raise + except Exception as e: + # For non-connection related errors, don't retry + logger.error(f'Query failed with non-connection error: {e}') + raise def qb_get_tables(query): @@ -87,10 +119,11 @@ def qb_get_tables(query): tables = ','.join(query['metrics']) return tables + def qb_get_columns(query): columns = '*' if query['groupby']: - if type(query['groupby']) == list: + if isinstance(query['groupby'], list): if len(query['groupby'][0]['tags']) > 0: columns = ','.join(query['groupby'][0]['tags']) else: @@ -102,43 +135,63 @@ def qb_get_columns(query): columns = '*' return columns + +def qb_get_aggrby(query): + aggrby = '' + if query.get('aggrby') and len(query['aggrby']) > 0: + aggrby = str(query['aggrby'][0]['name']) + return aggrby + + def qb_get_where(query): _where = '' if query['tags'] and (len(query['tags']) > 0): - for k,v in query['tags'].iteritems(): + for k, v in query['tags'].items(): for i in v: _where += ' AND ' - if k.lower() in ['user_id','job_id']: - _where += "{} = {}".format(str(k),str(i)) - elif k == 'node': - _where += "cpus_alloc_layout CONTAINS KEY '{}'".format(str(i)) - else: - _where += "{} = '{}'".format(str(k),str(i)) + # Different handling based on scheduler + if SCHEDULER_TYPE == 'SLURM': + if k in ['user_id', 'job_id']: + _where += "{} = {}".format(str(k), str(i)) + elif k == 'node': + _where += "cpus_alloc_layout CONTAINS KEY '{}'".format(str(i)) + else: + _where += "{} = '{}'".format(str(k), str(i)) + else: # PBS + if k.lower() in ['user_id', 'exit_status']: + _where += "{} = {}".format(str(k), str(i)) + elif k == 'node': + _where += "cpus_alloc_layout CONTAINS KEY '{}'".format(str(i)) + else: + _where += "{} = '{}'".format(str(k), str(i)) return _where + def qb_get_tstart(query): tstart = '' if query['tstart']: tstart = query['tstart'] return tstart + def qb_get_tstop(query): tstop = '' if query['tstop']: tstop = query['tstop'] return tstop + def qb_get_limit(query): limit = '' if query['limit']: limit = str(query['limit']) return limit + def query_builder(query): - """build a cassandra query. + """Build a cassandra query. Receive a serialized Query object and return a CQL query statement - """ cass_query = 'SELECT ' if qb_get_columns(query): @@ -149,11 +202,35 @@ def query_builder(query): if qb_get_tstart(query): tstart = qb_get_tstart(query) cass_query += ' WHERE ' - cass_query += '(start_time, end_time) >= ' + "({},{})".format(tstart, tstart) + + aggrby = qb_get_aggrby(query) + if SCHEDULER_TYPE == 'SLURM': + if aggrby == 'started': + cass_query += 'start_time >= ' + "{}".format(tstart) # return started job between dates + else: + cass_query += '(start_time, end_time) >= ' + "({},{})".format(tstart, tstart) + else: # PBS + if aggrby == 'started': + cass_query += 'stime >= ' + "{}".format(tstart) # return started job between dates + else: + cass_query += '(stime, mtime) >= ' + "({},{})".format(tstart, tstart) # executed job between dates + if qb_get_tstop(query): tstop = qb_get_tstop(query) cass_query += ' AND ' - cass_query += '(start_time, end_time) <= ' + "({},{})".format(tstop, tstop) + + aggrby = qb_get_aggrby(query) + if SCHEDULER_TYPE == 'SLURM': + if aggrby == 'started': + cass_query += 'start_time <= ' + "{}".format(tstop) # return started job between dates + else: + cass_query += '(start_time, end_time) <= ' + "({},{})".format(tstop, tstop) + else: # PBS + if aggrby == 'started': + cass_query += 'stime <= ' + "{}".format(tstop) # return started job between dates + else: + cass_query += '(stime, mtime) <= ' + "({},{})".format(tstop, tstop) # executed job between dates + if qb_get_where(query): cass_query += qb_get_where(query) if qb_get_limit(query): @@ -162,19 +239,30 @@ def query_builder(query): cass_query += ' ALLOW FILTERING' return cass_query +def localize_timestamps(df, tz): + for col in df.columns: + if pd.api.types.is_datetime64_any_dtype(df[col]): + # If the column is naive, localize it to UTC + if df[col].dt.tz is None: + df[col] = df[col].dt.tz_localize(tz) + # If the column has a timezone, convert it to UTC + else: + df[col] = df[col].dt.tz_convert(tz) + return df + @app.route('/') @auth.login_required def index(): return "Examon Server" - + @app.route('/docs') @app.route('/') @auth.login_required def serve_sphinx_docs(path='index.html'): return app.send_static_file(path) - + @app.route('/api/v1/examon/jobs/query', methods=['POST']) @auth.login_required def get_jobs_test(): @@ -182,20 +270,28 @@ def get_jobs_test(): logger.error('QUERY: No payload. Response: 400') abort(400) query = json.loads(request.json) - logger.info('QUERY: Received query: %s' % (query,)) + logger.info('QUERY: Received query: %s', query) try: stmt = query_builder(query) - logger.info('QUERYBUILDER: %s' % (stmt,)) - df_json = get_jobs(stmt).to_json(date_format='iso', orient='records') + logger.info('QUERYBUILDER: %s', stmt) + df_json = get_jobs(stmt, max_retries=5).to_json(date_format='iso', orient='records') except Exception as e: - logger.error('QUERY: %s' % (stmt,)) + logger.error('QUERY: %s', stmt) import traceback - print traceback.format_exc() + print(traceback.format_exc()) + + # Check if it's a connection-related error + if isinstance(e, (OperationTimedOut, NoHostAvailable)): + logger.error('CASSANDRA CONNECTION: %s', str(e)) + return jsonify({'error': 'Database temporarily unavailable, please try again later'}), 503 + + # Handle other errors if hasattr(e, 'message'): - logger.error('CASSANDRA: %s' % (e.message,)) - return jsonify(e.message), 400 - logger.error('QUERY: response: 400') - abort(400) + logger.error('CASSANDRA: %s', e.message) + return jsonify({'error': e.message}), 400 + else: + logger.error('QUERY: Unexpected error: %s', str(e)) + return jsonify({'error': 'Query execution failed'}), 400 logger.info('QUERY: response: 200') return jsonify(df_json), 200 @@ -207,33 +303,98 @@ def get_jobs_test_v2(): logger.error('QUERY: No payload. Response: 400') abort(400) query = request.json - logger.info('QUERY: Received query: %s' % (query,)) + logger.info('QUERY: Received query: %s', query) try: stmt = query_builder(query) - logger.info('QUERYBUILDER: %s' % (stmt,)) + logger.info('QUERYBUILDER: %s', stmt) df_ = get_jobs(stmt) - logger.info('QUERYBUILDER: Number of records: %s' % (str(len(df_)),)) - #logger.info('energy type: %s' % (df_['energy'].dtype,)) if 'energy' in df_: df_['energy'] = df_['energy'].apply(lambda x: json.loads(x) if not pd.isnull(x) else {}) df_json = df_.to_json(date_format='iso', orient='records') except Exception as e: - logger.error('QUERY: %s' % (stmt,)) + logger.error('QUERY: %s', stmt) import traceback - print traceback.format_exc() + print(traceback.format_exc()) + + # Check if it's a connection-related error + if isinstance(e, (OperationTimedOut, NoHostAvailable)): + logger.error('CASSANDRA CONNECTION: %s', str(e)) + return jsonify({'error': 'Database temporarily unavailable, please try again later'}), 503 + + # Handle other errors if hasattr(e, 'message'): - logger.error('CASSANDRA: %s' % (e.message,)) - return jsonify(e.message), 400 - abort(400) + logger.error('CASSANDRA: %s', e.message) + return jsonify({'error': e.message}), 400 + else: + logger.error('QUERY: Unexpected error: %s', str(e)) + return jsonify({'error': 'Query execution failed'}), 400 logger.info('QUERY: response: 200') + logger.debug('QUERY: response: %s', json.dumps(df_json, indent=4)) #return jsonify(json.loads(df_json)), 200 return Response(df_json, mimetype='application/json') + +def connect_to_cassandra_with_retry(cassandra_ip, cassandra_user, cassandra_passw, cassandra_keyspace, max_retries=30, initial_delay=1, max_delay=60): + """Connect to Cassandra with retry logic and exponential backoff. + + """ + delay = initial_delay + last_exception = None + + for attempt in range(max_retries): + try: + logger.info(f"Attempting to connect to Cassandra (attempt {attempt + 1}/{max_retries})...") + + c_auth = PlainTextAuthProvider(username=cassandra_user, password=cassandra_passw) + cluster = Cluster(contact_points=(cassandra_ip,), auth_provider=c_auth, reconnection_policy=ExponentialReconnectionPolicy(base_delay=1, max_delay=60)) + session = cluster.connect(cassandra_keyspace) + + logger.info("Successfully connected to Cassandra") + return cluster, session + + except Exception as e: + last_exception = e + logger.warning(f"Failed to connect to Cassandra (attempt {attempt + 1}/{max_retries}): {e}") + + if attempt < max_retries - 1: # Don't sleep on the last attempt + logger.info(f"Retrying in {delay} seconds...") + time.sleep(delay) + + # Exponential backoff with max limit + delay = min(delay * 2, max_delay) + + # If we get here, all retries failed + logger.error(f"Failed to connect to Cassandra after {max_retries} attempts") + raise last_exception + + if __name__ == '__main__': + # logging + logger = logging.getLogger("waitress") + handler = RotatingFileHandler(LOGFILE, mode='a', maxBytes=LOGFILE_SIZE_B, backupCount=2) + log_formatter = logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%m/%d/%Y %I:%M:%S %p') + handler.setFormatter(log_formatter) + logger.addHandler(handler) + logger.setLevel(LOG_LEVEL) + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(log_formatter) + logger.addHandler(handler) + + # Check if configuration file exists + if not os.path.isfile('server.conf'): + logger.error("Configuration file 'server.conf' not found. Please install the workload scheduler plugin and configure the server.conf file.") + sys.exit(1) # Load Config. - config = ConfigParser.RawConfigParser() + config = configparser.RawConfigParser() config.read('server.conf') + + # Check if keyspace is defined + if not config.has_option('Server', 'CASSANDRA_KEY_SPACE') or not config.get('Server', 'CASSANDRA_KEY_SPACE'): + logger.error("CASSANDRA_KEY_SPACE is not defined in the configuration file.") + sys.exit(1) + AUTH_URL = config.get('Server', 'AUTH_URL') CASSANDRA_IP = config.get('Server', 'CASSANDRA_IP') CASSANDRA_KEY_SPACE = config.get('Server', 'CASSANDRA_KEY_SPACE') @@ -241,25 +402,24 @@ def get_jobs_test_v2(): CASSANDRA_PASSW = config.get('Server', 'CASSANDRA_PASSW') EXAMON_SERVER_HOST = config.get('Server', 'EXAMON_SERVER_HOST') EXAMON_SERVER_PORT = int(config.get('Server', 'EXAMON_SERVER_PORT')) + THREADS_NUM = int(config.get('Server', 'THREADS_NUM')) + - # logging - logger = logging.getLogger("waitress") - handler = RotatingFileHandler(LOGFILE, mode='a', maxBytes=LOGFILE_SIZE_B, backupCount=2) - log_formatter = logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') - handler.setFormatter(log_formatter) - logger.addHandler(handler) - logger.setLevel(LOG_LEVEL) - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(log_formatter) - logger.addHandler(handler) - #conf = json.load(open('conf.json')) - c_auth = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASSW) - cluster = Cluster(contact_points=(CASSANDRA_IP,), auth_provider = c_auth) - session = cluster.connect(CASSANDRA_KEY_SPACE) + # Load scheduler type (SLURM or PBS) + SCHEDULER_TYPE = str(config.get('Server', 'SCHEDULER_TYPE', fallback='SLURM')) + logger.info("Starting examon server with scheduler type: %s", SCHEDULER_TYPE) + + # Connect to Cassandra with retry logic + cluster, session = connect_to_cassandra_with_retry( + cassandra_ip=CASSANDRA_IP, + cassandra_user=CASSANDRA_USER, + cassandra_passw=CASSANDRA_PASSW, + cassandra_keyspace=CASSANDRA_KEY_SPACE + ) queries = {} # setup cassandra row factory session.row_factory = pandas_factory # run - serve(app, host=EXAMON_SERVER_HOST, port=EXAMON_SERVER_PORT, threads=8) + serve(app, host=EXAMON_SERVER_HOST, port=EXAMON_SERVER_PORT, threads=THREADS_NUM) diff --git a/web/examon-server/static/docs/html/downloads/examon-cli-stable.zip b/web/examon-server/static/docs/html/downloads/examon-cli-stable.zip index 9dfd98eb..e422aff5 100644 Binary files a/web/examon-server/static/docs/html/downloads/examon-cli-stable.zip and b/web/examon-server/static/docs/html/downloads/examon-cli-stable.zip differ