import unittest
import platform
import logging
-import shlex
from unittest import suite, loader
-from teuthology.orchestra.run import Raw, quote
+from teuthology.orchestra.run import Raw, quote, PIPE
from teuthology.orchestra.daemon import DaemonGroup
from teuthology.orchestra.remote import Remote
from teuthology.config import config as teuth_config
# as long as the input buffer is "small"
if isinstance(stdin, str):
subproc.stdin.write(stdin.encode())
+ elif stdin == subprocess.PIPE or stdin == PIPE:
+ pass
else:
subproc.stdin.write(stdin)
# methods to work though.
self.pools = {}
- def find_remote(self, daemon_type, daemon_id):
- """
- daemon_type like 'mds', 'osd'
- daemon_id like 'a', '0'
- """
- return LocalRemote()
-
- def run_ceph_w(self, watch_channel=None):
- """
- :param watch_channel: Specifies the channel to be watched.
- This can be 'cluster', 'audit', ...
- :type watch_channel: str
- """
+ # NOTE: These variables are being overriden here so that parent class
+ # can pick it up.
+ self.cephadm = False
+ self.rook = False
+ self.testdir = None
+ self.run_cluster_cmd_prefix = [CEPH_CMD]
# XXX: Ceph API test CI job crashes because "ceph -w" process launched
# by run_ceph_w() crashes when shell is set to True.
# See https://tracker.ceph.com/issues/49644.
# it necessary to pass "shell" parameter to run() method. This leads
# to incompatibility with the method teuthology.orchestra.run's run()
# since it doesn't accept "shell" as parameter.
- args = ['exec', 'sudo', CEPH_CMD, "-w"]
- if watch_channel is not None:
- args.append("--watch-channel")
- args.append(watch_channel)
- proc = self.controller.run(args=args, wait=False, stdout=StringIO())
- return proc
+ self.run_ceph_w_prefix = ['exec', 'sudo', CEPH_CMD]
- def run_cluster_cmd(self, **kwargs):
- """
- Run a Ceph command and the object representing the process for the
- command.
-
- Accepts arguments same as teuthology.orchestra.remote.run().
- """
- if isinstance(kwargs['args'], str):
- kwargs['args'] = shlex.split(kwargs['args'])
- kwargs['args'] = [CEPH_CMD] + list(kwargs['args'])
- return self.controller.run(**kwargs)
-
- def raw_cluster_cmd(self, *args, **kwargs) -> str:
- """
- args like ["osd", "dump"}
- return stdout string
- """
- if kwargs.get('args') is None and args:
- kwargs['args'] = args
- kwargs['stdout'] = kwargs.pop('stdout', StringIO())
- return self.run_cluster_cmd(**kwargs).stdout.getvalue()
-
- def raw_cluster_cmd_result(self, *args, **kwargs):
+ def find_remote(self, daemon_type, daemon_id):
"""
- like raw_cluster_cmd but don't check status, just return rc
+ daemon_type like 'mds', 'osd'
+ daemon_id like 'a', '0'
"""
- if kwargs.get('args') is None and args:
- kwargs['args'] = args
- kwargs['check_status'] = False
- return self.run_cluster_cmd(**kwargs).exitstatus
+ return LocalRemote()
def admin_socket(self, daemon_type, daemon_id, command, check_status=True,
timeout=None, stdout=None):