Skip to content
Open
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
42594f9
testcommit, fork stuff
Snapple49 Feb 5, 2018
371a6a8
Revert "testcommit, fork stuff"
Snapple49 Feb 5, 2018
d8db906
added comment with stuff todo
Snapple49 Feb 5, 2018
96d28ad
Merge branch 'master' of https://github.com/HASTE-project/HarmonicIO
Snapple49 Feb 6, 2018
bcca87b
Merge branch 'master' of https://github.com/HASTE-project/HarmonicIO
Snapple49 Feb 7, 2018
7aebcf3
Merge branch 'master' of https://github.com/Snapple49/HarmonicIO
Snapple49 Feb 7, 2018
0f0d19d
added ClientManager to falcon api, added post request to queue a job
Snapple49 Feb 7, 2018
06b0cac
bugfix in POST
Snapple49 Feb 7, 2018
5f6245f
bugifx, added randrange
Snapple49 Feb 7, 2018
3372b84
bugifx, typo and testing req.stream read
Snapple49 Feb 7, 2018
e8663f9
bugifx
Snapple49 Feb 7, 2018
47a09bb
bugifx
Snapple49 Feb 7, 2018
69dacb7
bugifx, added json loads
Snapple49 Feb 7, 2018
42f7a7f
bugfix, str repr of tuple
Snapple49 Feb 8, 2018
51225a4
bugfix, fix urlencode
Snapple49 Feb 8, 2018
9c63391
bugfix
Snapple49 Feb 8, 2018
65c7795
urlencode fixed?
Snapple49 Feb 8, 2018
3a3ec83
fixed parameter read in docker create call
Snapple49 Feb 8, 2018
d204717
fixed missing import
Snapple49 Feb 8, 2018
f31fc73
fixed missing import
Snapple49 Feb 8, 2018
2b3de1d
debugging
Snapple49 Feb 8, 2018
e783d01
changed encoding to use json dumps
Snapple49 Feb 8, 2018
48d6eec
debugging
Snapple49 Feb 8, 2018
b2d87a9
json dumps fixing?
Snapple49 Feb 8, 2018
be65210
json typos
Snapple49 Feb 8, 2018
aa7f78e
added Jobs field to metadata
Snapple49 Feb 9, 2018
a834ec4
added job to metadata
Snapple49 Feb 12, 2018
e81ace6
working on creating job upon user request, search available local images
Snapple49 Feb 12, 2018
c188a65
created class for managing job queue
Snapple49 Feb 13, 2018
0604724
refactoring, added job status definiton class
Snapple49 Feb 13, 2018
15c8932
unsaved changes in meta table
Snapple49 Feb 13, 2018
cf52da2
I broke something, fixing
Snapple49 Feb 13, 2018
422cf2d
Maybe fixed broken? Added return statement to external metadata funct…
Snapple49 Feb 13, 2018
934beea
Maybe fixed broken? Imported jobqueue.JobQueue
Snapple49 Feb 13, 2018
cc5e0a8
Fixed typo in meta table update_job
Snapple49 Feb 13, 2018
2ed2602
Fixed typos in rest service
Snapple49 Feb 13, 2018
4211e43
solved issue that breaks master, JobQueue reserved name?
Snapple49 Feb 13, 2018
a714224
Created job queue, added queue manager thread to main, creating conta…
Snapple49 Feb 14, 2018
4325a3f
bugfix and refactors, actually started job queuer thread, start job p…
Snapple49 Feb 15, 2018
6de996a
worker updates local images correctly
Snapple49 Feb 16, 2018
e32f463
job queuer searches for appropriate worker node for putting container
Snapple49 Feb 16, 2018
0750c81
removed uneccessary TODO's, cleanup in Jobs in metatable
Snapple49 Feb 23, 2018
7705efe
located where to implement garbage collection, see TODO
Snapple49 Feb 23, 2018
37f7d01
small bugfixes, added api call to notify master that container is exi…
Snapple49 Feb 26, 2018
4252532
test
Snapple49 Feb 27, 2018
9e75722
test successful!
Snapple49 Feb 27, 2018
dd5c3c5
added garbage collector of exited containers to worker, adding short …
Snapple49 Feb 28, 2018
2c03ceb
bugfix, consumed response with sid when checking if worker started co…
Snapple49 Feb 28, 2018
8b274e7
added support to request several containers of same image name
Snapple49 Mar 1, 2018
8e7d0b5
changed queue to normal list in LookUpTable.__containers, working on …
Snapple49 Mar 5, 2018
be4fad0
fixed container removal bug
Snapple49 Mar 5, 2018
ad5411d
added support for requesting volatile jobs and this is managed when c…
Snapple49 Mar 6, 2018
c9a218f
bugfixes, setting up todo's
Snapple49 Mar 6, 2018
4cf31ba
made stream connector use daemon testing container, fixed bug in look…
Snapple49 Mar 7, 2018
ac3655b
added naive master-based upscaling depending on msg queue length
Snapple49 Mar 7, 2018
880209c
added comment describing parameters in master, temporary fix
Snapple49 Mar 12, 2018
afe565c
made worker port in master rest call follow config, working on contai…
Snapple49 Mar 15, 2018
23d4fcb
ports issue fixed(?), now job queuer tries to send requests to all av…
Snapple49 Mar 16, 2018
739936d
added json output option to master status verbose
Snapple49 Mar 26, 2018
950cc6e
added worker node port for rest service to container HDE data
Snapple49 Mar 26, 2018
1f09af0
added default value for volatility
Snapple49 Apr 3, 2018
c641e60
Update Readme.md
Snapple49 Apr 3, 2018
5f7091e
Update Readme.md
Snapple49 Apr 3, 2018
1843597
added option to enable/disable autoscaling in master config file
Snapple49 Apr 4, 2018
806d0c8
fixed typo and minutes to seconds conversion according to PR feedback
Snapple49 Apr 4, 2018
e5ed841
Update Readme.md
Snapple49 Apr 4, 2018
6d4ce4f
Create IRM_default_parameters.md
Snapple49 Jul 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ Stream_Connector - client for sending tasks for distributed execution.

