Skip to content

support for changing clustershell config directory via environment variable #191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/ClusterShell/CLI/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
from ClusterShell.CLI.Display import VERB_QUIET, VERB_STD, \
VERB_VERB, VERB_DEBUG, THREE_CHOICES

CLUSTERSHELL_CONFIG_DIR = os.environ.get("CLUSTERSHELL_CONFIG", "/etc/clustershell/")


class ClushConfigError(Exception):
"""Exception used by ClushConfig to report an error."""
Expand All @@ -53,6 +55,7 @@ def __init__(self, section, option, msg):
def __str__(self):
return "(Config %s.%s): %s" % (self.section, self.option, self.msg)


class ClushConfig(ConfigParser.ConfigParser, object):
"""Config class for clush (specialized ConfigParser)"""

Expand All @@ -77,7 +80,7 @@ def __init__(self, options, filename=None):
if filename:
files = [filename]
else:
files = ['/etc/clustershell/clush.conf',
files = [os.path.join(CLUSTERSHELL_CONFIG_DIR, 'clush.conf'),
os.path.expanduser('~/.clush.conf')]
self.read(files)

Expand Down
20 changes: 11 additions & 9 deletions lib/ClusterShell/NodeSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@

import re
import sys
import os

import ClusterShell.NodeUtils as NodeUtils
from ClusterShell.RangeSet import RangeSet, RangeSetParseError


# Define default GroupResolver object used by NodeSet
DEF_GROUPS_CONFIG = "/etc/clustershell/groups.conf"
CLUSTERSHELL_CONFIG_DIR = os.environ.get("CLUSTERSHELL_CONFIG", "/etc/clustershell/")
DEF_GROUPS_CONFIG = os.path.join(CLUSTERSHELL_CONFIG_DIR, "groups.conf")
DEF_RESOLVER_STD_GROUP = NodeUtils.GroupResolverConfig(DEF_GROUPS_CONFIG)
# Standard group resolver
RESOLVER_STD_GROUP = DEF_RESOLVER_STD_GROUP
Expand Down Expand Up @@ -499,7 +501,7 @@ def intersection_update(self, other):
# intersect two nodes with no rangeset
tmp_ns._add(pat, None)

# Substitute
# Substitute
self._patterns = tmp_ns._patterns

def __iand__(self, other):
Expand Down Expand Up @@ -580,7 +582,7 @@ def symmetric_difference(self, other):
"""
s.symmetric_difference(t) returns the symmetric difference of
two nodesets as a new NodeSet.

(ie. all nodes that are in exactly one of the nodesets.)
"""
self_copy = self.copy()
Expand Down Expand Up @@ -687,7 +689,7 @@ def parse(self, nsobj, autostep):
raise NodeSetParseError(nsobj, str(exc))

raise TypeError("Unsupported NodeSet input %s" % type(nsobj))

def parse_string(self, nsstr, autostep):
"""
Parse provided string and return a NodeSetBase object.
Expand All @@ -711,18 +713,18 @@ def parse_string(self, nsstr, autostep):
getattr(nodeset, opc)(NodeSetBase(pat, rangeset, False))

return nodeset

def parse_string_single(self, nsstr, autostep):
"""Parse provided string and return a NodeSetBase object."""
pat, rangeset = self._scan_string_single(nsstr, autostep)
return NodeSetBase(pat, rangeset, False)

def parse_group(self, group, namespace=None, autostep=None):
"""Parse provided single group name (without @ prefix)."""
assert self.group_resolver is not None
nodestr = self.group_resolver.group_nodes(group, namespace)
return self.parse(",".join(nodestr), autostep)

def parse_group_string(self, nodegroup):
"""Parse provided group string and return a string."""
assert nodegroup[0] == '@'
Expand Down Expand Up @@ -786,7 +788,7 @@ def _scan_string_single(self, nsstr, autostep):
else:
# undefined pad means no node index
return pfx, None

def _scan_string(self, nsstr, autostep):
"""Parsing engine's string scanner method (iterator)."""
pat = nsstr.strip()
Expand Down Expand Up @@ -846,7 +848,7 @@ def _scan_string(self, nsstr, autostep):
pat = None # break next time
else:
node, pat = pat.split(self.OP_CODES[next_op_code], 1)

newpat, rset = self._scan_string_single(node, autostep)
yield op_code, newpat, rset

Expand Down
57 changes: 31 additions & 26 deletions lib/ClusterShell/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@
Simple example of use:

>>> from ClusterShell.Task import task_self
>>>
>>>
>>> # get task associated with calling thread
... task = task_self()
>>>
>>>
>>> # add a command to execute on distant nodes
... task.shell("/bin/uname -r", nodes="tiger[1-30,35]")
<ClusterShell.Worker.Ssh.WorkerSsh object at 0x7f41da71b890>
>>>
>>>
>>> # run task in calling thread
... task.resume()
>>>
>>>
>>> # get results
... for buf, nodelist in task.iter_buffers():
... print NodeSet.fromlist(nodelist), buf
...
...

"""

Expand All @@ -59,6 +59,7 @@
from operator import itemgetter
import socket
import sys
import os
import threading
from time import sleep
import traceback
Expand All @@ -80,6 +81,8 @@
from ClusterShell.Topology import TopologyParser, TopologyError
from ClusterShell.Propagation import PropagationTreeRouter, PropagationChannel

CLUSTERSHELL_CONFIG_DIR = os.environ.get("CLUSTERSHELL_CONFIG", "/etc/clustershell/")


class TaskException(Exception):
"""Base task exception."""
Expand Down Expand Up @@ -176,13 +179,15 @@ class Task(object):
node_retcode() and max_retcode() methods after command execution, or
listen for ev_hup() events in your event handler.
"""
topology_file = os.path.join(CLUSTERSHELL_CONFIG_DIR, 'topology.conf')

_std_default = { "stderr" : False,
"stdout_msgtree" : True,
"stderr_msgtree" : True,
"engine" : 'auto',
"port_qlimit" : 100,
"auto_tree" : False,
"topology_file" : "/etc/clustershell/topology.conf" }
"topology_file" : topology_file }

