]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: handle ceph.*.layout.* setxattr
authorSage Weil <sage@inktank.com>
Sat, 19 Jan 2013 18:09:39 +0000 (10:09 -0800)
committerSage Weil <sage@inktank.com>
Wed, 30 Jan 2013 00:25:05 +0000 (16:25 -0800)
Allow individual fields of file or dir layouts to be set via setxattr.

Signed-off-by: Sage Weil <sage@inktank.com>
src/mds/Server.cc
src/mds/Server.h
src/osd/OSDMap.h

index f8d1af1d11a069187cfa3b902f9fda6f8dec20dd..bbda20f1522cd13efd8e1d1fc60e1384bfdc8c19 100644 (file)
@@ -52,6 +52,8 @@
 #include "include/compat.h"
 #include "osd/OSDMap.h"
 
+#include <boost/lexical_cast.hpp>
+
 #include <errno.h>
 #include <fcntl.h>
 
@@ -3498,6 +3500,134 @@ void Server::handle_client_setdirlayout(MDRequest *mdr)
 
 // XATTRS
 
+int Server::parse_layout_vxattr(string name, string value, ceph_file_layout *layout)
+{
+  dout(20) << "parse_layout_vxattr name " << name << " value '" << value << "'" << dendl;
+  try {
+    if (name == "layout") {
+      // XXX implement me
+    } else if (name == "layout.object_size") {
+      layout->fl_object_size = boost::lexical_cast<unsigned>(value);
+    } else if (name == "layout.stripe_unit") {
+      layout->fl_stripe_unit = boost::lexical_cast<unsigned>(value);
+    } else if (name == "layout.stripe_count") {
+      layout->fl_stripe_count = boost::lexical_cast<unsigned>(value);
+    } else if (name == "layout.pool") {
+      try {
+       layout->fl_pg_pool = boost::lexical_cast<unsigned>(value);
+      } catch (boost::bad_lexical_cast const&) {
+       int64_t pool = mds->osdmap->lookup_pg_pool_name(value);
+       if (pool < 0) {
+         dout(10) << " unknown pool " << value << dendl;
+         return -EINVAL;
+       }
+       layout->fl_pg_pool = pool;
+      }
+    } else {
+      dout(10) << " unknown layout vxattr " << name << dendl;
+      return -EINVAL;
+    }
+  } catch (boost::bad_lexical_cast const&) {
+    dout(10) << "bad vxattr value, unable to parse int for " << name << dendl;
+    return -EINVAL;
+  }
+
+  if (!ceph_file_layout_is_valid(layout)) {
+    dout(10) << "bad layout" << dendl;
+    return -EINVAL;
+  }
+  if (!mds->mdsmap->is_data_pool(layout->fl_pg_pool)) {
+    dout(10) << " invalid data pool " << layout->fl_pg_pool << dendl;
+    return -EINVAL;
+  }
+  return 0;
+}
+
+void Server::handle_set_vxattr(MDRequest *mdr, CInode *cur,
+                              ceph_file_layout *dir_layout,
+                              set<SimpleLock*> rdlocks,
+                              set<SimpleLock*> wrlocks,
+                              set<SimpleLock*> xlocks)
+{
+  MClientRequest *req = mdr->client_request;
+  string name(req->get_path2());
+  bufferlist bl = req->get_data();
+  string value (bl.c_str(), bl.length());
+  dout(10) << "handle_set_vxattr " << name << " val " << value.length() << " bytes on " << *cur << dendl;
+
+  // layout?
+  if (name.find("ceph.file.layout") == 0 ||
+      name.find("ceph.dir.layout") == 0) {
+    inode_t *pi;
+    string rest;
+    if (name.find("ceph.dir.layout") == 0) {
+      if (!cur->is_dir()) {
+       reply_request(mdr, -EINVAL);
+       return;
+      }
+
+      default_file_layout *dlayout = new default_file_layout;
+      if (cur->get_projected_dir_layout())
+       dlayout->layout = *cur->get_projected_dir_layout();
+      else if (dir_layout)
+       dlayout->layout = *dir_layout;
+      else
+       dlayout->layout = mds->mdcache->default_file_layout;
+
+      rest = name.substr(name.find("layout"));
+      int r = parse_layout_vxattr(rest, value, &dlayout->layout);
+      if (r < 0) {
+       reply_request(mdr, r);
+       return;
+      }
+
+      xlocks.insert(&cur->policylock);
+      if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+       return;
+
+      pi = cur->project_inode();
+      cur->get_projected_node()->dir_layout = dlayout;
+    } else {
+      if (!cur->is_file()) {
+       reply_request(mdr, -EINVAL);
+       return;
+      }
+      ceph_file_layout layout = cur->get_projected_inode()->layout;
+      rest = name.substr(name.find("layout"));
+      int r = parse_layout_vxattr(rest, value, &layout);
+      if (r < 0) {
+       reply_request(mdr, r);
+       return;
+      }
+
+      xlocks.insert(&cur->filelock);
+      if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+       return;
+
+      pi = cur->project_inode();
+      pi->layout = layout;
+      pi->ctime = ceph_clock_now(g_ceph_context);
+    }
+
+    pi->version = cur->pre_dirty();
+
+    // log + wait
+    mdr->ls = mdlog->get_current_segment();
+    EUpdate *le = new EUpdate(mdlog, "set vxattr layout");
+    mdlog->start_entry(le);
+    le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
+    mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY, false);
+    mdcache->journal_dirty_inode(mdr, &le->metablob, cur);
+
+    journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(mds, mdr, cur));
+    return;
+  }
+
+  dout(10) << " unknown vxattr " << name << dendl;
+  reply_request(mdr, -EINVAL);
+}
+
+
 class C_MDS_inode_xattr_update_finish : public Context {
   MDS *mds;
   MDRequest *mdr;
@@ -3523,26 +3653,39 @@ public:
 void Server::handle_client_setxattr(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
+  string name(req->get_path2());
   set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
-  if (!cur) return;
+  CInode *cur;
+
+  ceph_file_layout *dir_layout = NULL;
+  if (name.find("ceph.dir.layout") == 0)
+    cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true, false, &dir_layout);
+  else
+    cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+  if (!cur)
+    return;
 
   if (mdr->snapid != CEPH_NOSNAP) {
     reply_request(mdr, -EROFS);
     return;
   }
-    if (cur->is_base()) {
+  if (cur->is_base()) {
     reply_request(mdr, -EINVAL);   // for now
     return;
   }
 
+  int flags = req->head.args.setxattr.flags;
+
+  // magic ceph.* namespace?
+  if (name.find("ceph.") == 0) {
+    handle_set_vxattr(mdr, cur, dir_layout, rdlocks, wrlocks, xlocks);
+    return;
+  }
+
   xlocks.insert(&cur->xattrlock);
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
 
-  string name(req->get_path2());
-  int flags = req->head.args.setxattr.flags;
-
   if ((flags & CEPH_XATTR_CREATE) && cur->xattrs.count(name)) {
     dout(10) << "setxattr '" << name << "' XATTR_CREATE and EEXIST on " << *cur << dendl;
     reply_request(mdr, -EEXIST);
index 85ab34075f33db1e574e085e62969c24b9f95f4e..83dd318347d4dcb5a23c301a971073e549ef0ad2 100644 (file)
@@ -152,6 +152,13 @@ public:
   void handle_client_setattr(MDRequest *mdr);
   void handle_client_setlayout(MDRequest *mdr);
   void handle_client_setdirlayout(MDRequest *mdr);
+
+  int parse_layout_vxattr(string name, string value, ceph_file_layout *layout);
+  void handle_set_vxattr(MDRequest *mdr, CInode *cur,
+                        ceph_file_layout *dir_layout,
+                        set<SimpleLock*> rdlocks,
+                        set<SimpleLock*> wrlocks,
+                        set<SimpleLock*> xlocks);
   void handle_client_setxattr(MDRequest *mdr);
   void handle_client_removexattr(MDRequest *mdr);
 
index f3f84f0b4708aef4457a5206556d2ebd42696314..d161fa7436b53e77a1778556070150a7032b8fe3 100644 (file)
@@ -450,7 +450,7 @@ public:
   void pg_to_raw_up(pg_t pg, vector<int>& up) const;
   void pg_to_up_acting_osds(pg_t pg, vector<int>& up, vector<int>& acting) const;
 
-  int64_t lookup_pg_pool_name(const char *name) {
+  int64_t lookup_pg_pool_name(const string& name) {
     if (name_pool.count(name))
       return name_pool[name];
     return -ENOENT;