Forked from https://github.com/beirbear/HarmonicIO

## Update from Oliver:
* Autoscaling:

An important feature added is auto-scaling, but to not break production it can be disabled. To enable/disable, set the field "auto_scaling_enabled" to true/false in the master's configuration.json file

* Hosting containers:
```
curl -X POST "http://<master_ip>:/jobRequest?token=None&type=new_job" --data '{"c_name" : <container_image>, "num" : , "volatile" : <true/false>}'
```
NOTE: spelling is important, `true`=volatile container, `false`=involatile container. responds with an ID of the container creation job

* Polling status of container request:
```
curl http://<master_ip>:/jobRequest?token=None&type=poll_job&job_id=<job id, see above>
```
, checks status of the container hosting job with provided ID, READY means all contaiers are started and running, INITIALIZING means not all have started yet, FAILED means not all could be started but some may still be available

* Stream connector

Use just as before

## Quickstart

Expand Down
32 changes: 30 additions & 2 deletions harmonicIO/general/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ class CTuple:
RT = 3


class JobStatus:
INIT = "INITIALIZING"
READY = "READY"
ACTIVE = "ACTIVE"
IDLE = "IDLE"
FAILED = "FAILED"


class Definition(object):
@staticmethod
def get_str_node_name():
Expand Down Expand Up @@ -85,8 +93,8 @@ def get_str_data_port_range():
return "node_data_port_range"

@staticmethod
def get_str_idle_time():
return "std_idle_time"
def get_str_container_idle_timeout():
return "container_idle_timeout"

@staticmethod
def get_str_token():
Expand Down Expand Up @@ -176,6 +184,10 @@ def get_str_stream_req():
def get_str_msg_query():
return "messagesQuery"

@staticmethod
def get_str_job_mgr():
return "jobRequest"

@staticmethod
def get_str_reg_func():
return "registeredFunctions"
Expand All @@ -188,6 +200,10 @@ def get_str_token():
def get_str_docker():
return "docker"

@staticmethod
def get_str_local_imgs():
return "local_images"