_std_info = { "debug" : False,
"print_debug" : _task_print_debug,
Expand All @@ -209,7 +214,7 @@ class tasksyncmethod(object):
"""Class encapsulating a function that checks if the calling
task is running or is the current task, and allowing it to be
used as a decorator making the wrapped task method thread-safe."""

def __call__(self, f):
def taskfunc(*args, **kwargs):
# pull out the class instance
Expand All @@ -218,7 +223,7 @@ def taskfunc(*args, **kwargs):
if task._is_task_self():
return f(task, *fargs, **kwargs)
elif task._dispatch_port:
# no, safely call the task method by message
# no, safely call the task method by message
# through the task special dispatch port
task._dispatch_port.msg_send((f, fargs, kwargs))
else:
Expand Down Expand Up @@ -261,7 +266,7 @@ def wait_check(self, release_lock=None):
self._cond.wait()
finally:
self._cond.release()

def notify_all(self):
"""Signal all threads waiting for condition."""
self._cond.acquire()
Expand Down Expand Up @@ -385,7 +390,7 @@ def _setexcepthook(self, hook):
# arguments method (very similar of what you can do with
# sys.excepthook)."""
excepthook = property(_getexcepthook, _setexcepthook)

def _thread_start(self):
"""Task-managed thread entry point"""
while not self._quit:
Expand Down Expand Up @@ -451,7 +456,7 @@ def set_default(self, default_key, value):
Set task value for specified key in the dictionary "default".
Users may store their own task-specific key, value pairs
using this method and retrieve them with default().

Task default_keys are:
- "stderr": Boolean value indicating whether to enable
stdout/stderr separation when using task.shell(), if not
Expand Down Expand Up @@ -491,7 +496,7 @@ def set_info(self, info_key, value):
pairs can be passed to the engine and/or workers.
Users may store their own task-specific info key, value pairs
using this method and retrieve them with info().

The following example changes the fanout value to 128:
>>> task.set_info('fanout', 128)

Expand Down Expand Up @@ -670,7 +675,7 @@ def timer(self, fire, handler, interval=-1.0, autoclose=False):
can also have their next firing time manually adjusted.

The mandatory parameter `fire' sets the firing delay in seconds.

The optional parameter `interval' sets the firing interval of
the timer. If not specified, the timer fires once and then is
automatically invalidated.
Expand All @@ -691,7 +696,7 @@ def timer(self, fire, handler, interval=-1.0, autoclose=False):
"""
assert fire >= 0.0, \
"timer's relative fire time must be a positive floating number"

timer = EngineTimer(fire, interval, autoclose, handler)
# The following method may be sent through msg port (async
# call) if called from another task.
Expand Down Expand Up @@ -786,7 +791,7 @@ def run(self, command=None, **kwargs):

>>> task.run("hostname", nodes="foo")

Without argument, it starts all outstanding actions.
Without argument, it starts all outstanding actions.
It behaves like Task.resume().

>>> task.shell("hostname", nodes="foo")
Expand Down Expand Up @@ -840,7 +845,7 @@ def _suspend_wait(self):
self._suspend_lock.acquire()
self._suspended = False
self._suspend_lock.release()

def suspend(self):
"""
Suspend task execution. This method may be called from another
Expand All @@ -857,7 +862,7 @@ def suspend(self):

# wait for stopped task
self._run_lock.acquire() # run_lock ownership transfer

# get result: are we really suspended or just stopped?
result = True
self._suspend_lock.acquire()
Expand Down Expand Up @@ -988,7 +993,7 @@ def _rc_set(self, source, rc, override=True):

# store source by rc
self._d_rc_sources.setdefault(rc, set()).add(source)

# update max rc
if rc > self._max_rc:
self._max_rc = rc
Expand Down Expand Up @@ -1035,13 +1040,13 @@ def _call_tree_matcher(self, tree_match_func, match_keys=None, worker=None):
match = None
# Call tree matcher function (items or walk)
return tree_match_func(match, itemgetter(1))

def _rc_by_source(self, source):
"""
Get a return code by its source (worker, key).
"""
return self._d_source_rc[source]

def _rc_iter_by_key(self, key):
"""
Return an iterator over return codes for the given key.
Expand Down Expand Up @@ -1121,7 +1126,7 @@ def key_buffer(self, key):
raise TaskMsgTreeError("stdout_msgtree not set")
select_key = lambda k: k[1] == key
return "".join(imap(str, msgtree.messages(select_key)))

node_buffer = key_buffer

def key_error(self, key):
Expand All @@ -1136,7 +1141,7 @@ def key_error(self, key):
raise TaskMsgTreeError("stderr_msgtree not set")
select_key = lambda k: k[1] == key
return "".join(imap(str, errtree.messages(select_key)))

node_error = key_error

def key_retcode(self, key):
Expand All @@ -1150,7 +1155,7 @@ def key_retcode(self, key):
if not codes:
raise KeyError(key)
return max(codes)

node_retcode = key_retcode

def max_retcode(self):
Expand Down Expand Up @@ -1195,7 +1200,7 @@ def iter_errors(self, match_keys=None):
if errtree is None:
raise TaskMsgTreeError("stderr_msgtree not set")
return self._call_tree_matcher(errtree.walk, match_keys)

def iter_retcodes(self, match_keys=None):
"""
Iterate over return codes, returns a tuple (rc, keys).
Expand Down Expand Up @@ -1274,7 +1279,7 @@ def pchannel(self, gateway, metaworker): #gw_invoke_cmd):
else:
worker = self.pwrks[gateway]
chan = worker.eh

if metaworker not in self.pmwkrs:
mw = self.pmwkrs[metaworker] = set()
else:
Expand All @@ -1295,7 +1300,7 @@ def _pchannel_release(self, metaworker):
#print >>sys.stderr, "worker abort"
worker.eh._close()
#worker.abort()


def task_self():
"""
Expand Down