diff --git a/prt.py b/prt.py index ae39332..9e3b58d 100644 --- a/prt.py +++ b/prt.py @@ -19,6 +19,7 @@ import urllib import urllib2 import uuid +import Queue from distutils.spawn import find_executable @@ -90,6 +91,8 @@ def colored(msg, *args): NEW_TRANSCODER_NAME = "plex_transcoder" ORIGINAL_TRANSCODER_NAME = "Plex Transcoder" +SEGMENTS_PER_NODE = 5 + REMOTE_ARGS = ("%(env)s;" "cd %(working_dir)s;" "%(command)s %(args)s") @@ -128,6 +131,7 @@ def printf(message, *args, **kwargs): sys.stdout.write(colored(message % args, color, attrs=attrs)) sys.stdout.flush() + def get_auth_token(): url = "https://plex.tv/users/sign_in.json" payload = urllib.urlencode({ @@ -248,6 +252,7 @@ def overwrite_transcoder_after_upgrade(): print "Transcoder hasn't been previously installed, please use install option" sys.exit(1) + def build_env(host=None): # TODO: This really should be done in a way that is specific to the target # in the case that the target is a different architecture than the host @@ -303,6 +308,29 @@ def transcode_local(): if output and is_debug: log.debug(output.strip('\n')) + +def get_available_remote_servers(): + config = get_config() + servers = config["servers"] + available_servers = {} + + for hostname, host in servers.items(): + log.debug("Getting load for host '%s'" % hostname) + host["load"] = get_system_load_remote(hostname, host["port"], host["user"]) + + if not host["load"]: + # If no load is returned, then it is likely that the host + # is offline or unreachable + log.debug("Couldn't get load for host '%s'" % hostname) + continue + + available_servers[hostname] = host + + log.debug("Available servers : %s\n" % available_servers) + + return available_servers + + def transcode_remote(): setup_logging() @@ -356,53 +384,74 @@ def transcode_remote(): except Exception, e: log.error("Error retreiving host list via '%s': %s" % (config["servers_script"], str(e))) - hostname, host = None, None - - # Let's try to load-balance - min_load = None - for hostname, host in servers.items(): - - log.debug("Getting load for host '%s'" % hostname) - load = get_system_load_remote(hostname, host["port"], host["user"]) - - if not load: - # If no load is returned, then it is likely that the host - # is offline or unreachable - log.debug("Couldn't get load for host '%s'" % hostname) + command = command.replace("127.0.0.1", config["ipaddress"]).split(' ') + segment_time = int(command[command.index("-segment_time") + 1]) + ss = int(command[command.index("-ss") + 1]) + start_segment = int(command[command.index("-segment_start_number") + 1]) + q = Queue.Queue() + init = True + just_finished = False + transcoding_servers = [] + consecutive_errors = 0 + log.info("Initializing distributed trancode %s" % command) + log.info("Segment time: %s" % segment_time) + + while (consecutive_errors < 5) and (q.empty() is False or init is True or just_finished is True): + log.info("Fetching available servers") + available_servers = get_available_remote_servers() + just_finished = False + + for hostname, host in available_servers.items(): + log.info("Checking server %s" % hostname) + if hostname in transcoding_servers: + log.info("Server already transcoding a segment") + continue + + log.info("Starting trancoder with segment %s" % str(start_segment)) + proc = process_segment(host, hostname, start_segment, segment_time, ss, command) + transcoding_servers.append(hostname) + start_segment+=SEGMENTS_PER_NODE + ss+=segment_time*SEGMENTS_PER_NODE + q.put((proc, hostname)) + + if init: + log.info("Distributed transcode initialized") + init = False continue - log.debug("Log for '%s': %s" % (hostname, str(load))) + proc, hostname = q.get() + log.info("Checking if %s finished transcode" % hostname) + code = proc.poll() - # XXX: Use more that just 1-minute load? - if min_load is None or min_load[1] > load[0]: - min_load = (hostname, load[0],) - - if min_load is None: - log.info("No hosts found...using local") - return transcode_local() + if code is None: + q.put((proc, hostname)) + else: + log.info("%s finished transcode" % hostname) + just_finished = True + if code == 1: + consecutive_errors += 1 + log.info("Transcode returned an error (%s)" % str(consecutive_errors)) + else: + consecutive_errors = 0 - # Select lowest-load host - log.info("Host with minimum load is '%s'" % min_load[0]) - hostname, host = min_load[0], servers[min_load[0]] + transcoding_servers.remove(hostname) - log.info("Using transcode host '%s'" % hostname) + time.sleep(.300) - # Remap the 127.0.0.1 reference to the proper address - command = command.replace("127.0.0.1", config["ipaddress"]) + log.info("Transcode finished") - # - # TODO: Remap file-path to PMS URLs - # - args = ["ssh", "%s@%s" % (host["user"], hostname), "-p", host["port"]] + [command] +def process_segment (host, hostname, segment, time, ss, command): + command = ["ssh", "%s@%s" % (host["user"], hostname), "-p", host["port"]] + command + cmd = [] - log.info("Launching transcode_remote with args %s\n" % args) + command[command.index("-segment_start_number") + 1] = str(segment) + command[command.index("-ss") + 1] = str(ss) + command.insert(command.index("-i"), "-t") + command.insert(command.index("-i"), str(int(command[command.index("-segment_time") + 1]) * SEGMENTS_PER_NODE)) # Spawn the process - proc = subprocess.Popen(args) - proc.wait() - - log.info("Transcode stopped on host '%s'" % hostname) + return subprocess.Popen(command) def re_get(regex, string, group=0, default=None): @@ -415,6 +464,7 @@ def re_get(regex, string, group=0, default=None): return match.groups() return default + def et_get(node, attrib, default=None): if node is not None: return node.attrib.get(attrib, default) @@ -437,6 +487,7 @@ def get_plex_sessions(auth_token=None): } return sessions + def get_sessions(): sessions = {} @@ -487,6 +538,7 @@ def get_sessions(): sessions[m.groups()[0]] = data return sessions + def check_config(): """ Run through various diagnostic checks to see if things are configured @@ -597,13 +649,13 @@ def usage(): print "Usage:\n" print " %s [options]\n" % os.path.basename(sys.argv[0]) print ( - "Options:\n\n" - " usage, help, -h, ? Show usage page\n" - " get_load Show the load of the system\n" - " get_cluster_load Show the load of all systems in the cluster\n" - " install Install PRT for the first time and then sets up configuration\n" - " overwrite Fix PRT after PMS has had a version update breaking PRT\n" - " add_host Add an extra host to the list of slaves PRT is to use\n" + "Options:\n\n" + " usage, help, -h, ? Show usage page\n" + " get_load Show the load of the system\n" + " get_cluster_load Show the load of all systems in the cluster\n" + " install Install PRT for the first time and then sets up configuration\n" + " overwrite Fix PRT after PMS has had a version update breaking PRT\n" + " add_host Add an extra host to the list of slaves PRT is to use\n" " remove_host Removes a host from the list of slaves PRT is to use\n" " sessions Display current sessions\n" " check_config Checks the current configuration for errors\n") @@ -707,4 +759,3 @@ def main(): else: usage() sys.exit(-1) -