file_layout_t *layout)
{
const cref_t<MClientRequest> &req = mdr->client_request;
+ bool is_rmxattr = (req->get_op() == CEPH_MDS_OP_RMXATTR);
epoch_t epoch;
int r;
});
if (r == -CEPHFS_ENOENT) {
+ if (is_rmxattr) {
+ r = -CEPHFS_EINVAL;
+ respond_to_request(mdr, r);
+ return r;
+ }
// we don't have the specified pool, make sure our map
// is newer than or as new as the client.
epoch_t req_epoch = req->get_osdmap_epoch();
return 0;
}
-void Server::handle_set_vxattr(const MDRequestRef& mdr, CInode *cur)
+void Server::handle_client_setvxattr(const MDRequestRef& mdr, CInode *cur)
{
const cref_t<MClientRequest> &req = mdr->client_request;
+ bool is_rmxattr = (req->get_op() == CEPH_MDS_OP_RMXATTR);
MutationImpl::LockOpVec lov;
string name(req->get_path2());
bufferlist bl = req->get_data();
string value (bl.c_str(), bl.length());
- dout(10) << "handle_set_vxattr " << name
+ dout(10) << "handle_client_setvxattr " << name
<< " val " << value.length()
<< " bytes on " << *cur
<< dendl;
else
layout = mdcache->default_file_layout;
- rest = name.substr(name.find("layout"));
- if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
- return;
+ if (is_rmxattr && name == "ceph.dir.layout") {
+ lov.add_xlock(&cur->policylock);
+ if (!mds->locker->acquire_locks(mdr, lov)) {
+ return;
+ }
+ if (!cur->get_projected_inode()->has_layout()) {
+ respond_to_request(mdr, 0);
+ return;
+ }
+ auto pi = cur->project_inode(mdr);
+
+ if (cur->is_root()) {
+ pi.inode->layout = mdcache->default_file_layout;
+ } else {
+ pi.inode->clear_layout();
+ pi.inode->version = cur->pre_dirty();
+ }
+ pip = pi.inode.get();
+ } else {
+ rest = name.substr(name.find("layout"));
+ if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
+ return;
+
+ auto pi = cur->project_inode(mdr);
+ pi.inode->layout = layout;
+ pip = pi.inode.get();
+ }
- auto pi = cur->project_inode(mdr);
- pi.inode->layout = layout;
mdr->no_early_reply = true;
- pip = pi.inode.get();
} else if (name.compare(0, 16, "ceph.file.layout") == 0) {
if (!cur->is_file()) {
respond_to_request(mdr, -CEPHFS_EINVAL);
return;
}
+ if (!cur->get_projected_inode()->has_layout()) {
+ respond_to_request(mdr, 0);
+ return;
+ }
if (cur->get_projected_inode()->size ||
cur->get_projected_inode()->truncate_seq > 1) {
respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
}
quota_info_t quota = cur->get_projected_inode()->quota;
+ if (is_rmxattr) {
+ if (!quota.is_enabled()) {
+ respond_to_request(mdr, 0);
+ return;
+ }
+ value = "0";
+ }
rest = name.substr(name.find("quota"));
int r = parse_quota_vxattr(rest, value, "a);
bool val;
try {
+ if (is_rmxattr) {
+ const auto srnode = cur->get_projected_srnode();
+ if (!srnode->is_subvolume()) {
+ respond_to_request(mdr, 0);
+ return;
+ }
+ value = "0";
+ }
val = boost::lexical_cast<bool>(value);
} catch (boost::bad_lexical_cast const&) {
dout(10) << "bad vxattr value, unable to parse bool for " << name << dendl;
mds_rank_t rank;
try {
+ if (is_rmxattr) {
+ if (cur->get_projected_inode()->export_pin == -1) {
+ respond_to_request(mdr, 0);
+ return;
+ }
+ value = "-1";
+ }
rank = boost::lexical_cast<mds_rank_t>(value);
if (rank < 0) rank = MDS_RANK_NONE;
else if (rank >= MAX_MDS) {
double val;
try {
+ if (is_rmxattr) {
+ if (cur->get_projected_inode()->export_ephemeral_random_pin == 0.0) {
+ respond_to_request(mdr, 0);
+ return;
+ }
+ value = "0";
+ }
val = boost::lexical_cast<double>(value);
} catch (boost::bad_lexical_cast const&) {
dout(10) << "bad vxattr value, unable to parse float for " << name << dendl;
bool val;
try {
+ if (is_rmxattr) {
+ if (cur->get_projected_inode()->get_ephemeral_distributed_pin() == 0) {
+ respond_to_request(mdr, 0);
+ return;
+ }
+ value = "0";
+ }
val = boost::lexical_cast<bool>(value);
} catch (boost::bad_lexical_cast const&) {
dout(10) << "bad vxattr value, unable to parse bool for " << name << dendl;
return;
}
-void Server::handle_remove_vxattr(const MDRequestRef& mdr, CInode *cur)
-{
- const cref_t<MClientRequest> &req = mdr->client_request;
- string name(req->get_path2());
-
- dout(10) << __func__ << " " << name << " on " << *cur << dendl;
-
- if (name == "ceph.dir.layout") {
- if (!cur->is_dir()) {
- respond_to_request(mdr, -CEPHFS_ENODATA);
- return;
- }
- if (cur->is_root()) {
- dout(10) << "can't remove layout policy on the root directory" << dendl;
- respond_to_request(mdr, -CEPHFS_EINVAL);
- return;
- }
-
- if (!cur->get_projected_inode()->has_layout()) {
- respond_to_request(mdr, -CEPHFS_ENODATA);
- return;
- }
-
- MutationImpl::LockOpVec lov;
- lov.add_xlock(&cur->policylock);
- if (!mds->locker->acquire_locks(mdr, lov))
- return;
-
- auto pi = cur->project_inode(mdr);
- pi.inode->clear_layout();
- pi.inode->version = cur->pre_dirty();
-
- // log + wait
- mdr->ls = mdlog->get_current_segment();
- EUpdate *le = new EUpdate(mdlog, "remove dir layout vxattr");
- le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
- mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY);
- mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur);
-
- mdr->no_early_reply = true;
- journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur));
- return;
- } else if (name == "ceph.dir.layout.pool_namespace"
- || name == "ceph.file.layout.pool_namespace") {
- // Namespace is the only layout field that has a meaningful
- // null/none value (empty string, means default layout). Is equivalent
- // to a setxattr with empty string: pass through the empty payload of
- // the rmxattr request to do this.
- handle_set_vxattr(mdr, cur);
- return;
- }
-
- respond_to_request(mdr, -CEPHFS_ENODATA);
-}
-
const Server::XattrHandler Server::xattr_handlers[] = {
{
xattr_name: Server::DEFAULT_HANDLER,
if (!cur)
return;
- handle_set_vxattr(mdr, cur);
+ handle_client_setvxattr(mdr, cur);
return;
}
if (!cur)
return;
- handle_remove_vxattr(mdr, cur);
+ handle_client_setvxattr(mdr, cur);
return;
}