class Batch(object):
@staticmethod
def get_str_batch_addr():
Expand Down Expand Up @@ -281,6 +297,10 @@ def get_str_status():
def get_str_query():
return "query"

@staticmethod
def get_str_finished():
return "finished"

class HDE(object):

@staticmethod
Expand All @@ -291,6 +311,10 @@ def get_str_node_name():
def get_str_node_addr():
return "HDE_NODE_ADDR"

@staticmethod
def get_str_node_rest_port():
return "HDE_NODE_REST_PORT"

@staticmethod
def get_str_node_data_port():
return "HDE_NODE_DATA_PORT"
Expand All @@ -314,3 +338,7 @@ def get_str_std_idle_time():
@staticmethod
def get_str_token():
return "HDE_TOKEN"

@staticmethod
def get_str_idle_timeout():
return "HDE_IDLE_TIMEOUT"
1 change: 1 addition & 0 deletions harmonicIO/general/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def get_machine_status(setting, role):
body[Definition.get_str_node_name()] = setting.get_node_name()
body[Definition.get_str_node_role()] = role
body[Definition.get_str_node_addr()] = setting.get_node_addr()
body[Definition.get_str_node_port()] = setting.get_node_port()
body[Definition.get_str_load1()] = load1
body[Definition.get_str_load5()] = load5
body[Definition.get_str_load15()] = load15
Expand Down
29 changes: 27 additions & 2 deletions harmonicIO/master/__main__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
from harmonicIO.general.services import SysOut

from .jobqueue import JobManager

"""
Master entry point
"""

def run_queue_manager(manager):
"""
Run job queue manager thread
can be several managers to manage large amount of queued jobs
"""
import threading
for i in range(manager.queuer_threads):
manager_thread = threading.Thread(target=manager.job_queuer)
manager_thread.daemon = True
manager_thread.start()

SysOut.out_string("Job queue started")

if Setting.get_autoscaling():
supervisor_thread = threading.Thread(target=manager.queue_supervisor)
supervisor_thread.daemon = True
supervisor_thread.start()
SysOut.out_string("Autoscaling supervisor started")



