Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@
# emacs, vim
*~
.*.swp

.claude/
6 changes: 2 additions & 4 deletions fabric/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@


class NetworkError(Exception):
# Must allow for calling with zero args/kwargs, since pickle is apparently
# stupid with exceptions and tries to call it as such when passed around in
# a multiprocessing.Queue.
def __init__(self, message=None, wrapped=None):
# Must allow for calling with zero args/kwargs for consistency.
def __init__(self, message, wrapped):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signature change seems extraneous, so unless it's needed, I'd rather see it reverted to reduce the diff and chances of bugs. Seems reasonable to update the comment, though would like to see it be a little more informative (i.e. consistency with what)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the comment without changing the code doesn't really make sense. The only reason this exists is a hack for pickling to work in a multiprocessing queue which we removed entirely. i can clarify the comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'm inclined to leave the code as it was then, but if you're confident this works, then we can move forward with an updated comment

self.message = message
self.wrapped = wrapped

Expand Down
50 changes: 16 additions & 34 deletions fabric/job_queue.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
"""
Sliding-window-based job/task queue class (& example of use.)

May use ``multiprocessing.Process`` or ``threading.Thread`` objects as queue
items, though within Fabric itself only ``Process`` objects are used/supported.
Uses ``threading.Thread`` objects as queue items for parallel task execution.
"""

import time
import queue as Queue
from collections import namedtuple

from fabric.network import ssh
from fabric.context_managers import settings


DoneProc = namedtuple('DoneProc', ['name', 'exitcode'])

class JobQueue(object):
"""
The goal of this class is to make a queue of processes to run, and go
Expand Down Expand Up @@ -147,14 +142,6 @@ def _advance_the_queue():
if self._debug:
print("Job queue found finished proc: %s." % job.name)
done = self._running.pop(id)
# might be a Process or a Thread
if hasattr(done, 'exitcode'):
proc = done
done = DoneProc(proc.name, proc.exitcode)
# multiprocessing.Process.close() added in Python-3.7
if hasattr(proc, 'close'):
proc.close()
del proc
self._completed.append(done)

if self._debug:
Expand All @@ -178,64 +165,59 @@ def _advance_the_queue():
# already have finished.
self._fill_results(results)

# Attach exit codes now that we're all done & have joined all jobs
for job in self._completed:
if hasattr(job, 'exitcode'):
results[job.name]['exit_code'] = job.exitcode

return results

def _fill_results(self, results):
"""
Attempt to pull data off self._comms_queue and add to 'results' dict.
If no data is available (i.e. the queue is empty), bail immediately.

Exit codes are derived from the result type: exceptions map to 1,
normal results map to 0.
"""
while True:
try:
datum = self._comms_queue.get_nowait()
results[datum['name']]['results'] = datum['result']
if isinstance(datum['result'], BaseException):
results[datum['name']]['exit_code'] = 1
else:
results[datum['name']]['exit_code'] = 0
except Queue.Empty:
break


# Sample

def try_using(parallel_type):
def sample_usage():
"""
This will run the queue through it's paces, and show a simple way of using
the job queue.
This will run the queue through its paces, and show a simple way of using
the job queue with threads.
"""
from threading import Thread

def print_number(number):
"""
Simple function to give a simple task to execute.
"""
print(number)

if parallel_type == "multiprocessing":
from multiprocessing import Process as Bucket

elif parallel_type == "threading":
from threading import Thread as Bucket

# Make a job_queue with a bubble of len 5, and have it print verbosely
queue = Queue.Queue()
jobs = JobQueue(5, queue)
jobs._debug = True

# Add 20 procs onto the stack
# Add 20 threads onto the stack
for x in range(20):
jobs.append(Bucket(
jobs.append(Thread(
target=print_number,
args=[x],
kwargs={},
))

# Close up the queue and then start it's execution
# Close up the queue and then start its execution
jobs.close()
jobs.run()


if __name__ == '__main__':
try_using("multiprocessing")
try_using("threading")
sample_usage()
15 changes: 14 additions & 1 deletion fabric/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import paramiko as ssh

from fabric.auth import get_password, set_password
from fabric.utils import handle_prompt_abort, warn
from fabric.utils import _ThreadLocalDictMixin, handle_prompt_abort, warn
from fabric.exceptions import NetworkError


Expand Down Expand Up @@ -163,6 +163,19 @@ def __contains__(self, key):
return dict.__contains__(self, normalize_to_string(key))


class _ThreadLocalHostConnectionCache(_ThreadLocalDictMixin, HostConnectionCache):
"""
``HostConnectionCache`` with thread-local override support.

