import os
import crypt
import logging
+from tempfile import mkstemp as tempfile_mkstemp
from StringIO import StringIO
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from tasks.cephfs.fuse_mount import FuseMount
stdin=stdin)
return status.stdout.getvalue().strip()
+ def run_cephfs_shell_script(self, script, mount_x=None, stdin=None):
+ if mount_x is None:
+ mount_x = self.mount_a
+
+ scriptpath = tempfile_mkstemp(prefix='test-cephfs', text=True)[1]
+ with open(scriptpath, 'w') as scriptfile:
+ scriptfile.write(script)
+ # copy script to the machine running cephfs-shell.
+ mount_x.client_remote.put_file(scriptpath, scriptpath)
+ mount_x.run_shell('chmod 755 ' + scriptpath)
+
+ args = ["cephfs-shell", "-c", mount_x.config_path, '-b', scriptpath]
+ log.info('Running script \"' + scriptpath + '\"')
+ return mount_x.client_remote.run(args=args, stdout=StringIO(),
+ stderr=StringIO(), stdin=stdin)
+
def test_help(self):
"""
Test that help outputs commands.