def run_rest_service():
"""
Expand Down Expand Up @@ -64,4 +84,9 @@ def run_msg_service():

# Binding commander to the rest service and enable REST service
pool.submit(run_rest_service)


# create a job manager which is a queue manager supervising the creation of containers, both via user and auto-scaling
jobManager = JobManager(30, 100, 5, 1) # 30 seconds interval between checking, 100 requests in queue before increase, add 5 new containers, 1 thread for queue supervisor

# Run job queue manager thread
pool.submit(run_queue_manager, jobManager)
5 changes: 3 additions & 2 deletions harmonicIO/master/configuration.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"node_name": "PE Master",
"master_addr": "192.168.0.84",
"master_addr": "192.168.1.17",
"node_port": 8080,
"node_data_port_range": [8090,8090],
"std_idle_time": 5
"std_idle_time": 5,
"auto_scaling_enabled" : false
}
6 changes: 6 additions & 0 deletions harmonicIO/master/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Setting(object):
__node_data_port_stop = None
__std_idle_time = None
__token = "None"
__autoscaling = None

@staticmethod
def set_node_addr(addr=None):
Expand Down Expand Up @@ -58,6 +59,10 @@ def get_std_idle_time():
def get_token():
return Setting.__token

@staticmethod
def get_autoscaling():
return Setting.__autoscaling

@staticmethod
def read_cfg_from_file():
from harmonicIO.general.services import Services, SysOut
Expand Down Expand Up @@ -99,6 +104,7 @@ def read_cfg_from_file():
Setting.__node_data_port_start = cfg[Definition.get_str_data_port_range()][0]
Setting.__node_data_port_stop = cfg[Definition.get_str_data_port_range()][1]
Setting.__std_idle_time = cfg[Definition.get_str_idle_time()]
Setting.__autoscaling = cfg.get('auto_scaling_enabled')
SysOut.out_string("Load setting successful.")

try:
Expand Down
118 changes: 118 additions & 0 deletions harmonicIO/master/jobqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import queue
import json
from urllib.request import urlopen
from .meta_table import LookUpTable
from harmonicIO.general.definition import Definition, JobStatus
from harmonicIO.general.services import SysOut
import time
from .messaging_system import MessagesQueue

class JobManager:

def __init__(self, interval, threshold, increment, queuers):
self.__supervisor_interval = interval
self.__supervisor_increment = increment
self.__supervisor_threshold = threshold
self.queuer_threads = queuers


def find_available_worker(self, container):
candidates = []
workers = LookUpTable.Workers.verbose()
SysOut.debug_string("Found workers: " + str(workers))
if not workers:
return None

# loop through workers and make tuples of worker IP, load and if requested container is available locally
for worker in workers:

curr_worker = workers[worker]
if container in curr_worker[Definition.REST.get_str_local_imgs()]:
candidates.append(((curr_worker[Definition.get_str_node_addr()], curr_worker[Definition.get_str_node_port()]), curr_worker[Definition.get_str_load5()], True))
else:
candidates.append(((curr_worker[Definition.get_str_node_addr()], curr_worker[Definition.get_str_node_port()]), curr_worker[Definition.get_str_load5()], False))

candidates.sort(key=lambda x: (-x[2], x[1])) # sort candidate workers first on availability of image, then on load (avg load last 5 mins)
for candidate in list(candidates):
if not float(candidate[1]) < 0.5:
candidates.remove(candidate) # remove candidates with higher than 50% cpu load

return candidates

def start_job(self, target, job_data):
# send request to worker
worker_url = "http://{}:{}/docker?token=None&command=create".format(target[0], target[1])
req_data = bytes(json.dumps(job_data), 'utf-8')
resp = urlopen(worker_url, req_data) # NOTE: might need increase in timeout to allow download of large container images!!!

if resp.getcode() == 200: # container was created
sid = str(resp.read(), 'utf-8')
SysOut.debug_string("Received sid from container: " + sid)
return sid
return False

def job_queuer(self):
while True:
job_data = JobQueue.q.get()
num_of_conts = job_data.get('num')
job_sids = []
targets = self.find_available_worker(job_data.get(Definition.Container.get_str_con_image_name()))
SysOut.debug_string("Candidate workers: " + str(targets))
n = 0
while len(job_sids) < num_of_conts:
target = targets[n][0]
SysOut.debug_string("Attempting to send request to worker: " + str(target))
try:
sid = self.start_job(target, job_data)
if sid:
job_sids.append(sid)
else: # not sure how urllib handles a 400 response, but this needs to happen either in case of exception or sid = False
if n < len(targets)-1: # other candidates are available
n+= 1
continue
else:
job_data['job_status'] = JobStatus.FAILED
break

if len(job_sids) == num_of_conts:
job_data['job_status'] = JobStatus.READY
job_data[Definition.Container.Status.get_str_sid()] = job_sids #TODO: add this in metatable

except:
SysOut.debug_string("Response from worker threw exception!")
if n < len(targets)-1: # other candidates are available
SysOut.usr_string("We got to other candidates available!!!!!!! -------------------------------------")
n+= 1
continue
else:
job_data['job_status'] = JobStatus.FAILED
break # break makes it stop trying to create new containers as soon as one fails, is this desireable? Probaby as now it is unlikely that there is any hosting capability

## NOTE: can get really ugly, need to cleanup containers that started (rollback) OR let user know how many were started instead?? or retry failed ones?
LookUpTable.Jobs.update_job(job_data)
JobQueue.q.task_done()

def queue_supervisor(self):
"""
Thread that handles autoscaling
"""
while True:
time.sleep(self.__supervisor_interval) ## NOTE: this is probably a very tuneable parameter for later
msg_queue = MessagesQueue.verbose()
for container in msg_queue:
if int(msg_queue[container]) > self.__supervisor_threshold:
job_data = {
Definition.Container.get_str_con_image_name() : container,
'num' : self.__supervisor_increment,
'volatile' : True
}
JobQueue.queue_new_job(job_data)



class JobQueue:
q = queue.Queue()

@staticmethod
def queue_new_job(job_data):
JobQueue.q.put(job_data)
Loading