]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "*: remove legacy ceph_volume_client.py library" 39014/head
authorKotresh HR <khiremat@redhat.com>
Fri, 22 Jan 2021 07:48:30 +0000 (13:18 +0530)
committerKotresh HR <khiremat@redhat.com>
Fri, 22 Jan 2021 07:55:26 +0000 (13:25 +0530)
This reverts commit a3db265ad5b3ff899d8b71164a7f9ea5a426b19e.
The ceph_volume_client is being used by manila and it's
not fully equipped to use mgr/volumes and expects to use
ceph_volume_client for one more release. Hence reverting
this commit for pacific release.

Note that this is a pacific only patch and hence no corresponding
master patch is available.

Fixes: https://tracker.ceph.com/issues/48923
Signed-off-by: Kotresh HR <khiremat@redhat.com>
29 files changed:
.github/labeler.yml
PendingReleaseNotes
ceph.spec.in
debian/python3-cephfs.install
qa/suites/fs/upgrade/volumes/.qa [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/% [new file with mode: 0644]
qa/suites/fs/upgrade/volumes/import-legacy/.qa [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/bluestore-bitmap.yaml [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/clusters/.qa [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/clusters/1-mds-2-client-micro.yaml [new file with mode: 0644]
qa/suites/fs/upgrade/volumes/import-legacy/conf [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/overrides/+ [new file with mode: 0644]
qa/suites/fs/upgrade/volumes/import-legacy/overrides/.qa [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/overrides/frag_enable.yaml [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/overrides/pg-warn.yaml [new file with mode: 0644]
qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_health.yaml [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_wrongly_marked_down.yaml [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/tasks/% [new file with mode: 0644]
qa/suites/fs/upgrade/volumes/import-legacy/tasks/.qa [new symlink]
qa/suites/fs/upgrade/volumes/import-legacy/tasks/0-nautilus.yaml [new file with mode: 0644]
qa/suites/fs/upgrade/volumes/import-legacy/tasks/1-client.yaml [new file with mode: 0644]
qa/suites/fs/upgrade/volumes/import-legacy/tasks/2-upgrade.yaml [new file with mode: 0644]
qa/suites/fs/upgrade/volumes/import-legacy/tasks/3-verify.yaml [new file with mode: 0644]
qa/suites/fs/upgrade/volumes/import-legacy/ubuntu_18.04.yaml [new symlink]
qa/suites/fs/volumes/tasks/volume-client.yaml [new file with mode: 0644]
qa/tasks/cephfs/test_volume_client.py [new file with mode: 0644]
qa/workunits/fs/upgrade/volume_client [new file with mode: 0755]
src/pybind/CMakeLists.txt
src/pybind/ceph_volume_client.py [new file with mode: 0644]

index 7fef72d0baacb708a0c1c983c73cf2deff8799cd..0821fe9645ea6e84403956275fc3b27e85cc1324 100644 (file)
@@ -164,6 +164,7 @@ cephfs:
   - src/mds/**
   - src/mon/MDSMonitor.*
   - src/mon/FSCommands.*
+  - src/pybind/ceph_volume_client.py
   - src/pybind/cephfs/**
   - src/pybind/mgr/mds_autoscaler/**
   - src/pybind/mgr/status/**
index c981c82942a690974006b7a758e71ae154d0718e..73399197552150d39b61d0923ad2094cb9fd23e0 100644 (file)
 
 * MGR: progress module can now be turned on/off, using the commands:
   ``ceph progress on`` and ``ceph progress off``.
-
-* The ceph_volume_client.py library used for manipulating legacy "volumes" in
-  CephFS is removed. All remaining users should use the "fs volume" interface
-  exposed by the ceph-mgr:
-  https://docs.ceph.com/en/latest/cephfs/fs-volumes/
-
 * An AWS-compliant API: "GetTopicAttributes" was added to replace the existing "GetTopic" API. The new API
   should be used to fetch information about topics used for bucket notifications.
 
index de228df1ef398f001ae8e0c42f791414acf55400..c11b1b76c27d13f727c8a1bc18a73312f8ac6477 100644 (file)
@@ -2202,6 +2202,8 @@ fi
 %files -n python%{python3_pkgversion}-cephfs
 %{python3_sitearch}/cephfs.cpython*.so
 %{python3_sitearch}/cephfs-*.egg-info
+%{python3_sitelib}/ceph_volume_client.py
+%{python3_sitelib}/__pycache__/ceph_volume_client.cpython*.py*
 
 %files -n python%{python3_pkgversion}-ceph-argparse
 %{python3_sitelib}/ceph_argparse.py
index 9ac75f5366b49a23ed64684d857b6c45e30df5f2..6eb8836707f8ba82952ceaeed78bd6d89c6e6b0f 100644 (file)
@@ -1,2 +1,3 @@
+usr/lib/python3*/dist-packages/ceph_volume_client.py
 usr/lib/python3*/dist-packages/cephfs-*.egg-info
 usr/lib/python3*/dist-packages/cephfs.cpython*.so
diff --git a/qa/suites/fs/upgrade/volumes/.qa b/qa/suites/fs/upgrade/volumes/.qa
new file mode 120000 (symlink)
index 0000000..a602a03
--- /dev/null
@@ -0,0 +1 @@
+../.qa/
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/% b/qa/suites/fs/upgrade/volumes/import-legacy/%
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/.qa b/qa/suites/fs/upgrade/volumes/import-legacy/.qa
new file mode 120000 (symlink)
index 0000000..a602a03
--- /dev/null
@@ -0,0 +1 @@
+../.qa/
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/bluestore-bitmap.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/bluestore-bitmap.yaml
new file mode 120000 (symlink)
index 0000000..17ad98e
--- /dev/null
@@ -0,0 +1 @@
+../../../../../cephfs/objectstore-ec/bluestore-bitmap.yaml
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/clusters/.qa b/qa/suites/fs/upgrade/volumes/import-legacy/clusters/.qa
new file mode 120000 (symlink)
index 0000000..a602a03
--- /dev/null
@@ -0,0 +1 @@
+../.qa/
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/clusters/1-mds-2-client-micro.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/clusters/1-mds-2-client-micro.yaml
new file mode 100644 (file)
index 0000000..9b443f7
--- /dev/null
@@ -0,0 +1,7 @@
+roles:
+- [mon.a, mon.b, mon.c, mgr.x, mgr.y, mds.a, mds.b, mds.c, osd.0, osd.1, osd.2, osd.3]
+- [client.0, client.1]
+openstack:
+- volumes: # attached to each instance
+    count: 4
+    size: 10 # GB
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/conf b/qa/suites/fs/upgrade/volumes/import-legacy/conf
new file mode 120000 (symlink)
index 0000000..6d47129
--- /dev/null
@@ -0,0 +1 @@
+.qa/cephfs/conf/
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/+ b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/+
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/.qa b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/.qa
new file mode 120000 (symlink)
index 0000000..a602a03
--- /dev/null
@@ -0,0 +1 @@
+../.qa/
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/frag_enable.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/frag_enable.yaml
new file mode 120000 (symlink)
index 0000000..34a39a3
--- /dev/null
@@ -0,0 +1 @@
+.qa/cephfs/overrides/frag_enable.yaml
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/pg-warn.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/pg-warn.yaml
new file mode 100644 (file)
index 0000000..4ae54a4
--- /dev/null
@@ -0,0 +1,5 @@
+overrides:
+  ceph:
+    conf:
+      global:
+        mon pg warn min per osd: 0
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_health.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_health.yaml
new file mode 120000 (symlink)
index 0000000..74f39a4
--- /dev/null
@@ -0,0 +1 @@
+.qa/cephfs/overrides/whitelist_health.yaml
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_wrongly_marked_down.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/overrides/whitelist_wrongly_marked_down.yaml
new file mode 120000 (symlink)
index 0000000..b4528c0
--- /dev/null
@@ -0,0 +1 @@
+.qa/cephfs/overrides/whitelist_wrongly_marked_down.yaml
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/% b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/%
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/.qa b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/.qa
new file mode 120000 (symlink)
index 0000000..a602a03
--- /dev/null
@@ -0,0 +1 @@
+../.qa/
\ No newline at end of file
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/0-nautilus.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/0-nautilus.yaml
new file mode 100644 (file)
index 0000000..462163e
--- /dev/null
@@ -0,0 +1,39 @@
+meta:
+- desc: |
+   install ceph/nautilus latest
+tasks:
+- install:
+    branch: nautilus #tag: v13.2.8
+    exclude_packages:
+      - librados3
+      - ceph-mgr-dashboard
+      - ceph-mgr-diskprediction-local
+      - ceph-mgr-rook
+      - ceph-mgr-cephadm
+      - cephadm
+    extra_packages: ['librados2']
+- print: "**** done installing nautilus"
+- ceph:
+    log-ignorelist:
+      - overall HEALTH_
+      - \(FS_
+      - \(MDS_
+      - \(OSD_
+      - \(MON_DOWN\)
+      - \(CACHE_POOL_
+      - \(POOL_
+      - \(MGR_DOWN\)
+      - \(PG_
+      - \(SMALLER_PGP_NUM\)
+      - Monitor daemon marked osd
+      - Behind on trimming
+      - Manager daemon
+    conf:
+      global:
+        mon warn on pool no app: false
+        ms bind msgr2: false
+- exec:
+    osd.0:
+      - ceph osd require-osd-release nautilus
+      - ceph osd set-require-min-compat-client nautilus
+- print: "**** done ceph"
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/1-client.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/1-client.yaml
new file mode 100644 (file)
index 0000000..8273107
--- /dev/null
@@ -0,0 +1,33 @@
+tasks:
+- workunit:
+    clients:
+      client.0:
+      - fs/upgrade/volume_client
+    env:
+      ACTION: create
+- print: "**** fs/volume_client create"
+- ceph-fuse:
+    client.0:
+      mount_path: /volumes/_nogroup/vol_isolated
+      mountpoint: mnt.0
+      auth_id: vol_data_isolated
+    client.1:
+      mount_path: /volumes/_nogroup/vol_default
+      mountpoint: mnt.1
+      auth_id: vol_default
+- print: "**** ceph-fuse vol_isolated"
+- workunit:
+    clients:
+      client.0:
+      - fs/upgrade/volume_client
+    env:
+      ACTION: populate
+    cleanup: false
+- workunit:
+    clients:
+      client.1:
+      - fs/upgrade/volume_client
+    env:
+      ACTION: populate
+    cleanup: false
+- print: "**** fs/volume_client populate"
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/2-upgrade.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/2-upgrade.yaml
new file mode 100644 (file)
index 0000000..488dbf7
--- /dev/null
@@ -0,0 +1,47 @@
+overrides:
+  ceph:
+    log-ignorelist:
+    - scrub mismatch
+    - ScrubResult
+    - wrongly marked
+    - \(POOL_APP_NOT_ENABLED\)
+    - \(SLOW_OPS\)
+    - overall HEALTH_
+    - \(MON_MSGR2_NOT_ENABLED\)
+    - slow request
+    conf:
+      global:
+        bluestore warn on legacy statfs: false
+        bluestore warn on no per pool omap: false
+      mon:
+        mon warn on osd down out interval zero: false
+
+tasks:
+- mds_pre_upgrade:
+- print: "**** done mds pre-upgrade sequence"
+- install.upgrade:
+    mon.a:
+- print: "**** done install.upgrade both hosts"
+- ceph.restart:
+    daemons: [mon.*, mgr.*]
+    mon-health-to-clog: false
+    wait-for-healthy: false
+- ceph.healthy:
+- ceph.restart:
+    daemons: [osd.*]
+    wait-for-healthy: false
+    wait-for-osds-up: true
+- ceph.stop: [mds.*]
+- ceph.restart:
+    daemons: [mds.*]
+    wait-for-healthy: false
+    wait-for-osds-up: true
+- exec:
+    mon.a:
+    - ceph versions
+    - ceph osd dump -f json-pretty
+    - ceph osd require-osd-release octopus
+    - for f in `ceph osd pool ls` ; do ceph osd pool set $f pg_autoscale_mode off ; done
+    #- ceph osd set-require-min-compat-client nautilus
+- ceph.healthy:
+- print: "**** done ceph.restart"
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/tasks/3-verify.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/tasks/3-verify.yaml
new file mode 100644 (file)
index 0000000..e14b483
--- /dev/null
@@ -0,0 +1,25 @@
+overrides:
+  ceph:
+    log-ignorelist:
+      - missing required features
+tasks:
+- exec:
+    mon.a:
+      - ceph fs dump --format=json-pretty
+      - ceph fs volume ls
+      - ceph fs subvolume ls cephfs
+- workunit:
+   clients:
+     client.0:
+     - fs/upgrade/volume_client
+   env:
+     ACTION: verify
+   cleanup: false
+- workunit:
+   clients:
+     client.1:
+     - fs/upgrade/volume_client
+   env:
+     ACTION: verify
+   cleanup: false
+- print: "**** fs/volume_client verify"
diff --git a/qa/suites/fs/upgrade/volumes/import-legacy/ubuntu_18.04.yaml b/qa/suites/fs/upgrade/volumes/import-legacy/ubuntu_18.04.yaml
new file mode 120000 (symlink)
index 0000000..cfb85f1
--- /dev/null
@@ -0,0 +1 @@
+.qa/distros/all/ubuntu_18.04.yaml
\ No newline at end of file
diff --git a/qa/suites/fs/volumes/tasks/volume-client.yaml b/qa/suites/fs/volumes/tasks/volume-client.yaml
new file mode 100644 (file)
index 0000000..04ee276
--- /dev/null
@@ -0,0 +1,9 @@
+overrides:
+  ceph:
+    log-ignorelist:
+      - MON_DOWN
+tasks:
+  - cephfs_test_runner:
+      fail_on_skip: false
+      modules:
+        - tasks.cephfs.test_volume_client
diff --git a/qa/tasks/cephfs/test_volume_client.py b/qa/tasks/cephfs/test_volume_client.py
new file mode 100644 (file)
index 0000000..f889421
--- /dev/null
@@ -0,0 +1,1299 @@
+import json
+import logging
+import os
+from textwrap import dedent
+from tasks.cephfs.cephfs_test_case import CephFSTestCase
+from tasks.cephfs.fuse_mount import FuseMount
+from teuthology.exceptions import CommandFailedError
+
+log = logging.getLogger(__name__)
+
+
+class TestVolumeClient(CephFSTestCase):
+    # One for looking at the global filesystem, one for being
+    # the VolumeClient, two for mounting the created shares
+    CLIENTS_REQUIRED = 4
+
+    def setUp(self):
+        CephFSTestCase.setUp(self)
+
+    def _volume_client_python(self, client, script, vol_prefix=None, ns_prefix=None):
+        # Can't dedent this *and* the script we pass in, because they might have different
+        # levels of indentation to begin with, so leave this string zero-indented
+        if vol_prefix:
+            vol_prefix = "\"" + vol_prefix + "\""
+        if ns_prefix:
+            ns_prefix = "\"" + ns_prefix + "\""
+        return client.run_python("""
+from __future__ import print_function
+from ceph_volume_client import CephFSVolumeClient, VolumePath
+from sys import version_info as sys_version_info
+from rados import OSError as rados_OSError
+import logging
+log = logging.getLogger("ceph_volume_client")
+log.addHandler(logging.StreamHandler())
+log.setLevel(logging.DEBUG)
+vc = CephFSVolumeClient("manila", "{conf_path}", "ceph", {vol_prefix}, {ns_prefix})
+vc.connect()
+{payload}
+vc.disconnect()
+        """.format(payload=script, conf_path=client.config_path,
+                   vol_prefix=vol_prefix, ns_prefix=ns_prefix))
+
+    def _configure_vc_auth(self, mount, id_name):
+        """
+        Set up auth credentials for the VolumeClient user
+        """
+        out = self.fs.mon_manager.raw_cluster_cmd(
+            "auth", "get-or-create", "client.{name}".format(name=id_name),
+            "mds", "allow *",
+            "osd", "allow rw",
+            "mon", "allow *"
+        )
+        mount.client_id = id_name
+        mount.client_remote.write_file(mount.get_keyring_path(),
+                                       out, sudo=True)
+        self.set_conf("client.{name}".format(name=id_name), "keyring", mount.get_keyring_path())
+
+    def _configure_guest_auth(self, volumeclient_mount, guest_mount,
+                              guest_entity, cephfs_mntpt,
+                              namespace_prefix=None, readonly=False,
+                              tenant_id=None, allow_existing_id=False):
+        """
+        Set up auth credentials for the guest client to mount a volume.
+
+        :param volumeclient_mount: mount used as the handle for driving
+                                   volumeclient.
+        :param guest_mount: mount used by the guest client.
+        :param guest_entity: auth ID used by the guest client.
+        :param cephfs_mntpt: path of the volume.
+        :param namespace_prefix: name prefix of the RADOS namespace, which
+                                 is used for the volume's layout.
+        :param readonly: defaults to False. If set to 'True' only read-only
+                         mount access is granted to the guest.
+        :param tenant_id: (OpenStack) tenant ID of the guest client.
+        """
+
+        head, volume_id = os.path.split(cephfs_mntpt)
+        head, group_id = os.path.split(head)
+        head, volume_prefix = os.path.split(head)
+        volume_prefix = "/" + volume_prefix
+
+        # Authorize the guest client's auth ID to mount the volume.
+        key = self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            auth_result = vc.authorize(vp, "{guest_entity}", readonly={readonly},
+                                       tenant_id="{tenant_id}",
+                                       allow_existing_id="{allow_existing_id}")
+            print(auth_result['auth_key'])
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            guest_entity=guest_entity,
+            readonly=readonly,
+            tenant_id=tenant_id,
+            allow_existing_id=allow_existing_id)), volume_prefix, namespace_prefix
+        )
+
+        # CephFSVolumeClient's authorize() does not return the secret
+        # key to a caller who isn't multi-tenant aware. Explicitly
+        # query the key for such a client.
+        if not tenant_id:
+            key = self.fs.mon_manager.raw_cluster_cmd(
+            "auth", "get-key", "client.{name}".format(name=guest_entity),
+            )
+
+        # The guest auth ID should exist.
+        existing_ids = [a['entity'] for a in self.auth_list()]
+        self.assertIn("client.{0}".format(guest_entity), existing_ids)
+
+        # Create keyring file for the guest client.
+        keyring_txt = dedent("""
+        [client.{guest_entity}]
+            key = {key}
+
+        """.format(
+            guest_entity=guest_entity,
+            key=key
+        ))
+        guest_mount.client_id = guest_entity
+        guest_mount.client_remote.write_file(guest_mount.get_keyring_path(),
+                                             keyring_txt, sudo=True)
+
+        # Add a guest client section to the ceph config file.
+        self.set_conf("client.{0}".format(guest_entity), "client quota", "True")
+        self.set_conf("client.{0}".format(guest_entity), "debug client", "20")
+        self.set_conf("client.{0}".format(guest_entity), "debug objecter", "20")
+        self.set_conf("client.{0}".format(guest_entity),
+                      "keyring", guest_mount.get_keyring_path())
+
+    def test_default_prefix(self):
+        group_id = "grpid"
+        volume_id = "volid"
+        DEFAULT_VOL_PREFIX = "volumes"
+        DEFAULT_NS_PREFIX = "fsvolumens_"
+
+        self.mount_b.umount_wait()
+        self._configure_vc_auth(self.mount_b, "manila")
+
+        #create a volume with default prefix
+        self._volume_client_python(self.mount_b, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.create_volume(vp, 10, data_isolated=True)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+        # The dir should be created
+        self.mount_a.stat(os.path.join(DEFAULT_VOL_PREFIX, group_id, volume_id))
+
+        #namespace should be set
+        ns_in_attr = self.mount_a.getfattr(os.path.join(DEFAULT_VOL_PREFIX, group_id, volume_id), "ceph.dir.layout.pool_namespace")
+        namespace = "{0}{1}".format(DEFAULT_NS_PREFIX, volume_id)
+        self.assertEqual(namespace, ns_in_attr)
+
+
+    def test_lifecycle(self):
+        """
+        General smoke test for create, extend, destroy
+        """
+
+        # I'm going to use mount_c later as a guest for mounting the created
+        # shares
+        self.mounts[2].umount_wait()
+
+        # I'm going to leave mount_b unmounted and just use it as a handle for
+        # driving volumeclient.  It's a little hacky but we don't have a more
+        # general concept for librados/libcephfs clients as opposed to full
+        # blown mounting clients.
+        self.mount_b.umount_wait()
+        self._configure_vc_auth(self.mount_b, "manila")
+
+        guest_entity = "guest"
+        group_id = "grpid"
+        volume_id = "volid"
+
+        volume_prefix = "/myprefix"
+        namespace_prefix = "mynsprefix_"
+
+        # Create a 100MB volume
+        volume_size = 100
+        cephfs_mntpt = self._volume_client_python(self.mount_b, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            create_result = vc.create_volume(vp, 1024*1024*{volume_size})
+            print(create_result['mount_path'])
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            volume_size=volume_size
+        )), volume_prefix, namespace_prefix)
+
+        # The dir should be created
+        self.mount_a.stat(os.path.join("myprefix", group_id, volume_id))
+
+        # Authorize and configure credentials for the guest to mount the
+        # the volume.
+        self._configure_guest_auth(self.mount_b, self.mounts[2], guest_entity,
+                                   cephfs_mntpt, namespace_prefix)
+        self.mounts[2].mount_wait(cephfs_mntpt=cephfs_mntpt)
+
+        # The kernel client doesn't have the quota-based df behaviour,
+        # or quotas at all, so only exercise the client behaviour when
+        # running fuse.
+        if isinstance(self.mounts[2], FuseMount):
+            # df should see volume size, same as the quota set on volume's dir
+            self.assertEqual(self.mounts[2].df()['total'],
+                             volume_size * 1024 * 1024)
+            self.assertEqual(
+                    self.mount_a.getfattr(
+                        os.path.join(volume_prefix.strip("/"), group_id, volume_id),
+                        "ceph.quota.max_bytes"),
+                    "%s" % (volume_size * 1024 * 1024))
+
+            # df granularity is 4MB block so have to write at least that much
+            data_bin_mb = 4
+            self.mounts[2].write_n_mb("data.bin", data_bin_mb)
+
+            # Write something outside volume to check this space usage is
+            # not reported in the volume's DF.
+            other_bin_mb = 8
+            self.mount_a.write_n_mb("other.bin", other_bin_mb)
+
+            # global: df should see all the writes (data + other).  This is a >
+            # rather than a == because the global spaced used includes all pools
+            def check_df():
+                used = self.mount_a.df()['used']
+                return used >= (other_bin_mb * 1024 * 1024)
+
+            self.wait_until_true(check_df, timeout=30)
+
+            # Hack: do a metadata IO to kick rstats
+            self.mounts[2].run_shell(["touch", "foo"])
+
+            # volume: df should see the data_bin_mb consumed from quota, same
+            # as the rbytes for the volume's dir
+            self.wait_until_equal(
+                    lambda: self.mounts[2].df()['used'],
+                    data_bin_mb * 1024 * 1024, timeout=60)
+            self.wait_until_equal(
+                    lambda: self.mount_a.getfattr(
+                        os.path.join(volume_prefix.strip("/"), group_id, volume_id),
+                        "ceph.dir.rbytes"),
+                    "%s" % (data_bin_mb * 1024 * 1024), timeout=60)
+
+            # sync so that file data are persist to rados
+            self.mounts[2].run_shell(["sync"])
+
+            # Our data should stay in particular rados namespace
+            pool_name = self.mount_a.getfattr(os.path.join("myprefix", group_id, volume_id), "ceph.dir.layout.pool")
+            namespace = "{0}{1}".format(namespace_prefix, volume_id)
+            ns_in_attr = self.mount_a.getfattr(os.path.join("myprefix", group_id, volume_id), "ceph.dir.layout.pool_namespace")
+            self.assertEqual(namespace, ns_in_attr)
+
+            objects_in_ns = set(self.fs.rados(["ls"], pool=pool_name, namespace=namespace).split("\n"))
+            self.assertNotEqual(objects_in_ns, set())
+
+            # De-authorize the guest
+            self._volume_client_python(self.mount_b, dedent("""
+                vp = VolumePath("{group_id}", "{volume_id}")
+                vc.deauthorize(vp, "{guest_entity}")
+                vc.evict("{guest_entity}")
+            """.format(
+                group_id=group_id,
+                volume_id=volume_id,
+                guest_entity=guest_entity
+            )), volume_prefix, namespace_prefix)
+
+            # Once deauthorized, the client should be unable to do any more metadata ops
+            # The way that the client currently behaves here is to block (it acts like
+            # it has lost network, because there is nothing to tell it that is messages
+            # are being dropped because it's identity is gone)
+            background = self.mounts[2].write_n_mb("rogue.bin", 1, wait=False)
+            try:
+                background.wait()
+            except CommandFailedError:
+                # command failed with EBLOCKLISTED?
+                if "transport endpoint shutdown" in background.stderr.getvalue():
+                    pass
+                else:
+                    raise
+
+            # After deauthorisation, the client ID should be gone (this was the only
+            # volume it was authorised for)
+            self.assertNotIn("client.{0}".format(guest_entity), [e['entity'] for e in self.auth_list()])
+
+            # Clean up the dead mount (ceph-fuse's behaviour here is a bit undefined)
+            self.mounts[2].umount_wait()
+
+        self._volume_client_python(self.mount_b, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.delete_volume(vp)
+            vc.purge_volume(vp)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )), volume_prefix, namespace_prefix)
+
+    def test_idempotency(self):
+        """
+        That the volumeclient interface works when calling everything twice
+        """
+        self.mount_b.umount_wait()
+        self._configure_vc_auth(self.mount_b, "manila")
+
+        guest_entity = "guest"
+        group_id = "grpid"
+        volume_id = "volid"
+        self._volume_client_python(self.mount_b, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.create_volume(vp, 10)
+            vc.create_volume(vp, 10)
+            vc.authorize(vp, "{guest_entity}")
+            vc.authorize(vp, "{guest_entity}")
+            vc.deauthorize(vp, "{guest_entity}")
+            vc.deauthorize(vp, "{guest_entity}")
+            vc.delete_volume(vp)
+            vc.delete_volume(vp)
+            vc.purge_volume(vp)
+            vc.purge_volume(vp)
+
+            vc.create_volume(vp, 10, data_isolated=True)
+            vc.create_volume(vp, 10, data_isolated=True)
+            vc.authorize(vp, "{guest_entity}")
+            vc.authorize(vp, "{guest_entity}")
+            vc.deauthorize(vp, "{guest_entity}")
+            vc.deauthorize(vp, "{guest_entity}")
+            vc.evict("{guest_entity}")
+            vc.evict("{guest_entity}")
+            vc.delete_volume(vp, data_isolated=True)
+            vc.delete_volume(vp, data_isolated=True)
+            vc.purge_volume(vp, data_isolated=True)
+            vc.purge_volume(vp, data_isolated=True)
+
+            vc.create_volume(vp, 10, namespace_isolated=False)
+            vc.create_volume(vp, 10, namespace_isolated=False)
+            vc.authorize(vp, "{guest_entity}")
+            vc.authorize(vp, "{guest_entity}")
+            vc.deauthorize(vp, "{guest_entity}")
+            vc.deauthorize(vp, "{guest_entity}")
+            vc.evict("{guest_entity}")
+            vc.evict("{guest_entity}")
+            vc.delete_volume(vp)
+            vc.delete_volume(vp)
+            vc.purge_volume(vp)
+            vc.purge_volume(vp)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            guest_entity=guest_entity
+        )))
+
+    def test_data_isolated(self):
+        """
+        That data isolated shares get their own pool
+        :return:
+        """
+
+        self.mount_b.umount_wait()
+        self._configure_vc_auth(self.mount_b, "manila")
+
+        pools_a = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
+
+        group_id = "grpid"
+        volume_id = "volid"
+        self._volume_client_python(self.mount_b, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.create_volume(vp, data_isolated=True)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+        pools_b = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
+
+        # Should have created one new pool
+        new_pools = set(p['pool_name'] for p in pools_b) - set([p['pool_name'] for p in pools_a])
+        self.assertEqual(len(new_pools), 1)
+
+    def test_15303(self):
+        """
+        Reproducer for #15303 "Client holds incorrect complete flag on dir
+        after losing caps" (http://tracker.ceph.com/issues/15303)
+        """
+        for m in self.mounts:
+            m.umount_wait()
+
+        # Create a dir on mount A
+        self.mount_a.mount_wait()
+        self.mount_a.run_shell(["mkdir", "parent1"])
+        self.mount_a.run_shell(["mkdir", "parent2"])
+        self.mount_a.run_shell(["mkdir", "parent1/mydir"])
+
+        # Put some files in it from mount B
+        self.mount_b.mount_wait()
+        self.mount_b.run_shell(["touch", "parent1/mydir/afile"])
+        self.mount_b.umount_wait()
+
+        # List the dir's contents on mount A
+        self.assertListEqual(self.mount_a.ls("parent1/mydir"),
+                             ["afile"])
+
+    def test_evict_client(self):
+        """
+        That a volume client can be evicted based on its auth ID and the volume
+        path it has mounted.
+        """
+
+        if not isinstance(self.mount_a, FuseMount):
+            self.skipTest("Requires FUSE client to inject client metadata")
+
+        # mounts[1] would be used as handle for driving VolumeClient. mounts[2]
+        # and mounts[3] would be used as guests to mount the volumes/shares.
+
+        for i in range(1, 4):
+            self.mounts[i].umount_wait()
+
+        volumeclient_mount = self.mounts[1]
+        self._configure_vc_auth(volumeclient_mount, "manila")
+        guest_mounts = (self.mounts[2], self.mounts[3])
+
+        guest_entity = "guest"
+        group_id = "grpid"
+        cephfs_mntpts = []
+        volume_ids = []
+
+        # Create two volumes. Authorize 'guest' auth ID to mount the two
+        # volumes. Mount the two volumes. Write data to the volumes.
+        for i in range(2):
+            # Create volume.
+            volume_ids.append("volid_{0}".format(str(i)))
+            cephfs_mntpts.append(
+                self._volume_client_python(volumeclient_mount, dedent("""
+                    vp = VolumePath("{group_id}", "{volume_id}")
+                    create_result = vc.create_volume(vp, 10 * 1024 * 1024)
+                    print(create_result['mount_path'])
+                """.format(
+                    group_id=group_id,
+                    volume_id=volume_ids[i]
+            ))))
+
+            # Authorize 'guest' auth ID to mount the volume.
+            self._configure_guest_auth(volumeclient_mount, guest_mounts[i],
+                                       guest_entity, cephfs_mntpts[i])
+
+            # Mount the volume.
+            guest_mounts[i].mountpoint_dir_name = 'mnt.{id}.{suffix}'.format(
+                id=guest_entity, suffix=str(i))
+            guest_mounts[i].mount_wait(cephfs_mntpt=cephfs_mntpts[i])
+            guest_mounts[i].write_n_mb("data.bin", 1)
+
+
+        # Evict client, guest_mounts[0], using auth ID 'guest' and has mounted
+        # one volume.
+        self._volume_client_python(self.mount_b, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.deauthorize(vp, "{guest_entity}")
+            vc.evict("{guest_entity}", volume_path=vp)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_ids[0],
+            guest_entity=guest_entity
+        )))
+
+        # Evicted guest client, guest_mounts[0], should not be able to do
+        # anymore metadata ops.  It should start failing all operations
+        # when it sees that its own address is in the blocklist.
+        try:
+            guest_mounts[0].write_n_mb("rogue.bin", 1)
+        except CommandFailedError:
+            pass
+        else:
+            raise RuntimeError("post-eviction write should have failed!")
+
+        # The blocklisted guest client should now be unmountable
+        guest_mounts[0].umount_wait()
+
+        # Guest client, guest_mounts[1], using the same auth ID 'guest', but
+        # has mounted the other volume, should be able to use its volume
+        # unaffected.
+        guest_mounts[1].write_n_mb("data.bin.1", 1)
+
+        # Cleanup.
+        for i in range(2):
+            self._volume_client_python(volumeclient_mount, dedent("""
+                vp = VolumePath("{group_id}", "{volume_id}")
+                vc.deauthorize(vp, "{guest_entity}")
+                vc.delete_volume(vp)
+                vc.purge_volume(vp)
+            """.format(
+                group_id=group_id,
+                volume_id=volume_ids[i],
+                guest_entity=guest_entity
+            )))
+
+
+    def test_purge(self):
+        """
+        Reproducer for #15266, exception trying to purge volumes that
+        contain non-ascii filenames.
+
+        Additionally test any other purge corner cases here.
+        """
+        # I'm going to leave mount_b unmounted and just use it as a handle for
+        # driving volumeclient.  It's a little hacky but we don't have a more
+        # general concept for librados/libcephfs clients as opposed to full
+        # blown mounting clients.
+        self.mount_b.umount_wait()
+        self._configure_vc_auth(self.mount_b, "manila")
+
+        group_id = "grpid"
+        # Use a unicode volume ID (like Manila), to reproduce #15266
+        volume_id = u"volid"
+
+        # Create
+        cephfs_mntpt = self._volume_client_python(self.mount_b, dedent("""
+            vp = VolumePath("{group_id}", u"{volume_id}")
+            create_result = vc.create_volume(vp, 10)
+            print(create_result['mount_path'])
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id
+        )))
+
+        # Strip leading "/"
+        cephfs_mntpt = cephfs_mntpt[1:]
+
+        # A file with non-ascii characters
+        self.mount_a.run_shell(["touch", os.path.join(cephfs_mntpt, u"b\u00F6b")])
+
+        # A file with no permissions to do anything
+        self.mount_a.run_shell(["touch", os.path.join(cephfs_mntpt, "noperms")])
+        self.mount_a.run_shell(["chmod", "0000", os.path.join(cephfs_mntpt, "noperms")])
+
+        self._volume_client_python(self.mount_b, dedent("""
+            vp = VolumePath("{group_id}", u"{volume_id}")
+            vc.delete_volume(vp)
+            vc.purge_volume(vp)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id
+        )))
+
+        # Check it's really gone
+        self.assertEqual(self.mount_a.ls("volumes/_deleting"), [])
+        self.assertEqual(self.mount_a.ls("volumes/"), ["_deleting", group_id])
+
+    def test_readonly_authorization(self):
+        """
+        That guest clients can be restricted to read-only mounts of volumes.
+        """
+
+        volumeclient_mount = self.mounts[1]
+        guest_mount = self.mounts[2]
+        volumeclient_mount.umount_wait()
+        guest_mount.umount_wait()
+
+        # Configure volumeclient_mount as the handle for driving volumeclient.
+        self._configure_vc_auth(volumeclient_mount, "manila")
+
+        guest_entity = "guest"
+        group_id = "grpid"
+        volume_id = "volid"
+
+        # Create a volume.
+        cephfs_mntpt = self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            create_result = vc.create_volume(vp, 1024*1024*10)
+            print(create_result['mount_path'])
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+        # Authorize and configure credentials for the guest to mount the
+        # the volume with read-write access.
+        self._configure_guest_auth(volumeclient_mount, guest_mount,
+                                   guest_entity, cephfs_mntpt, readonly=False)
+
+        # Mount the volume, and write to it.
+        guest_mount.mount_wait(cephfs_mntpt=cephfs_mntpt)
+        guest_mount.write_n_mb("data.bin", 1)
+
+        # Change the guest auth ID's authorization to read-only mount access.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.deauthorize(vp, "{guest_entity}")
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            guest_entity=guest_entity
+        )))
+        self._configure_guest_auth(volumeclient_mount, guest_mount, guest_entity,
+                                   cephfs_mntpt, readonly=True)
+
+        # The effect of the change in access level to read-only is not
+        # immediate. The guest sees the change only after a remount of
+        # the volume.
+        guest_mount.umount_wait()
+        guest_mount.mount_wait(cephfs_mntpt=cephfs_mntpt)
+
+        # Read existing content of the volume.
+        self.assertListEqual(guest_mount.ls(guest_mount.mountpoint), ["data.bin"])
+        # Cannot write into read-only volume.
+        try:
+            guest_mount.write_n_mb("rogue.bin", 1)
+        except CommandFailedError:
+            pass
+
+    def test_get_authorized_ids(self):
+        """
+        That for a volume, the authorized IDs and their access levels
+        can be obtained using CephFSVolumeClient's get_authorized_ids().
+        """
+        volumeclient_mount = self.mounts[1]
+        volumeclient_mount.umount_wait()
+
+        # Configure volumeclient_mount as the handle for driving volumeclient.
+        self._configure_vc_auth(volumeclient_mount, "manila")
+
+        group_id = "grpid"
+        volume_id = "volid"
+        guest_entity_1 = "guest1"
+        guest_entity_2 = "guest2"
+
+        log.info("print(group ID: {0})".format(group_id))
+
+        # Create a volume.
+        auths = self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.create_volume(vp, 1024*1024*10)
+            auths = vc.get_authorized_ids(vp)
+            print(auths)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+        # Check the list of authorized IDs for the volume.
+        self.assertEqual('None', auths)
+
+        # Allow two auth IDs access to the volume.
+        auths = self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.authorize(vp, "{guest_entity_1}", readonly=False)
+            vc.authorize(vp, "{guest_entity_2}", readonly=True)
+            auths = vc.get_authorized_ids(vp)
+            print(auths)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            guest_entity_1=guest_entity_1,
+            guest_entity_2=guest_entity_2,
+        )))
+        # Check the list of authorized IDs and their access levels.
+        expected_result = [('guest1', 'rw'), ('guest2', 'r')]
+        self.assertCountEqual(str(expected_result), auths)
+
+        # Disallow both the auth IDs' access to the volume.
+        auths = self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.deauthorize(vp, "{guest_entity_1}")
+            vc.deauthorize(vp, "{guest_entity_2}")
+            auths = vc.get_authorized_ids(vp)
+            print(auths)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            guest_entity_1=guest_entity_1,
+            guest_entity_2=guest_entity_2,
+        )))
+        # Check the list of authorized IDs for the volume.
+        self.assertEqual('None', auths)
+
+    def test_multitenant_volumes(self):
+        """
+        That volume access can be restricted to a tenant.
+
+        That metadata used to enforce tenant isolation of
+        volumes is stored as a two-way mapping between auth
+        IDs and volumes that they're authorized to access.
+        """
+        volumeclient_mount = self.mounts[1]
+        volumeclient_mount.umount_wait()
+
+        # Configure volumeclient_mount as the handle for driving volumeclient.
+        self._configure_vc_auth(volumeclient_mount, "manila")
+
+        group_id = "groupid"
+        volume_id = "volumeid"
+
+        # Guest clients belonging to different tenants, but using the same
+        # auth ID.
+        auth_id = "guest"
+        guestclient_1 = {
+            "auth_id": auth_id,
+            "tenant_id": "tenant1",
+        }
+        guestclient_2 = {
+            "auth_id": auth_id,
+            "tenant_id": "tenant2",
+        }
+
+        # Create a volume.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.create_volume(vp, 1024*1024*10)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+        # Check that volume metadata file is created on volume creation.
+        vol_metadata_filename = "_{0}:{1}.meta".format(group_id, volume_id)
+        self.assertIn(vol_metadata_filename, self.mounts[0].ls("volumes"))
+
+        # Authorize 'guestclient_1', using auth ID 'guest' and belonging to
+        # 'tenant1', with 'rw' access to the volume.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}")
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            auth_id=guestclient_1["auth_id"],
+            tenant_id=guestclient_1["tenant_id"]
+        )))
+
+        # Check that auth metadata file for auth ID 'guest', is
+        # created on authorizing 'guest' access to the volume.
+        auth_metadata_filename = "${0}.meta".format(guestclient_1["auth_id"])
+        self.assertIn(auth_metadata_filename, self.mounts[0].ls("volumes"))
+
+        # Verify that the auth metadata file stores the tenant ID that the
+        # auth ID belongs to, the auth ID's authorized access levels
+        # for different volumes, versioning details, etc.
+        expected_auth_metadata = {
+            "version": 2,
+            "compat_version": 1,
+            "dirty": False,
+            "tenant_id": "tenant1",
+            "volumes": {
+                "groupid/volumeid": {
+                    "dirty": False,
+                    "access_level": "rw"
+                }
+            }
+        }
+
+        auth_metadata = self._volume_client_python(volumeclient_mount, dedent("""
+            import json
+            vp = VolumePath("{group_id}", "{volume_id}")
+            auth_metadata = vc._auth_metadata_get("{auth_id}")
+            print(json.dumps(auth_metadata))
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            auth_id=guestclient_1["auth_id"],
+        )))
+        auth_metadata = json.loads(auth_metadata)
+
+        self.assertGreaterEqual(auth_metadata["version"], expected_auth_metadata["version"])
+        del expected_auth_metadata["version"]
+        del auth_metadata["version"]
+        self.assertEqual(expected_auth_metadata, auth_metadata)
+
+        # Verify that the volume metadata file stores info about auth IDs
+        # and their access levels to the volume, versioning details, etc.
+        expected_vol_metadata = {
+            "version": 2,
+            "compat_version": 1,
+            "auths": {
+                "guest": {
+                    "dirty": False,
+                    "access_level": "rw"
+                }
+            }
+        }
+
+        vol_metadata = self._volume_client_python(volumeclient_mount, dedent("""
+            import json
+            vp = VolumePath("{group_id}", "{volume_id}")
+            volume_metadata = vc._volume_metadata_get(vp)
+            print(json.dumps(volume_metadata))
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+        vol_metadata = json.loads(vol_metadata)
+
+        self.assertGreaterEqual(vol_metadata["version"], expected_vol_metadata["version"])
+        del expected_vol_metadata["version"]
+        del vol_metadata["version"]
+        self.assertEqual(expected_vol_metadata, vol_metadata)
+
+        # Cannot authorize 'guestclient_2' to access the volume.
+        # It uses auth ID 'guest', which has already been used by a
+        # 'guestclient_1' belonging to an another tenant for accessing
+        # the volume.
+        with self.assertRaises(CommandFailedError):
+            self._volume_client_python(volumeclient_mount, dedent("""
+                vp = VolumePath("{group_id}", "{volume_id}")
+                vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}")
+            """.format(
+                group_id=group_id,
+                volume_id=volume_id,
+                auth_id=guestclient_2["auth_id"],
+                tenant_id=guestclient_2["tenant_id"]
+            )))
+
+        # Check that auth metadata file is cleaned up on removing
+        # auth ID's only access to a volume.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.deauthorize(vp, "{guest_entity}")
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            guest_entity=guestclient_1["auth_id"]
+        )))
+
+        self.assertNotIn(auth_metadata_filename, self.mounts[0].ls("volumes"))
+
+        # Check that volume metadata file is cleaned up on volume deletion.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.delete_volume(vp)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+        self.assertNotIn(vol_metadata_filename, self.mounts[0].ls("volumes"))
+
+    def test_authorize_auth_id_not_created_by_ceph_volume_client(self):
+        """
+        If the auth_id already exists and is not created by
+        ceph_volume_client, it's not allowed to authorize
+        the auth-id by default.
+        """
+        volumeclient_mount = self.mounts[1]
+        volumeclient_mount.umount_wait()
+
+        # Configure volumeclient_mount as the handle for driving volumeclient.
+        self._configure_vc_auth(volumeclient_mount, "manila")
+
+        group_id = "groupid"
+        volume_id = "volumeid"
+
+        # Create auth_id
+        self.fs.mon_manager.raw_cluster_cmd(
+            "auth", "get-or-create", "client.guest1",
+            "mds", "allow *",
+            "osd", "allow rw",
+            "mon", "allow *"
+        )
+
+        auth_id = "guest1"
+        guestclient_1 = {
+            "auth_id": auth_id,
+            "tenant_id": "tenant1",
+        }
+
+        # Create a volume.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.create_volume(vp, 1024*1024*10)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+        # Cannot authorize 'guestclient_1' to access the volume.
+        # It uses auth ID 'guest1', which already exists and not
+        # created by ceph_volume_client
+        with self.assertRaises(CommandFailedError):
+            self._volume_client_python(volumeclient_mount, dedent("""
+                vp = VolumePath("{group_id}", "{volume_id}")
+                vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}")
+            """.format(
+                group_id=group_id,
+                volume_id=volume_id,
+                auth_id=guestclient_1["auth_id"],
+                tenant_id=guestclient_1["tenant_id"]
+            )))
+
+        # Delete volume
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.delete_volume(vp)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+    def test_authorize_allow_existing_id_option(self):
+        """
+        If the auth_id already exists and is not created by
+        ceph_volume_client, it's not allowed to authorize
+        the auth-id by default but is allowed with option
+        allow_existing_id.
+        """
+        volumeclient_mount = self.mounts[1]
+        volumeclient_mount.umount_wait()
+
+        # Configure volumeclient_mount as the handle for driving volumeclient.
+        self._configure_vc_auth(volumeclient_mount, "manila")
+
+        group_id = "groupid"
+        volume_id = "volumeid"
+
+        # Create auth_id
+        self.fs.mon_manager.raw_cluster_cmd(
+            "auth", "get-or-create", "client.guest1",
+            "mds", "allow *",
+            "osd", "allow rw",
+            "mon", "allow *"
+        )
+
+        auth_id = "guest1"
+        guestclient_1 = {
+            "auth_id": auth_id,
+            "tenant_id": "tenant1",
+        }
+
+        # Create a volume.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.create_volume(vp, 1024*1024*10)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+        # Cannot authorize 'guestclient_1' to access the volume
+        # by default, which already exists and not created by
+        # ceph_volume_client but is allowed with option 'allow_existing_id'.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}",
+                         allow_existing_id="{allow_existing_id}")
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            auth_id=guestclient_1["auth_id"],
+            tenant_id=guestclient_1["tenant_id"],
+            allow_existing_id=True
+        )))
+
+        # Delete volume
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.delete_volume(vp)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+    def test_deauthorize_auth_id_after_out_of_band_update(self):
+        """
+        If the auth_id authorized by ceph_volume_client is updated
+        out of band, the auth_id should not be deleted after a
+        deauthorize. It should only remove caps associated it.
+        """
+        volumeclient_mount = self.mounts[1]
+        volumeclient_mount.umount_wait()
+
+        # Configure volumeclient_mount as the handle for driving volumeclient.
+        self._configure_vc_auth(volumeclient_mount, "manila")
+
+        group_id = "groupid"
+        volume_id = "volumeid"
+
+
+        auth_id = "guest1"
+        guestclient_1 = {
+            "auth_id": auth_id,
+            "tenant_id": "tenant1",
+        }
+
+        # Create a volume.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.create_volume(vp, 1024*1024*10)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+        # Authorize 'guestclient_1' to access the volume.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}")
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            auth_id=guestclient_1["auth_id"],
+            tenant_id=guestclient_1["tenant_id"]
+        )))
+
+        # Update caps for guestclient_1 out of band
+        out = self.fs.mon_manager.raw_cluster_cmd(
+            "auth", "caps", "client.guest1",
+            "mds", "allow rw path=/volumes/groupid, allow rw path=/volumes/groupid/volumeid",
+            "osd", "allow rw pool=cephfs_data namespace=fsvolumens_volumeid",
+            "mon", "allow r",
+            "mgr", "allow *"
+        )
+
+        # Deauthorize guestclient_1
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.deauthorize(vp, "{guest_entity}")
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            guest_entity=guestclient_1["auth_id"]
+        )))
+
+        # Validate the caps of guestclient_1 after deauthorize. It should not have deleted
+        # guestclient_1. The mgr and mds caps should be present which was updated out of band.
+        out = json.loads(self.fs.mon_manager.raw_cluster_cmd("auth", "get", "client.guest1", "--format=json-pretty"))
+
+        self.assertEqual("client.guest1", out[0]["entity"])
+        self.assertEqual("allow rw path=/volumes/groupid", out[0]["caps"]["mds"])
+        self.assertEqual("allow *", out[0]["caps"]["mgr"])
+        self.assertNotIn("osd", out[0]["caps"])
+
+        # Delete volume
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.delete_volume(vp)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+    def test_recover_metadata(self):
+        """
+        That volume client can recover from partial auth updates using
+        metadata files, which store auth info and its update status info.
+        """
+        volumeclient_mount = self.mounts[1]
+        volumeclient_mount.umount_wait()
+
+        # Configure volumeclient_mount as the handle for driving volumeclient.
+        self._configure_vc_auth(volumeclient_mount, "manila")
+
+        group_id = "groupid"
+        volume_id = "volumeid"
+
+        guestclient = {
+            "auth_id": "guest",
+            "tenant_id": "tenant",
+        }
+
+        # Create a volume.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.create_volume(vp, 1024*1024*10)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )))
+
+        # Authorize 'guestclient' access to the volume.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}")
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            auth_id=guestclient["auth_id"],
+            tenant_id=guestclient["tenant_id"]
+        )))
+
+        # Check that auth metadata file for auth ID 'guest' is created.
+        auth_metadata_filename = "${0}.meta".format(guestclient["auth_id"])
+        self.assertIn(auth_metadata_filename, self.mounts[0].ls("volumes"))
+
+        # Induce partial auth update state by modifying the auth metadata file,
+        # and then run recovery procedure.
+        self._volume_client_python(volumeclient_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            auth_metadata = vc._auth_metadata_get("{auth_id}")
+            auth_metadata['dirty'] = True
+            vc._auth_metadata_set("{auth_id}", auth_metadata)
+            vc.recover()
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+            auth_id=guestclient["auth_id"],
+        )))
+
+    def test_put_object(self):
+        vc_mount = self.mounts[1]
+        vc_mount.umount_wait()
+        self._configure_vc_auth(vc_mount, "manila")
+
+        obj_data = 'test data'
+        obj_name = 'test_vc_obj_1'
+        pool_name = self.fs.get_data_pool_names()[0]
+
+        self._volume_client_python(vc_mount, dedent("""
+            vc.put_object("{pool_name}", "{obj_name}", b"{obj_data}")
+        """.format(
+            pool_name = pool_name,
+            obj_name = obj_name,
+            obj_data = obj_data
+        )))
+
+        read_data = self.fs.rados(['get', obj_name, '-'], pool=pool_name)
+        self.assertEqual(obj_data, read_data)
+
+    def test_get_object(self):
+        vc_mount = self.mounts[1]
+        vc_mount.umount_wait()
+        self._configure_vc_auth(vc_mount, "manila")
+
+        obj_data = 'test_data'
+        obj_name = 'test_vc_ob_2'
+        pool_name = self.fs.get_data_pool_names()[0]
+
+        self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data)
+
+        self._volume_client_python(vc_mount, dedent("""
+            data_read = vc.get_object("{pool_name}", "{obj_name}")
+            assert data_read == b"{obj_data}"
+        """.format(
+            pool_name = pool_name,
+            obj_name = obj_name,
+            obj_data = obj_data
+        )))
+
+    def test_put_object_versioned(self):
+        vc_mount = self.mounts[1]
+        vc_mount.umount_wait()
+        self._configure_vc_auth(vc_mount, "manila")
+
+        obj_data = 'test_data'
+        obj_name = 'test_vc_obj'
+        pool_name = self.fs.get_data_pool_names()[0]
+        self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data)
+
+        self._volume_client_python(vc_mount, dedent("""
+            data, version_before = vc.get_object_and_version("{pool_name}", "{obj_name}")
+
+            if sys_version_info.major < 3:
+                data = data + 'modification1'
+            elif sys_version_info.major > 3:
+                data = str.encode(data.decode() + 'modification1')
+
+            vc.put_object_versioned("{pool_name}", "{obj_name}", data, version_before)
+            data, version_after = vc.get_object_and_version("{pool_name}", "{obj_name}")
+            assert version_after == version_before + 1
+        """).format(pool_name=pool_name, obj_name=obj_name))
+
+    def test_version_check_for_put_object_versioned(self):
+        vc_mount = self.mounts[1]
+        vc_mount.umount_wait()
+        self._configure_vc_auth(vc_mount, "manila")
+
+        obj_data = 'test_data'
+        obj_name = 'test_vc_ob_2'
+        pool_name = self.fs.get_data_pool_names()[0]
+        self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data)
+
+        # Test if put_object_versioned() crosschecks the version of the
+        # given object. Being a negative test, an exception is expected.
+        expected_exception = 'rados_OSError'
+        output = self._volume_client_python(vc_mount, dedent("""
+            data, version = vc.get_object_and_version("{pool_name}", "{obj_name}")
+
+            if sys_version_info.major < 3:
+                data = data + 'm1'
+            elif sys_version_info.major > 3:
+                data = str.encode(data.decode('utf-8') + 'm1')
+
+            vc.put_object("{pool_name}", "{obj_name}", data)
+
+            if sys_version_info.major < 3:
+                data = data + 'm2'
+            elif sys_version_info.major > 3:
+                data = str.encode(data.decode('utf-8') + 'm2')
+
+            try:
+                vc.put_object_versioned("{pool_name}", "{obj_name}", data, version)
+            except {expected_exception}:
+                print('{expected_exception} raised')
+        """).format(pool_name=pool_name, obj_name=obj_name,
+                    expected_exception=expected_exception))
+        self.assertEqual(expected_exception + ' raised', output)
+
+
+    def test_delete_object(self):
+        vc_mount = self.mounts[1]
+        vc_mount.umount_wait()
+        self._configure_vc_auth(vc_mount, "manila")
+
+        obj_data = 'test data'
+        obj_name = 'test_vc_obj_3'
+        pool_name = self.fs.get_data_pool_names()[0]
+
+        self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data)
+
+        self._volume_client_python(vc_mount, dedent("""
+            data_read = vc.delete_object("{pool_name}", "{obj_name}")
+        """.format(
+            pool_name = pool_name,
+            obj_name = obj_name,
+        )))
+
+        with self.assertRaises(CommandFailedError):
+            self.fs.rados(['stat', obj_name], pool=pool_name)
+
+        # Check idempotency -- no error raised trying to delete non-existent
+        # object
+        self._volume_client_python(vc_mount, dedent("""
+            data_read = vc.delete_object("{pool_name}", "{obj_name}")
+        """.format(
+            pool_name = pool_name,
+            obj_name = obj_name,
+        )))
+
+    def test_21501(self):
+        """
+        Reproducer for #21501 "ceph_volume_client: sets invalid caps for
+        existing IDs with no caps" (http://tracker.ceph.com/issues/21501)
+        """
+
+        vc_mount = self.mounts[1]
+        vc_mount.umount_wait()
+
+        # Configure vc_mount as the handle for driving volumeclient
+        self._configure_vc_auth(vc_mount, "manila")
+
+        # Create a volume
+        group_id = "grpid"
+        volume_id = "volid"
+        cephfs_mntpt = self._volume_client_python(vc_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            create_result = vc.create_volume(vp, 1024*1024*10)
+            print(create_result['mount_path'])
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id
+        )))
+
+        # Create an auth ID with no caps
+        guest_id = '21501'
+        self.fs.mon_manager.raw_cluster_cmd_result(
+            'auth', 'get-or-create', 'client.{0}'.format(guest_id))
+
+        guest_mount = self.mounts[2]
+        guest_mount.umount_wait()
+# Set auth caps for the auth ID using the volumeclient
+        self._configure_guest_auth(vc_mount, guest_mount, guest_id, cephfs_mntpt,
+                                   allow_existing_id=True)
+
+        # Mount the volume in the guest using the auth ID to assert that the
+        # auth caps are valid
+        guest_mount.mount_wait(cephfs_mntpt=cephfs_mntpt)
+
+    def test_volume_without_namespace_isolation(self):
+        """
+        That volume client can create volumes that do not have separate RADOS
+        namespace layouts.
+        """
+        vc_mount = self.mounts[1]
+        vc_mount.umount_wait()
+
+        # Configure vc_mount as the handle for driving volumeclient
+        self._configure_vc_auth(vc_mount, "manila")
+
+        # Create a volume
+        volume_prefix = "/myprefix"
+        group_id = "grpid"
+        volume_id = "volid"
+        self._volume_client_python(vc_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            create_result = vc.create_volume(vp, 1024*1024*10, namespace_isolated=False)
+            print(create_result['mount_path'])
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id
+        )), volume_prefix)
+
+        # The CephFS volume should be created
+        self.mounts[0].stat(os.path.join("myprefix", group_id, volume_id))
+        vol_namespace = self.mounts[0].getfattr(
+            os.path.join("myprefix", group_id, volume_id),
+            "ceph.dir.layout.pool_namespace")
+        assert not vol_namespace
+
+        self._volume_client_python(vc_mount, dedent("""
+            vp = VolumePath("{group_id}", "{volume_id}")
+            vc.delete_volume(vp)
+            vc.purge_volume(vp)
+        """.format(
+            group_id=group_id,
+            volume_id=volume_id,
+        )), volume_prefix)
diff --git a/qa/workunits/fs/upgrade/volume_client b/qa/workunits/fs/upgrade/volume_client
new file mode 100755 (executable)
index 0000000..b3b6dd3
--- /dev/null
@@ -0,0 +1,110 @@
+#!/bin/bash
+
+set -ex
+
+PYTHON="python3"
+
+function run_payload {
+    local payload="$1"
+    sudo "$PYTHON" <<EOF
+from __future__ import print_function
+from ceph_volume_client import CephFSVolumeClient, VolumePath
+from sys import version_info as sys_version_info
+from rados import OSError as rados_OSError
+import logging
+log = logging.getLogger("ceph_volume_client")
+log.addHandler(logging.StreamHandler())
+log.setLevel(logging.DEBUG)
+vc = CephFSVolumeClient("manila", "/etc/ceph/ceph.conf", "ceph")
+vc.connect()
+${payload}
+vc.disconnect()
+EOF
+}
+
+function import_key {
+  local client="$1"
+  if [ -n "$2" ]; then
+    local keyring="$2"
+  else
+    local keyring="/etc/ceph/ceph.client.${client}.keyring"
+  fi
+  local T=$(mktemp)
+  tee "$T" >&2
+  sudo touch -- "$keyring"
+  sudo ceph-authtool "$keyring" --import-keyring "$T"
+  rm -f -- "$T"
+}
+
+function conf_keys {
+  local client="$1"
+  ls /etc/ceph >&2
+  ceph auth get-or-create "client.manila" mds 'allow *' osd 'allow rw' mon 'allow *' | import_key "$client" /etc/ceph/ceph.keyring
+}
+
+function create_data_isolated {
+  local PAYLOAD='
+vp = VolumePath(None, "vol_isolated")
+vc.create_volume(vp, (1<<33), data_isolated=True)
+auth_result = vc.authorize(vp, "vol_data_isolated", tenant_id="test")
+print("[client.vol_data_isolated]\n\tkey = ", auth_result["auth_key"])
+'
+
+  run_payload "$PAYLOAD" | import_key "vol_data_isolated"
+}
+
+function create_default {
+  local PAYLOAD='
+vp = VolumePath(None, "vol_default")
+vc.create_volume(vp, (1<<33))
+auth_result = vc.authorize(vp, "vol_default", tenant_id="test")
+print("[client.vol_default]\n\tkey = ", auth_result["auth_key"])
+'
+  run_payload "$PAYLOAD" | import_key "vol_default"
+}
+
+function create {
+  create_data_isolated
+  create_default
+}
+
+function populate {
+  pwd
+  df -h .
+  ls -l
+  cp -a /usr/bin .
+}
+
+function verify_data_isolated {
+  ceph fs subvolume getpath cephfs vol_isolated
+  stat bin
+  ls bin | tail
+}
+
+function verify_default {
+  ceph fs subvolume getpath cephfs vol_default
+  stat bin
+  ls bin | tail
+}
+
+function verify {
+  diff <(ceph fs subvolume ls cephfs | jq -cS 'sort_by(.name)' | tee /dev/stderr) <(printf '[{"name":"vol_isolated"},{"name":"vol_default"}]' | jq -cS 'sort_by(.name)')
+  verify_data_isolated
+  verify_default
+}
+
+function main {
+  if [ "$1" = create ]; then
+    conf_keys
+    create
+  elif [ "$1" = populate ]; then
+    populate
+  elif [ "$1" = verify ]; then
+    # verify (sub)volumes still exist and are configured correctly
+    verify
+  else
+    exit 1
+  fi
+}
+
+main "$ACTION"
index 4ff35011058ee739b740c5b90079737660ec88c6..86fcc6becade7d3f2766bd0cf9b330b1882f7d5d 100644 (file)
@@ -39,6 +39,7 @@ execute_process(
 install(FILES
   ${CMAKE_CURRENT_SOURCE_DIR}/ceph_argparse.py
   ${CMAKE_CURRENT_SOURCE_DIR}/ceph_daemon.py
+  ${CMAKE_CURRENT_SOURCE_DIR}/ceph_volume_client.py
   DESTINATION ${PYTHON3_INSTDIR})
 
 if(WITH_MGR)
diff --git a/src/pybind/ceph_volume_client.py b/src/pybind/ceph_volume_client.py
new file mode 100644 (file)
index 0000000..b748f5d
--- /dev/null
@@ -0,0 +1,1551 @@
+"""
+Copyright (C) 2015 Red Hat, Inc.
+
+LGPL-2.1 or LGPL-3.0.  See file COPYING.
+"""
+
+from contextlib import contextmanager
+import errno
+import fcntl
+import json
+import logging
+import os
+import re
+import struct
+import sys
+import threading
+import time
+import uuid
+
+from ceph_argparse import json_command
+
+import cephfs
+import rados
+
+def to_bytes(param):
+    '''
+    Helper method that returns byte representation of the given parameter.
+    '''
+    if isinstance(param, str):
+        return param.encode('utf-8')
+    elif param is None:
+        return param
+    else:
+        return str(param).encode('utf-8')
+
+class RadosError(Exception):
+    """
+    Something went wrong talking to Ceph with librados
+    """
+    pass
+
+
+RADOS_TIMEOUT = 10
+
+log = logging.getLogger(__name__)
+
+# Reserved volume group name which we use in paths for volumes
+# that are not assigned to a group (i.e. created with group=None)
+NO_GROUP_NAME = "_nogroup"
+
+# Filename extensions for meta files.
+META_FILE_EXT = ".meta"
+
+class VolumePath(object):
+    """
+    Identify a volume's path as group->volume
+    The Volume ID is a unique identifier, but this is a much more
+    helpful thing to pass around.
+    """
+    def __init__(self, group_id, volume_id):
+        self.group_id = group_id
+        self.volume_id = volume_id
+        assert self.group_id != NO_GROUP_NAME
+        assert self.volume_id != "" and self.volume_id is not None
+
+    def __str__(self):
+        return "{0}/{1}".format(self.group_id, self.volume_id)
+
+
+class ClusterTimeout(Exception):
+    """
+    Exception indicating that we timed out trying to talk to the Ceph cluster,
+    either to the mons, or to any individual daemon that the mons indicate ought
+    to be up but isn't responding to us.
+    """
+    pass
+
+
+class ClusterError(Exception):
+    """
+    Exception indicating that the cluster returned an error to a command that
+    we thought should be successful based on our last knowledge of the cluster
+    state.
+    """
+    def __init__(self, action, result_code, result_str):
+        self._action = action
+        self._result_code = result_code
+        self._result_str = result_str
+
+    def __str__(self):
+        return "Error {0} (\"{1}\") while {2}".format(
+            self._result_code, self._result_str, self._action)
+
+
+class RankEvicter(threading.Thread):
+    """
+    Thread for evicting client(s) from a particular MDS daemon instance.
+
+    This is more complex than simply sending a command, because we have to
+    handle cases where MDS daemons might not be fully up yet, and/or might
+    be transiently unresponsive to commands.
+    """
+    class GidGone(Exception):
+        pass
+
+    POLL_PERIOD = 5
+
+    def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
+        """
+        :param client_spec: list of strings, used as filter arguments to "session evict"
+                            pass ["id=123"] to evict a single client with session id 123.
+        """
+        self.rank = rank
+        self.gid = gid
+        self._mds_map = mds_map
+        self._client_spec = client_spec
+        self._volume_client = volume_client
+        self._ready_timeout = ready_timeout
+        self._ready_waited = 0
+
+        self.success = False
+        self.exception = None
+
+        super(RankEvicter, self).__init__()
+
+    def _ready_to_evict(self):
+        if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
+            log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
+                self._client_spec, self.rank, self.gid
+            ))
+            raise RankEvicter.GidGone()
+
+        info = self._mds_map['info']["gid_{0}".format(self.gid)]
+        log.debug("_ready_to_evict: state={0}".format(info['state']))
+        return info['state'] in ["up:active", "up:clientreplay"]
+
+    def _wait_for_ready(self):
+        """
+        Wait for that MDS rank to reach an active or clientreplay state, and
+        not be laggy.
+        """
+        while not self._ready_to_evict():
+            if self._ready_waited > self._ready_timeout:
+                raise ClusterTimeout()
+
+            time.sleep(self.POLL_PERIOD)
+            self._ready_waited += self.POLL_PERIOD
+
+            self._mds_map = self._volume_client.get_mds_map()
+
+    def _evict(self):
+        """
+        Run the eviction procedure.  Return true on success, false on errors.
+        """
+
+        # Wait til the MDS is believed by the mon to be available for commands
+        try:
+            self._wait_for_ready()
+        except self.GidGone:
+            return True
+
+        # Then send it an evict
+        ret = errno.ETIMEDOUT
+        while ret == errno.ETIMEDOUT:
+            log.debug("mds_command: {0}, {1}".format(
+                "%s" % self.gid, ["session", "evict"] + self._client_spec
+            ))
+            ret, outb, outs = self._volume_client.fs.mds_command(
+                "%s" % self.gid,
+                [json.dumps({
+                                "prefix": "session evict",
+                                "filters": self._client_spec
+                })], "")
+            log.debug("mds_command: complete {0} {1}".format(ret, outs))
+
+            # If we get a clean response, great, it's gone from that rank.
+            if ret == 0:
+                return True
+            elif ret == errno.ETIMEDOUT:
+                # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
+                self._mds_map = self._volume_client.get_mds_map()
+                try:
+                    self._wait_for_ready()
+                except self.GidGone:
+                    return True
+            else:
+                raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
+
+    def run(self):
+        try:
+            self._evict()
+        except Exception as e:
+            self.success = False
+            self.exception = e
+        else:
+            self.success = True
+
+
+class EvictionError(Exception):
+    pass
+
+
+class CephFSVolumeClientError(Exception):
+    """
+    Something went wrong talking to Ceph using CephFSVolumeClient.
+    """
+    pass
+
+
+CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
+
+    CephFSVolumeClient Version History:
+
+    * 1 - Initial version
+    * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
+    * 3 - Allow volumes to be created without RADOS namespace isolation
+    * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
+    * 5 - Disallow authorize API for users not created by CephFSVolumeClient
+"""
+
+
+class CephFSVolumeClient(object):
+    """
+    Combine libcephfs and librados interfaces to implement a
+    'Volume' concept implemented as a cephfs directory and
+    client capabilities which restrict mount access to this
+    directory.
+
+    Additionally, volumes may be in a 'Group'.  Conveniently,
+    volumes are a lot like manila shares, and groups are a lot
+    like manila consistency groups.
+
+    Refer to volumes with VolumePath, which specifies the
+    volume and group IDs (both strings).  The group ID may
+    be None.
+
+    In general, functions in this class are allowed raise rados.Error
+    or cephfs.Error exceptions in unexpected situations.
+    """
+
+    # Current version
+    version = 5
+
+    # Where shall we create our volumes?
+    POOL_PREFIX = "fsvolume_"
+    DEFAULT_VOL_PREFIX = "/volumes"
+    DEFAULT_NS_PREFIX = "fsvolumens_"
+
+    def __init__(self, auth_id=None, conf_path=None, cluster_name=None,
+                 volume_prefix=None, pool_ns_prefix=None, rados=None,
+                 fs_name=None):
+        """
+        Either set all three of ``auth_id``, ``conf_path`` and
+        ``cluster_name`` (rados constructed on connect), or
+        set ``rados`` (existing rados instance).
+        """
+        self.fs = None
+        self.fs_name = fs_name
+        self.connected = False
+
+        self.conf_path = conf_path
+        self.cluster_name = cluster_name
+        self.auth_id = auth_id
+
+        self.rados = rados
+        if self.rados:
+            # Using an externally owned rados, so we won't tear it down
+            # on disconnect
+            self.own_rados = False
+        else:
+            # self.rados will be constructed in connect
+            self.own_rados = True
+
+        self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
+        self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
+        # For flock'ing in cephfs, I want a unique ID to distinguish me
+        # from any other manila-share services that are loading this module.
+        # We could use pid, but that's unnecessary weak: generate a
+        # UUID
+        self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
+
+        # TODO: version the on-disk structures
+
+    def recover(self):
+        # Scan all auth keys to see if they're dirty: if they are, they have
+        # state that might not have propagated to Ceph or to the related
+        # volumes yet.
+
+        # Important: we *always* acquire locks in the order auth->volume
+        # That means a volume can never be dirty without the auth key
+        # we're updating it with being dirty at the same time.
+
+        # First list the auth IDs that have potentially dirty on-disk metadata
+        log.debug("Recovering from partial auth updates (if any)...")
+
+        try:
+            dir_handle = self.fs.opendir(self.volume_prefix)
+        except cephfs.ObjectNotFound:
+            log.debug("Nothing to recover. No auth meta files.")
+            return
+
+        d = self.fs.readdir(dir_handle)
+        auth_ids = []
+
+        if not d:
+            log.debug("Nothing to recover. No auth meta files.")
+
+        while d:
+            # Identify auth IDs from auth meta filenames. The auth meta files
+            # are named as, "$<auth_id><meta filename extension>"
+            regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
+            match = re.search(regex, d.d_name.decode(encoding='utf-8'))
+            if match:
+                auth_ids.append(match.group(1))
+
+            d = self.fs.readdir(dir_handle)
+
+        self.fs.closedir(dir_handle)
+
+        # Key points based on ordering:
+        # * Anything added in VMeta is already added in AMeta
+        # * Anything added in Ceph is already added in VMeta
+        # * Anything removed in VMeta is already removed in Ceph
+        # * Anything removed in AMeta is already removed in VMeta
+
+        # Deauthorization: because I only update metadata AFTER the
+        # update of the next level down, I have the same ordering of
+        # -> things which exist in the AMeta should also exist
+        #    in the VMeta, should also exist in Ceph, and the same
+        #    recovery procedure that gets me consistent after crashes
+        #    during authorization will also work during deauthorization
+
+        # Now for each auth ID, check for dirty flag and apply updates
+        # if dirty flag is found
+        for auth_id in auth_ids:
+            with self._auth_lock(auth_id):
+                auth_meta = self._auth_metadata_get(auth_id)
+                if not auth_meta or not auth_meta['volumes']:
+                    # Clean up auth meta file
+                    self.fs.unlink(self._auth_metadata_path(auth_id))
+                    continue
+                if not auth_meta['dirty']:
+                    continue
+                self._recover_auth_meta(auth_id, auth_meta)
+
+        log.debug("Recovered from partial auth updates (if any).")
+
+    def _recover_auth_meta(self, auth_id, auth_meta):
+        """
+        Call me after locking the auth meta file.
+        """
+        remove_volumes = []
+
+        for volume, volume_data in auth_meta['volumes'].items():
+            if not volume_data['dirty']:
+                continue
+
+            (group_id, volume_id) = volume.split('/')
+            group_id = group_id if group_id != 'None' else None
+            volume_path = VolumePath(group_id, volume_id)
+            access_level = volume_data['access_level']
+
+            with self._volume_lock(volume_path):
+                vol_meta = self._volume_metadata_get(volume_path)
+
+                # No VMeta update indicates that there was no auth update
+                # in Ceph either. So it's safe to remove corresponding
+                # partial update in AMeta.
+                if not vol_meta or auth_id not in vol_meta['auths']:
+                    remove_volumes.append(volume)
+                    continue
+
+                want_auth = {
+                    'access_level': access_level,
+                    'dirty': False,
+                }
+                # VMeta update looks clean. Ceph auth update must have been
+                # clean.
+                if vol_meta['auths'][auth_id] == want_auth:
+                    continue
+
+                readonly = access_level == 'r'
+                client_entity = "client.{0}".format(auth_id)
+                try:
+                    existing_caps = self._rados_command(
+                        'auth get',
+                        {
+                            'entity': client_entity
+                        }
+                    )
+                    # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
+                except rados.Error:
+                    existing_caps = None
+                self._authorize_volume(volume_path, auth_id, readonly, existing_caps)
+
+            # Recovered from partial auth updates for the auth ID's access
+            # to a volume.
+            auth_meta['volumes'][volume]['dirty'] = False
+            self._auth_metadata_set(auth_id, auth_meta)
+
+        for volume in remove_volumes:
+            del auth_meta['volumes'][volume]
+
+        if not auth_meta['volumes']:
+            # Clean up auth meta file
+            self.fs.unlink(self._auth_metadata_path(auth_id))
+            return
+
+        # Recovered from all partial auth updates for the auth ID.
+        auth_meta['dirty'] = False
+        self._auth_metadata_set(auth_id, auth_meta)
+
+    def get_mds_map(self):
+        fs_map = self._rados_command("fs dump", {})
+        return fs_map['filesystems'][0]['mdsmap']
+
+    def evict(self, auth_id, timeout=30, volume_path=None):
+        """
+        Evict all clients based on the authorization ID and optionally based on
+        the volume path mounted.  Assumes that the authorization key has been
+        revoked prior to calling this function.
+
+        This operation can throw an exception if the mon cluster is unresponsive, or
+        any individual MDS daemon is unresponsive for longer than the timeout passed in.
+        """
+
+        client_spec = ["auth_name={0}".format(auth_id), ]
+        if volume_path:
+            client_spec.append("client_metadata.root={0}".
+                               format(self._get_path(volume_path)))
+
+        log.info("evict clients with {0}".format(', '.join(client_spec)))
+
+        mds_map = self.get_mds_map()
+        up = {}
+        for name, gid in mds_map['up'].items():
+            # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
+            assert name.startswith("mds_")
+            up[int(name[4:])] = gid
+
+        # For all MDS ranks held by a daemon
+        # Do the parallelism in python instead of using "tell mds.*", because
+        # the latter doesn't give us per-mds output
+        threads = []
+        for rank, gid in up.items():
+            thread = RankEvicter(self, client_spec, rank, gid, mds_map,
+                                 timeout)
+            thread.start()
+            threads.append(thread)
+
+        for t in threads:
+            t.join()
+
+        log.info("evict: joined all")
+
+        for t in threads:
+            if not t.success:
+                msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
+                       format(', '.join(client_spec), t.rank, t.gid, t.exception)
+                      )
+                log.error(msg)
+                raise EvictionError(msg)
+
+    def _get_path(self, volume_path):
+        """
+        Determine the path within CephFS where this volume will live
+        :return: absolute path (string)
+        """
+        return os.path.join(
+            self.volume_prefix,
+            volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
+            volume_path.volume_id)
+
+    def _get_group_path(self, group_id):
+        if group_id is None:
+            raise ValueError("group_id may not be None")
+
+        return os.path.join(
+            self.volume_prefix,
+            group_id
+        )
+
+    def _connect(self, premount_evict):
+        log.debug("Connecting to cephfs...")
+        self.fs = cephfs.LibCephFS(rados_inst=self.rados)
+        log.debug("CephFS initializing...")
+        self.fs.init()
+        if premount_evict is not None:
+            log.debug("Premount eviction of {0} starting".format(premount_evict))
+            self.evict(premount_evict)
+            log.debug("Premount eviction of {0} completes".format(premount_evict))
+        log.debug("CephFS mounting...")
+        self.fs.mount(filesystem_name=to_bytes(self.fs_name))
+        log.debug("Connection to cephfs complete")
+
+        # Recover from partial auth updates due to a previous
+        # crash.
+        self.recover()
+
+    def connect(self, premount_evict = None):
+        """
+
+        :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
+                               may want to use this to specify their own auth ID if they expect
+                               to be a unique instance and don't want to wait for caps to time
+                               out after failure of another instance of themselves.
+        """
+        if self.own_rados:
+            log.debug("Configuring to RADOS with config {0}...".format(self.conf_path))
+            self.rados = rados.Rados(
+                name="client.{0}".format(self.auth_id),
+                clustername=self.cluster_name,
+                conffile=self.conf_path,
+                conf={}
+            )
+            if self.rados.state != "connected":
+                log.debug("Connecting to RADOS...")
+                self.rados.connect()
+                log.debug("Connection to RADOS complete")
+        self._connect(premount_evict)
+
+    def get_mon_addrs(self):
+        log.info("get_mon_addrs")
+        result = []
+        mon_map = self._rados_command("mon dump")
+        for mon in mon_map['mons']:
+            ip_port = mon['addr'].split("/")[0]
+            result.append(ip_port)
+
+        return result
+
+    def disconnect(self):
+        log.info("disconnect")
+        if self.fs:
+            log.debug("Disconnecting cephfs...")
+            self.fs.shutdown()
+            self.fs = None
+            log.debug("Disconnecting cephfs complete")
+
+        if self.rados and self.own_rados:
+            log.debug("Disconnecting rados...")
+            self.rados.shutdown()
+            self.rados = None
+            log.debug("Disconnecting rados complete")
+
+    def __enter__(self):
+        self.connect()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.disconnect()
+
+    def __del__(self):
+        self.disconnect()
+
+    def _get_pool_id(self, osd_map, pool_name):
+        # Maybe borrow the OSDMap wrapper class from calamari if more helpers
+        # like this are needed.
+        for pool in osd_map['pools']:
+            if pool['pool_name'] == pool_name:
+                return pool['pool']
+
+        return None
+
+    def _create_volume_pool(self, pool_name):
+        """
+        Idempotently create a pool for use as a CephFS data pool, with the given name
+
+        :return The ID of the created pool
+        """
+        osd_map = self._rados_command('osd dump', {})
+
+        existing_id = self._get_pool_id(osd_map, pool_name)
+        if existing_id is not None:
+            log.info("Pool {0} already exists".format(pool_name))
+            return existing_id
+
+        self._rados_command(
+            'osd pool create',
+            {
+                'pool': pool_name,
+            }
+        )
+
+        osd_map = self._rados_command('osd dump', {})
+        pool_id = self._get_pool_id(osd_map, pool_name)
+
+        if pool_id is None:
+            # If the pool isn't there, that's either a ceph bug, or it's some outside influence
+            # removing it right after we created it.
+            log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
+                pool_name, json.dumps(osd_map, indent=2)
+            ))
+            raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
+        else:
+            return pool_id
+
+    def create_group(self, group_id, mode=0o755):
+        # Prevent craftily-named volume groups from colliding with the meta
+        # files.
+        if group_id.endswith(META_FILE_EXT):
+            raise ValueError("group ID cannot end with '{0}'.".format(
+                META_FILE_EXT))
+        path = self._get_group_path(group_id)
+        self._mkdir_p(path, mode)
+
+    def destroy_group(self, group_id):
+        path = self._get_group_path(group_id)
+        try:
+            self.fs.stat(self.volume_prefix)
+        except cephfs.ObjectNotFound:
+            pass
+        else:
+            self.fs.rmdir(path)
+
+    def _mkdir_p(self, path, mode=0o755):
+        try:
+            self.fs.stat(path)
+        except cephfs.ObjectNotFound:
+            pass
+        else:
+            return
+
+        parts = path.split(os.path.sep)
+
+        for i in range(1, len(parts) + 1):
+            subpath = os.path.join(*parts[0:i])
+            try:
+                self.fs.stat(subpath)
+            except cephfs.ObjectNotFound:
+                self.fs.mkdir(subpath, mode)
+
+    def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True,
+                      mode=0o755):
+        """
+        Set up metadata, pools and auth for a volume.
+
+        This function is idempotent.  It is safe to call this again
+        for an already-created volume, even if it is in use.
+
+        :param volume_path: VolumePath instance
+        :param size: In bytes, or None for no size limit
+        :param data_isolated: If true, create a separate OSD pool for this volume
+        :param namespace_isolated: If true, use separate RADOS namespace for this volume
+        :return:
+        """
+        path = self._get_path(volume_path)
+        log.info("create_volume: {0}".format(path))
+
+        self._mkdir_p(path, mode)
+
+        if size is not None:
+            self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0)
+
+        # data_isolated means create a separate pool for this volume
+        if data_isolated:
+            pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
+            log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
+            pool_id = self._create_volume_pool(pool_name)
+            mds_map = self.get_mds_map()
+            if pool_id not in mds_map['data_pools']:
+                self._rados_command("fs add_data_pool", {
+                    'fs_name': mds_map['fs_name'],
+                    'pool': pool_name
+                })
+            time.sleep(5) # time for MDSMap to be distributed
+            self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0)
+
+        # enforce security isolation, use separate namespace for this volume
+        if namespace_isolated:
+            namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
+            log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
+            self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace',
+                             to_bytes(namespace), 0)
+        else:
+            # If volume's namespace layout is not set, then the volume's pool
+            # layout remains unset and will undesirably change with ancestor's
+            # pool layout changes.
+            pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
+            self.fs.setxattr(path, 'ceph.dir.layout.pool',
+                             to_bytes(pool_name), 0)
+
+        # Create a volume meta file, if it does not already exist, to store
+        # data about auth ids having access to the volume
+        fd = self.fs.open(self._volume_metadata_path(volume_path),
+                          os.O_CREAT, 0o755)
+        self.fs.close(fd)
+
+        return {
+            'mount_path': path
+        }
+
+    def delete_volume(self, volume_path, data_isolated=False):
+        """
+        Make a volume inaccessible to guests.  This function is
+        idempotent.  This is the fast part of tearing down a volume: you must
+        also later call purge_volume, which is the slow part.
+
+        :param volume_path: Same identifier used in create_volume
+        :return:
+        """
+
+        path = self._get_path(volume_path)
+        log.info("delete_volume: {0}".format(path))
+
+        # Create the trash folder if it doesn't already exist
+        trash = os.path.join(self.volume_prefix, "_deleting")
+        self._mkdir_p(trash)
+
+        # We'll move it to here
+        trashed_volume = os.path.join(trash, volume_path.volume_id)
+
+        # Move the volume's data to the trash folder
+        try:
+            self.fs.stat(path)
+        except cephfs.ObjectNotFound:
+            log.warning("Trying to delete volume '{0}' but it's already gone".format(
+                path))
+        else:
+            self.fs.rename(path, trashed_volume)
+
+        # Delete the volume meta file, if it's not already deleted
+        vol_meta_path = self._volume_metadata_path(volume_path)
+        try:
+            self.fs.unlink(vol_meta_path)
+        except cephfs.ObjectNotFound:
+            pass
+
+    def purge_volume(self, volume_path, data_isolated=False):
+        """
+        Finish clearing up a volume that was previously passed to delete_volume.  This
+        function is idempotent.
+        """
+
+        trash = os.path.join(self.volume_prefix, "_deleting")
+        trashed_volume = os.path.join(trash, volume_path.volume_id)
+
+        try:
+            self.fs.stat(trashed_volume)
+        except cephfs.ObjectNotFound:
+            log.warning("Trying to purge volume '{0}' but it's already been purged".format(
+                trashed_volume))
+            return
+
+        def rmtree(root_path):
+            log.debug("rmtree {0}".format(root_path))
+            dir_handle = self.fs.opendir(root_path)
+            d = self.fs.readdir(dir_handle)
+            while d:
+                d_name = d.d_name.decode(encoding='utf-8')
+                if d_name not in [".", ".."]:
+                    # Do not use os.path.join because it is sensitive
+                    # to string encoding, we just pass through dnames
+                    # as byte arrays
+                    d_full = u"{0}/{1}".format(root_path, d_name)
+                    if d.is_dir():
+                        rmtree(d_full)
+                    else:
+                        self.fs.unlink(d_full)
+
+                d = self.fs.readdir(dir_handle)
+            self.fs.closedir(dir_handle)
+
+            self.fs.rmdir(root_path)
+
+        rmtree(trashed_volume)
+
+        if data_isolated:
+            pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
+            osd_map = self._rados_command("osd dump", {})
+            pool_id = self._get_pool_id(osd_map, pool_name)
+            mds_map = self.get_mds_map()
+            if pool_id in mds_map['data_pools']:
+                self._rados_command("fs rm_data_pool", {
+                    'fs_name': mds_map['fs_name'],
+                    'pool': pool_name
+                })
+            self._rados_command("osd pool delete",
+                                {
+                                    "pool": pool_name,
+                                    "pool2": pool_name,
+                                    "yes_i_really_really_mean_it": True
+                                })
+
+    def _get_ancestor_xattr(self, path, attr):
+        """
+        Helper for reading layout information: if this xattr is missing
+        on the requested path, keep checking parents until we find it.
+        """
+        try:
+            result = self.fs.getxattr(path, attr).decode()
+            if result == "":
+                # Annoying!  cephfs gives us empty instead of an error when attr not found
+                raise cephfs.NoData()
+            else:
+                return result
+        except cephfs.NoData:
+            if path == "/":
+                raise
+            else:
+                return self._get_ancestor_xattr(os.path.split(path)[0], attr)
+
+    def _check_compat_version(self, compat_version):
+        if self.version < compat_version:
+            msg = ("The current version of CephFSVolumeClient, version {0} "
+                   "does not support the required feature. Need version {1} "
+                   "or greater".format(self.version, compat_version)
+                  )
+            log.error(msg)
+            raise CephFSVolumeClientError(msg)
+
+    def _metadata_get(self, path):
+        """
+        Return a deserialized JSON object, or None
+        """
+        fd = self.fs.open(path, "r")
+        # TODO iterate instead of assuming file < 4MB
+        read_bytes = self.fs.read(fd, 0, 4096 * 1024)
+        self.fs.close(fd)
+        if read_bytes:
+            return json.loads(read_bytes.decode())
+        else:
+            return None
+
+    def _metadata_set(self, path, data):
+        serialized = json.dumps(data)
+        fd = self.fs.open(path, "w")
+        try:
+            self.fs.write(fd, to_bytes(serialized), 0)
+            self.fs.fsync(fd, 0)
+        finally:
+            self.fs.close(fd)
+
+    def _lock(self, path):
+        @contextmanager
+        def fn():
+            while(1):
+                fd = self.fs.open(path, os.O_CREAT, 0o755)
+                self.fs.flock(fd, fcntl.LOCK_EX, self._id)
+
+                # The locked file will be cleaned up sometime. It could be
+                # unlinked e.g., by an another manila share instance, before
+                # lock was applied on it. Perform checks to ensure that this
+                # does not happen.
+                try:
+                    statbuf = self.fs.stat(path)
+                except cephfs.ObjectNotFound:
+                    self.fs.close(fd)
+                    continue
+
+                fstatbuf = self.fs.fstat(fd)
+                if statbuf.st_ino == fstatbuf.st_ino:
+                    break
+
+            try:
+                yield
+            finally:
+                self.fs.flock(fd, fcntl.LOCK_UN, self._id)
+                self.fs.close(fd)
+
+        return fn()
+
+    def _auth_metadata_path(self, auth_id):
+        return os.path.join(self.volume_prefix, "${0}{1}".format(
+            auth_id, META_FILE_EXT))
+
+    def _auth_lock(self, auth_id):
+        return self._lock(self._auth_metadata_path(auth_id))
+
+    def _auth_metadata_get(self, auth_id):
+        """
+        Call me with the metadata locked!
+
+        Check whether a auth metadata structure can be decoded by the current
+        version of CephFSVolumeClient.
+
+        Return auth metadata that the current version of CephFSVolumeClient
+        can decode.
+        """
+        auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
+
+        if auth_metadata:
+            self._check_compat_version(auth_metadata['compat_version'])
+
+        return auth_metadata
+
+    def _auth_metadata_set(self, auth_id, data):
+        """
+        Call me with the metadata locked!
+
+        Fsync the auth metadata.
+
+        Add two version attributes to the auth metadata,
+        'compat_version', the minimum CephFSVolumeClient version that can
+        decode the metadata, and 'version', the CephFSVolumeClient version
+        that encoded the metadata.
+        """
+        data['compat_version'] = 1
+        data['version'] = self.version
+        return self._metadata_set(self._auth_metadata_path(auth_id), data)
+
+    def _volume_metadata_path(self, volume_path):
+        return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
+            volume_path.group_id if volume_path.group_id else "",
+            volume_path.volume_id,
+            META_FILE_EXT
+        ))
+
+    def _volume_lock(self, volume_path):
+        """
+        Return a ContextManager which locks the authorization metadata for
+        a particular volume, and persists a flag to the metadata indicating
+        that it is currently locked, so that we can detect dirty situations
+        during recovery.
+
+        This lock isn't just to make access to the metadata safe: it's also
+        designed to be used over the two-step process of checking the
+        metadata and then responding to an authorization request, to
+        ensure that at the point we respond the metadata hasn't changed
+        in the background.  It's key to how we avoid security holes
+        resulting from races during that problem ,
+        """
+        return self._lock(self._volume_metadata_path(volume_path))
+
+    def _volume_metadata_get(self, volume_path):
+        """
+        Call me with the metadata locked!
+
+        Check whether a volume metadata structure can be decoded by the current
+        version of CephFSVolumeClient.
+
+        Return a volume_metadata structure that the current version of
+        CephFSVolumeClient can decode.
+        """
+        volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
+
+        if volume_metadata:
+            self._check_compat_version(volume_metadata['compat_version'])
+
+        return volume_metadata
+
+    def _volume_metadata_set(self, volume_path, data):
+        """
+        Call me with the metadata locked!
+
+        Add two version attributes to the volume metadata,
+        'compat_version', the minimum CephFSVolumeClient version that can
+        decode the metadata and 'version', the CephFSVolumeClient version
+        that encoded the metadata.
+        """
+        data['compat_version'] = 1
+        data['version'] = self.version
+        return self._metadata_set(self._volume_metadata_path(volume_path), data)
+
+    def _prepare_updated_caps_list(self, existing_caps, mds_cap_str, osd_cap_str, authorize=True):
+        caps_list = []
+        for k, v in existing_caps['caps'].items():
+            if k == 'mds' or k == 'osd':
+                continue
+            elif k == 'mon':
+                if not authorize and v == 'allow r':
+                    continue
+            caps_list.extend((k,v))
+
+        if mds_cap_str:
+            caps_list.extend(('mds', mds_cap_str))
+        if osd_cap_str:
+            caps_list.extend(('osd', osd_cap_str))
+
+        if authorize and 'mon' not in caps_list:
+            caps_list.extend(('mon', 'allow r'))
+
+        return caps_list
+
+    def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None, allow_existing_id=False):
+        """
+        Get-or-create a Ceph auth identity for `auth_id` and grant them access
+        to
+        :param volume_path:
+        :param auth_id:
+        :param readonly:
+        :param tenant_id: Optionally provide a stringizable object to
+                          restrict any created cephx IDs to other callers
+                          passing the same tenant ID.
+        :allow_existing_id: Optionally authorize existing auth-ids not
+                            created by ceph_volume_client
+        :return:
+        """
+
+        with self._auth_lock(auth_id):
+            client_entity = "client.{0}".format(auth_id)
+            try:
+                existing_caps = self._rados_command(
+                    'auth get',
+                    {
+                        'entity': client_entity
+                    }
+                )
+                # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
+            except rados.Error:
+                existing_caps = None
+
+            # Existing meta, or None, to be updated
+            auth_meta = self._auth_metadata_get(auth_id)
+
+            # volume data to be inserted
+            volume_path_str = str(volume_path)
+            volume = {
+                volume_path_str : {
+                    # The access level at which the auth_id is authorized to
+                    # access the volume.
+                    'access_level': 'r' if readonly else 'rw',
+                    'dirty': True,
+                }
+            }
+
+            if auth_meta is None:
+                if not allow_existing_id and existing_caps is not None:
+                    msg = "auth ID: {0} exists and not created by ceph_volume_client. Not allowed to modify".format(auth_id)
+                    log.error(msg)
+                    raise CephFSVolumeClientError(msg)
+
+                # non-existent auth IDs
+                sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
+                    auth_id, tenant_id
+                ))
+                log.debug("Authorize: no existing meta")
+                auth_meta = {
+                    'dirty': True,
+                    'tenant_id': tenant_id.__str__() if tenant_id else None,
+                    'volumes': volume
+                }
+            else:
+                # Disallow tenants to share auth IDs
+                if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
+                    msg = "auth ID: {0} is already in use".format(auth_id)
+                    log.error(msg)
+                    raise CephFSVolumeClientError(msg)
+
+                if auth_meta['dirty']:
+                    self._recover_auth_meta(auth_id, auth_meta)
+
+                log.debug("Authorize: existing tenant {tenant}".format(
+                    tenant=auth_meta['tenant_id']
+                ))
+                auth_meta['dirty'] = True
+                auth_meta['volumes'].update(volume)
+
+            self._auth_metadata_set(auth_id, auth_meta)
+
+            with self._volume_lock(volume_path):
+                key = self._authorize_volume(volume_path, auth_id, readonly, existing_caps)
+
+            auth_meta['dirty'] = False
+            auth_meta['volumes'][volume_path_str]['dirty'] = False
+            self._auth_metadata_set(auth_id, auth_meta)
+
+            if tenant_id:
+                return {
+                    'auth_key': key
+                }
+            else:
+                # Caller wasn't multi-tenant aware: be safe and don't give
+                # them a key
+                return {
+                    'auth_key': None
+                }
+
+    def _authorize_volume(self, volume_path, auth_id, readonly, existing_caps):
+        vol_meta = self._volume_metadata_get(volume_path)
+
+        access_level = 'r' if readonly else 'rw'
+        auth = {
+            auth_id: {
+                'access_level': access_level,
+                'dirty': True,
+            }
+        }
+
+        if vol_meta is None:
+            vol_meta = {
+                'auths': auth
+            }
+        else:
+            vol_meta['auths'].update(auth)
+            self._volume_metadata_set(volume_path, vol_meta)
+
+        key = self._authorize_ceph(volume_path, auth_id, readonly, existing_caps)
+
+        vol_meta['auths'][auth_id]['dirty'] = False
+        self._volume_metadata_set(volume_path, vol_meta)
+
+        return key
+
+    def _authorize_ceph(self, volume_path, auth_id, readonly, existing_caps):
+        path = self._get_path(volume_path)
+        log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
+            auth_id, path
+        ))
+
+        # First I need to work out what the data pool is for this share:
+        # read the layout
+        pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
+
+        try:
+            namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
+                                         "namespace").decode()
+        except cephfs.NoData:
+            namespace = None
+
+        # Now construct auth capabilities that give the guest just enough
+        # permissions to access the share
+        client_entity = "client.{0}".format(auth_id)
+        want_access_level = 'r' if readonly else 'rw'
+        want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
+        if namespace:
+            want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
+                want_access_level, pool_name, namespace)
+        else:
+            want_osd_cap = 'allow {0} pool={1}'.format(want_access_level,
+                                                       pool_name)
+
+        if existing_caps is None:
+            caps = self._rados_command(
+                'auth get-or-create',
+                {
+                    'entity': client_entity,
+                    'caps': [
+                        'mds', want_mds_cap,
+                        'osd', want_osd_cap,
+                        'mon', 'allow r']
+                })
+        else:
+            # entity exists, update it
+            cap = existing_caps[0]
+
+            # Construct auth caps that if present might conflict with the desired
+            # auth caps.
+            unwanted_access_level = 'r' if want_access_level == 'rw' else 'rw'
+            unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
+            if namespace:
+                unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
+                    unwanted_access_level, pool_name, namespace)
+            else:
+                unwanted_osd_cap = 'allow {0} pool={1}'.format(
+                    unwanted_access_level, pool_name)
+
+            def cap_update(
+                    orig_mds_caps, orig_osd_caps, want_mds_cap,
+                    want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
+
+                if not orig_mds_caps:
+                    return want_mds_cap, want_osd_cap
+
+                mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
+                osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
+
+                if want_mds_cap in mds_cap_tokens:
+                    return orig_mds_caps, orig_osd_caps
+
+                if unwanted_mds_cap in mds_cap_tokens:
+                    mds_cap_tokens.remove(unwanted_mds_cap)
+                    osd_cap_tokens.remove(unwanted_osd_cap)
+
+                mds_cap_tokens.append(want_mds_cap)
+                osd_cap_tokens.append(want_osd_cap)
+
+                return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
+
+            orig_mds_caps = cap['caps'].get('mds', "")
+            orig_osd_caps = cap['caps'].get('osd', "")
+
+            mds_cap_str, osd_cap_str = cap_update(
+                orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
+                unwanted_mds_cap, unwanted_osd_cap)
+
+            caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str)
+            caps = self._rados_command(
+                'auth caps',
+                {
+                    'entity': client_entity,
+                    'caps': caps_list
+                })
+
+            caps = self._rados_command(
+                'auth get',
+                {
+                    'entity': client_entity
+                }
+            )
+
+        # Result expected like this:
+        # [
+        #     {
+        #         "entity": "client.foobar",
+        #         "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
+        #         "caps": {
+        #             "mds": "allow *",
+        #             "mon": "allow *"
+        #         }
+        #     }
+        # ]
+        assert len(caps) == 1
+        assert caps[0]['entity'] == client_entity
+        return caps[0]['key']
+
+    def deauthorize(self, volume_path, auth_id):
+        with self._auth_lock(auth_id):
+            # Existing meta, or None, to be updated
+            auth_meta = self._auth_metadata_get(auth_id)
+
+            volume_path_str = str(volume_path)
+            if (auth_meta is None) or (not auth_meta['volumes']):
+                log.warning("deauthorized called for already-removed auth"
+                         "ID '{auth_id}' for volume ID '{volume}'".format(
+                    auth_id=auth_id, volume=volume_path.volume_id
+                ))
+                # Clean up the auth meta file of an auth ID
+                self.fs.unlink(self._auth_metadata_path(auth_id))
+                return
+
+            if volume_path_str not in auth_meta['volumes']:
+                log.warning("deauthorized called for already-removed auth"
+                         "ID '{auth_id}' for volume ID '{volume}'".format(
+                    auth_id=auth_id, volume=volume_path.volume_id
+                ))
+                return
+
+            if auth_meta['dirty']:
+                self._recover_auth_meta(auth_id, auth_meta)
+
+            auth_meta['dirty'] = True
+            auth_meta['volumes'][volume_path_str]['dirty'] = True
+            self._auth_metadata_set(auth_id, auth_meta)
+
+            self._deauthorize_volume(volume_path, auth_id)
+
+            # Filter out the volume we're deauthorizing
+            del auth_meta['volumes'][volume_path_str]
+
+            # Clean up auth meta file
+            if not auth_meta['volumes']:
+                self.fs.unlink(self._auth_metadata_path(auth_id))
+                return
+
+            auth_meta['dirty'] = False
+            self._auth_metadata_set(auth_id, auth_meta)
+
+    def _deauthorize_volume(self, volume_path, auth_id):
+        with self._volume_lock(volume_path):
+            vol_meta = self._volume_metadata_get(volume_path)
+
+            if (vol_meta is None) or (auth_id not in vol_meta['auths']):
+                log.warning("deauthorized called for already-removed auth"
+                         "ID '{auth_id}' for volume ID '{volume}'".format(
+                    auth_id=auth_id, volume=volume_path.volume_id
+                ))
+                return
+
+            vol_meta['auths'][auth_id]['dirty'] = True
+            self._volume_metadata_set(volume_path, vol_meta)
+
+            self._deauthorize(volume_path, auth_id)
+
+            # Remove the auth_id from the metadata *after* removing it
+            # from ceph, so that if we crashed here, we would actually
+            # recreate the auth ID during recovery (i.e. end up with
+            # a consistent state).
+
+            # Filter out the auth we're removing
+            del vol_meta['auths'][auth_id]
+            self._volume_metadata_set(volume_path, vol_meta)
+
+    def _deauthorize(self, volume_path, auth_id):
+        """
+        The volume must still exist.
+        """
+        client_entity = "client.{0}".format(auth_id)
+        path = self._get_path(volume_path)
+        pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
+        try:
+            namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
+                                         "namespace").decode()
+        except cephfs.NoData:
+            namespace = None
+
+        # The auth_id might have read-only or read-write mount access for the
+        # volume path.
+        access_levels = ('r', 'rw')
+        want_mds_caps = ['allow {0} path={1}'.format(access_level, path)
+                         for access_level in access_levels]
+        if namespace:
+            want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace)
+                             for access_level in access_levels]
+        else:
+            want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name)
+                             for access_level in access_levels]
+
+
+        try:
+            existing = self._rados_command(
+                'auth get',
+                {
+                    'entity': client_entity
+                }
+            )
+
+            def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
+                mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
+                osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
+
+                for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
+                    if want_mds_cap in mds_cap_tokens:
+                        mds_cap_tokens.remove(want_mds_cap)
+                        osd_cap_tokens.remove(want_osd_cap)
+                        break
+
+                return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
+
+            cap = existing[0]
+            orig_mds_caps = cap['caps'].get('mds', "")
+            orig_osd_caps = cap['caps'].get('osd', "")
+            mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
+                                                  want_mds_caps, want_osd_caps)
+
+            caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False)
+            if not caps_list:
+                self._rados_command('auth del', {'entity': client_entity}, decode=False)
+            else:
+                self._rados_command(
+                    'auth caps',
+                    {
+                        'entity': client_entity,
+                        'caps': caps_list
+                    })
+
+        # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
+        except rados.Error:
+            # Already gone, great.
+            return
+
+    def get_authorized_ids(self, volume_path):
+        """
+        Expose a list of auth IDs that have access to a volume.
+
+        return: a list of (auth_id, access_level) tuples, where
+                the access_level can be 'r' , or 'rw'.
+                None if no auth ID is given access to the volume.
+        """
+        with self._volume_lock(volume_path):
+            meta = self._volume_metadata_get(volume_path)
+            auths = []
+            if not meta or not meta['auths']:
+                return None
+
+            for auth, auth_data in meta['auths'].items():
+                # Skip partial auth updates.
+                if not auth_data['dirty']:
+                    auths.append((auth, auth_data['access_level']))
+
+            return auths
+
+    def _rados_command(self, prefix, args=None, decode=True):
+        """
+        Safer wrapper for ceph_argparse.json_command, which raises
+        Error exception instead of relying on caller to check return
+        codes.
+
+        Error exception can result from:
+        * Timeout
+        * Actual legitimate errors
+        * Malformed JSON output
+
+        return: Decoded object from ceph, or None if empty string returned.
+                If decode is False, return a string (the data returned by
+                ceph command)
+        """
+        if args is None:
+            args = {}
+
+        argdict = args.copy()
+        argdict['format'] = 'json'
+
+        ret, outbuf, outs = json_command(self.rados,
+                                         prefix=prefix,
+                                         argdict=argdict,
+                                         timeout=RADOS_TIMEOUT)
+        if ret != 0:
+            raise rados.Error(outs)
+        else:
+            if decode:
+                if outbuf:
+                    try:
+                        return json.loads(outbuf.decode())
+                    except (ValueError, TypeError):
+                        raise RadosError("Invalid JSON output for command {0}".format(argdict))
+                else:
+                    return None
+            else:
+                return outbuf
+
+    def get_used_bytes(self, volume_path):
+        return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir."
+                                    "rbytes").decode())
+
+    def set_max_bytes(self, volume_path, max_bytes):
+        self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
+                         to_bytes(max_bytes if max_bytes else 0), 0)
+
+    def _snapshot_path(self, dir_path, snapshot_name):
+        return os.path.join(
+            dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
+        )
+
+    def _snapshot_create(self, dir_path, snapshot_name, mode=0o755):
+        # TODO: raise intelligible exception for clusters where snaps are disabled
+        self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode)
+
+    def _snapshot_destroy(self, dir_path, snapshot_name):
+        """
+        Remove a snapshot, or do nothing if it already doesn't exist.
+        """
+        try:
+            self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
+        except cephfs.ObjectNotFound:
+            log.warning("Snapshot was already gone: {0}".format(snapshot_name))
+
+    def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755):
+        self._snapshot_create(self._get_path(volume_path), snapshot_name, mode)
+
+    def destroy_snapshot_volume(self, volume_path, snapshot_name):
+        self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
+
+    def create_snapshot_group(self, group_id, snapshot_name, mode=0o755):
+        if group_id is None:
+            raise RuntimeError("Group ID may not be None")
+
+        return self._snapshot_create(self._get_group_path(group_id), snapshot_name,
+                                     mode)
+
+    def destroy_snapshot_group(self, group_id, snapshot_name):
+        if group_id is None:
+            raise RuntimeError("Group ID may not be None")
+        if snapshot_name is None:
+            raise RuntimeError("Snapshot name may not be None")
+
+        return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
+
+    def _cp_r(self, src, dst):
+        # TODO
+        raise NotImplementedError()
+
+    def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
+        dest_fs_path = self._get_path(dest_volume_path)
+        src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
+
+        self._cp_r(src_snapshot_path, dest_fs_path)
+
+    def put_object(self, pool_name, object_name, data):
+        """
+        Synchronously write data to an object.
+
+        :param pool_name: name of the pool
+        :type pool_name: str
+        :param object_name: name of the object
+        :type object_name: str
+        :param data: data to write
+        :type data: bytes
+        """
+        return self.put_object_versioned(pool_name, object_name, data)
+
+    def put_object_versioned(self, pool_name, object_name, data, version=None):
+        """
+        Synchronously write data to an object only if version of the object
+        version matches the expected version.
+
+        :param pool_name: name of the pool
+        :type pool_name: str
+        :param object_name: name of the object
+        :type object_name: str
+        :param data: data to write
+        :type data: bytes
+        :param version: expected version of the object to write
+        :type version: int
+        """
+        ioctx = self.rados.open_ioctx(pool_name)
+
+        max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
+        if len(data) > max_size:
+            msg = ("Data to be written to object '{0}' exceeds "
+                   "{1} bytes".format(object_name, max_size))
+            log.error(msg)
+            raise CephFSVolumeClientError(msg)
+
+        try:
+            with rados.WriteOpCtx() as wop:
+                if version is not None:
+                    wop.assert_version(version)
+                wop.write_full(data)
+                ioctx.operate_write_op(wop, object_name)
+        except rados.OSError as e:
+            log.error(e)
+            raise e
+        finally:
+            ioctx.close()
+
+    def get_object(self, pool_name, object_name):
+        """
+        Synchronously read data from object.
+
+        :param pool_name: name of the pool
+        :type pool_name: str
+        :param object_name: name of the object
+        :type object_name: str
+
+        :returns: bytes - data read from object
+        """
+        return self.get_object_and_version(pool_name, object_name)[0]
+
+    def get_object_and_version(self, pool_name, object_name):
+        """
+        Synchronously read data from object and get its version.
+
+        :param pool_name: name of the pool
+        :type pool_name: str
+        :param object_name: name of the object
+        :type object_name: str
+
+        :returns: tuple of object data and version
+        """
+        ioctx = self.rados.open_ioctx(pool_name)
+        max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
+        try:
+            bytes_read = ioctx.read(object_name, max_size)
+            if ((len(bytes_read) == max_size) and
+                    (ioctx.read(object_name, 1, offset=max_size))):
+                log.warning("Size of object {0} exceeds '{1}' bytes "
+                            "read".format(object_name, max_size))
+            obj_version = ioctx.get_last_version()
+        finally:
+            ioctx.close()
+        return (bytes_read, obj_version)
+
+    def delete_object(self, pool_name, object_name):
+        ioctx = self.rados.open_ioctx(pool_name)
+        try:
+            ioctx.remove_object(object_name)
+        except rados.ObjectNotFound:
+            log.warning("Object '{0}' was already removed".format(object_name))
+        finally:
+            ioctx.close()