else
layout = mdcache->default_file_layout;
+ // What kind of client caps are required to complete this operation
+ uint64_t access = MAY_WRITE;
+
// fill in any special params from client
if (req->head.args.open.stripe_unit)
layout.fl_stripe_unit = req->head.args.open.stripe_unit;
(__s32)req->head.args.open.pool >= 0) {
layout.fl_pg_pool = req->head.args.open.pool;
+ // If client doesn't have capability to modify layout pools, then
+ // only permit this request if the requested pool matches what the
+ // file would have inherited anyway from its parent.
+ CDir *parent = dn->get_dir();
+ CInode *parent_in = parent->get_inode();
+ int64_t parent_pool = parent_in->inode.layout.fl_pg_pool;
+
+ if (layout.fl_pg_pool != parent_pool) {
+ access |= MAY_SET_POOL;
+ }
+
// make sure we have as new a map as the client
if (req->get_mdsmap_epoch() > mds->mdsmap->get_epoch()) {
mds->wait_for_mdsmap(req->get_mdsmap_epoch(), new C_MDS_RetryRequest(mdcache, mdr));
if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
return;
- if (!check_access(mdr, diri, MAY_WRITE))
+ if (!check_access(mdr, diri, access))
return;
CDentry::linkage_t *dnl = dn->get_projected_linkage();
// save existing layout for later
int64_t old_pool = layout.fl_pg_pool;
+ int access = MAY_WRITE;
+
if (req->head.args.setlayout.layout.fl_object_size > 0)
layout.fl_object_size = req->head.args.setlayout.layout.fl_object_size;
if (req->head.args.setlayout.layout.fl_stripe_unit > 0)
if (req->head.args.setlayout.layout.fl_pg_pool > 0) {
layout.fl_pg_pool = req->head.args.setlayout.layout.fl_pg_pool;
+ if (layout.fl_pg_pool != old_pool) {
+ access |= MAY_SET_POOL;
+ }
+
// make sure we have as new a map as the client
if (req->get_mdsmap_epoch() > mds->mdsmap->get_epoch()) {
mds->wait_for_mdsmap(req->get_mdsmap_epoch(), new C_MDS_RetryRequest(mdcache, mdr));
if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
return;
- if (!check_access(mdr, cur, MAY_WRITE))
+ if (!check_access(mdr, cur, access))
return;
// project update
if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
return;
- if (!check_access(mdr, cur, MAY_WRITE))
- return;
-
// validate layout
const inode_t *old_pi = cur->get_projected_inode();
ceph_file_layout layout;
else
layout = mdcache->default_file_layout;
+ // Level of access required to complete
+ int access = MAY_WRITE;
+
if (req->head.args.setlayout.layout.fl_object_size > 0)
layout.fl_object_size = req->head.args.setlayout.layout.fl_object_size;
if (req->head.args.setlayout.layout.fl_stripe_unit > 0)
if (req->head.args.setlayout.layout.fl_object_stripe_unit > 0)
layout.fl_object_stripe_unit = req->head.args.setlayout.layout.fl_object_stripe_unit;
if (req->head.args.setlayout.layout.fl_pg_pool > 0) {
+ if (req->head.args.setlayout.layout.fl_pg_pool != layout.fl_pg_pool) {
+ access |= MAY_SET_POOL;
+ }
layout.fl_pg_pool = req->head.args.setlayout.layout.fl_pg_pool;
// make sure we have as new a map as the client
if (req->get_mdsmap_epoch() > mds->mdsmap->get_epoch()) {
return;
}
+ if (!check_access(mdr, cur, access))
+ return;
+
inode_t *pi = cur->project_inode();
pi->layout = layout;
pi->version = cur->pre_dirty();
if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
return;
+ if (cur->inode.layout.fl_pg_pool != layout.fl_pg_pool) {
+ if (!check_access(mdr, cur, MAY_SET_POOL)) {
+ return;
+ }
+ }
+
pi = cur->project_inode();
pi->layout = layout;
} else if (name.find("ceph.file.layout") == 0) {
return;
}
- xlocks.insert(&cur->filelock);
if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
return;
+ if (cur->inode.layout.fl_pg_pool != layout.fl_pg_pool) {
+ if (!check_access(mdr, cur, MAY_SET_POOL)) {
+ return;
+ }
+ }
+
+ xlocks.insert(&cur->filelock);
pi = cur->project_inode();
int64_t old_pool = pi->layout.fl_pg_pool;
pi->add_old_pool(old_pool);