diff --git a/IRM_default_parameters.md b/IRM_default_parameters.md new file mode 100644 index 0000000..521ad51 --- /dev/null +++ b/IRM_default_parameters.md @@ -0,0 +1,18 @@ +## IRM configuration parameters + +This table shows a list of the parameters available to tune the IRM components in HIO, including a description of each parameter and the default value. + +| Parameter name | Explanation | Default value | +|:-:|:-------------|------:| +| `packing_interval` | Interval in seconds between performing the bin packing algorithm | 1 | +| `default_cpu_share` | Initial guess of CPU size of unencountered container images | 0.125 | +| `profiling_interval` | Interval in seconds between how often worker profiler updates queued container requests | 4 | +| `predictor_interval` | Interval in seconds between predicting load and determining scaling action | 1 | +| `lower_rate_limit` | Lower positive threshold for load predictor | 2 | +| `upper_rate_limit` | Upper positive threshold for load predictor | 5 | +| `slowdown_rate` | Negative threshold for load predictor | -2 | +| `queue_size_limit` | Message queue length limit for load predictor | 10 | +| `scaleup_waiting_time` | Cool-down time for load predictor scaleup actions | 10 | +| `large_scaleup_amount` | Large scaleup quantity for load predictor | 2 | +| `small_scaleup_amount` | Small scaleup quantity for load predictor | 1 | +| `container_request_TTL` | Initial time-to-live counter for container requests | 1 | diff --git a/Readme.md b/Readme.md index 81ce59d..5739acc 100644 --- a/Readme.md +++ b/Readme.md @@ -10,6 +10,26 @@ Stream_Connector - client for sending tasks for distributed execution. Forked from https://github.com/beirbear/HarmonicIO +## Update from Oliver: +* Autoscaling: + +An important feature added is auto-scaling, but to not break production it can be disabled. To enable/disable, set the field "auto_scaling_enabled" to true/false in the master's configuration.json file + +* Hosting containers: +``` +curl -X POST "http://:/jobRequest?token=None&type=new_job" --data '{"c_name" : , "num" : , "volatile" : }' +``` +NOTE: spelling is important, `true`=volatile container, `false`=involatile container. responds with an ID of the container creation job + +* Polling status of container request: +``` +curl http://:/jobRequest?token=None&type=poll_job&job_id= +``` +, checks status of the container hosting job with provided ID, READY means all contaiers are started and running, INITIALIZING means not all have started yet, FAILED means not all could be started but some may still be available + +* Stream connector + +Use just as before ## Quickstart diff --git a/harmonicIO/general/definition.py b/harmonicIO/general/definition.py index 20681ab..d011046 100644 --- a/harmonicIO/general/definition.py +++ b/harmonicIO/general/definition.py @@ -23,6 +23,14 @@ class CTuple: RT = 3 +class JobStatus: + INIT = "INITIALIZING" + READY = "READY" + ACTIVE = "ACTIVE" + IDLE = "IDLE" + FAILED = "FAILED" + + class Definition(object): @staticmethod def get_str_node_name(): @@ -85,8 +93,8 @@ def get_str_data_port_range(): return "node_data_port_range" @staticmethod - def get_str_idle_time(): - return "std_idle_time" + def get_str_container_idle_timeout(): + return "container_idle_timeout" @staticmethod def get_str_token(): @@ -176,6 +184,10 @@ def get_str_stream_req(): def get_str_msg_query(): return "messagesQuery" + @staticmethod + def get_str_job_mgr(): + return "jobRequest" + @staticmethod def get_str_reg_func(): return "registeredFunctions" @@ -188,6 +200,10 @@ def get_str_token(): def get_str_docker(): return "docker" + @staticmethod + def get_str_local_imgs(): + return "local_images" + class Batch(object): @staticmethod def get_str_batch_addr(): @@ -281,6 +297,10 @@ def get_str_status(): def get_str_query(): return "query" + @staticmethod + def get_str_finished(): + return "finished" + class HDE(object): @staticmethod @@ -291,6 +311,10 @@ def get_str_node_name(): def get_str_node_addr(): return "HDE_NODE_ADDR" + @staticmethod + def get_str_node_rest_port(): + return "HDE_NODE_REST_PORT" + @staticmethod def get_str_node_data_port(): return "HDE_NODE_DATA_PORT" @@ -314,3 +338,7 @@ def get_str_std_idle_time(): @staticmethod def get_str_token(): return "HDE_TOKEN" + + @staticmethod + def get_str_idle_timeout(): + return "HDE_IDLE_TIMEOUT" diff --git a/harmonicIO/general/services.py b/harmonicIO/general/services.py index c130b6c..1a9fb15 100644 --- a/harmonicIO/general/services.py +++ b/harmonicIO/general/services.py @@ -81,6 +81,7 @@ def get_machine_status(setting, role): body[Definition.get_str_node_name()] = setting.get_node_name() body[Definition.get_str_node_role()] = role body[Definition.get_str_node_addr()] = setting.get_node_addr() + body[Definition.get_str_node_port()] = setting.get_node_port() body[Definition.get_str_load1()] = load1 body[Definition.get_str_load5()] = load5 body[Definition.get_str_load15()] = load15 diff --git a/harmonicIO/master/__main__.py b/harmonicIO/master/__main__.py index 71f2f2b..e1e25ba 100644 --- a/harmonicIO/master/__main__.py +++ b/harmonicIO/master/__main__.py @@ -1,10 +1,30 @@ from harmonicIO.general.services import SysOut - +from .jobqueue import JobManager """ Master entry point """ +def run_queue_manager(manager): + """ + Run job queue manager thread + can be several managers to manage large amount of queued jobs + """ + import threading + for i in range(manager.queuer_threads): + manager_thread = threading.Thread(target=manager.job_queuer) + manager_thread.daemon = True + manager_thread.start() + + SysOut.out_string("Job queue started") + + if Setting.get_autoscaling(): + supervisor_thread = threading.Thread(target=manager.queue_supervisor) + supervisor_thread.daemon = True + supervisor_thread.start() + SysOut.out_string("Autoscaling supervisor started") + + def run_rest_service(): """ @@ -64,4 +84,9 @@ def run_msg_service(): # Binding commander to the rest service and enable REST service pool.submit(run_rest_service) - + + # create a job manager which is a queue manager supervising the creation of containers, both via user and auto-scaling + jobManager = JobManager(30, 100, 5, 1) # 30 seconds interval between checking, 100 requests in queue before increase, add 5 new containers, 1 thread for queue supervisor + + # Run job queue manager thread + pool.submit(run_queue_manager, jobManager) diff --git a/harmonicIO/master/configuration.json b/harmonicIO/master/configuration.json index 5a43bfc..746dcdc 100644 --- a/harmonicIO/master/configuration.json +++ b/harmonicIO/master/configuration.json @@ -1,7 +1,8 @@ { "node_name": "PE Master", - "master_addr": "192.168.0.84", + "master_addr": "192.168.1.17", "node_port": 8080, "node_data_port_range": [8090,8090], - "std_idle_time": 5 + "std_idle_time": 5, + "auto_scaling_enabled" : false } diff --git a/harmonicIO/master/configuration.py b/harmonicIO/master/configuration.py index 4096761..e27625a 100644 --- a/harmonicIO/master/configuration.py +++ b/harmonicIO/master/configuration.py @@ -9,6 +9,7 @@ class Setting(object): __node_data_port_stop = None __std_idle_time = None __token = "None" + __autoscaling = None @staticmethod def set_node_addr(addr=None): @@ -58,6 +59,10 @@ def get_std_idle_time(): def get_token(): return Setting.__token + @staticmethod + def get_autoscaling(): + return Setting.__autoscaling + @staticmethod def read_cfg_from_file(): from harmonicIO.general.services import Services, SysOut @@ -99,6 +104,7 @@ def read_cfg_from_file(): Setting.__node_data_port_start = cfg[Definition.get_str_data_port_range()][0] Setting.__node_data_port_stop = cfg[Definition.get_str_data_port_range()][1] Setting.__std_idle_time = cfg[Definition.get_str_idle_time()] + Setting.__autoscaling = cfg.get('auto_scaling_enabled') SysOut.out_string("Load setting successful.") try: diff --git a/harmonicIO/master/jobqueue.py b/harmonicIO/master/jobqueue.py new file mode 100644 index 0000000..c6e7730 --- /dev/null +++ b/harmonicIO/master/jobqueue.py @@ -0,0 +1,118 @@ +import queue +import json +from urllib.request import urlopen +from .meta_table import LookUpTable +from harmonicIO.general.definition import Definition, JobStatus +from harmonicIO.general.services import SysOut +import time +from .messaging_system import MessagesQueue + +class JobManager: + + def __init__(self, interval, threshold, increment, queuers): + self.__supervisor_interval = interval + self.__supervisor_increment = increment + self.__supervisor_threshold = threshold + self.queuer_threads = queuers + + + def find_available_worker(self, container): + candidates = [] + workers = LookUpTable.Workers.verbose() + SysOut.debug_string("Found workers: " + str(workers)) + if not workers: + return None + + # loop through workers and make tuples of worker IP, load and if requested container is available locally + for worker in workers: + + curr_worker = workers[worker] + if container in curr_worker[Definition.REST.get_str_local_imgs()]: + candidates.append(((curr_worker[Definition.get_str_node_addr()], curr_worker[Definition.get_str_node_port()]), curr_worker[Definition.get_str_load5()], True)) + else: + candidates.append(((curr_worker[Definition.get_str_node_addr()], curr_worker[Definition.get_str_node_port()]), curr_worker[Definition.get_str_load5()], False)) + + candidates.sort(key=lambda x: (-x[2], x[1])) # sort candidate workers first on availability of image, then on load (avg load last 5 mins) + for candidate in list(candidates): + if not float(candidate[1]) < 0.5: + candidates.remove(candidate) # remove candidates with higher than 50% cpu load + + return candidates + + def start_job(self, target, job_data): + # send request to worker + worker_url = "http://{}:{}/docker?token=None&command=create".format(target[0], target[1]) + req_data = bytes(json.dumps(job_data), 'utf-8') + resp = urlopen(worker_url, req_data) # NOTE: might need increase in timeout to allow download of large container images!!! + + if resp.getcode() == 200: # container was created + sid = str(resp.read(), 'utf-8') + SysOut.debug_string("Received sid from container: " + sid) + return sid + return False + + def job_queuer(self): + while True: + job_data = JobQueue.q.get() + num_of_conts = job_data.get('num') + job_sids = [] + targets = self.find_available_worker(job_data.get(Definition.Container.get_str_con_image_name())) + SysOut.debug_string("Candidate workers: " + str(targets)) + n = 0 + while len(job_sids) < num_of_conts: + target = targets[n][0] + SysOut.debug_string("Attempting to send request to worker: " + str(target)) + try: + sid = self.start_job(target, job_data) + if sid: + job_sids.append(sid) + else: # not sure how urllib handles a 400 response, but this needs to happen either in case of exception or sid = False + if n < len(targets)-1: # other candidates are available + n+= 1 + continue + else: + job_data['job_status'] = JobStatus.FAILED + break + + if len(job_sids) == num_of_conts: + job_data['job_status'] = JobStatus.READY + job_data[Definition.Container.Status.get_str_sid()] = job_sids #TODO: add this in metatable + + except: + SysOut.debug_string("Response from worker threw exception!") + if n < len(targets)-1: # other candidates are available + SysOut.usr_string("We got to other candidates available!!!!!!! -------------------------------------") + n+= 1 + continue + else: + job_data['job_status'] = JobStatus.FAILED + break # break makes it stop trying to create new containers as soon as one fails, is this desireable? Probaby as now it is unlikely that there is any hosting capability + + ## NOTE: can get really ugly, need to cleanup containers that started (rollback) OR let user know how many were started instead?? or retry failed ones? + LookUpTable.Jobs.update_job(job_data) + JobQueue.q.task_done() + + def queue_supervisor(self): + """ + Thread that handles autoscaling + """ + while True: + time.sleep(self.__supervisor_interval) ## NOTE: this is probably a very tuneable parameter for later + msg_queue = MessagesQueue.verbose() + for container in msg_queue: + if int(msg_queue[container]) > self.__supervisor_threshold: + job_data = { + Definition.Container.get_str_con_image_name() : container, + 'num' : self.__supervisor_increment, + 'volatile' : True + } + JobQueue.queue_new_job(job_data) + + + +class JobQueue: + q = queue.Queue() + + @staticmethod + def queue_new_job(job_data): + JobQueue.q.put(job_data) \ No newline at end of file diff --git a/harmonicIO/master/meta_table.py b/harmonicIO/master/meta_table.py index 9b685b2..cac7b6b 100644 --- a/harmonicIO/master/meta_table.py +++ b/harmonicIO/master/meta_table.py @@ -1,5 +1,5 @@ import queue -from harmonicIO.general.services import Services +from harmonicIO.general.services import Services, SysOut from harmonicIO.general.definition import Definition, CTuple @@ -25,6 +25,7 @@ def add_worker(dict_input): @staticmethod def del_worker(worker_addr): + # TODO: implement actual worker termination? del LookUpTable.Workers.__workers[worker_addr] class Containers(object): @@ -37,35 +38,61 @@ def get_container_object(req): ret[Definition.REST.Batch.get_str_batch_port()] = int(req.params[Definition.REST.Batch.get_str_batch_port()]) ret[Definition.REST.Batch.get_str_batch_status()] = int(req.params[Definition.REST.Batch.get_str_batch_status()]) ret[Definition.Container.get_str_con_image_name()] = req.params[Definition.Container.get_str_con_image_name()].strip() - ret[Definition.get_str_last_update()] = Services.get_current_timestamp() - + ret[Definition.Container.Status.get_str_sid()] = req.params[Definition.Container.Status.get_str_sid()] + return ret @staticmethod def verbose(): - ret = dict() - for key, value in LookUpTable.Containers.__containers.items(): - ret[key] = list(value.queue) - - return ret + return LookUpTable.Containers.__containers @staticmethod def update_container(dict_input): - if dict_input[Definition.Container.get_str_con_image_name()] not in LookUpTable.Containers.__containers: - LookUpTable.Containers.__containers[dict_input[Definition.Container.get_str_con_image_name()]] = queue.Queue() - LookUpTable.Containers.__containers[dict_input[Definition.Container.get_str_con_image_name()]].put(dict_input) + def cont_in_table(dict_input): + conts = LookUpTable.Containers.__containers[dict_input[Definition.Container.get_str_con_image_name()]] + for cont in conts: + if dict_input.get(Definition.Container.Status.get_str_sid()) == cont.get(Definition.Container.Status.get_str_sid()): + return cont + return None + + if dict_input[Definition.Container.get_str_con_image_name()] not in LookUpTable.Containers.__containers: # no containers for this image exist + new_cont = [dict_input] + LookUpTable.Containers.__containers[dict_input[Definition.Container.get_str_con_image_name()]] = new_cont + else: + cont = cont_in_table(dict_input) + if not cont: # this specific container is not already in table + LookUpTable.Containers.__containers[dict_input[Definition.Container.get_str_con_image_name()]].append(dict_input) + else: # container was already in table, update timestamp + cont[Definition.get_str_last_update()] = Services.get_current_timestamp() @staticmethod def get_candidate_container(image_name): if image_name not in LookUpTable.Containers.__containers: return None - if len(LookUpTable.Containers.__containers[image_name].queue) > 0: - return LookUpTable.Containers.__containers[image_name].get() + if len(LookUpTable.Containers.__containers[image_name]) > 0: + return LookUpTable.Containers.__containers[image_name].pop() return None + @staticmethod + def del_container(container_name, short_id): + conts = LookUpTable.Containers.__containers.get(container_name) + if not conts: + return False + else: + # conts is list of containers with same c_name + + # List filter code based on: https://stackoverflow.com/questions/1235618/python-remove-dictionary-from-list + # Removes item with specified short_id from list + conts[:] = [con for con in conts if con.get(Definition.Container.Status.get_str_sid()) != short_id] + + return True + + + + class Tuples(object): __tuples = {} @@ -94,6 +121,52 @@ def add_tuple_info(tuple_info): def verbose(): return LookUpTable.Tuples.__tuples + class Jobs(object): + __jobs = {} + + # create new job from request dictionary + @staticmethod + def new_job(request): + new_item = {} + new_id = request.get('job_id') + if not new_id: + SysOut.warn_string("Couldn't create job, no ID provided!") + return False + + if new_id in LookUpTable.Jobs.__jobs: + SysOut.warn_string("Job already exists in system, can't create!") + return False + + new_item['job_id'] = new_id + new_item['job_status'] = request.get('job_status') + new_item[Definition.Container.get_str_con_image_name()] = request.get(Definition.Container.get_str_con_image_name()) + new_item['user_token'] = request.get(Definition.get_str_token()) + new_item['volatile'] = request.get('volatile') + LookUpTable.Jobs.__jobs[new_id] = new_item + + return True + + @staticmethod + def update_job(request): + job_id = request.get('job_id') + if not job_id in LookUpTable.Jobs.__jobs: + SysOut.warn_string("Couldn't update job, no existing job matching ID!") + return False + + tkn = request.get(Definition.get_str_token()) + if not tkn == LookUpTable.Jobs.__jobs[job_id]['user_token']: + SysOut.warn_string("Incorrect token, refusing update.") + return False + + old_job = LookUpTable.Jobs.__jobs[job_id] + old_job['job_status'] = request.get('job_status') + + return True + + @staticmethod + def verbose(): + return LookUpTable.Jobs.__jobs + @staticmethod def update_worker(dict_input): LookUpTable.Workers.add_worker(dict_input) @@ -102,12 +175,28 @@ def update_worker(dict_input): def get_candidate_container(image_name): return LookUpTable.Containers.get_candidate_container(image_name) + @staticmethod + def new_job(request): + return LookUpTable.Jobs.new_job(request) + + @staticmethod + def update_job(request): + return LookUpTable.Jobs.update_job(request) + + @staticmethod + def poll_id(id): + return id in LookUpTable.Jobs.verbose() + + @staticmethod + def remove_container(c_name, csid): + return LookUpTable.Containers.del_container(c_name, csid) + @staticmethod def verbose(): ret = dict() ret['WORKERS'] = LookUpTable.Workers.verbose() ret['CONTAINERS'] = LookUpTable.Containers.verbose() ret['TUPLES'] = LookUpTable.Tuples.verbose() + ret['JOBS'] = LookUpTable.Jobs.verbose() return ret - diff --git a/harmonicIO/master/rest_service.py b/harmonicIO/master/rest_service.py index 373b091..9c5d744 100644 --- a/harmonicIO/master/rest_service.py +++ b/harmonicIO/master/rest_service.py @@ -1,10 +1,21 @@ import falcon from .configuration import Setting -from harmonicIO.general.definition import Definition, CStatus, CRole +from harmonicIO.general.definition import Definition, CStatus, CRole, JobStatus from .messaging_system import MessagesQueue from harmonicIO.general.services import SysOut, Services as LService from .meta_table import LookUpTable +from urllib.request import urlopen +from urllib3.request import urlencode + +import json +from .jobqueue import JobQueue + +def format_response_string(res, http_code, msg): + res.body = msg + '\n' + res.status = http_code + res.content_type = "String" + return res class RequestStatus(object): @@ -16,22 +27,15 @@ def on_get(self, req, res): GET: /status?token={None} """ if not Definition.get_str_token() in req.params: - res.body = "Token is required." - res.content_type = "String" - res.status = falcon.HTTP_401 + format_response_string(res, falcon.HTTP_401, "Token is required") return if req.params[Definition.get_str_token()] == Setting.get_token(): - result = LService.get_machine_status(Setting, CRole.MASTER) - - res.body = str(result) - res.content_type = "String" - res.status = falcon.HTTP_200 + format_response_string(res, falcon.HTTP_200, str(result)) + else: - res.body = "Invalid token ID." - res.content_type = "String" - res.status = falcon.HTTP_401 + format_response_string(res, falcon.HTTP_401,"Invalid token ID") def on_put(self, req, res): """ @@ -43,9 +47,23 @@ def on_put(self, req, res): res.status = falcon.HTTP_401 return + if Definition.Docker.get_str_finished() in req.params: + # a container is shutting down, update containers + # TODO: add some kind of safety mechanism to really make sure no new requests have been sent to this container before acknowledging removal? + if LookUpTable.remove_container( + req.params.get(Definition.Container.get_str_con_image_name()), + req.params.get(Definition.Docker.get_str_finished()) + ): + format_response_string(res, falcon.HTTP_200, "Container successfully removed") + # NOTE: container will terminate as soon as it reads this response! + else: + format_response_string(res, falcon.HTTP_400, "Could not remove container from table!") + # NOTE: container will continue as before when it reads this response! + return + + if req.params[Definition.get_str_token()] == Setting.get_token(): - raw = str(req.stream.read(), 'UTF-8') - data = eval(raw) + data = json.loads(str(req.stream.read(req.content_length or 0), 'utf-8')) LookUpTable.update_worker(data) SysOut.debug_string("Update worker status ({0})".format(data[Definition.get_str_node_name()])) @@ -58,6 +76,7 @@ def on_put(self, req, res): res.content_type = "String" res.status = falcon.HTTP_401 + return class MessageStreaming(object): def __init__(self): @@ -142,7 +161,8 @@ def on_post(self, req, res): if Definition.REST.Batch.get_str_batch_addr() in req.params and \ Definition.REST.Batch.get_str_batch_port() in req.params and \ Definition.REST.Batch.get_str_batch_status() in req.params and \ - Definition.Container.get_str_con_image_name() in req.params: + Definition.Container.get_str_con_image_name() in req.params and \ + Definition.Container.Status.get_str_sid() in req.params: # Check for data type if req.params[Definition.REST.Batch.get_str_batch_port()].isdigit() and \ @@ -224,6 +244,8 @@ def on_get(self, req, res): if req.params[Definition.MessagesQueue.get_str_command()] == "verbose": data = LookUpTable.verbose() data['MSG'] = MessagesQueue.verbose() + if req.params.get('format') == 'JSON': + data = json.dumps(data) res.body = str(data) res.content_type = "String" @@ -237,6 +259,68 @@ def on_get(self, req, res): res.content_type = "String" res.status = falcon.HTTP_200 +class JobManager(object): + """ + JobManager is about taking requests from clients to set up containers + + Provides a post request to let master allocate containers, and get requests to check the status of this. + + """ + def __init__(self): + pass + + def on_get(self, req, res): + # check token and request type is provided + if not Definition.get_str_token() in req.params: + format_response_string(res, falcon.HTTP_401, "Token required.") + return + + if not "type" in req.params: + format_response_string(res, falcon.HTTP_406, "Command not specified.") + return + + # user wants to know if containers are ready for provided job ID + if req.params['type'] == "poll_job": + id = req.params.get('job_id') + if not id in LookUpTable.Jobs.verbose(): + format_response_string(res, falcon.HTTP_404, "Specified job not available.") + return + + jobs = LookUpTable.Jobs.verbose() + stat = str(jobs[id].get('job_status')) + format_response_string(res, falcon.HTTP_200, ("Job status: " + stat)) + + return + + def on_post(self, req, res): + # check token and request type is provided + req_raw = (str(req.stream.read(req.content_length or 0), 'utf-8')) # create dict of body data if they exist + req_data = json.loads(req_raw) + if not Definition.get_str_token() in req.params: + res.body = "Token is required." + res.content_type = "String" + res.status = falcon.HTTP_401 + return + + if not "type" in req.params: + res.body = "No command specified." + res.content_type = "String" + res.status = falcon.HTTP_406 + return + + # request to create new job - create ID for job, add to lookup table, queue creation of the job + if req.params['type'] == 'new_job': + job = new_job(req_data) # attempt to create new job from provided parameters + if not job: + SysOut.err_string("New job could not be added!") + format_response_string(res, falcon.HTTP_500, "Could not create job.") + return + job_status = job.get('job_status') + format_response_string(res, falcon.HTTP_200, "Job request received, container status: {}\nJob ID: {}".format(job_status, job.get('job_id'))) + return + + return + class RESTService(object): def __init__(self): # Initialize REST Services @@ -252,6 +336,9 @@ def __init__(self): # Add route for msg query api.add_route('/' + Definition.REST.get_str_msg_query(), MessagesQuery()) + # Add route for job manager + api.add_route('/' + Definition.REST.get_str_job_mgr(), JobManager()) + # Establishing a REST server self.__server = make_server(Setting.get_node_addr(), Setting.get_node_port(), api) @@ -260,6 +347,29 @@ def run(self): self.__server.serve_forever() +def new_job(job_params): + ### below ID randomizer from: https://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python + def rand_id(N): + from random import SystemRandom + import string + return ''.join(SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(N)) + ### + + # create job ID, make sure ID is new + job_id = rand_id(5) + while LookUpTable.poll_id(job_id): + job_id = rand_id(5) + + # add job to table + job_params['job_id'] = job_id + job_params['job_status'] = JobStatus.INIT + if not LookUpTable.Jobs.new_job(job_params): + return None + + # queue creation + JobQueue.queue_new_job(job_params) + + return job_params def get_html_form(worker, msg, containers, tuples): html = """ diff --git a/harmonicIO/master/server_socket.py b/harmonicIO/master/server_socket.py index afeefd3..57f94d8 100644 --- a/harmonicIO/master/server_socket.py +++ b/harmonicIO/master/server_socket.py @@ -1,6 +1,6 @@ import socketserver from .messaging_system import MessagesQueue - +from harmonicIO.general.services import SysOut class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): """ @@ -42,4 +42,4 @@ def handle(self): except: from harmonicIO.general.services import Services - Services.e_print("Insufficient memory for storing g object.") + SysOut.err_string("Insufficient memory for storing g object.") diff --git a/harmonicIO/stream_connector/__main__.py b/harmonicIO/stream_connector/__main__.py index 8e4e5a1..d9be424 100755 --- a/harmonicIO/stream_connector/__main__.py +++ b/harmonicIO/stream_connector/__main__.py @@ -4,13 +4,12 @@ # Example program # The use case number can be defined by varying the number in use case variable MASTER_DATA = { - "MASTER_ADDR": "192.168.0.137", + "MASTER_ADDR": "192.168.1.5", "MASTER_PORT": 8080 } PROCC_DATA = { - "batch_hist": "beirbear/test:batch_hist", - "batch_sum": "beirbear/test:batch_sum", + "daemon_test": "snapple49/hio-daemondev:test", "OS": "ubuntu" } @@ -38,15 +37,14 @@ def read_data_from_file(path): # Define data to test d_list = { - 'batch_hist': read_data_from_file('stream_connector/lena512.bmp'), - 'batch_sum': read_data_from_file('stream_connector/str_array.txt') + 'daemon_test': read_data_from_file('harmonicIO/stream_connector/lena512.bmp') } # Generate a sample stream order stream_order = [0] * ITEM_NUMBER import random for i in range(ITEM_NUMBER): - stream_order[i] = (i, 'batch_sum' if (random.randrange(1, 100) % len(d_list)) == 0 else 'batch_hist') + stream_order[i] = (i, 'daemon_test' if (random.randrange(1, 100) % len(d_list)) == 0 else 'daemon_test') return stream_order, d_list diff --git a/harmonicIO/worker/__main__.py b/harmonicIO/worker/__main__.py index d5df5fe..ae8b6f0 100644 --- a/harmonicIO/worker/__main__.py +++ b/harmonicIO/worker/__main__.py @@ -6,6 +6,8 @@ from .configuration import Setting from harmonicIO.general.services import SysOut, Services from harmonicIO.general.definition import Definition, CRole +from .garbage_collector import GarbageCollector +import json def run_rest_service(): @@ -17,6 +19,15 @@ def run_rest_service(): rest.run() +def start_gc_thread(): + garbage_collector = GarbageCollector(10) + gc_thread = threading.Thread(garbage_collector.collect_exited_containers()) + gc_thread.daemon = True + gc_thread.start() + + SysOut.out_string("Garbage collector started") + + def update_worker_status(): """ Update the worker status to the master as well as container info. @@ -27,15 +38,18 @@ def update_worker_status(): Get machine status by calling a unix command and fetch for load average """ - s_content = Services.get_machine_status(Setting, CRole.WORKER) - s_content[Definition.REST.get_str_docker()] = DockerService.get_containers_status() + content = Services.get_machine_status(Setting, CRole.WORKER) + content[Definition.REST.get_str_docker()] = DockerService.get_containers_status() + content[Definition.REST.get_str_local_imgs()] = DockerService.get_local_images() + + s_content = bytes(json.dumps(content), 'utf-8') html = urllib3.PoolManager() try: r = html.request('PUT', Definition.Master.get_str_check_master(Setting.get_master_addr(), Setting.get_master_port(), Setting.get_token()), - body=str(s_content)) + body=s_content) if r.status != 200: SysOut.err_string("Cannot update worker status to the master!") @@ -83,3 +97,6 @@ def update_worker_status(): # Update the worker status pool.submit(update_worker_status) + + # Start garbage collector thread + pool.submit(start_gc_thread) \ No newline at end of file diff --git a/harmonicIO/worker/configuration.json b/harmonicIO/worker/configuration.json index b177066..81a1405 100644 --- a/harmonicIO/worker/configuration.json +++ b/harmonicIO/worker/configuration.json @@ -1,10 +1,11 @@ { "node_name": "PE Worker", "node_port": 8081, - "node_internal_addr": "192.168.0.84", + "node_internal_addr": "192.168.1.5", "node_external_addr": "None", - "master_addr": "192.168.0.84", + "master_addr": "192.168.1.5", "master_port": 8080, "node_data_port_range": [9000, 9010], - "std_idle_time": 5 + "std_idle_time": 5, + "container_idle_timeout": 60 } diff --git a/harmonicIO/worker/configuration.py b/harmonicIO/worker/configuration.py index 2946ddc..f434122 100644 --- a/harmonicIO/worker/configuration.py +++ b/harmonicIO/worker/configuration.py @@ -12,6 +12,7 @@ class Setting(object): __master_port = None __node_external_addr = None __node_internal_addr = None + __container_idle_timeout = None @staticmethod def set_node_addr(addr=None): @@ -87,6 +88,10 @@ def get_min_worker(): @staticmethod def get_node_external_addr(): return Setting.__node_external_addr + + @staticmethod + def get_container_idle_timeout(): + return Setting.__container_idle_timeout @staticmethod def read_cfg_from_file(): @@ -107,7 +112,8 @@ def read_cfg_from_file(): Definition.get_str_idle_time() in cfg and \ Definition.get_str_master_addr() in cfg and \ Definition.get_str_master_port() in cfg and \ - Definition.get_str_node_external_addr() in cfg and \ + Definition.get_str_container_idle_timeout() in cfg and \ + Definition.get_str_node_internal_addr() in cfg and \ Definition.get_str_node_internal_addr(): # Check port number is int or not if not isinstance(cfg[Definition.get_str_node_port()], int): @@ -137,6 +143,7 @@ def read_cfg_from_file(): Setting.__master_addr = cfg[Definition.get_str_master_addr()].strip() Setting.__master_port = cfg[Definition.get_str_master_port()] Setting.__node_external_addr = cfg[Definition.get_str_node_external_addr()].strip().lower() + Setting.__container_idle_timeout = cfg[Definition.get_str_container_idle_timeout()] # Check for auto node name if Setting.__node_name.lower() == "auto": diff --git a/harmonicIO/worker/docker_master.py b/harmonicIO/worker/docker_master.py index 6ce4346..ab79694 100644 --- a/harmonicIO/worker/docker_master.py +++ b/harmonicIO/worker/docker_master.py @@ -4,6 +4,8 @@ from harmonicIO.general.definition import CStatus, Definition from harmonicIO.general.services import SysOut +from docker.errors import APIError +from requests.exceptions import HTTPError class ChannelStatus(object): def __init__(self, port): @@ -53,6 +55,13 @@ def __get_available_port(self): return None + def __update_ports(self): + for port in self.__ports: + if port.is_port_open(): + port.status = CStatus.BUSY + else: + port.status = CStatus.AVAILABLE + def get_containers_status(self): def get_container_status(input): @@ -63,31 +72,54 @@ def get_container_status(input): return res res = [] - for item in self.__client.containers.list(): + for item in self.__client.containers.list(all=True): res.append(get_container_status(item)) # To print all logs: #print(item.logs(stdout=True, stderr=True)) return res - def run_container(self, container_name): + def get_local_images(self): + # get a list of all tags of all locally available images on this machine + imgs = self.__client.images.list() + local_imgs = [] + for img in imgs: + local_imgs += img.tags + + return local_imgs + + def delete_container(self, cont_shortid): + # remove a container from the worker by provided short id, only removes exited containers + try: + self.__client.containers.get(cont_shortid).remove() + return True + except (ApiError, HTTPError) as e: + SysOut.err_string("Could not remove requested container, exception:\n{}".format(e)) + return False + + + def run_container(self, container_name, volatile=False): def get_ports_setting(expose, ports): return {str(expose) + '/tcp': ports} - def get_env_setting(expose, a_port): + def get_env_setting(expose, a_port, volatile): ret = dict() ret[Definition.Docker.HDE.get_str_node_name()] = container_name ret[Definition.Docker.HDE.get_str_node_addr()] = Setting.get_node_addr() + ret[Definition.Docker.HDE.get_str_node_rest_port()] = Setting.get_node_port() ret[Definition.Docker.HDE.get_str_node_data_port()] = expose ret[Definition.Docker.HDE.get_str_node_forward_port()] = a_port ret[Definition.Docker.HDE.get_str_master_addr()] = Setting.get_master_addr() ret[Definition.Docker.HDE.get_str_master_port()] = Setting.get_master_port() ret[Definition.Docker.HDE.get_str_std_idle_time()] = Setting.get_std_idle_time() ret[Definition.Docker.HDE.get_str_token()] = Setting.get_token() - + if volatile: + ret[Definition.Docker.HDE.get_str_idle_timeout()] = Setting.get_container_idle_timeout() return ret + self.__update_ports() + port = self.__get_available_port() expose_port = 80 @@ -101,7 +133,7 @@ def get_env_setting(expose, a_port): stderr=True, stdout=True, ports=get_ports_setting(expose_port, port), - environment=get_env_setting(expose_port, port)) + environment=get_env_setting(expose_port, port, volatile)) import time time.sleep(1) print('..created container, logs:') @@ -110,7 +142,8 @@ def get_env_setting(expose, a_port): if res: SysOut.out_string("Container " + container_name + " is created!") SysOut.out_string("Container " + container_name + " is " + res.status + " ") - return True + # return short id of container + return res.short_id else: SysOut.out_string("Container " + container_name + " cannot be created!") return False diff --git a/harmonicIO/worker/docker_service.py b/harmonicIO/worker/docker_service.py index 557f81e..0fc7098 100644 --- a/harmonicIO/worker/docker_service.py +++ b/harmonicIO/worker/docker_service.py @@ -9,9 +9,17 @@ def init(): DockerService.__docker_master = DockerMaster() @staticmethod - def create_container(container_name): - return DockerService.__docker_master.run_container(container_name) + def create_container(container_name, volatile=False): + return DockerService.__docker_master.run_container(container_name, volatile) @staticmethod def get_containers_status(): return DockerService.__docker_master.get_containers_status() + + @staticmethod + def get_local_images(): + return DockerService.__docker_master.get_local_images() + + @staticmethod + def delete_container(csid): + return DockerService.__docker_master.delete_container(csid) diff --git a/harmonicIO/worker/garbage_collector.py b/harmonicIO/worker/garbage_collector.py new file mode 100644 index 0000000..66f42a0 --- /dev/null +++ b/harmonicIO/worker/garbage_collector.py @@ -0,0 +1,29 @@ +from .docker_service import DockerService +from harmonicIO.general.definition import Definition +from harmonicIO.general.services import SysOut + +from time import sleep +class GarbageCollector(): + + # interval between garbage collections in seconds + gc_run_interval = 300 + + def __init__(self, run_interval=300): + self.gc_run_interval = run_interval + + + def collect_exited_containers(self): + while True: + sleep(self.gc_run_interval) + + exited_containers = [] + current_containers = DockerService.get_containers_status() + for cont in current_containers: + # find exited containers + if cont.get(Definition.Container.Status.get_str_status()) == 'exited': + exited_containers.append(cont.get(Definition.Container.Status.get_str_sid())) + + for sid in exited_containers: + if not DockerService.delete_container(sid): + SysOut.debug_string("Could not delete target container: {}".format(sid)) + \ No newline at end of file diff --git a/harmonicIO/worker/rest_service.py b/harmonicIO/worker/rest_service.py index d4e4854..17910fa 100644 --- a/harmonicIO/worker/rest_service.py +++ b/harmonicIO/worker/rest_service.py @@ -3,6 +3,34 @@ from harmonicIO.general.services import SysOut, Services from .docker_service import DockerService from harmonicIO.general.definition import Definition, CRole +import json + + +# function that sends request to master to notify exiting of a container +def notify_master_container_finished(container, csid): + from urllib.request import urlopen, Request + from urllib.error import HTTPError + + notify_url = "http://{}:{}/{}?token=None&{}={}&{}={}".format( + Setting.get_master_addr(), + Setting.get_master_port(), + Definition.REST.get_str_status(), + Definition.Docker.get_str_finished(), + csid, + Definition.Container.get_str_con_image_name(), + container + ) + try: + req = Request(url=notify_url, method='PUT') + resp = urlopen(req) + + if resp.getcode() == 200: + # container was removed on master + return True + except HTTPError as e: + SysOut.err_string(e.msg) + return False + class ContainerService(object): @@ -31,6 +59,25 @@ def on_get(self, req, res): res.body = str(body) res.content_type = "String" res.status = falcon.HTTP_200 + return + + # Container is exiting, notify master to update + if req.params[Definition.Docker.get_str_command()] == Definition.Docker.get_str_finished(): + res.content_type = "String" + short_id = req.params.get(Definition.Container.Status.get_str_sid()) + name = req.params.get(Definition.Container.get_str_con_image_name()) + if short_id and name: + if not notify_master_container_finished(name, short_id): + res.body = "Could not find requested container running." + res.status = falcon.HTTP_404 + else: + res.body = "ACK: Container terminated, master notified." + res.status = falcon.HTTP_200 + else: + res.body = "Container short id required" + res.status = falcon.HTTP_400 + + def on_post(self, req, res): """ @@ -52,19 +99,22 @@ def on_post(self, req, res): POST: docker?token=None&command=create """ if req.params[Definition.Docker.get_str_command()] == Definition.Docker.get_str_create(): - # Unpack the posted data - raw = str(req.stream.read(), 'UTF-8') - data = eval(raw) + raw = req.stream.read(req.content_length or 0) + data = json.loads(str(raw, 'utf-8')) # create dict of body data if it exists if not data[Definition.Container.get_str_con_image_name()]: res.body = "Required parameters are not supplied!" res.content_type = "String" res.status = falcon.HTTP_401 - result = DockerService.create_container(data[Definition.Container.get_str_con_image_name()]) + volatile = False + if data.get('volatile'): + volatile = True # only set to true if user has actually provided the 'volatile' : true data in request + + result = DockerService.create_container(data[Definition.Container.get_str_con_image_name()], volatile) if result: - res.body = "Okay" + res.body = "{}".format(result) res.content_type = "String" res.status = falcon.HTTP_200 return @@ -92,6 +142,7 @@ def on_get(self, req, res): if req.params[Definition.get_str_token()] == Setting.get_token(): s_content = Services.get_machine_status(Setting, CRole.WORKER) s_content[Definition.REST.get_str_docker()] = DockerService.get_containers_status() + s_content[Definition.REST.get_str_local_imgs()] = DockerService.get_local_images() res.body = str(s_content)