diff --git a/carto/analysis.py b/carto/analysis.py new file mode 100644 index 0000000..5e9e120 --- /dev/null +++ b/carto/analysis.py @@ -0,0 +1,729 @@ +from datetime import datetime +import time +import re +import os +import json +from .color import colored, Color +import base64 + +AFW_API_URL = 'api/{api_version}/analysis' + +HEADERS = { + 'content-type': 'application/json', + 'accept': 'application/json' +} + + +class AnalysisClient(object): + def __init__(self, auth_client, api_version='v4'): + self.auth_client = auth_client + self.api_url = AFW_API_URL.format(api_version=api_version) + + self.api_key = getattr(self.auth_client, 'api_key', None) + self.username = getattr(self.auth_client, 'username', None) + self.base_url = self.auth_client.base_url + + def _url(self, *parts): + return '/'.join((self.api_url,) + parts) + + def list_jobs(self): + res = self.auth_client.send(self._url('jobs'), 'GET', headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def create_job(self, job_definition): + url = self._url('jobs') + res = self.auth_client.send(url, 'POST', json=job_definition, headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def read_job(self, job_id): + url = self._url('jobs', job_id) + res = self.auth_client.send(url, 'GET', headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def schedule_job( + self, job_id, job_params, schedule=None, on_changed_table=None + ): + url = self._url('jobs', job_id, 'schedule') + params = {} + if schedule: + params['schedule'] = schedule + if on_changed_table: + params['on_changed_table'] = on_changed_table + res = self.auth_client.send(url, 'POST', params=params, json=job_params, headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def list_schedules(self, job_id): + url = self._url('jobs', job_id, 'schedules') + res = self.auth_client.send(url, 'GET', headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def schedule_status(self, job_id, schedule_id): + url = self._url('jobs', job_id, 'schedule', schedule_id) + res = self.auth_client.send(url, 'GET', headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def schedule_executions(self, job_id, schedule_id): + url = self._url('jobs', job_id, 'schedule', schedule_id, 'executions') + res = self.auth_client.send(url, 'GET', headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def execution_log( + self, job_id, schedule_id, execution_id, type='stdout', offset=None + ): + url = self._url( + 'jobs', job_id, + 'schedule', schedule_id, + 'execution', execution_id, + 'log' + ) + res = self.auth_client.send(url, 'GET', params={'type': type, 'offset': offset}, headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def execution_logs(self, job_id, schedule_id, execution_id): + url = self._url( + 'jobs', job_id, + 'schedule', schedule_id, + 'execution', execution_id, + 'debug' + ) + res = self.auth_client.send(url, 'GET', headers=HEADERS) + logs = self.auth_client.get_response_data(res, False) + if logs is not None: + logs = logs.decode() + return logs + + def remove_job(self, job_id): + url = self._url('jobs', job_id) + res = self.auth_client.send(url, 'DELETE', headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def stop_schedule(self, job_id, schedule_id): + url = self._url('jobs', job_id, 'schedule', schedule_id) + res = self.auth_client.send(url, 'DELETE', headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + def stop_schedules(self, job_id): + url = self._url('jobs', job_id, 'schedules') + res = self.auth_client.send(url, 'DELETE', headers=HEADERS) + return self.auth_client.get_response_data(res, True) + + +TASK_GROUP = 'analysis' # we have always one task group named analysis + + +class DefaultOutput: + + @staticmethod + def puts(text): + print(text) + + @staticmethod + def clear(): + os.system('clear') + + +def _color_logs(logs): + if logs and Color.enabled: + logs = re.sub(r'^EVENTS$', Color.COLORS['blue'], logs, flags=re.MULTILINE) + logs = re.sub(r'^STDOUT$', Color.END + Color.COLORS['yellow'], logs, flags=re.MULTILINE) + logs = re.sub(r'^STDERR$', Color.END + Color.COLORS['red'], logs, flags=re.MULTILINE) + logs = logs + Color.END + return logs + + +def _status_color(status): + if status == 'purged': + color = ['blink', 'gray'] + elif status == 'pending': + color = 'pink' + elif status == 'dead': + color = 'orange' + elif status == 'running': + color = ['bold', 'blue'] + else: + color = [] + return color + + +class SessionIds: + + def __init__(self, file=None): + self._tokens = {} + self._counters = {} + self._ids_from_tokens = {} + self._file = file + self._load() + + def _load(self): + if self._file and os.path.isfile(self._file): + with open(self._file, "r") as input: + data = json.load(input) + self._tokens = data['tokens'] + self._counters = data['counters'] + self._ids_from_tokens = data['ids'] + + def _save(self): + if self._file: + data = { + 'tokens': self._tokens, + 'counters': self._counters, + 'ids': self._ids_from_tokens + } + with open(self._file, "w") as output: + output.write(json.dumps(data)) + + def job_token(self, job): + return self._session_token(job.name(), [job.job_id]) + + def schedule_token(self, job_id, schedule_id): + job_token = self._tokens.get(job_id) + return self._session_token(job_token + '/', [job_id, schedule_id]) + + def execution_token(self, job_id, schedule_id, execution_id): + schedule_token = self._tokens.get(schedule_id) + # assert + # [job_id, schedule_id] == self.__ids_from_tokens(schedule_token) + return self._session_token( + schedule_token + '/', [job_id, schedule_id, execution_id] + ) + + def _session_token(self, prefix, ids): + id = ids[-1] + found_id = self._tokens.get(id) + if found_id: + return found_id + count = self._counters.get(prefix, 0) + 1 + self._counters[prefix] = count + if len(ids) == 1: + # job identifier + if count > 1: + token = '_'.join([prefix, str(count)]) + else: + token = prefix + else: + token = prefix + str(count) + self._tokens[id] = token + self._ids_from_tokens[token] = ids + self._save() + return token + + def ids(self, token): + return self._ids_from_tokens.get(token) + + +class AnalysisContext: + """Analysis objects hold a context used to access the AAPI. + + They contain both the AAPI interface and parameters that define + how to use it when updating the state of AAPI objects. + + Attributes: + aapi (AApi): AApi REST client interface + minimum_update_time (float): Minimum time in seconds to wait before + updating the state of an object + autosleep: When requesting an early update sleep till the minimum + time (rather than skipping the update) + """ + + def __init__( + self, + auth_client, + api_version='v4', + minimum_update_time=1, + autosleep=True, + persist_tokens=None, + output=DefaultOutput + ): + """ + Args: + api_config: dictionary with AAPI configuration + (url_base, user, api_key) + minimum_update_time + autosleep + """ + self.api = AnalysisClient(auth_client, api_version) + self.minimum_update_time = minimum_update_time + self.autosleep = autosleep + self.tokens = SessionIds(persist_tokens) + self.output = output + + def create_job(self, job_definition): + result = self.api.create_job(job_definition) + return self.job(result['job_id']) + + def jobs(self): + return [self.job(j['job_id']) for j in self.api.list_jobs()] + + def job(self, job_id): + return Job(self, job_id) + + def job_by_name(self, name): + return next(job for job in self.jobs() if job.name() == name) + + def job_by_token(self, token): + return self.job(*self.tokens.ids(token)) + + def schedule_by_token(self, schedule_token): + return Schedule( + self, *self.tokens.ids(schedule_token) + ) + + def execution_by_token(self, execution_token): + # workaround: executions only reachable through + # schedules ATM + job_id, schedule_id, execution_id = self.tokens.ids(execution_token) + schedule = Schedule(self, job_id, schedule_id) + return schedule.execution(execution_id) + + +class Updatable: + """Base class for updatable AAPI Objects + """ + + def __init__(self, context): + self._context = context + self.update_time = None + + def _time_since_update(self): + return time.time() - (self.update_time or 0) + + def _updated_now(self): + self.update_time = time.time() + + def _need_to_update(self): + t = self._time_since_update() + wait = self.update_time and t < self._context.minimum_update_time + if wait: + if self._context.autosleep: + time.sleep(self._context.minimum_update_time - t) + else: + return False + return True + + +class Job(Updatable): + def __init__(self, context, job_id): + super().__init__(context) + self.job_id = job_id + self.definition = None + self.schedule_ids = None + self.update() + + def update(self): + if self._need_to_update(): + self.definition = self._context.api.read_job(self.job_id) + self.token = self._context.tokens.job_token(self) + self.schedule_ids = [ + s['schedule_id'] + for s in self._context.api.list_schedules(self.job_id) + ] + + def name(self): + return self.definition['name'] + + def version(self): + return self.definition['version'] + + def user(self): + return self.definition['user'] + + def image(self): + return self.definition['image'] + + def schedule(self, schedule_id): + return Schedule(self._context, self.job_id, schedule_id) + + def schedule_by_token(self, schedule_token): + return Schedule( + self._context, *self._context.ids(schedule_token) + ) + + def schedules(self): + return [self.schedule(id) for id in self.schedule_ids] + + def info(self, details=False): + info = '{token} {version} ({user})'.format( + token=colored('bold', self.token), + version=self.version(), + user=self.user() + ) + if (details): + info += ' ' + self.details() + return info + + def details(self): + return 'image: {image}'.format(image=self.image()) + + def execute(self, params, schedule=None, table=None): + schedule = self._context.api.schedule_job( + self.job_id, + job_params=params, + schedule=schedule, + on_changed_table=table + ) + schedule_id = schedule['schedule_id'] + return self.schedule(schedule_id) + + def execute_and_follow(self, params, schedule=None, table=None): + schedule = self.execute(params, schedule, table) + schedule.follow() + return schedule + + def execute_and_get_output(self, params, schedule=None, table=None, verbose=False, decodeJson=False): + schedule = self.execute(params, schedule, table) + if not (schedule.is_periodic() or schedule.is_triggered()): + execution = schedule.follow(not verbose) + return execution and execution.output(decodeJson) + return schedule + + def remove(self): + return self._context.api.remove_job(self.job_id) + + def stop_schedules(self): + return self._context.api.stop_schedules(self.job_id) + + def __str__(self): + return '{} {}'.format(super().__repr__(), self.name()) + + __repr__ = __str__ + + +class Schedule(Updatable): + """ + Attributes: + status + children: + ['Pending'] ['Dead'] ['Running'] + summary: + ['Complete'] ['Starting'] ['Failed'] + ['Lost'] ['Queued'] ['Running'] + executions + """ + + def __init__(self, context, job_id, schedule_id): + super().__init__(context) + self.job_id = job_id + self.schedule_id = schedule_id + self._raw_status = None + self._set_raw_status(self._raw_status) + self._executions = None + self.update() + + def _set_raw_status(self, status): + self._raw_status = status or {} + self.status = self._raw_status.get('status') + if (self.status and self.status != 'purged'): + self.time = _timefromtimestamp(self._raw_status.get('time', 0)) + self.schedule = self._raw_status.get('schedule') + s = self._raw_status.get('summary', {}) + self.children = s.get('Children') + self.summary = s.get('Summary', {}).get(TASK_GROUP) + else: + self.time = None + self.schedule = None + self.children = None + self.summary = None + self.definition = self._raw_status.get('definition', {}) + self.type = self.definition.get('type') + self.table = self.definition.get('table') + self.scheduling_time = self.definition.get('scheduling_time') + if not self.time and self.scheduling_time: + self.time = _timefromiso(self.scheduling_time) + + def _fetch_executions(self): + if self._executions_pending: + self._executions_pending = False + execs = self._context.api.schedule_executions( + self.job_id, self.schedule_id + ) + # TODO: is this necessary? + # (does Nomad API guarantee the result order? ) + execs.sort(key=lambda e: e['time']) + self._executions = [ + Execution( + self._context, self.job_id, self.schedule_id, execution + ) + for execution in execs + ] + + def update(self): + if self._need_to_update(): + self.token = self._context.tokens.schedule_token( + self.job_id, self.schedule_id + ) + new_raw_status = self._context.api.schedule_status( + self.job_id, self.schedule_id + ) + if new_raw_status != self._raw_status: # FIXME: ok?? + self._set_raw_status(new_raw_status) + self._executions_pending = True + self._updated_now() + return True + self._updated_now() + return False + + def is_periodic(self): + return self.schedule is not None + + def is_triggered(self): + return self.type == 'trigger' + + def info(self): + # TODO: identify schedule by user, time of creation + # option to include job name + # note: user will only be available when adding db metadata to api + # time could be obtained from "SubmitTime":1548327600000512493 + return self._info_prefix() + self._info_suffix() + + def _info_prefix(self): + return '{id} {t}'.format( + id=colored('bold', self.token), + t=_timetoiso(self.time) + ) + + def _info_suffix(self): + if self.is_periodic(): + return ' ({s})'.format(s=colored('pink', self.schedule)) + elif self.is_triggered(): + return ' (on changed {t})'.format(t=colored('pink', self.table)) + else: + return '' + + def show(self, details=False): + color = _status_color(self.status) + self._context.output.puts( + self._info_prefix() + + ' : ' + colored(color, self.status) + + ' : ' + self.summary_info() + + self._info_suffix() + ) + if details: + self._context.output.puts(self.details()) + + def details(self): + return '' + + def summary_info(self): + if self.summary is None: + return '' + if self.type == 'now': + return 'C:{c} S:{s} F:{f} L:{l} Q:{q} R:{r}'.format( + c=colored('green', self.summary['Complete']), + s=colored('blue', self.summary['Starting']), + f=colored('red', self.summary['Failed']), + l=colored('orange', self.summary['Lost']), # noqa: E741 + q=colored('pink', self.summary['Queued']), + r=colored('yellow', self.summary['Running']) + ) + else: + # For 'periodic', 'trigger', the summary is empty; + # it would be interesting to show the summary of the + # dispatch children jobs as part of the corresponding + # execution status. + return 'P:{p} R: {r} D:{d}'.format( + p=colored(_status_color('pending'), self.children['Pending']), + r=colored(_status_color('running'), self.children['Running']), + d=colored(_status_color('dead'), self.children['Dead']) + ) + + def follow(self, silent=False): + # TODO: if self.is_periodic() filter execs for most recent cron point + last_exec = None + while self.status != 'dead': + self.update() + if not silent: + self._context.output.clear() + self.show() + last_exec_running = False + for execution in self.executions(): + last_exec = execution + last_exec_running = execution.running() + if not silent: + execution.show(with_logs=last_exec_running) + if last_exec and not last_exec_running and not silent: + self._context.output.puts(last_exec.logs()) + return last_exec + + def executions(self): + self._fetch_executions() + return self._executions + + # def execution(self, execution_id): + # return Execution( + # self._context, + # self.job_id, + # self.schedule_id, + # execution_id + # ) + + def execution_by_token(self, execution_token): + return self.execution(self._context.tokens.ids(execution_token)[-1]) + + def execution(self, execution_id): + return next( + exe + for exe in self.executions() + if exe.execution_id == execution_id + ) + + def stop(self): + return self._context.api.stop_schedule(self.job_id, self.schedule_id) + + def last_outputs(self, n=1, only_finished=True, only_succesful=False, decodeJson=False, filter=None): + self.update() + + def select(execution): + selected = True + selected = not only_finished or execution.finished() + selected = selected and (not only_succesful or execution.successful) + selected = selected and (not filter or filter(execution)) + return selected + + execs = self.executions() + if execs and len(execs) > 0: + execs = [execution for execution in execs if select(execution)] + execs.sort(key=lambda e: e.finish_time(), reverse=True) + if len(execs) > 0: + return list(map(lambda execution: execution.output(decodeJson), execs)) + return [] + + +def _timefromiso(txt): + if not txt: + return None + txt = re.sub(r'.(\d\d\d)\d+', r'.\1', txt).replace("Z", "+00:00") + return datetime.fromisoformat(txt) + + +def _timetoiso(t): + if t is None: + return '' + return t.isoformat(sep=' ', timespec='seconds') + + +def _timefromtimestamp(value): + return datetime.utcfromtimestamp(value/1000000000.0) + + +EVENT_TIME = re.compile(r'\bTime:(\d+)\b') + + +class Execution: + + def __init__(self, context, job_id, schedule_id, execution): + self._context = context + self.job_id = job_id + self.schedule_id = schedule_id + self.execution_id = execution['execution_id'] + self._raw_data = execution + # dummy_state = execution['state'] # None + # Time of execution creation + self._create_t = _timefromtimestamp(execution['time']) + e = execution['execution'] + self._state = e['state'] # pending -> running -> dead + self._failed = None + self._start_t = None + self._finish_t = None + if (self._state != 'pending'): + self._failed = e['failed'] + self._start_t = e['started_at'] and _timefromiso(e['started_at']) + if (self._state == 'dead'): + self._finish_t = ( + e['finished_at'] and _timefromiso(e['finished_at']) + ) + self.token = self._context.tokens.execution_token( + self.job_id, self.schedule_id, self.execution_id + ) + + def pending(self): + return self._state == 'pending' or self._state is None + + def running(self): + return self._state == 'running' + + def finished(self): + return self._state == 'dead' + + def success(self): + return self.finished() and not self._failed + + def failed(self): + return self.finished() and self._failed + + def allocation_time(self): + return self._create_t + + def start_time(self): + return self._start_t + + def finish_time(self): + return self._finish_t + + def info(self, indent=''): + msg = ( + indent + + colored('bold', self.token) + + ' ' + + _timetoiso(self.allocation_time()) + + ' : ' + ) + if self.pending(): + msg += colored('blue', 'pending') + elif self.running(): + msg += ( + colored('yellow', 'running') + + ' since {t}'.format(t=_timetoiso(self.start_time())) + ) + elif self.success(): + msg += ( + colored('green', 'success') + + ' (ran from {t1} to {t2})'.format( + t1=_timetoiso(self.start_time()), + t2=_timetoiso(self.finish_time()) + ) + ) + elif self.failed(): + msg += colored('red', 'error') + ' (ran from {t1} to {t2}'.format( + t1=_timetoiso(self.start_time()), + t2=_timetoiso(self.finish_time()) + ) + else: + msg = str(self._raw_data) or '???' + return msg + + def logs(self): + logs = self._context.api.execution_logs( + self.job_id, + self.schedule_id, + self.execution_id + ) + + def repltime(match): + return _timetoiso(_timefromtimestamp(int(match.group(1)))) + + if logs: + print('>>>>>>>> LOGS') + print(logs) + print('----------------------------') + print(repltime) + logs = re.sub(EVENT_TIME, repltime, logs) + return _color_logs(logs) + + def output(self, decodeJson=False): + log = self._context.api.execution_log( + self.job_id, + self.schedule_id, + self.execution_id, + type='stdout' + ) + result = None + if log: + result = base64.b64decode(log.get('Data', '')) + if decodeJson: + result = json.loads(result.decode()) + return result + + def show(self, with_logs=True): + self._context.output.puts(self.info()) + if with_logs: + self._context.output.puts(self.logs()) diff --git a/carto/color.py b/carto/color.py new file mode 100644 index 0000000..aa04f31 --- /dev/null +++ b/carto/color.py @@ -0,0 +1,43 @@ +import re + + +class Color: + COLORS = { + 'pink': '\033[95m', + 'blue': '\033[94m', + 'bblue': "\033[1;34m", + 'red': '\033[31m', + 'bred': "\033[1;31m", + 'green': '\033[92m', + 'bgreen': "\033[0;32m", + 'yellow': '\033[93m', + 'orange': '\033[91m', + 'gray': '\033[37m', + 'reverse': "\033[;7m", + 'bold': '\033[1m', + 'blink': '\033[5m', + 'underline': '\033[4m' + } + CLEAR_SCREEN = '\033[2J' + END = '\033[0m' + enabled = True + + +def enable_colors(mode): + Color.enabled = mode + + +def colored(attrib, text): + if not Color.enabled: + return text + if type(attrib) != list: + attrib = [attrib] + attrib = ''.join([Color.COLORS[a] for a in attrib]) + return attrib + str(text) + Color.END + + +ANSI_ESCAPE = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]') + + +def uncolored(text): + return ANSI_ESCAPE.sub('', text) diff --git a/docs/guides/09-analysis-api.md b/docs/guides/09-analysis-api.md new file mode 100644 index 0000000..98a788f --- /dev/null +++ b/docs/guides/09-analysis-api.md @@ -0,0 +1,171 @@ +## Analysis API + +This allows managing the definition, execution and monitoring of analyses in a simple way, with some basic command line output. + +There's an option to use short identifiers, valid during the session, instead of the actual (long) identifiers. + +The output of commands that follow the execution of an analysis will be updated periodically with the configured frequency. + +Example of use, list jobs: + +```python +from carto.analysis import AnalysisContext + +analysis = AnalysisContext( + auth_client, + minimum_update_time=4, # Update frequency: 4 seconds + autosleep=True # Updates will wait minimum_update_time and then be performed +) + +# List analysis jobs +for job in analysis.jobs(): + print(job.info()) +``` + +Output: + +``` +bigquery-example 1.0.0 (rtorre-analysis) +kmeans 1.0.0 (jgoizueta-analysis) +kmeans-r 1.0.0 (rtorre-analysis) +``` + +Create a new analysis job: + +```python +job = analysis.create_job( + { + "name": "kmeans-test", + "image": "rafatower/kmeans:v1.0.0", + "version": "1.0.0", + "description": "KMeans clusters", + "parameters": ["n_clusters", "dataset", "columns"] + } +) +print(job.info()) +``` + +Output: + +``` +kmeans 1.0.0 (YOUR-USER-NAME) +``` + +Example 2: execute the job and show it's execution progress + +```python +params = { + "n_clusters": 3, + "dataset": "brooklyn_poverty", + "columns": ["poverty_per_pop"] +} +schedule = job.execute_and_follow(params) +``` + +Output (final state): +``` +kmeans/1 2019-01-30 10:25:32 : dead : C:1 S:0 F:0 L:0 Q:0 R:0 +kmeans/1/1 2019-01-30 10:25:32 : success (ran from 2019-01-30 10:25:33+00:00 to 2019-01-30 10:25:41+00:00) +EVENTS +Task received by client Time:1548843932611818957 +Building Task Directory Time:1548843932612050357 +Task started by client Time:1548843933127955118 +Exit Code: 0 Time:1548843941822685581 + +STDOUT +Config: {u'conn': u'postgresql://development_cartodb_user_21247c38-f9b5-4cdb-9f11-f21e8edca58e:6ea8860cdae0549b96c88bd00a02bee21c557a88development_cartodb_user_21247c38-f9b5-4cdb-9f11-f21e8edca58e@10.0.32.28:9432/cartodb_dev_user_21247c38-f9b5-4cdb-9f11-f21e8edca58e_db', u'n_clusters': 3, u'columns': [u'poverty_per_pop'], u'dataset': u'brooklyn_poverty'} +Output written to table brooklyn_poverty_kmeans_out +``` + +Note that short identifiers are used for schedules (`kmeans/1`) and executions (`kmeans/1/1`). These are valid tokens for the session. + +Example 3: show a schedule's info, and list its executioins + +```python +schedule.show() +for execution in schedule.executions(): + print(execution.info()) +``` + +Output: + +``` +kmeans/3 2019-01-30 10:25:32 : dead : C:1 S:0 F:0 L:0 Q:0 R:0 +kmeans/3/1 2019-01-30 10:25:32 : success (ran from 2019-01-30 10:25:33+00:00 to 2019-01-30 10:25:41+00:00) +``` + +The first line shows the (short) id of the schedule we created, the state and a summary of: +* `C` number of complete executions +* `S` number of starting executions +* `F` number of failed executions +* `L` number of lost executions +* `Q` number of queued executions +* `R` number of running executions + +The second line shows information about then only (complete) execution. + +Example 4: show the last execution of an schedule + +```python +exec = schedule.executions()[-1] +exec.show() +``` + +Output: + +``` +kmeans/3/1 2019-01-30 10:25:32 : success (ran from 2019-01-30 10:25:33+00:00 to 2019-01-30 10:25:41+00:00) +EVENTS +Task received by client Time:1548843932611818957 +Building Task Directory Time:1548843932612050357 +Task started by client Time:1548843933127955118 +Exit Code: 0 Time:1548843941822685581 + +STDOUT +Config: {u'conn': u'postgresql://development_cartodb_user_21247c38-f9b5-4cdb-9f11-f21e8edca58e:6ea8860cdae0549b96c88bd00a02bee21c557a88development_cartodb_user_21247c38-f9b5-4cdb-9f11-f21e8edca58e@10.0.32.28:9432/cartodb_dev_user_21247c38-f9b5-4cdb-9f11-f21e8edca58e_db', u'n_clusters': 3, u'columns': [u'poverty_per_pop'], u'dataset': u'brooklyn_poverty'} +Output written to table brooklyn_poverty_kmeans_out +``` + +Example 5: Select job by name and list it's schedules + +```python +job = analysis.job_by_name('kmeans') +for schedule in job.schedules(): + schedule.show() +``` + +Output: + +``` +kmeans/1 2019-01-25 14:47:45 : running : C:0 S:0 F:0 L:0 Q:0 R:0 (52 * * * * *) +kmeans/2 2019-01-30 10:02:40 : dead : C:1 S:0 F:0 L:0 Q:0 R:0 +``` + +Note that short ids are used here; the summary numbers are as explained in example 3. + +## Synchronous output + +If an analysis follows the convention of returning an output value through the standard output +(standard error could be used for logging in that case), then you can execute an analysis syncrhonously +and obtain its result with this command. + +```python +job = analysis.job_by_token('carto-geocoder') +result = job.execute_and_get_output({ + "dataset": "accidentesbicicletas_2018", + "street": "regexp_replace(trim(lugar_accidente), ' NUM$', '') || ' ' || n", + "city": "'Madrid'", + "country": "'Spain'", + "log_level": "DEBUG", + "dry": True +}, decodeJson=True) +print(result) +``` + +Output: +``` +{'total_rows': 667, 'required_quota': 667, 'previously_geocoded': 0, 'previously_failed': 0, 'records_with_geometry': 266} +``` + +For non-JSON output simply omit the `decodeJson` argument: `result = job.execute_and_get_output(params)` and you'll get a bytes string. +For viewing the execution progress as with the *follow* methods, just add a `verbose=True` argument. diff --git a/tests/mocks.py b/tests/mocks.py index e961410..4ba1cbf 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -26,6 +26,7 @@ def __init__(self): self.requests = { "test_sql": { "url": "https://mock.carto.com/api/v2/sql?q=select+%2A+from+tornados&api_key=mockmockmock", + "method": "get", "text": '{ \ "rows": [ \ "a" \ @@ -33,15 +34,70 @@ def __init__(self): "total_rows": 1, \ "time": 1 \ }' + }, + "test_analysis_1": { + "method": "get", + "url": "https://mock.carto.com/api/v4/analysis/jobs?api_key=mockmockmock", + "text": "[{},{}]" + }, + "test_analysis_2": { + "method": "post", + "url": "https://mock.carto.com/api/v4/analysis/jobs", + "text": '{"job_id":"abc"}' + }, + "test_analysis_3": { + "method": "get", + "url": "https://mock.carto.com/api/v4/analysis/jobs/abc", + "text": '{"job_id":"abc"}' + }, + "test_analysis_4": { + "method": "post", + "url": "https://mock.carto.com/api/v4/analysis/jobs/abc/schedule", + "text": '{"schedule_id":"xyz"}' + }, + "test_analysis_5": { + "method": "get", + "url": "https://mock.carto.com/api/v4/analysis/jobs/abc/schedules", + "text": '[{"schedule_id":"xyz"},{"schedule_id":"xyz"}]' + }, + "test_analysis_6": { + "method": "get", + "url": "https://mock.carto.com/api/v4/analysis/jobs/abc/schedule/xyz", + "text": '{"schedule_id":"xyz","status":"dead"}' + }, + "test_analysis_7": { + "method": "get", + "url": "https://mock.carto.com/api/v4/analysis/jobs/abc/schedule/xyz/executions", + "text": '[{},{"execution_id":"xxx"}]' + }, + "test_analysis_8": { + "method": "get", + "url": "https://mock.carto.com/api/v4/analysis/jobs/abc/schedule/xyz/execution/xxx/debug", + "text": '["log"]' + }, + "test_analysis_9": { + "method": "get", + "url": "https://mock.carto.com/api/v4/analysis/jobs/abc/schedule/xyz/execution/xxx/log", + "text": '[]' + }, + "test_analysis_10": { + "method": "delete", + "url": "https://mock.carto.com/api/v4/analysis/jobs/abc/schedules", + "text": '{"schedule_ids":[{},{}]}' + }, + "test_analysis_11": { + "method": "delete", + "url": "https://mock.carto.com/api/v4/analysis/jobs/abc", + "text": '{"stopped":2,"schedule_ids":[{},{}]}' } } with requests_mock.Mocker() as m: for method_name in self.requests: - self.get(method_name, m) + self.mock(method_name, m) self.mocker = m - def get(self, method_name, mocker): + def mock(self, method_name, mocker): """ Returns a mock request for a given `method_name` @@ -55,7 +111,7 @@ def get(self, method_name, mocker): try: if method_name in self.requests: r = self.requests[method_name] - mocker.get(r['url'], text=r['text']) + mocker.register_uri(r['method'], r['url'], text=r['text']) else: raise CartoException('method_name not found: ' + method_name) except Exception as e: diff --git a/tests/test_analysis.py b/tests/test_analysis.py new file mode 100644 index 0000000..45de909 --- /dev/null +++ b/tests/test_analysis.py @@ -0,0 +1,54 @@ +from carto.analysis import AnalysisClient + + +def test_analysis(api_key_auth_client_usr, mock_requests): + analysis = AnalysisClient(api_key_auth_client_usr) + with mock_requests.mocker: + jobs = analysis.list_jobs() + assert len(jobs) > 0 + + job_def = { + "name": "kmeans-test", + "image": "rafatower/kmeans:v1.0.0", + "version": "1.0.0", + "description": "KMeans clusters", + "parameters": ["n_clusters", "dataset", "columns"] + } + result = analysis.create_job(job_def) + job_id = result.get('job_id') + assert job_id is not None + + result = analysis.read_job(job_id) + assert result.get('job_id') == job_id + + job_params = { + "n_clusters": 3, + "dataset": "brooklyn_poverty", + "columns": ["poverty_per_pop"] + } + result = analysis.schedule_job(job_id, job_params, on_changed_table='jgoizueta.brooklyn_poverty') + assert result.get('schedule_id') is not None + + result = analysis.schedule_job(job_id, job_params) + schedule_id = result.get('schedule_id') + assert schedule_id is not None + + result = analysis.list_schedules(job_id) + assert len(result) == 2 + + dead = False + while not dead: + result = analysis.schedule_status(job_id, schedule_id) + dead = result['status'] == 'dead' + result = analysis.schedule_executions(job_id, schedule_id) + if dead: + exec_id = result[-1]['execution_id'] + logs = analysis.execution_logs(job_id, schedule_id, exec_id) + assert len(logs) > 0 + result = analysis.execution_log(job_id, schedule_id, exec_id) + + result = analysis.stop_schedules(job_id) + assert len(result.get('schedule_ids')) == 2 + result = analysis.remove_job(job_id) + assert len(result.get('schedule_ids')) == 2 + assert result.get('stopped') == 2