Worker threads call ``_set_thread_local()`` to get their own empty
connection cache so SSH connections are not shared across threads.
"""

def _set_thread_local(self):
"""Install an empty per-thread ``HostConnectionCache``."""
self._thread_local._data = HostConnectionCache()


def ssh_config(host_string=None):
"""
Return ssh configuration dict for current env.host_string host value.
Expand Down
8 changes: 4 additions & 4 deletions fabric/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

import paramiko as ssh

from fabric.network import HostConnectionCache
from fabric.network import HostConnectionCache, _ThreadLocalHostConnectionCache
from fabric.version import get_version
from fabric.utils import _AliasDict, _AttributeDict
from fabric.utils import _AliasDict, _AttributeDict, _ThreadLocalAttributeDict


#
Expand Down Expand Up @@ -351,7 +351,7 @@ def _rc_path():
# Most default values are specified in `env_options` above, in the interests of
# preserving DRY: anything in here is generally not settable via the command
# line.
env = _AttributeDict({
env = _ThreadLocalAttributeDict({
'abort_exception': None,
'again_prompt': 'Sorry, try again.',
'all_hosts': [],
Expand Down Expand Up @@ -429,7 +429,7 @@ def _rc_path():
# Host connection dict/cache
#

connections = HostConnectionCache()
connections = _ThreadLocalHostConnectionCache()


def _open_session():
Expand Down
56 changes: 28 additions & 28 deletions fabric/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import inspect
import queue
import sys
import textwrap
import threading

from fabric import state
from fabric.utils import abort, warn, error
Expand Down Expand Up @@ -182,17 +184,16 @@ def _is_network_error_ignored():
return not state.env.use_exceptions_for['network'] and state.env.skip_bad_hosts


def _parallel_wrap(task, args, kwargs, queue, name, env):
def _parallel_wrap(task, args, kwargs, task_queue, name, env):
# Wrap in another callable that:
# * expands the env it's given to ensure parallel, linewise, etc are
# all set correctly and explicitly
# * nukes the connection cache to prevent shared-access problems
# * installs a thread-local copy of env for isolation
# * gives this thread its own connection cache
# * knows how to send the tasks' return value back over a Queue
# * captures exceptions raised by the task
state.env.update(env)
state.env._set_thread_local(env)
state.connections._set_thread_local()
try:
state.connections.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no because the connections object is now a thread local object. this was reset to prevent shared-access problems because state was copied between processes

queue.put({'name': name, 'result': task.run(*args, **kwargs)})
task_queue.put({'name': name, 'result': task.run(*args, **kwargs)})
except BaseException as e: # We really do want to capture everything
# SystemExit implies use of abort(), which prints its own
# traceback, host info etc -- so we don't want to double up
Expand All @@ -202,14 +203,16 @@ def _parallel_wrap(task, args, kwargs, queue, name, env):
if type(e) is not SystemExit:
if not (isinstance(e, NetworkError) and _is_network_error_ignored()):
sys.stderr.write("!!! Parallel execution exception under host %r:\n" % name)
queue.put({'name': name, 'result': e})
# Here, anything -- unexpected exceptions, or abort()
# driven SystemExits -- will bubble up and terminate the
# child process.
if not (isinstance(e, NetworkError) and _is_network_error_ignored()):
raise

def _execute(task, host, my_env, args, kwargs, jobs, queue, multiprocessing):
task_queue.put({'name': name, 'result': e})
# NOTE: We intentionally do NOT re-raise here. With threads (unlike
# processes), re-raising would trigger threading.excepthook and
# duplicate the traceback on stderr.
finally:
disconnect_all()
state.connections._clear_thread_local()
state.env._clear_thread_local()

def _execute(task, host, my_env, args, kwargs, jobs, task_queue):
"""
Primary single-host work body of execute()
"""
Expand All @@ -220,22 +223,23 @@ def _execute(task, host, my_env, args, kwargs, jobs, queue, multiprocessing):
local_env = to_dict(host)
local_env.update(my_env)
# Set a few more env flags for parallelism
if queue is not None:
if task_queue is not None:
local_env.update({'parallel': True, 'linewise': True})
# Handle parallel execution
if queue is not None: # Since queue is only set for parallel
if task_queue is not None: # Since queue is only set for parallel
name = local_env['host_string']

# Stuff into Process wrapper
# Stuff into Thread wrapper
kwarg_dict = {
'task': task,
'args': args,
'kwargs': kwargs,
'queue': queue,
'task_queue': task_queue,
'name': name,
'env': local_env,
}
p = multiprocessing.Process(target=_parallel_wrap, kwargs=kwarg_dict)
p = threading.Thread(target=_parallel_wrap, kwargs=kwarg_dict)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name p is no longer applicable, consider something more relevant (an unabbreviated) like thred

p.daemon = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think this can result in background threads sticking around even if the main process is killed, is that right? Are you sure we want this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting point, i'll think about this a little more

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main process waits for all the threads to finish, so I don't think there is a concern about it killing threads prematurely. If somebody ^C's the main process, we actively want it to kill threads.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In daemon mode the main process does not wait for all threads to finish, it kills them all immediately if the main process terminates, so orphan threads cannot be created, which is what we want. The only difference between the current fork() setup and the thread replacement is that in fork() the subprocesses receive the same signal as the parent, so ctrl+c is propagated down to all of the children and they can also handle the signal whereas here, the threads are just killed. Both stop execution, but the current implementation allows for cleanup processes such as finally blocks to run where as the new implementation does not.

This can be fixed by using stop events, but that may slow shut down for interrupts if a thread is blocked on i/o and would require a little more rewriting here. That may be preferred, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting 🤔 do we actually do any signal handling in anything we expect to run in parallel? I can't think of anything. If we do, then we should probably handle it in this PR. If not, then I agree events may still be better, but we can defer it to a follow-up

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signal handling would only be for ctl +c otherwise we do not do any.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, do we do any for ^C for tasks we run in parallel? I'll thing I can think of is deploy prep, which runs in serial

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't do any today

# Name/id is host string
p.name = name
# Add to queue
Expand Down Expand Up @@ -324,17 +328,13 @@ def execute(task, *args, **kwargs):

parallel = requires_parallel(task)
if parallel:
import multiprocessing
ctx = multiprocessing.get_context('fork')
# Set up job queue for parallel cases
queue = ctx.Queue()
task_queue = queue.Queue()
else:
ctx = None
queue = None
task_queue = None

# Get pool size for this task
pool_size = task.get_pool_size(my_env['all_hosts'], state.env.pool_size)
jobs = JobQueue(pool_size, queue)
jobs = JobQueue(pool_size, task_queue)
if state.output.debug:
jobs._debug = True

Expand All @@ -344,7 +344,7 @@ def execute(task, *args, **kwargs):
for host in my_env['all_hosts']:
try:
results[host] = _execute(
task, host, my_env, args, new_kwargs, jobs, queue, ctx,
task, host, my_env, args, new_kwargs, jobs, task_queue,
)
except NetworkError as e:
results[host] = e
Expand Down
Loading