From: Kotresh HR Date: Fri, 22 Jan 2021 07:48:30 +0000 (+0530) Subject: Revert "*: remove legacy ceph_volume_client.py library" X-Git-Tag: v16.2.0~245^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F39014%2Fhead;p=ceph.git Revert "*: remove legacy ceph_volume_client.py library" This reverts commit a3db265ad5b3ff899d8b71164a7f9ea5a426b19e. The ceph_volume_client is being used by manila and it's not fully equipped to use mgr/volumes and expects to use ceph_volume_client for one more release. Hence reverting this commit for pacific release. Note that this is a pacific only patch and hence no corresponding master patch is available. Fixes: https://tracker.ceph.com/issues/48923 Signed-off-by: Kotresh HR --- diff --git a/.github/labeler.yml b/.github/labeler.yml index 7fef72d0baac..0821fe9645ea 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -164,6 +164,7 @@ cephfs: - src/mds/** - src/mon/MDSMonitor.* - src/mon/FSCommands.* + - src/pybind/ceph_volume_client.py - src/pybind/cephfs/** - src/pybind/mgr/mds_autoscaler/** - src/pybind/mgr/status/** diff --git a/PendingReleaseNotes b/PendingReleaseNotes index c981c82942a6..733991975521 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -54,12 +54,6 @@ * MGR: progress module can now be turned on/off, using the commands: ``ceph progress on`` and ``ceph progress off``. - -* The ceph_volume_client.py library used for manipulating legacy "volumes" in - CephFS is removed. All remaining users should use the "fs volume" interface - exposed by the ceph-mgr: - https://docs.ceph.com/en/latest/cephfs/fs-volumes/ - * An AWS-compliant API: "GetTopicAttributes" was added to replace the existing "GetTopic" API. The new API should be used to fetch information about topics used for bucket notifications. diff --git a/ceph.spec.in b/ceph.spec.in index de228df1ef39..c11b1b76c27d 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -2202,6 +2202,8 @@ fi %files -n python%{python3_pkgversion}-cephfs %{python3_sitearch}/cephfs.cpython*.so %{python3_sitearch}/cephfs-*.egg-info +%{python3_sitelib}/ceph_volume_client.py +%{python3_sitelib}/__pycache__/ceph_volume_client.cpython*.py* %files -n python%{python3_pkgversion}-ceph-argparse %{python3_sitelib}/ceph_argparse.py diff --git a/debian/python3-cephfs.install b/debian/python3-cephfs.install index 9ac75f5366b4..6eb8836707f8 100644 --- a/debian/python3-cephfs.install +++ b/debian/python3-cephfs.install @@ -1,2 +1,3 @@ +usr/lib/python3*/dist-packages/ceph_volume_client.py usr/lib/python3*/dist-packages/cephfs-*.egg-info usr/lib/python3*/dist-packages/cephfs.cpython*.so diff --git a/qa/suites/fs/upgrade/volumes/.qa b/qa/suites/fs/upgrade/volumes/.qa new file mode 120000 index 000000000000..a602a0353e75 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/.qa @@ -0,0 +1 @@ +../.qa/ \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/% b/qa/suites/fs/upgrade/volumes/import-legacy/% new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/.qa b/qa/suites/fs/upgrade/volumes/import-legacy/.qa new file mode 120000 index 000000000000..a602a0353e75 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/.qa @@ -0,0 +1 @@ +../.qa/ \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/bluestore-bitmap.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/bluestore-bitmap.yaml new file mode 120000 index 000000000000..17ad98e799ee --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/bluestore-bitmap.yaml @@ -0,0 +1 @@ +../../../../../cephfs/objectstore-ec/bluestore-bitmap.yaml \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/clusters/.qa b/qa/suites/fs/upgrade/volumes/import-legacy/clusters/.qa new file mode 120000 index 000000000000..a602a0353e75 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/clusters/.qa @@ -0,0 +1 @@ +../.qa/ \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/clusters/1-mds-2-client-micro.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/clusters/1-mds-2-client-micro.yaml new file mode 100644 index 000000000000..9b443f7d2bf0 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/clusters/1-mds-2-client-micro.yaml @@ -0,0 +1,7 @@ +roles: +- [mon.a, mon.b, mon.c, mgr.x, mgr.y, mds.a, mds.b, mds.c, osd.0, osd.1, osd.2, osd.3] +- [client.0, client.1] +openstack: +- volumes: # attached to each instance + count: 4 + size: 10 # GB diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/conf b/qa/suites/fs/upgrade/volumes/import-legacy/conf new file mode 120000 index 000000000000..6d47129847fa --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/conf @@ -0,0 +1 @@ +.qa/cephfs/conf/ \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/+ b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/+ new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/.qa b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/.qa new file mode 120000 index 000000000000..a602a0353e75 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/.qa @@ -0,0 +1 @@ +../.qa/ \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/frag_enable.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/frag_enable.yaml new file mode 120000 index 000000000000..34a39a368cf1 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/frag_enable.yaml @@ -0,0 +1 @@ +.qa/cephfs/overrides/frag_enable.yaml \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/pg-warn.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/pg-warn.yaml new file mode 100644 index 000000000000..4ae54a40d319 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/pg-warn.yaml @@ -0,0 +1,5 @@ +overrides: + ceph: + conf: + global: + mon pg warn min per osd: 0 diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_health.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_health.yaml new file mode 120000 index 000000000000..74f39a49b27e --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_health.yaml @@ -0,0 +1 @@ +.qa/cephfs/overrides/whitelist_health.yaml \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_wrongly_marked_down.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_wrongly_marked_down.yaml new file mode 120000 index 000000000000..b4528c0f8c09 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_wrongly_marked_down.yaml @@ -0,0 +1 @@ +.qa/cephfs/overrides/whitelist_wrongly_marked_down.yaml \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/% b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/% new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/.qa b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/.qa new file mode 120000 index 000000000000..a602a0353e75 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/.qa @@ -0,0 +1 @@ +../.qa/ \ No newline at end of file diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/0-nautilus.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/0-nautilus.yaml new file mode 100644 index 000000000000..462163e3b085 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/0-nautilus.yaml @@ -0,0 +1,39 @@ +meta: +- desc: | + install ceph/nautilus latest +tasks: +- install: + branch: nautilus #tag: v13.2.8 + exclude_packages: + - librados3 + - ceph-mgr-dashboard + - ceph-mgr-diskprediction-local + - ceph-mgr-rook + - ceph-mgr-cephadm + - cephadm + extra_packages: ['librados2'] +- print: "**** done installing nautilus" +- ceph: + log-ignorelist: + - overall HEALTH_ + - \(FS_ + - \(MDS_ + - \(OSD_ + - \(MON_DOWN\) + - \(CACHE_POOL_ + - \(POOL_ + - \(MGR_DOWN\) + - \(PG_ + - \(SMALLER_PGP_NUM\) + - Monitor daemon marked osd + - Behind on trimming + - Manager daemon + conf: + global: + mon warn on pool no app: false + ms bind msgr2: false +- exec: + osd.0: + - ceph osd require-osd-release nautilus + - ceph osd set-require-min-compat-client nautilus +- print: "**** done ceph" diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/1-client.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/1-client.yaml new file mode 100644 index 000000000000..82731071f1d1 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/1-client.yaml @@ -0,0 +1,33 @@ +tasks: +- workunit: + clients: + client.0: + - fs/upgrade/volume_client + env: + ACTION: create +- print: "**** fs/volume_client create" +- ceph-fuse: + client.0: + mount_path: /volumes/_nogroup/vol_isolated + mountpoint: mnt.0 + auth_id: vol_data_isolated + client.1: + mount_path: /volumes/_nogroup/vol_default + mountpoint: mnt.1 + auth_id: vol_default +- print: "**** ceph-fuse vol_isolated" +- workunit: + clients: + client.0: + - fs/upgrade/volume_client + env: + ACTION: populate + cleanup: false +- workunit: + clients: + client.1: + - fs/upgrade/volume_client + env: + ACTION: populate + cleanup: false +- print: "**** fs/volume_client populate" diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/2-upgrade.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/2-upgrade.yaml new file mode 100644 index 000000000000..488dbf7efc0c --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/2-upgrade.yaml @@ -0,0 +1,47 @@ +overrides: + ceph: + log-ignorelist: + - scrub mismatch + - ScrubResult + - wrongly marked + - \(POOL_APP_NOT_ENABLED\) + - \(SLOW_OPS\) + - overall HEALTH_ + - \(MON_MSGR2_NOT_ENABLED\) + - slow request + conf: + global: + bluestore warn on legacy statfs: false + bluestore warn on no per pool omap: false + mon: + mon warn on osd down out interval zero: false + +tasks: +- mds_pre_upgrade: +- print: "**** done mds pre-upgrade sequence" +- install.upgrade: + mon.a: +- print: "**** done install.upgrade both hosts" +- ceph.restart: + daemons: [mon.*, mgr.*] + mon-health-to-clog: false + wait-for-healthy: false +- ceph.healthy: +- ceph.restart: + daemons: [osd.*] + wait-for-healthy: false + wait-for-osds-up: true +- ceph.stop: [mds.*] +- ceph.restart: + daemons: [mds.*] + wait-for-healthy: false + wait-for-osds-up: true +- exec: + mon.a: + - ceph versions + - ceph osd dump -f json-pretty + - ceph osd require-osd-release octopus + - for f in `ceph osd pool ls` ; do ceph osd pool set $f pg_autoscale_mode off ; done + #- ceph osd set-require-min-compat-client nautilus +- ceph.healthy: +- print: "**** done ceph.restart" diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/3-verify.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/3-verify.yaml new file mode 100644 index 000000000000..e14b48383a74 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/3-verify.yaml @@ -0,0 +1,25 @@ +overrides: + ceph: + log-ignorelist: + - missing required features +tasks: +- exec: + mon.a: + - ceph fs dump --format=json-pretty + - ceph fs volume ls + - ceph fs subvolume ls cephfs +- workunit: + clients: + client.0: + - fs/upgrade/volume_client + env: + ACTION: verify + cleanup: false +- workunit: + clients: + client.1: + - fs/upgrade/volume_client + env: + ACTION: verify + cleanup: false +- print: "**** fs/volume_client verify" diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/ubuntu_18.04.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/ubuntu_18.04.yaml new file mode 120000 index 000000000000..cfb85f10ef59 --- /dev/null +++ b/qa/suites/fs/upgrade/volumes/import-legacy/ubuntu_18.04.yaml @@ -0,0 +1 @@ +.qa/distros/all/ubuntu_18.04.yaml \ No newline at end of file diff --git a/qa/suites/fs/volumes/tasks/volume-client.yaml b/qa/suites/fs/volumes/tasks/volume-client.yaml new file mode 100644 index 000000000000..04ee276572cd --- /dev/null +++ b/qa/suites/fs/volumes/tasks/volume-client.yaml @@ -0,0 +1,9 @@ +overrides: + ceph: + log-ignorelist: + - MON_DOWN +tasks: + - cephfs_test_runner: + fail_on_skip: false + modules: + - tasks.cephfs.test_volume_client diff --git a/qa/tasks/cephfs/test_volume_client.py b/qa/tasks/cephfs/test_volume_client.py new file mode 100644 index 000000000000..f889421d9fc8 --- /dev/null +++ b/qa/tasks/cephfs/test_volume_client.py @@ -0,0 +1,1299 @@ +import json +import logging +import os +from textwrap import dedent +from tasks.cephfs.cephfs_test_case import CephFSTestCase +from tasks.cephfs.fuse_mount import FuseMount +from teuthology.exceptions import CommandFailedError + +log = logging.getLogger(__name__) + + +class TestVolumeClient(CephFSTestCase): + # One for looking at the global filesystem, one for being + # the VolumeClient, two for mounting the created shares + CLIENTS_REQUIRED = 4 + + def setUp(self): + CephFSTestCase.setUp(self) + + def _volume_client_python(self, client, script, vol_prefix=None, ns_prefix=None): + # Can't dedent this *and* the script we pass in, because they might have different + # levels of indentation to begin with, so leave this string zero-indented + if vol_prefix: + vol_prefix = "\"" + vol_prefix + "\"" + if ns_prefix: + ns_prefix = "\"" + ns_prefix + "\"" + return client.run_python(""" +from __future__ import print_function +from ceph_volume_client import CephFSVolumeClient, VolumePath +from sys import version_info as sys_version_info +from rados import OSError as rados_OSError +import logging +log = logging.getLogger("ceph_volume_client") +log.addHandler(logging.StreamHandler()) +log.setLevel(logging.DEBUG) +vc = CephFSVolumeClient("manila", "{conf_path}", "ceph", {vol_prefix}, {ns_prefix}) +vc.connect() +{payload} +vc.disconnect() + """.format(payload=script, conf_path=client.config_path, + vol_prefix=vol_prefix, ns_prefix=ns_prefix)) + + def _configure_vc_auth(self, mount, id_name): + """ + Set up auth credentials for the VolumeClient user + """ + out = self.fs.mon_manager.raw_cluster_cmd( + "auth", "get-or-create", "client.{name}".format(name=id_name), + "mds", "allow *", + "osd", "allow rw", + "mon", "allow *" + ) + mount.client_id = id_name + mount.client_remote.write_file(mount.get_keyring_path(), + out, sudo=True) + self.set_conf("client.{name}".format(name=id_name), "keyring", mount.get_keyring_path()) + + def _configure_guest_auth(self, volumeclient_mount, guest_mount, + guest_entity, cephfs_mntpt, + namespace_prefix=None, readonly=False, + tenant_id=None, allow_existing_id=False): + """ + Set up auth credentials for the guest client to mount a volume. + + :param volumeclient_mount: mount used as the handle for driving + volumeclient. + :param guest_mount: mount used by the guest client. + :param guest_entity: auth ID used by the guest client. + :param cephfs_mntpt: path of the volume. + :param namespace_prefix: name prefix of the RADOS namespace, which + is used for the volume's layout. + :param readonly: defaults to False. If set to 'True' only read-only + mount access is granted to the guest. + :param tenant_id: (OpenStack) tenant ID of the guest client. + """ + + head, volume_id = os.path.split(cephfs_mntpt) + head, group_id = os.path.split(head) + head, volume_prefix = os.path.split(head) + volume_prefix = "/" + volume_prefix + + # Authorize the guest client's auth ID to mount the volume. + key = self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + auth_result = vc.authorize(vp, "{guest_entity}", readonly={readonly}, + tenant_id="{tenant_id}", + allow_existing_id="{allow_existing_id}") + print(auth_result['auth_key']) + """.format( + group_id=group_id, + volume_id=volume_id, + guest_entity=guest_entity, + readonly=readonly, + tenant_id=tenant_id, + allow_existing_id=allow_existing_id)), volume_prefix, namespace_prefix + ) + + # CephFSVolumeClient's authorize() does not return the secret + # key to a caller who isn't multi-tenant aware. Explicitly + # query the key for such a client. + if not tenant_id: + key = self.fs.mon_manager.raw_cluster_cmd( + "auth", "get-key", "client.{name}".format(name=guest_entity), + ) + + # The guest auth ID should exist. + existing_ids = [a['entity'] for a in self.auth_list()] + self.assertIn("client.{0}".format(guest_entity), existing_ids) + + # Create keyring file for the guest client. + keyring_txt = dedent(""" + [client.{guest_entity}] + key = {key} + + """.format( + guest_entity=guest_entity, + key=key + )) + guest_mount.client_id = guest_entity + guest_mount.client_remote.write_file(guest_mount.get_keyring_path(), + keyring_txt, sudo=True) + + # Add a guest client section to the ceph config file. + self.set_conf("client.{0}".format(guest_entity), "client quota", "True") + self.set_conf("client.{0}".format(guest_entity), "debug client", "20") + self.set_conf("client.{0}".format(guest_entity), "debug objecter", "20") + self.set_conf("client.{0}".format(guest_entity), + "keyring", guest_mount.get_keyring_path()) + + def test_default_prefix(self): + group_id = "grpid" + volume_id = "volid" + DEFAULT_VOL_PREFIX = "volumes" + DEFAULT_NS_PREFIX = "fsvolumens_" + + self.mount_b.umount_wait() + self._configure_vc_auth(self.mount_b, "manila") + + #create a volume with default prefix + self._volume_client_python(self.mount_b, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.create_volume(vp, 10, data_isolated=True) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + # The dir should be created + self.mount_a.stat(os.path.join(DEFAULT_VOL_PREFIX, group_id, volume_id)) + + #namespace should be set + ns_in_attr = self.mount_a.getfattr(os.path.join(DEFAULT_VOL_PREFIX, group_id, volume_id), "ceph.dir.layout.pool_namespace") + namespace = "{0}{1}".format(DEFAULT_NS_PREFIX, volume_id) + self.assertEqual(namespace, ns_in_attr) + + + def test_lifecycle(self): + """ + General smoke test for create, extend, destroy + """ + + # I'm going to use mount_c later as a guest for mounting the created + # shares + self.mounts[2].umount_wait() + + # I'm going to leave mount_b unmounted and just use it as a handle for + # driving volumeclient. It's a little hacky but we don't have a more + # general concept for librados/libcephfs clients as opposed to full + # blown mounting clients. + self.mount_b.umount_wait() + self._configure_vc_auth(self.mount_b, "manila") + + guest_entity = "guest" + group_id = "grpid" + volume_id = "volid" + + volume_prefix = "/myprefix" + namespace_prefix = "mynsprefix_" + + # Create a 100MB volume + volume_size = 100 + cephfs_mntpt = self._volume_client_python(self.mount_b, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + create_result = vc.create_volume(vp, 1024*1024*{volume_size}) + print(create_result['mount_path']) + """.format( + group_id=group_id, + volume_id=volume_id, + volume_size=volume_size + )), volume_prefix, namespace_prefix) + + # The dir should be created + self.mount_a.stat(os.path.join("myprefix", group_id, volume_id)) + + # Authorize and configure credentials for the guest to mount the + # the volume. + self._configure_guest_auth(self.mount_b, self.mounts[2], guest_entity, + cephfs_mntpt, namespace_prefix) + self.mounts[2].mount_wait(cephfs_mntpt=cephfs_mntpt) + + # The kernel client doesn't have the quota-based df behaviour, + # or quotas at all, so only exercise the client behaviour when + # running fuse. + if isinstance(self.mounts[2], FuseMount): + # df should see volume size, same as the quota set on volume's dir + self.assertEqual(self.mounts[2].df()['total'], + volume_size * 1024 * 1024) + self.assertEqual( + self.mount_a.getfattr( + os.path.join(volume_prefix.strip("/"), group_id, volume_id), + "ceph.quota.max_bytes"), + "%s" % (volume_size * 1024 * 1024)) + + # df granularity is 4MB block so have to write at least that much + data_bin_mb = 4 + self.mounts[2].write_n_mb("data.bin", data_bin_mb) + + # Write something outside volume to check this space usage is + # not reported in the volume's DF. + other_bin_mb = 8 + self.mount_a.write_n_mb("other.bin", other_bin_mb) + + # global: df should see all the writes (data + other). This is a > + # rather than a == because the global spaced used includes all pools + def check_df(): + used = self.mount_a.df()['used'] + return used >= (other_bin_mb * 1024 * 1024) + + self.wait_until_true(check_df, timeout=30) + + # Hack: do a metadata IO to kick rstats + self.mounts[2].run_shell(["touch", "foo"]) + + # volume: df should see the data_bin_mb consumed from quota, same + # as the rbytes for the volume's dir + self.wait_until_equal( + lambda: self.mounts[2].df()['used'], + data_bin_mb * 1024 * 1024, timeout=60) + self.wait_until_equal( + lambda: self.mount_a.getfattr( + os.path.join(volume_prefix.strip("/"), group_id, volume_id), + "ceph.dir.rbytes"), + "%s" % (data_bin_mb * 1024 * 1024), timeout=60) + + # sync so that file data are persist to rados + self.mounts[2].run_shell(["sync"]) + + # Our data should stay in particular rados namespace + pool_name = self.mount_a.getfattr(os.path.join("myprefix", group_id, volume_id), "ceph.dir.layout.pool") + namespace = "{0}{1}".format(namespace_prefix, volume_id) + ns_in_attr = self.mount_a.getfattr(os.path.join("myprefix", group_id, volume_id), "ceph.dir.layout.pool_namespace") + self.assertEqual(namespace, ns_in_attr) + + objects_in_ns = set(self.fs.rados(["ls"], pool=pool_name, namespace=namespace).split("\n")) + self.assertNotEqual(objects_in_ns, set()) + + # De-authorize the guest + self._volume_client_python(self.mount_b, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.deauthorize(vp, "{guest_entity}") + vc.evict("{guest_entity}") + """.format( + group_id=group_id, + volume_id=volume_id, + guest_entity=guest_entity + )), volume_prefix, namespace_prefix) + + # Once deauthorized, the client should be unable to do any more metadata ops + # The way that the client currently behaves here is to block (it acts like + # it has lost network, because there is nothing to tell it that is messages + # are being dropped because it's identity is gone) + background = self.mounts[2].write_n_mb("rogue.bin", 1, wait=False) + try: + background.wait() + except CommandFailedError: + # command failed with EBLOCKLISTED? + if "transport endpoint shutdown" in background.stderr.getvalue(): + pass + else: + raise + + # After deauthorisation, the client ID should be gone (this was the only + # volume it was authorised for) + self.assertNotIn("client.{0}".format(guest_entity), [e['entity'] for e in self.auth_list()]) + + # Clean up the dead mount (ceph-fuse's behaviour here is a bit undefined) + self.mounts[2].umount_wait() + + self._volume_client_python(self.mount_b, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.delete_volume(vp) + vc.purge_volume(vp) + """.format( + group_id=group_id, + volume_id=volume_id, + )), volume_prefix, namespace_prefix) + + def test_idempotency(self): + """ + That the volumeclient interface works when calling everything twice + """ + self.mount_b.umount_wait() + self._configure_vc_auth(self.mount_b, "manila") + + guest_entity = "guest" + group_id = "grpid" + volume_id = "volid" + self._volume_client_python(self.mount_b, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.create_volume(vp, 10) + vc.create_volume(vp, 10) + vc.authorize(vp, "{guest_entity}") + vc.authorize(vp, "{guest_entity}") + vc.deauthorize(vp, "{guest_entity}") + vc.deauthorize(vp, "{guest_entity}") + vc.delete_volume(vp) + vc.delete_volume(vp) + vc.purge_volume(vp) + vc.purge_volume(vp) + + vc.create_volume(vp, 10, data_isolated=True) + vc.create_volume(vp, 10, data_isolated=True) + vc.authorize(vp, "{guest_entity}") + vc.authorize(vp, "{guest_entity}") + vc.deauthorize(vp, "{guest_entity}") + vc.deauthorize(vp, "{guest_entity}") + vc.evict("{guest_entity}") + vc.evict("{guest_entity}") + vc.delete_volume(vp, data_isolated=True) + vc.delete_volume(vp, data_isolated=True) + vc.purge_volume(vp, data_isolated=True) + vc.purge_volume(vp, data_isolated=True) + + vc.create_volume(vp, 10, namespace_isolated=False) + vc.create_volume(vp, 10, namespace_isolated=False) + vc.authorize(vp, "{guest_entity}") + vc.authorize(vp, "{guest_entity}") + vc.deauthorize(vp, "{guest_entity}") + vc.deauthorize(vp, "{guest_entity}") + vc.evict("{guest_entity}") + vc.evict("{guest_entity}") + vc.delete_volume(vp) + vc.delete_volume(vp) + vc.purge_volume(vp) + vc.purge_volume(vp) + """.format( + group_id=group_id, + volume_id=volume_id, + guest_entity=guest_entity + ))) + + def test_data_isolated(self): + """ + That data isolated shares get their own pool + :return: + """ + + self.mount_b.umount_wait() + self._configure_vc_auth(self.mount_b, "manila") + + pools_a = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools'] + + group_id = "grpid" + volume_id = "volid" + self._volume_client_python(self.mount_b, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.create_volume(vp, data_isolated=True) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + pools_b = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools'] + + # Should have created one new pool + new_pools = set(p['pool_name'] for p in pools_b) - set([p['pool_name'] for p in pools_a]) + self.assertEqual(len(new_pools), 1) + + def test_15303(self): + """ + Reproducer for #15303 "Client holds incorrect complete flag on dir + after losing caps" (http://tracker.ceph.com/issues/15303) + """ + for m in self.mounts: + m.umount_wait() + + # Create a dir on mount A + self.mount_a.mount_wait() + self.mount_a.run_shell(["mkdir", "parent1"]) + self.mount_a.run_shell(["mkdir", "parent2"]) + self.mount_a.run_shell(["mkdir", "parent1/mydir"]) + + # Put some files in it from mount B + self.mount_b.mount_wait() + self.mount_b.run_shell(["touch", "parent1/mydir/afile"]) + self.mount_b.umount_wait() + + # List the dir's contents on mount A + self.assertListEqual(self.mount_a.ls("parent1/mydir"), + ["afile"]) + + def test_evict_client(self): + """ + That a volume client can be evicted based on its auth ID and the volume + path it has mounted. + """ + + if not isinstance(self.mount_a, FuseMount): + self.skipTest("Requires FUSE client to inject client metadata") + + # mounts[1] would be used as handle for driving VolumeClient. mounts[2] + # and mounts[3] would be used as guests to mount the volumes/shares. + + for i in range(1, 4): + self.mounts[i].umount_wait() + + volumeclient_mount = self.mounts[1] + self._configure_vc_auth(volumeclient_mount, "manila") + guest_mounts = (self.mounts[2], self.mounts[3]) + + guest_entity = "guest" + group_id = "grpid" + cephfs_mntpts = [] + volume_ids = [] + + # Create two volumes. Authorize 'guest' auth ID to mount the two + # volumes. Mount the two volumes. Write data to the volumes. + for i in range(2): + # Create volume. + volume_ids.append("volid_{0}".format(str(i))) + cephfs_mntpts.append( + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + create_result = vc.create_volume(vp, 10 * 1024 * 1024) + print(create_result['mount_path']) + """.format( + group_id=group_id, + volume_id=volume_ids[i] + )))) + + # Authorize 'guest' auth ID to mount the volume. + self._configure_guest_auth(volumeclient_mount, guest_mounts[i], + guest_entity, cephfs_mntpts[i]) + + # Mount the volume. + guest_mounts[i].mountpoint_dir_name = 'mnt.{id}.{suffix}'.format( + id=guest_entity, suffix=str(i)) + guest_mounts[i].mount_wait(cephfs_mntpt=cephfs_mntpts[i]) + guest_mounts[i].write_n_mb("data.bin", 1) + + + # Evict client, guest_mounts[0], using auth ID 'guest' and has mounted + # one volume. + self._volume_client_python(self.mount_b, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.deauthorize(vp, "{guest_entity}") + vc.evict("{guest_entity}", volume_path=vp) + """.format( + group_id=group_id, + volume_id=volume_ids[0], + guest_entity=guest_entity + ))) + + # Evicted guest client, guest_mounts[0], should not be able to do + # anymore metadata ops. It should start failing all operations + # when it sees that its own address is in the blocklist. + try: + guest_mounts[0].write_n_mb("rogue.bin", 1) + except CommandFailedError: + pass + else: + raise RuntimeError("post-eviction write should have failed!") + + # The blocklisted guest client should now be unmountable + guest_mounts[0].umount_wait() + + # Guest client, guest_mounts[1], using the same auth ID 'guest', but + # has mounted the other volume, should be able to use its volume + # unaffected. + guest_mounts[1].write_n_mb("data.bin.1", 1) + + # Cleanup. + for i in range(2): + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.deauthorize(vp, "{guest_entity}") + vc.delete_volume(vp) + vc.purge_volume(vp) + """.format( + group_id=group_id, + volume_id=volume_ids[i], + guest_entity=guest_entity + ))) + + + def test_purge(self): + """ + Reproducer for #15266, exception trying to purge volumes that + contain non-ascii filenames. + + Additionally test any other purge corner cases here. + """ + # I'm going to leave mount_b unmounted and just use it as a handle for + # driving volumeclient. It's a little hacky but we don't have a more + # general concept for librados/libcephfs clients as opposed to full + # blown mounting clients. + self.mount_b.umount_wait() + self._configure_vc_auth(self.mount_b, "manila") + + group_id = "grpid" + # Use a unicode volume ID (like Manila), to reproduce #15266 + volume_id = u"volid" + + # Create + cephfs_mntpt = self._volume_client_python(self.mount_b, dedent(""" + vp = VolumePath("{group_id}", u"{volume_id}") + create_result = vc.create_volume(vp, 10) + print(create_result['mount_path']) + """.format( + group_id=group_id, + volume_id=volume_id + ))) + + # Strip leading "/" + cephfs_mntpt = cephfs_mntpt[1:] + + # A file with non-ascii characters + self.mount_a.run_shell(["touch", os.path.join(cephfs_mntpt, u"b\u00F6b")]) + + # A file with no permissions to do anything + self.mount_a.run_shell(["touch", os.path.join(cephfs_mntpt, "noperms")]) + self.mount_a.run_shell(["chmod", "0000", os.path.join(cephfs_mntpt, "noperms")]) + + self._volume_client_python(self.mount_b, dedent(""" + vp = VolumePath("{group_id}", u"{volume_id}") + vc.delete_volume(vp) + vc.purge_volume(vp) + """.format( + group_id=group_id, + volume_id=volume_id + ))) + + # Check it's really gone + self.assertEqual(self.mount_a.ls("volumes/_deleting"), []) + self.assertEqual(self.mount_a.ls("volumes/"), ["_deleting", group_id]) + + def test_readonly_authorization(self): + """ + That guest clients can be restricted to read-only mounts of volumes. + """ + + volumeclient_mount = self.mounts[1] + guest_mount = self.mounts[2] + volumeclient_mount.umount_wait() + guest_mount.umount_wait() + + # Configure volumeclient_mount as the handle for driving volumeclient. + self._configure_vc_auth(volumeclient_mount, "manila") + + guest_entity = "guest" + group_id = "grpid" + volume_id = "volid" + + # Create a volume. + cephfs_mntpt = self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + create_result = vc.create_volume(vp, 1024*1024*10) + print(create_result['mount_path']) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + # Authorize and configure credentials for the guest to mount the + # the volume with read-write access. + self._configure_guest_auth(volumeclient_mount, guest_mount, + guest_entity, cephfs_mntpt, readonly=False) + + # Mount the volume, and write to it. + guest_mount.mount_wait(cephfs_mntpt=cephfs_mntpt) + guest_mount.write_n_mb("data.bin", 1) + + # Change the guest auth ID's authorization to read-only mount access. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.deauthorize(vp, "{guest_entity}") + """.format( + group_id=group_id, + volume_id=volume_id, + guest_entity=guest_entity + ))) + self._configure_guest_auth(volumeclient_mount, guest_mount, guest_entity, + cephfs_mntpt, readonly=True) + + # The effect of the change in access level to read-only is not + # immediate. The guest sees the change only after a remount of + # the volume. + guest_mount.umount_wait() + guest_mount.mount_wait(cephfs_mntpt=cephfs_mntpt) + + # Read existing content of the volume. + self.assertListEqual(guest_mount.ls(guest_mount.mountpoint), ["data.bin"]) + # Cannot write into read-only volume. + try: + guest_mount.write_n_mb("rogue.bin", 1) + except CommandFailedError: + pass + + def test_get_authorized_ids(self): + """ + That for a volume, the authorized IDs and their access levels + can be obtained using CephFSVolumeClient's get_authorized_ids(). + """ + volumeclient_mount = self.mounts[1] + volumeclient_mount.umount_wait() + + # Configure volumeclient_mount as the handle for driving volumeclient. + self._configure_vc_auth(volumeclient_mount, "manila") + + group_id = "grpid" + volume_id = "volid" + guest_entity_1 = "guest1" + guest_entity_2 = "guest2" + + log.info("print(group ID: {0})".format(group_id)) + + # Create a volume. + auths = self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.create_volume(vp, 1024*1024*10) + auths = vc.get_authorized_ids(vp) + print(auths) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + # Check the list of authorized IDs for the volume. + self.assertEqual('None', auths) + + # Allow two auth IDs access to the volume. + auths = self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.authorize(vp, "{guest_entity_1}", readonly=False) + vc.authorize(vp, "{guest_entity_2}", readonly=True) + auths = vc.get_authorized_ids(vp) + print(auths) + """.format( + group_id=group_id, + volume_id=volume_id, + guest_entity_1=guest_entity_1, + guest_entity_2=guest_entity_2, + ))) + # Check the list of authorized IDs and their access levels. + expected_result = [('guest1', 'rw'), ('guest2', 'r')] + self.assertCountEqual(str(expected_result), auths) + + # Disallow both the auth IDs' access to the volume. + auths = self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.deauthorize(vp, "{guest_entity_1}") + vc.deauthorize(vp, "{guest_entity_2}") + auths = vc.get_authorized_ids(vp) + print(auths) + """.format( + group_id=group_id, + volume_id=volume_id, + guest_entity_1=guest_entity_1, + guest_entity_2=guest_entity_2, + ))) + # Check the list of authorized IDs for the volume. + self.assertEqual('None', auths) + + def test_multitenant_volumes(self): + """ + That volume access can be restricted to a tenant. + + That metadata used to enforce tenant isolation of + volumes is stored as a two-way mapping between auth + IDs and volumes that they're authorized to access. + """ + volumeclient_mount = self.mounts[1] + volumeclient_mount.umount_wait() + + # Configure volumeclient_mount as the handle for driving volumeclient. + self._configure_vc_auth(volumeclient_mount, "manila") + + group_id = "groupid" + volume_id = "volumeid" + + # Guest clients belonging to different tenants, but using the same + # auth ID. + auth_id = "guest" + guestclient_1 = { + "auth_id": auth_id, + "tenant_id": "tenant1", + } + guestclient_2 = { + "auth_id": auth_id, + "tenant_id": "tenant2", + } + + # Create a volume. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.create_volume(vp, 1024*1024*10) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + # Check that volume metadata file is created on volume creation. + vol_metadata_filename = "_{0}:{1}.meta".format(group_id, volume_id) + self.assertIn(vol_metadata_filename, self.mounts[0].ls("volumes")) + + # Authorize 'guestclient_1', using auth ID 'guest' and belonging to + # 'tenant1', with 'rw' access to the volume. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}") + """.format( + group_id=group_id, + volume_id=volume_id, + auth_id=guestclient_1["auth_id"], + tenant_id=guestclient_1["tenant_id"] + ))) + + # Check that auth metadata file for auth ID 'guest', is + # created on authorizing 'guest' access to the volume. + auth_metadata_filename = "${0}.meta".format(guestclient_1["auth_id"]) + self.assertIn(auth_metadata_filename, self.mounts[0].ls("volumes")) + + # Verify that the auth metadata file stores the tenant ID that the + # auth ID belongs to, the auth ID's authorized access levels + # for different volumes, versioning details, etc. + expected_auth_metadata = { + "version": 2, + "compat_version": 1, + "dirty": False, + "tenant_id": "tenant1", + "volumes": { + "groupid/volumeid": { + "dirty": False, + "access_level": "rw" + } + } + } + + auth_metadata = self._volume_client_python(volumeclient_mount, dedent(""" + import json + vp = VolumePath("{group_id}", "{volume_id}") + auth_metadata = vc._auth_metadata_get("{auth_id}") + print(json.dumps(auth_metadata)) + """.format( + group_id=group_id, + volume_id=volume_id, + auth_id=guestclient_1["auth_id"], + ))) + auth_metadata = json.loads(auth_metadata) + + self.assertGreaterEqual(auth_metadata["version"], expected_auth_metadata["version"]) + del expected_auth_metadata["version"] + del auth_metadata["version"] + self.assertEqual(expected_auth_metadata, auth_metadata) + + # Verify that the volume metadata file stores info about auth IDs + # and their access levels to the volume, versioning details, etc. + expected_vol_metadata = { + "version": 2, + "compat_version": 1, + "auths": { + "guest": { + "dirty": False, + "access_level": "rw" + } + } + } + + vol_metadata = self._volume_client_python(volumeclient_mount, dedent(""" + import json + vp = VolumePath("{group_id}", "{volume_id}") + volume_metadata = vc._volume_metadata_get(vp) + print(json.dumps(volume_metadata)) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + vol_metadata = json.loads(vol_metadata) + + self.assertGreaterEqual(vol_metadata["version"], expected_vol_metadata["version"]) + del expected_vol_metadata["version"] + del vol_metadata["version"] + self.assertEqual(expected_vol_metadata, vol_metadata) + + # Cannot authorize 'guestclient_2' to access the volume. + # It uses auth ID 'guest', which has already been used by a + # 'guestclient_1' belonging to an another tenant for accessing + # the volume. + with self.assertRaises(CommandFailedError): + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}") + """.format( + group_id=group_id, + volume_id=volume_id, + auth_id=guestclient_2["auth_id"], + tenant_id=guestclient_2["tenant_id"] + ))) + + # Check that auth metadata file is cleaned up on removing + # auth ID's only access to a volume. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.deauthorize(vp, "{guest_entity}") + """.format( + group_id=group_id, + volume_id=volume_id, + guest_entity=guestclient_1["auth_id"] + ))) + + self.assertNotIn(auth_metadata_filename, self.mounts[0].ls("volumes")) + + # Check that volume metadata file is cleaned up on volume deletion. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.delete_volume(vp) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + self.assertNotIn(vol_metadata_filename, self.mounts[0].ls("volumes")) + + def test_authorize_auth_id_not_created_by_ceph_volume_client(self): + """ + If the auth_id already exists and is not created by + ceph_volume_client, it's not allowed to authorize + the auth-id by default. + """ + volumeclient_mount = self.mounts[1] + volumeclient_mount.umount_wait() + + # Configure volumeclient_mount as the handle for driving volumeclient. + self._configure_vc_auth(volumeclient_mount, "manila") + + group_id = "groupid" + volume_id = "volumeid" + + # Create auth_id + self.fs.mon_manager.raw_cluster_cmd( + "auth", "get-or-create", "client.guest1", + "mds", "allow *", + "osd", "allow rw", + "mon", "allow *" + ) + + auth_id = "guest1" + guestclient_1 = { + "auth_id": auth_id, + "tenant_id": "tenant1", + } + + # Create a volume. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.create_volume(vp, 1024*1024*10) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + # Cannot authorize 'guestclient_1' to access the volume. + # It uses auth ID 'guest1', which already exists and not + # created by ceph_volume_client + with self.assertRaises(CommandFailedError): + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}") + """.format( + group_id=group_id, + volume_id=volume_id, + auth_id=guestclient_1["auth_id"], + tenant_id=guestclient_1["tenant_id"] + ))) + + # Delete volume + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.delete_volume(vp) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + def test_authorize_allow_existing_id_option(self): + """ + If the auth_id already exists and is not created by + ceph_volume_client, it's not allowed to authorize + the auth-id by default but is allowed with option + allow_existing_id. + """ + volumeclient_mount = self.mounts[1] + volumeclient_mount.umount_wait() + + # Configure volumeclient_mount as the handle for driving volumeclient. + self._configure_vc_auth(volumeclient_mount, "manila") + + group_id = "groupid" + volume_id = "volumeid" + + # Create auth_id + self.fs.mon_manager.raw_cluster_cmd( + "auth", "get-or-create", "client.guest1", + "mds", "allow *", + "osd", "allow rw", + "mon", "allow *" + ) + + auth_id = "guest1" + guestclient_1 = { + "auth_id": auth_id, + "tenant_id": "tenant1", + } + + # Create a volume. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.create_volume(vp, 1024*1024*10) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + # Cannot authorize 'guestclient_1' to access the volume + # by default, which already exists and not created by + # ceph_volume_client but is allowed with option 'allow_existing_id'. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}", + allow_existing_id="{allow_existing_id}") + """.format( + group_id=group_id, + volume_id=volume_id, + auth_id=guestclient_1["auth_id"], + tenant_id=guestclient_1["tenant_id"], + allow_existing_id=True + ))) + + # Delete volume + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.delete_volume(vp) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + def test_deauthorize_auth_id_after_out_of_band_update(self): + """ + If the auth_id authorized by ceph_volume_client is updated + out of band, the auth_id should not be deleted after a + deauthorize. It should only remove caps associated it. + """ + volumeclient_mount = self.mounts[1] + volumeclient_mount.umount_wait() + + # Configure volumeclient_mount as the handle for driving volumeclient. + self._configure_vc_auth(volumeclient_mount, "manila") + + group_id = "groupid" + volume_id = "volumeid" + + + auth_id = "guest1" + guestclient_1 = { + "auth_id": auth_id, + "tenant_id": "tenant1", + } + + # Create a volume. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.create_volume(vp, 1024*1024*10) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + # Authorize 'guestclient_1' to access the volume. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}") + """.format( + group_id=group_id, + volume_id=volume_id, + auth_id=guestclient_1["auth_id"], + tenant_id=guestclient_1["tenant_id"] + ))) + + # Update caps for guestclient_1 out of band + out = self.fs.mon_manager.raw_cluster_cmd( + "auth", "caps", "client.guest1", + "mds", "allow rw path=/volumes/groupid, allow rw path=/volumes/groupid/volumeid", + "osd", "allow rw pool=cephfs_data namespace=fsvolumens_volumeid", + "mon", "allow r", + "mgr", "allow *" + ) + + # Deauthorize guestclient_1 + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.deauthorize(vp, "{guest_entity}") + """.format( + group_id=group_id, + volume_id=volume_id, + guest_entity=guestclient_1["auth_id"] + ))) + + # Validate the caps of guestclient_1 after deauthorize. It should not have deleted + # guestclient_1. The mgr and mds caps should be present which was updated out of band. + out = json.loads(self.fs.mon_manager.raw_cluster_cmd("auth", "get", "client.guest1", "--format=json-pretty")) + + self.assertEqual("client.guest1", out[0]["entity"]) + self.assertEqual("allow rw path=/volumes/groupid", out[0]["caps"]["mds"]) + self.assertEqual("allow *", out[0]["caps"]["mgr"]) + self.assertNotIn("osd", out[0]["caps"]) + + # Delete volume + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.delete_volume(vp) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + def test_recover_metadata(self): + """ + That volume client can recover from partial auth updates using + metadata files, which store auth info and its update status info. + """ + volumeclient_mount = self.mounts[1] + volumeclient_mount.umount_wait() + + # Configure volumeclient_mount as the handle for driving volumeclient. + self._configure_vc_auth(volumeclient_mount, "manila") + + group_id = "groupid" + volume_id = "volumeid" + + guestclient = { + "auth_id": "guest", + "tenant_id": "tenant", + } + + # Create a volume. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.create_volume(vp, 1024*1024*10) + """.format( + group_id=group_id, + volume_id=volume_id, + ))) + + # Authorize 'guestclient' access to the volume. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}") + """.format( + group_id=group_id, + volume_id=volume_id, + auth_id=guestclient["auth_id"], + tenant_id=guestclient["tenant_id"] + ))) + + # Check that auth metadata file for auth ID 'guest' is created. + auth_metadata_filename = "${0}.meta".format(guestclient["auth_id"]) + self.assertIn(auth_metadata_filename, self.mounts[0].ls("volumes")) + + # Induce partial auth update state by modifying the auth metadata file, + # and then run recovery procedure. + self._volume_client_python(volumeclient_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + auth_metadata = vc._auth_metadata_get("{auth_id}") + auth_metadata['dirty'] = True + vc._auth_metadata_set("{auth_id}", auth_metadata) + vc.recover() + """.format( + group_id=group_id, + volume_id=volume_id, + auth_id=guestclient["auth_id"], + ))) + + def test_put_object(self): + vc_mount = self.mounts[1] + vc_mount.umount_wait() + self._configure_vc_auth(vc_mount, "manila") + + obj_data = 'test data' + obj_name = 'test_vc_obj_1' + pool_name = self.fs.get_data_pool_names()[0] + + self._volume_client_python(vc_mount, dedent(""" + vc.put_object("{pool_name}", "{obj_name}", b"{obj_data}") + """.format( + pool_name = pool_name, + obj_name = obj_name, + obj_data = obj_data + ))) + + read_data = self.fs.rados(['get', obj_name, '-'], pool=pool_name) + self.assertEqual(obj_data, read_data) + + def test_get_object(self): + vc_mount = self.mounts[1] + vc_mount.umount_wait() + self._configure_vc_auth(vc_mount, "manila") + + obj_data = 'test_data' + obj_name = 'test_vc_ob_2' + pool_name = self.fs.get_data_pool_names()[0] + + self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data) + + self._volume_client_python(vc_mount, dedent(""" + data_read = vc.get_object("{pool_name}", "{obj_name}") + assert data_read == b"{obj_data}" + """.format( + pool_name = pool_name, + obj_name = obj_name, + obj_data = obj_data + ))) + + def test_put_object_versioned(self): + vc_mount = self.mounts[1] + vc_mount.umount_wait() + self._configure_vc_auth(vc_mount, "manila") + + obj_data = 'test_data' + obj_name = 'test_vc_obj' + pool_name = self.fs.get_data_pool_names()[0] + self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data) + + self._volume_client_python(vc_mount, dedent(""" + data, version_before = vc.get_object_and_version("{pool_name}", "{obj_name}") + + if sys_version_info.major < 3: + data = data + 'modification1' + elif sys_version_info.major > 3: + data = str.encode(data.decode() + 'modification1') + + vc.put_object_versioned("{pool_name}", "{obj_name}", data, version_before) + data, version_after = vc.get_object_and_version("{pool_name}", "{obj_name}") + assert version_after == version_before + 1 + """).format(pool_name=pool_name, obj_name=obj_name)) + + def test_version_check_for_put_object_versioned(self): + vc_mount = self.mounts[1] + vc_mount.umount_wait() + self._configure_vc_auth(vc_mount, "manila") + + obj_data = 'test_data' + obj_name = 'test_vc_ob_2' + pool_name = self.fs.get_data_pool_names()[0] + self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data) + + # Test if put_object_versioned() crosschecks the version of the + # given object. Being a negative test, an exception is expected. + expected_exception = 'rados_OSError' + output = self._volume_client_python(vc_mount, dedent(""" + data, version = vc.get_object_and_version("{pool_name}", "{obj_name}") + + if sys_version_info.major < 3: + data = data + 'm1' + elif sys_version_info.major > 3: + data = str.encode(data.decode('utf-8') + 'm1') + + vc.put_object("{pool_name}", "{obj_name}", data) + + if sys_version_info.major < 3: + data = data + 'm2' + elif sys_version_info.major > 3: + data = str.encode(data.decode('utf-8') + 'm2') + + try: + vc.put_object_versioned("{pool_name}", "{obj_name}", data, version) + except {expected_exception}: + print('{expected_exception} raised') + """).format(pool_name=pool_name, obj_name=obj_name, + expected_exception=expected_exception)) + self.assertEqual(expected_exception + ' raised', output) + + + def test_delete_object(self): + vc_mount = self.mounts[1] + vc_mount.umount_wait() + self._configure_vc_auth(vc_mount, "manila") + + obj_data = 'test data' + obj_name = 'test_vc_obj_3' + pool_name = self.fs.get_data_pool_names()[0] + + self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data) + + self._volume_client_python(vc_mount, dedent(""" + data_read = vc.delete_object("{pool_name}", "{obj_name}") + """.format( + pool_name = pool_name, + obj_name = obj_name, + ))) + + with self.assertRaises(CommandFailedError): + self.fs.rados(['stat', obj_name], pool=pool_name) + + # Check idempotency -- no error raised trying to delete non-existent + # object + self._volume_client_python(vc_mount, dedent(""" + data_read = vc.delete_object("{pool_name}", "{obj_name}") + """.format( + pool_name = pool_name, + obj_name = obj_name, + ))) + + def test_21501(self): + """ + Reproducer for #21501 "ceph_volume_client: sets invalid caps for + existing IDs with no caps" (http://tracker.ceph.com/issues/21501) + """ + + vc_mount = self.mounts[1] + vc_mount.umount_wait() + + # Configure vc_mount as the handle for driving volumeclient + self._configure_vc_auth(vc_mount, "manila") + + # Create a volume + group_id = "grpid" + volume_id = "volid" + cephfs_mntpt = self._volume_client_python(vc_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + create_result = vc.create_volume(vp, 1024*1024*10) + print(create_result['mount_path']) + """.format( + group_id=group_id, + volume_id=volume_id + ))) + + # Create an auth ID with no caps + guest_id = '21501' + self.fs.mon_manager.raw_cluster_cmd_result( + 'auth', 'get-or-create', 'client.{0}'.format(guest_id)) + + guest_mount = self.mounts[2] + guest_mount.umount_wait() +# Set auth caps for the auth ID using the volumeclient + self._configure_guest_auth(vc_mount, guest_mount, guest_id, cephfs_mntpt, + allow_existing_id=True) + + # Mount the volume in the guest using the auth ID to assert that the + # auth caps are valid + guest_mount.mount_wait(cephfs_mntpt=cephfs_mntpt) + + def test_volume_without_namespace_isolation(self): + """ + That volume client can create volumes that do not have separate RADOS + namespace layouts. + """ + vc_mount = self.mounts[1] + vc_mount.umount_wait() + + # Configure vc_mount as the handle for driving volumeclient + self._configure_vc_auth(vc_mount, "manila") + + # Create a volume + volume_prefix = "/myprefix" + group_id = "grpid" + volume_id = "volid" + self._volume_client_python(vc_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + create_result = vc.create_volume(vp, 1024*1024*10, namespace_isolated=False) + print(create_result['mount_path']) + """.format( + group_id=group_id, + volume_id=volume_id + )), volume_prefix) + + # The CephFS volume should be created + self.mounts[0].stat(os.path.join("myprefix", group_id, volume_id)) + vol_namespace = self.mounts[0].getfattr( + os.path.join("myprefix", group_id, volume_id), + "ceph.dir.layout.pool_namespace") + assert not vol_namespace + + self._volume_client_python(vc_mount, dedent(""" + vp = VolumePath("{group_id}", "{volume_id}") + vc.delete_volume(vp) + vc.purge_volume(vp) + """.format( + group_id=group_id, + volume_id=volume_id, + )), volume_prefix) diff --git a/qa/workunits/fs/upgrade/volume_client b/qa/workunits/fs/upgrade/volume_client new file mode 100755 index 000000000000..b3b6dd3e1cfe --- /dev/null +++ b/qa/workunits/fs/upgrade/volume_client @@ -0,0 +1,110 @@ +#!/bin/bash + +set -ex + +PYTHON="python3" + +function run_payload { + local payload="$1" + sudo "$PYTHON" <&2 + sudo touch -- "$keyring" + sudo ceph-authtool "$keyring" --import-keyring "$T" + rm -f -- "$T" +} + +function conf_keys { + local client="$1" + ls /etc/ceph >&2 + ceph auth get-or-create "client.manila" mds 'allow *' osd 'allow rw' mon 'allow *' | import_key "$client" /etc/ceph/ceph.keyring +} + +function create_data_isolated { + local PAYLOAD=' +vp = VolumePath(None, "vol_isolated") +vc.create_volume(vp, (1<<33), data_isolated=True) +auth_result = vc.authorize(vp, "vol_data_isolated", tenant_id="test") +print("[client.vol_data_isolated]\n\tkey = ", auth_result["auth_key"]) +' + + run_payload "$PAYLOAD" | import_key "vol_data_isolated" +} + +function create_default { + local PAYLOAD=' +vp = VolumePath(None, "vol_default") +vc.create_volume(vp, (1<<33)) +auth_result = vc.authorize(vp, "vol_default", tenant_id="test") +print("[client.vol_default]\n\tkey = ", auth_result["auth_key"]) +' + run_payload "$PAYLOAD" | import_key "vol_default" +} + +function create { + create_data_isolated + create_default +} + +function populate { + pwd + df -h . + ls -l + cp -a /usr/bin . +} + +function verify_data_isolated { + ceph fs subvolume getpath cephfs vol_isolated + stat bin + ls bin | tail +} + +function verify_default { + ceph fs subvolume getpath cephfs vol_default + stat bin + ls bin | tail +} + +function verify { + diff <(ceph fs subvolume ls cephfs | jq -cS 'sort_by(.name)' | tee /dev/stderr) <(printf '[{"name":"vol_isolated"},{"name":"vol_default"}]' | jq -cS 'sort_by(.name)') + verify_data_isolated + verify_default +} + +function main { + if [ "$1" = create ]; then + conf_keys + create + elif [ "$1" = populate ]; then + populate + elif [ "$1" = verify ]; then + # verify (sub)volumes still exist and are configured correctly + verify + else + exit 1 + fi +} + +main "$ACTION" diff --git a/src/pybind/CMakeLists.txt b/src/pybind/CMakeLists.txt index 4ff35011058e..86fcc6becade 100644 --- a/src/pybind/CMakeLists.txt +++ b/src/pybind/CMakeLists.txt @@ -39,6 +39,7 @@ execute_process( install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/ceph_argparse.py ${CMAKE_CURRENT_SOURCE_DIR}/ceph_daemon.py + ${CMAKE_CURRENT_SOURCE_DIR}/ceph_volume_client.py DESTINATION ${PYTHON3_INSTDIR}) if(WITH_MGR) diff --git a/src/pybind/ceph_volume_client.py b/src/pybind/ceph_volume_client.py new file mode 100644 index 000000000000..b748f5d85f78 --- /dev/null +++ b/src/pybind/ceph_volume_client.py @@ -0,0 +1,1551 @@ +""" +Copyright (C) 2015 Red Hat, Inc. + +LGPL-2.1 or LGPL-3.0. See file COPYING. +""" + +from contextlib import contextmanager +import errno +import fcntl +import json +import logging +import os +import re +import struct +import sys +import threading +import time +import uuid + +from ceph_argparse import json_command + +import cephfs +import rados + +def to_bytes(param): + ''' + Helper method that returns byte representation of the given parameter. + ''' + if isinstance(param, str): + return param.encode('utf-8') + elif param is None: + return param + else: + return str(param).encode('utf-8') + +class RadosError(Exception): + """ + Something went wrong talking to Ceph with librados + """ + pass + + +RADOS_TIMEOUT = 10 + +log = logging.getLogger(__name__) + +# Reserved volume group name which we use in paths for volumes +# that are not assigned to a group (i.e. created with group=None) +NO_GROUP_NAME = "_nogroup" + +# Filename extensions for meta files. +META_FILE_EXT = ".meta" + +class VolumePath(object): + """ + Identify a volume's path as group->volume + The Volume ID is a unique identifier, but this is a much more + helpful thing to pass around. + """ + def __init__(self, group_id, volume_id): + self.group_id = group_id + self.volume_id = volume_id + assert self.group_id != NO_GROUP_NAME + assert self.volume_id != "" and self.volume_id is not None + + def __str__(self): + return "{0}/{1}".format(self.group_id, self.volume_id) + + +class ClusterTimeout(Exception): + """ + Exception indicating that we timed out trying to talk to the Ceph cluster, + either to the mons, or to any individual daemon that the mons indicate ought + to be up but isn't responding to us. + """ + pass + + +class ClusterError(Exception): + """ + Exception indicating that the cluster returned an error to a command that + we thought should be successful based on our last knowledge of the cluster + state. + """ + def __init__(self, action, result_code, result_str): + self._action = action + self._result_code = result_code + self._result_str = result_str + + def __str__(self): + return "Error {0} (\"{1}\") while {2}".format( + self._result_code, self._result_str, self._action) + + +class RankEvicter(threading.Thread): + """ + Thread for evicting client(s) from a particular MDS daemon instance. + + This is more complex than simply sending a command, because we have to + handle cases where MDS daemons might not be fully up yet, and/or might + be transiently unresponsive to commands. + """ + class GidGone(Exception): + pass + + POLL_PERIOD = 5 + + def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout): + """ + :param client_spec: list of strings, used as filter arguments to "session evict" + pass ["id=123"] to evict a single client with session id 123. + """ + self.rank = rank + self.gid = gid + self._mds_map = mds_map + self._client_spec = client_spec + self._volume_client = volume_client + self._ready_timeout = ready_timeout + self._ready_waited = 0 + + self.success = False + self.exception = None + + super(RankEvicter, self).__init__() + + def _ready_to_evict(self): + if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: + log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( + self._client_spec, self.rank, self.gid + )) + raise RankEvicter.GidGone() + + info = self._mds_map['info']["gid_{0}".format(self.gid)] + log.debug("_ready_to_evict: state={0}".format(info['state'])) + return info['state'] in ["up:active", "up:clientreplay"] + + def _wait_for_ready(self): + """ + Wait for that MDS rank to reach an active or clientreplay state, and + not be laggy. + """ + while not self._ready_to_evict(): + if self._ready_waited > self._ready_timeout: + raise ClusterTimeout() + + time.sleep(self.POLL_PERIOD) + self._ready_waited += self.POLL_PERIOD + + self._mds_map = self._volume_client.get_mds_map() + + def _evict(self): + """ + Run the eviction procedure. Return true on success, false on errors. + """ + + # Wait til the MDS is believed by the mon to be available for commands + try: + self._wait_for_ready() + except self.GidGone: + return True + + # Then send it an evict + ret = errno.ETIMEDOUT + while ret == errno.ETIMEDOUT: + log.debug("mds_command: {0}, {1}".format( + "%s" % self.gid, ["session", "evict"] + self._client_spec + )) + ret, outb, outs = self._volume_client.fs.mds_command( + "%s" % self.gid, + [json.dumps({ + "prefix": "session evict", + "filters": self._client_spec + })], "") + log.debug("mds_command: complete {0} {1}".format(ret, outs)) + + # If we get a clean response, great, it's gone from that rank. + if ret == 0: + return True + elif ret == errno.ETIMEDOUT: + # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) + self._mds_map = self._volume_client.get_mds_map() + try: + self._wait_for_ready() + except self.GidGone: + return True + else: + raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) + + def run(self): + try: + self._evict() + except Exception as e: + self.success = False + self.exception = e + else: + self.success = True + + +class EvictionError(Exception): + pass + + +class CephFSVolumeClientError(Exception): + """ + Something went wrong talking to Ceph using CephFSVolumeClient. + """ + pass + + +CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ + + CephFSVolumeClient Version History: + + * 1 - Initial version + * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient + * 3 - Allow volumes to be created without RADOS namespace isolation + * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient + * 5 - Disallow authorize API for users not created by CephFSVolumeClient +""" + + +class CephFSVolumeClient(object): + """ + Combine libcephfs and librados interfaces to implement a + 'Volume' concept implemented as a cephfs directory and + client capabilities which restrict mount access to this + directory. + + Additionally, volumes may be in a 'Group'. Conveniently, + volumes are a lot like manila shares, and groups are a lot + like manila consistency groups. + + Refer to volumes with VolumePath, which specifies the + volume and group IDs (both strings). The group ID may + be None. + + In general, functions in this class are allowed raise rados.Error + or cephfs.Error exceptions in unexpected situations. + """ + + # Current version + version = 5 + + # Where shall we create our volumes? + POOL_PREFIX = "fsvolume_" + DEFAULT_VOL_PREFIX = "/volumes" + DEFAULT_NS_PREFIX = "fsvolumens_" + + def __init__(self, auth_id=None, conf_path=None, cluster_name=None, + volume_prefix=None, pool_ns_prefix=None, rados=None, + fs_name=None): + """ + Either set all three of ``auth_id``, ``conf_path`` and + ``cluster_name`` (rados constructed on connect), or + set ``rados`` (existing rados instance). + """ + self.fs = None + self.fs_name = fs_name + self.connected = False + + self.conf_path = conf_path + self.cluster_name = cluster_name + self.auth_id = auth_id + + self.rados = rados + if self.rados: + # Using an externally owned rados, so we won't tear it down + # on disconnect + self.own_rados = False + else: + # self.rados will be constructed in connect + self.own_rados = True + + self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX + self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX + # For flock'ing in cephfs, I want a unique ID to distinguish me + # from any other manila-share services that are loading this module. + # We could use pid, but that's unnecessary weak: generate a + # UUID + self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0] + + # TODO: version the on-disk structures + + def recover(self): + # Scan all auth keys to see if they're dirty: if they are, they have + # state that might not have propagated to Ceph or to the related + # volumes yet. + + # Important: we *always* acquire locks in the order auth->volume + # That means a volume can never be dirty without the auth key + # we're updating it with being dirty at the same time. + + # First list the auth IDs that have potentially dirty on-disk metadata + log.debug("Recovering from partial auth updates (if any)...") + + try: + dir_handle = self.fs.opendir(self.volume_prefix) + except cephfs.ObjectNotFound: + log.debug("Nothing to recover. No auth meta files.") + return + + d = self.fs.readdir(dir_handle) + auth_ids = [] + + if not d: + log.debug("Nothing to recover. No auth meta files.") + + while d: + # Identify auth IDs from auth meta filenames. The auth meta files + # are named as, "$" + regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT)) + match = re.search(regex, d.d_name.decode(encoding='utf-8')) + if match: + auth_ids.append(match.group(1)) + + d = self.fs.readdir(dir_handle) + + self.fs.closedir(dir_handle) + + # Key points based on ordering: + # * Anything added in VMeta is already added in AMeta + # * Anything added in Ceph is already added in VMeta + # * Anything removed in VMeta is already removed in Ceph + # * Anything removed in AMeta is already removed in VMeta + + # Deauthorization: because I only update metadata AFTER the + # update of the next level down, I have the same ordering of + # -> things which exist in the AMeta should also exist + # in the VMeta, should also exist in Ceph, and the same + # recovery procedure that gets me consistent after crashes + # during authorization will also work during deauthorization + + # Now for each auth ID, check for dirty flag and apply updates + # if dirty flag is found + for auth_id in auth_ids: + with self._auth_lock(auth_id): + auth_meta = self._auth_metadata_get(auth_id) + if not auth_meta or not auth_meta['volumes']: + # Clean up auth meta file + self.fs.unlink(self._auth_metadata_path(auth_id)) + continue + if not auth_meta['dirty']: + continue + self._recover_auth_meta(auth_id, auth_meta) + + log.debug("Recovered from partial auth updates (if any).") + + def _recover_auth_meta(self, auth_id, auth_meta): + """ + Call me after locking the auth meta file. + """ + remove_volumes = [] + + for volume, volume_data in auth_meta['volumes'].items(): + if not volume_data['dirty']: + continue + + (group_id, volume_id) = volume.split('/') + group_id = group_id if group_id != 'None' else None + volume_path = VolumePath(group_id, volume_id) + access_level = volume_data['access_level'] + + with self._volume_lock(volume_path): + vol_meta = self._volume_metadata_get(volume_path) + + # No VMeta update indicates that there was no auth update + # in Ceph either. So it's safe to remove corresponding + # partial update in AMeta. + if not vol_meta or auth_id not in vol_meta['auths']: + remove_volumes.append(volume) + continue + + want_auth = { + 'access_level': access_level, + 'dirty': False, + } + # VMeta update looks clean. Ceph auth update must have been + # clean. + if vol_meta['auths'][auth_id] == want_auth: + continue + + readonly = access_level == 'r' + client_entity = "client.{0}".format(auth_id) + try: + existing_caps = self._rados_command( + 'auth get', + { + 'entity': client_entity + } + ) + # FIXME: rados raising Error instead of ObjectNotFound in auth get failure + except rados.Error: + existing_caps = None + self._authorize_volume(volume_path, auth_id, readonly, existing_caps) + + # Recovered from partial auth updates for the auth ID's access + # to a volume. + auth_meta['volumes'][volume]['dirty'] = False + self._auth_metadata_set(auth_id, auth_meta) + + for volume in remove_volumes: + del auth_meta['volumes'][volume] + + if not auth_meta['volumes']: + # Clean up auth meta file + self.fs.unlink(self._auth_metadata_path(auth_id)) + return + + # Recovered from all partial auth updates for the auth ID. + auth_meta['dirty'] = False + self._auth_metadata_set(auth_id, auth_meta) + + def get_mds_map(self): + fs_map = self._rados_command("fs dump", {}) + return fs_map['filesystems'][0]['mdsmap'] + + def evict(self, auth_id, timeout=30, volume_path=None): + """ + Evict all clients based on the authorization ID and optionally based on + the volume path mounted. Assumes that the authorization key has been + revoked prior to calling this function. + + This operation can throw an exception if the mon cluster is unresponsive, or + any individual MDS daemon is unresponsive for longer than the timeout passed in. + """ + + client_spec = ["auth_name={0}".format(auth_id), ] + if volume_path: + client_spec.append("client_metadata.root={0}". + format(self._get_path(volume_path))) + + log.info("evict clients with {0}".format(', '.join(client_spec))) + + mds_map = self.get_mds_map() + up = {} + for name, gid in mds_map['up'].items(): + # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" + assert name.startswith("mds_") + up[int(name[4:])] = gid + + # For all MDS ranks held by a daemon + # Do the parallelism in python instead of using "tell mds.*", because + # the latter doesn't give us per-mds output + threads = [] + for rank, gid in up.items(): + thread = RankEvicter(self, client_spec, rank, gid, mds_map, + timeout) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + log.info("evict: joined all") + + for t in threads: + if not t.success: + msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". + format(', '.join(client_spec), t.rank, t.gid, t.exception) + ) + log.error(msg) + raise EvictionError(msg) + + def _get_path(self, volume_path): + """ + Determine the path within CephFS where this volume will live + :return: absolute path (string) + """ + return os.path.join( + self.volume_prefix, + volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME, + volume_path.volume_id) + + def _get_group_path(self, group_id): + if group_id is None: + raise ValueError("group_id may not be None") + + return os.path.join( + self.volume_prefix, + group_id + ) + + def _connect(self, premount_evict): + log.debug("Connecting to cephfs...") + self.fs = cephfs.LibCephFS(rados_inst=self.rados) + log.debug("CephFS initializing...") + self.fs.init() + if premount_evict is not None: + log.debug("Premount eviction of {0} starting".format(premount_evict)) + self.evict(premount_evict) + log.debug("Premount eviction of {0} completes".format(premount_evict)) + log.debug("CephFS mounting...") + self.fs.mount(filesystem_name=to_bytes(self.fs_name)) + log.debug("Connection to cephfs complete") + + # Recover from partial auth updates due to a previous + # crash. + self.recover() + + def connect(self, premount_evict = None): + """ + + :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers + may want to use this to specify their own auth ID if they expect + to be a unique instance and don't want to wait for caps to time + out after failure of another instance of themselves. + """ + if self.own_rados: + log.debug("Configuring to RADOS with config {0}...".format(self.conf_path)) + self.rados = rados.Rados( + name="client.{0}".format(self.auth_id), + clustername=self.cluster_name, + conffile=self.conf_path, + conf={} + ) + if self.rados.state != "connected": + log.debug("Connecting to RADOS...") + self.rados.connect() + log.debug("Connection to RADOS complete") + self._connect(premount_evict) + + def get_mon_addrs(self): + log.info("get_mon_addrs") + result = [] + mon_map = self._rados_command("mon dump") + for mon in mon_map['mons']: + ip_port = mon['addr'].split("/")[0] + result.append(ip_port) + + return result + + def disconnect(self): + log.info("disconnect") + if self.fs: + log.debug("Disconnecting cephfs...") + self.fs.shutdown() + self.fs = None + log.debug("Disconnecting cephfs complete") + + if self.rados and self.own_rados: + log.debug("Disconnecting rados...") + self.rados.shutdown() + self.rados = None + log.debug("Disconnecting rados complete") + + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.disconnect() + + def __del__(self): + self.disconnect() + + def _get_pool_id(self, osd_map, pool_name): + # Maybe borrow the OSDMap wrapper class from calamari if more helpers + # like this are needed. + for pool in osd_map['pools']: + if pool['pool_name'] == pool_name: + return pool['pool'] + + return None + + def _create_volume_pool(self, pool_name): + """ + Idempotently create a pool for use as a CephFS data pool, with the given name + + :return The ID of the created pool + """ + osd_map = self._rados_command('osd dump', {}) + + existing_id = self._get_pool_id(osd_map, pool_name) + if existing_id is not None: + log.info("Pool {0} already exists".format(pool_name)) + return existing_id + + self._rados_command( + 'osd pool create', + { + 'pool': pool_name, + } + ) + + osd_map = self._rados_command('osd dump', {}) + pool_id = self._get_pool_id(osd_map, pool_name) + + if pool_id is None: + # If the pool isn't there, that's either a ceph bug, or it's some outside influence + # removing it right after we created it. + log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format( + pool_name, json.dumps(osd_map, indent=2) + )) + raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name)) + else: + return pool_id + + def create_group(self, group_id, mode=0o755): + # Prevent craftily-named volume groups from colliding with the meta + # files. + if group_id.endswith(META_FILE_EXT): + raise ValueError("group ID cannot end with '{0}'.".format( + META_FILE_EXT)) + path = self._get_group_path(group_id) + self._mkdir_p(path, mode) + + def destroy_group(self, group_id): + path = self._get_group_path(group_id) + try: + self.fs.stat(self.volume_prefix) + except cephfs.ObjectNotFound: + pass + else: + self.fs.rmdir(path) + + def _mkdir_p(self, path, mode=0o755): + try: + self.fs.stat(path) + except cephfs.ObjectNotFound: + pass + else: + return + + parts = path.split(os.path.sep) + + for i in range(1, len(parts) + 1): + subpath = os.path.join(*parts[0:i]) + try: + self.fs.stat(subpath) + except cephfs.ObjectNotFound: + self.fs.mkdir(subpath, mode) + + def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True, + mode=0o755): + """ + Set up metadata, pools and auth for a volume. + + This function is idempotent. It is safe to call this again + for an already-created volume, even if it is in use. + + :param volume_path: VolumePath instance + :param size: In bytes, or None for no size limit + :param data_isolated: If true, create a separate OSD pool for this volume + :param namespace_isolated: If true, use separate RADOS namespace for this volume + :return: + """ + path = self._get_path(volume_path) + log.info("create_volume: {0}".format(path)) + + self._mkdir_p(path, mode) + + if size is not None: + self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0) + + # data_isolated means create a separate pool for this volume + if data_isolated: + pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) + log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name)) + pool_id = self._create_volume_pool(pool_name) + mds_map = self.get_mds_map() + if pool_id not in mds_map['data_pools']: + self._rados_command("fs add_data_pool", { + 'fs_name': mds_map['fs_name'], + 'pool': pool_name + }) + time.sleep(5) # time for MDSMap to be distributed + self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0) + + # enforce security isolation, use separate namespace for this volume + if namespace_isolated: + namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id) + log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace)) + self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', + to_bytes(namespace), 0) + else: + # If volume's namespace layout is not set, then the volume's pool + # layout remains unset and will undesirably change with ancestor's + # pool layout changes. + pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") + self.fs.setxattr(path, 'ceph.dir.layout.pool', + to_bytes(pool_name), 0) + + # Create a volume meta file, if it does not already exist, to store + # data about auth ids having access to the volume + fd = self.fs.open(self._volume_metadata_path(volume_path), + os.O_CREAT, 0o755) + self.fs.close(fd) + + return { + 'mount_path': path + } + + def delete_volume(self, volume_path, data_isolated=False): + """ + Make a volume inaccessible to guests. This function is + idempotent. This is the fast part of tearing down a volume: you must + also later call purge_volume, which is the slow part. + + :param volume_path: Same identifier used in create_volume + :return: + """ + + path = self._get_path(volume_path) + log.info("delete_volume: {0}".format(path)) + + # Create the trash folder if it doesn't already exist + trash = os.path.join(self.volume_prefix, "_deleting") + self._mkdir_p(trash) + + # We'll move it to here + trashed_volume = os.path.join(trash, volume_path.volume_id) + + # Move the volume's data to the trash folder + try: + self.fs.stat(path) + except cephfs.ObjectNotFound: + log.warning("Trying to delete volume '{0}' but it's already gone".format( + path)) + else: + self.fs.rename(path, trashed_volume) + + # Delete the volume meta file, if it's not already deleted + vol_meta_path = self._volume_metadata_path(volume_path) + try: + self.fs.unlink(vol_meta_path) + except cephfs.ObjectNotFound: + pass + + def purge_volume(self, volume_path, data_isolated=False): + """ + Finish clearing up a volume that was previously passed to delete_volume. This + function is idempotent. + """ + + trash = os.path.join(self.volume_prefix, "_deleting") + trashed_volume = os.path.join(trash, volume_path.volume_id) + + try: + self.fs.stat(trashed_volume) + except cephfs.ObjectNotFound: + log.warning("Trying to purge volume '{0}' but it's already been purged".format( + trashed_volume)) + return + + def rmtree(root_path): + log.debug("rmtree {0}".format(root_path)) + dir_handle = self.fs.opendir(root_path) + d = self.fs.readdir(dir_handle) + while d: + d_name = d.d_name.decode(encoding='utf-8') + if d_name not in [".", ".."]: + # Do not use os.path.join because it is sensitive + # to string encoding, we just pass through dnames + # as byte arrays + d_full = u"{0}/{1}".format(root_path, d_name) + if d.is_dir(): + rmtree(d_full) + else: + self.fs.unlink(d_full) + + d = self.fs.readdir(dir_handle) + self.fs.closedir(dir_handle) + + self.fs.rmdir(root_path) + + rmtree(trashed_volume) + + if data_isolated: + pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) + osd_map = self._rados_command("osd dump", {}) + pool_id = self._get_pool_id(osd_map, pool_name) + mds_map = self.get_mds_map() + if pool_id in mds_map['data_pools']: + self._rados_command("fs rm_data_pool", { + 'fs_name': mds_map['fs_name'], + 'pool': pool_name + }) + self._rados_command("osd pool delete", + { + "pool": pool_name, + "pool2": pool_name, + "yes_i_really_really_mean_it": True + }) + + def _get_ancestor_xattr(self, path, attr): + """ + Helper for reading layout information: if this xattr is missing + on the requested path, keep checking parents until we find it. + """ + try: + result = self.fs.getxattr(path, attr).decode() + if result == "": + # Annoying! cephfs gives us empty instead of an error when attr not found + raise cephfs.NoData() + else: + return result + except cephfs.NoData: + if path == "/": + raise + else: + return self._get_ancestor_xattr(os.path.split(path)[0], attr) + + def _check_compat_version(self, compat_version): + if self.version < compat_version: + msg = ("The current version of CephFSVolumeClient, version {0} " + "does not support the required feature. Need version {1} " + "or greater".format(self.version, compat_version) + ) + log.error(msg) + raise CephFSVolumeClientError(msg) + + def _metadata_get(self, path): + """ + Return a deserialized JSON object, or None + """ + fd = self.fs.open(path, "r") + # TODO iterate instead of assuming file < 4MB + read_bytes = self.fs.read(fd, 0, 4096 * 1024) + self.fs.close(fd) + if read_bytes: + return json.loads(read_bytes.decode()) + else: + return None + + def _metadata_set(self, path, data): + serialized = json.dumps(data) + fd = self.fs.open(path, "w") + try: + self.fs.write(fd, to_bytes(serialized), 0) + self.fs.fsync(fd, 0) + finally: + self.fs.close(fd) + + def _lock(self, path): + @contextmanager + def fn(): + while(1): + fd = self.fs.open(path, os.O_CREAT, 0o755) + self.fs.flock(fd, fcntl.LOCK_EX, self._id) + + # The locked file will be cleaned up sometime. It could be + # unlinked e.g., by an another manila share instance, before + # lock was applied on it. Perform checks to ensure that this + # does not happen. + try: + statbuf = self.fs.stat(path) + except cephfs.ObjectNotFound: + self.fs.close(fd) + continue + + fstatbuf = self.fs.fstat(fd) + if statbuf.st_ino == fstatbuf.st_ino: + break + + try: + yield + finally: + self.fs.flock(fd, fcntl.LOCK_UN, self._id) + self.fs.close(fd) + + return fn() + + def _auth_metadata_path(self, auth_id): + return os.path.join(self.volume_prefix, "${0}{1}".format( + auth_id, META_FILE_EXT)) + + def _auth_lock(self, auth_id): + return self._lock(self._auth_metadata_path(auth_id)) + + def _auth_metadata_get(self, auth_id): + """ + Call me with the metadata locked! + + Check whether a auth metadata structure can be decoded by the current + version of CephFSVolumeClient. + + Return auth metadata that the current version of CephFSVolumeClient + can decode. + """ + auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) + + if auth_metadata: + self._check_compat_version(auth_metadata['compat_version']) + + return auth_metadata + + def _auth_metadata_set(self, auth_id, data): + """ + Call me with the metadata locked! + + Fsync the auth metadata. + + Add two version attributes to the auth metadata, + 'compat_version', the minimum CephFSVolumeClient version that can + decode the metadata, and 'version', the CephFSVolumeClient version + that encoded the metadata. + """ + data['compat_version'] = 1 + data['version'] = self.version + return self._metadata_set(self._auth_metadata_path(auth_id), data) + + def _volume_metadata_path(self, volume_path): + return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( + volume_path.group_id if volume_path.group_id else "", + volume_path.volume_id, + META_FILE_EXT + )) + + def _volume_lock(self, volume_path): + """ + Return a ContextManager which locks the authorization metadata for + a particular volume, and persists a flag to the metadata indicating + that it is currently locked, so that we can detect dirty situations + during recovery. + + This lock isn't just to make access to the metadata safe: it's also + designed to be used over the two-step process of checking the + metadata and then responding to an authorization request, to + ensure that at the point we respond the metadata hasn't changed + in the background. It's key to how we avoid security holes + resulting from races during that problem , + """ + return self._lock(self._volume_metadata_path(volume_path)) + + def _volume_metadata_get(self, volume_path): + """ + Call me with the metadata locked! + + Check whether a volume metadata structure can be decoded by the current + version of CephFSVolumeClient. + + Return a volume_metadata structure that the current version of + CephFSVolumeClient can decode. + """ + volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path)) + + if volume_metadata: + self._check_compat_version(volume_metadata['compat_version']) + + return volume_metadata + + def _volume_metadata_set(self, volume_path, data): + """ + Call me with the metadata locked! + + Add two version attributes to the volume metadata, + 'compat_version', the minimum CephFSVolumeClient version that can + decode the metadata and 'version', the CephFSVolumeClient version + that encoded the metadata. + """ + data['compat_version'] = 1 + data['version'] = self.version + return self._metadata_set(self._volume_metadata_path(volume_path), data) + + def _prepare_updated_caps_list(self, existing_caps, mds_cap_str, osd_cap_str, authorize=True): + caps_list = [] + for k, v in existing_caps['caps'].items(): + if k == 'mds' or k == 'osd': + continue + elif k == 'mon': + if not authorize and v == 'allow r': + continue + caps_list.extend((k,v)) + + if mds_cap_str: + caps_list.extend(('mds', mds_cap_str)) + if osd_cap_str: + caps_list.extend(('osd', osd_cap_str)) + + if authorize and 'mon' not in caps_list: + caps_list.extend(('mon', 'allow r')) + + return caps_list + + def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None, allow_existing_id=False): + """ + Get-or-create a Ceph auth identity for `auth_id` and grant them access + to + :param volume_path: + :param auth_id: + :param readonly: + :param tenant_id: Optionally provide a stringizable object to + restrict any created cephx IDs to other callers + passing the same tenant ID. + :allow_existing_id: Optionally authorize existing auth-ids not + created by ceph_volume_client + :return: + """ + + with self._auth_lock(auth_id): + client_entity = "client.{0}".format(auth_id) + try: + existing_caps = self._rados_command( + 'auth get', + { + 'entity': client_entity + } + ) + # FIXME: rados raising Error instead of ObjectNotFound in auth get failure + except rados.Error: + existing_caps = None + + # Existing meta, or None, to be updated + auth_meta = self._auth_metadata_get(auth_id) + + # volume data to be inserted + volume_path_str = str(volume_path) + volume = { + volume_path_str : { + # The access level at which the auth_id is authorized to + # access the volume. + 'access_level': 'r' if readonly else 'rw', + 'dirty': True, + } + } + + if auth_meta is None: + if not allow_existing_id and existing_caps is not None: + msg = "auth ID: {0} exists and not created by ceph_volume_client. Not allowed to modify".format(auth_id) + log.error(msg) + raise CephFSVolumeClientError(msg) + + # non-existent auth IDs + sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( + auth_id, tenant_id + )) + log.debug("Authorize: no existing meta") + auth_meta = { + 'dirty': True, + 'tenant_id': tenant_id.__str__() if tenant_id else None, + 'volumes': volume + } + else: + # Disallow tenants to share auth IDs + if auth_meta['tenant_id'].__str__() != tenant_id.__str__(): + msg = "auth ID: {0} is already in use".format(auth_id) + log.error(msg) + raise CephFSVolumeClientError(msg) + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + log.debug("Authorize: existing tenant {tenant}".format( + tenant=auth_meta['tenant_id'] + )) + auth_meta['dirty'] = True + auth_meta['volumes'].update(volume) + + self._auth_metadata_set(auth_id, auth_meta) + + with self._volume_lock(volume_path): + key = self._authorize_volume(volume_path, auth_id, readonly, existing_caps) + + auth_meta['dirty'] = False + auth_meta['volumes'][volume_path_str]['dirty'] = False + self._auth_metadata_set(auth_id, auth_meta) + + if tenant_id: + return { + 'auth_key': key + } + else: + # Caller wasn't multi-tenant aware: be safe and don't give + # them a key + return { + 'auth_key': None + } + + def _authorize_volume(self, volume_path, auth_id, readonly, existing_caps): + vol_meta = self._volume_metadata_get(volume_path) + + access_level = 'r' if readonly else 'rw' + auth = { + auth_id: { + 'access_level': access_level, + 'dirty': True, + } + } + + if vol_meta is None: + vol_meta = { + 'auths': auth + } + else: + vol_meta['auths'].update(auth) + self._volume_metadata_set(volume_path, vol_meta) + + key = self._authorize_ceph(volume_path, auth_id, readonly, existing_caps) + + vol_meta['auths'][auth_id]['dirty'] = False + self._volume_metadata_set(volume_path, vol_meta) + + return key + + def _authorize_ceph(self, volume_path, auth_id, readonly, existing_caps): + path = self._get_path(volume_path) + log.debug("Authorizing Ceph id '{0}' for path '{1}'".format( + auth_id, path + )) + + # First I need to work out what the data pool is for this share: + # read the layout + pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") + + try: + namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" + "namespace").decode() + except cephfs.NoData: + namespace = None + + # Now construct auth capabilities that give the guest just enough + # permissions to access the share + client_entity = "client.{0}".format(auth_id) + want_access_level = 'r' if readonly else 'rw' + want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path) + if namespace: + want_osd_cap = 'allow {0} pool={1} namespace={2}'.format( + want_access_level, pool_name, namespace) + else: + want_osd_cap = 'allow {0} pool={1}'.format(want_access_level, + pool_name) + + if existing_caps is None: + caps = self._rados_command( + 'auth get-or-create', + { + 'entity': client_entity, + 'caps': [ + 'mds', want_mds_cap, + 'osd', want_osd_cap, + 'mon', 'allow r'] + }) + else: + # entity exists, update it + cap = existing_caps[0] + + # Construct auth caps that if present might conflict with the desired + # auth caps. + unwanted_access_level = 'r' if want_access_level == 'rw' else 'rw' + unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path) + if namespace: + unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format( + unwanted_access_level, pool_name, namespace) + else: + unwanted_osd_cap = 'allow {0} pool={1}'.format( + unwanted_access_level, pool_name) + + def cap_update( + orig_mds_caps, orig_osd_caps, want_mds_cap, + want_osd_cap, unwanted_mds_cap, unwanted_osd_cap): + + if not orig_mds_caps: + return want_mds_cap, want_osd_cap + + mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] + osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] + + if want_mds_cap in mds_cap_tokens: + return orig_mds_caps, orig_osd_caps + + if unwanted_mds_cap in mds_cap_tokens: + mds_cap_tokens.remove(unwanted_mds_cap) + osd_cap_tokens.remove(unwanted_osd_cap) + + mds_cap_tokens.append(want_mds_cap) + osd_cap_tokens.append(want_osd_cap) + + return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) + + orig_mds_caps = cap['caps'].get('mds', "") + orig_osd_caps = cap['caps'].get('osd', "") + + mds_cap_str, osd_cap_str = cap_update( + orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap, + unwanted_mds_cap, unwanted_osd_cap) + + caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str) + caps = self._rados_command( + 'auth caps', + { + 'entity': client_entity, + 'caps': caps_list + }) + + caps = self._rados_command( + 'auth get', + { + 'entity': client_entity + } + ) + + # Result expected like this: + # [ + # { + # "entity": "client.foobar", + # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", + # "caps": { + # "mds": "allow *", + # "mon": "allow *" + # } + # } + # ] + assert len(caps) == 1 + assert caps[0]['entity'] == client_entity + return caps[0]['key'] + + def deauthorize(self, volume_path, auth_id): + with self._auth_lock(auth_id): + # Existing meta, or None, to be updated + auth_meta = self._auth_metadata_get(auth_id) + + volume_path_str = str(volume_path) + if (auth_meta is None) or (not auth_meta['volumes']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for volume ID '{volume}'".format( + auth_id=auth_id, volume=volume_path.volume_id + )) + # Clean up the auth meta file of an auth ID + self.fs.unlink(self._auth_metadata_path(auth_id)) + return + + if volume_path_str not in auth_meta['volumes']: + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for volume ID '{volume}'".format( + auth_id=auth_id, volume=volume_path.volume_id + )) + return + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + auth_meta['dirty'] = True + auth_meta['volumes'][volume_path_str]['dirty'] = True + self._auth_metadata_set(auth_id, auth_meta) + + self._deauthorize_volume(volume_path, auth_id) + + # Filter out the volume we're deauthorizing + del auth_meta['volumes'][volume_path_str] + + # Clean up auth meta file + if not auth_meta['volumes']: + self.fs.unlink(self._auth_metadata_path(auth_id)) + return + + auth_meta['dirty'] = False + self._auth_metadata_set(auth_id, auth_meta) + + def _deauthorize_volume(self, volume_path, auth_id): + with self._volume_lock(volume_path): + vol_meta = self._volume_metadata_get(volume_path) + + if (vol_meta is None) or (auth_id not in vol_meta['auths']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for volume ID '{volume}'".format( + auth_id=auth_id, volume=volume_path.volume_id + )) + return + + vol_meta['auths'][auth_id]['dirty'] = True + self._volume_metadata_set(volume_path, vol_meta) + + self._deauthorize(volume_path, auth_id) + + # Remove the auth_id from the metadata *after* removing it + # from ceph, so that if we crashed here, we would actually + # recreate the auth ID during recovery (i.e. end up with + # a consistent state). + + # Filter out the auth we're removing + del vol_meta['auths'][auth_id] + self._volume_metadata_set(volume_path, vol_meta) + + def _deauthorize(self, volume_path, auth_id): + """ + The volume must still exist. + """ + client_entity = "client.{0}".format(auth_id) + path = self._get_path(volume_path) + pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") + try: + namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" + "namespace").decode() + except cephfs.NoData: + namespace = None + + # The auth_id might have read-only or read-write mount access for the + # volume path. + access_levels = ('r', 'rw') + want_mds_caps = ['allow {0} path={1}'.format(access_level, path) + for access_level in access_levels] + if namespace: + want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace) + for access_level in access_levels] + else: + want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name) + for access_level in access_levels] + + + try: + existing = self._rados_command( + 'auth get', + { + 'entity': client_entity + } + ) + + def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps): + mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] + osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] + + for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps): + if want_mds_cap in mds_cap_tokens: + mds_cap_tokens.remove(want_mds_cap) + osd_cap_tokens.remove(want_osd_cap) + break + + return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) + + cap = existing[0] + orig_mds_caps = cap['caps'].get('mds', "") + orig_osd_caps = cap['caps'].get('osd', "") + mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps, + want_mds_caps, want_osd_caps) + + caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False) + if not caps_list: + self._rados_command('auth del', {'entity': client_entity}, decode=False) + else: + self._rados_command( + 'auth caps', + { + 'entity': client_entity, + 'caps': caps_list + }) + + # FIXME: rados raising Error instead of ObjectNotFound in auth get failure + except rados.Error: + # Already gone, great. + return + + def get_authorized_ids(self, volume_path): + """ + Expose a list of auth IDs that have access to a volume. + + return: a list of (auth_id, access_level) tuples, where + the access_level can be 'r' , or 'rw'. + None if no auth ID is given access to the volume. + """ + with self._volume_lock(volume_path): + meta = self._volume_metadata_get(volume_path) + auths = [] + if not meta or not meta['auths']: + return None + + for auth, auth_data in meta['auths'].items(): + # Skip partial auth updates. + if not auth_data['dirty']: + auths.append((auth, auth_data['access_level'])) + + return auths + + def _rados_command(self, prefix, args=None, decode=True): + """ + Safer wrapper for ceph_argparse.json_command, which raises + Error exception instead of relying on caller to check return + codes. + + Error exception can result from: + * Timeout + * Actual legitimate errors + * Malformed JSON output + + return: Decoded object from ceph, or None if empty string returned. + If decode is False, return a string (the data returned by + ceph command) + """ + if args is None: + args = {} + + argdict = args.copy() + argdict['format'] = 'json' + + ret, outbuf, outs = json_command(self.rados, + prefix=prefix, + argdict=argdict, + timeout=RADOS_TIMEOUT) + if ret != 0: + raise rados.Error(outs) + else: + if decode: + if outbuf: + try: + return json.loads(outbuf.decode()) + except (ValueError, TypeError): + raise RadosError("Invalid JSON output for command {0}".format(argdict)) + else: + return None + else: + return outbuf + + def get_used_bytes(self, volume_path): + return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir." + "rbytes").decode()) + + def set_max_bytes(self, volume_path, max_bytes): + self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes', + to_bytes(max_bytes if max_bytes else 0), 0) + + def _snapshot_path(self, dir_path, snapshot_name): + return os.path.join( + dir_path, self.rados.conf_get('client_snapdir'), snapshot_name + ) + + def _snapshot_create(self, dir_path, snapshot_name, mode=0o755): + # TODO: raise intelligible exception for clusters where snaps are disabled + self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode) + + def _snapshot_destroy(self, dir_path, snapshot_name): + """ + Remove a snapshot, or do nothing if it already doesn't exist. + """ + try: + self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name)) + except cephfs.ObjectNotFound: + log.warning("Snapshot was already gone: {0}".format(snapshot_name)) + + def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755): + self._snapshot_create(self._get_path(volume_path), snapshot_name, mode) + + def destroy_snapshot_volume(self, volume_path, snapshot_name): + self._snapshot_destroy(self._get_path(volume_path), snapshot_name) + + def create_snapshot_group(self, group_id, snapshot_name, mode=0o755): + if group_id is None: + raise RuntimeError("Group ID may not be None") + + return self._snapshot_create(self._get_group_path(group_id), snapshot_name, + mode) + + def destroy_snapshot_group(self, group_id, snapshot_name): + if group_id is None: + raise RuntimeError("Group ID may not be None") + if snapshot_name is None: + raise RuntimeError("Snapshot name may not be None") + + return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name) + + def _cp_r(self, src, dst): + # TODO + raise NotImplementedError() + + def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name): + dest_fs_path = self._get_path(dest_volume_path) + src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) + + self._cp_r(src_snapshot_path, dest_fs_path) + + def put_object(self, pool_name, object_name, data): + """ + Synchronously write data to an object. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + :param data: data to write + :type data: bytes + """ + return self.put_object_versioned(pool_name, object_name, data) + + def put_object_versioned(self, pool_name, object_name, data, version=None): + """ + Synchronously write data to an object only if version of the object + version matches the expected version. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + :param data: data to write + :type data: bytes + :param version: expected version of the object to write + :type version: int + """ + ioctx = self.rados.open_ioctx(pool_name) + + max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 + if len(data) > max_size: + msg = ("Data to be written to object '{0}' exceeds " + "{1} bytes".format(object_name, max_size)) + log.error(msg) + raise CephFSVolumeClientError(msg) + + try: + with rados.WriteOpCtx() as wop: + if version is not None: + wop.assert_version(version) + wop.write_full(data) + ioctx.operate_write_op(wop, object_name) + except rados.OSError as e: + log.error(e) + raise e + finally: + ioctx.close() + + def get_object(self, pool_name, object_name): + """ + Synchronously read data from object. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + + :returns: bytes - data read from object + """ + return self.get_object_and_version(pool_name, object_name)[0] + + def get_object_and_version(self, pool_name, object_name): + """ + Synchronously read data from object and get its version. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + + :returns: tuple of object data and version + """ + ioctx = self.rados.open_ioctx(pool_name) + max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 + try: + bytes_read = ioctx.read(object_name, max_size) + if ((len(bytes_read) == max_size) and + (ioctx.read(object_name, 1, offset=max_size))): + log.warning("Size of object {0} exceeds '{1}' bytes " + "read".format(object_name, max_size)) + obj_version = ioctx.get_last_version() + finally: + ioctx.close() + return (bytes_read, obj_version) + + def delete_object(self, pool_name, object_name): + ioctx = self.rados.open_ioctx(pool_name) + try: + ioctx.remove_object(object_name) + except rados.ObjectNotFound: + log.warning("Object '{0}' was already removed".format(object_name)) + finally: + ioctx.close()