]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: check OSD caps before read/write
authorYan, Zheng <zyan@redhat.com>
Fri, 24 Apr 2015 07:23:21 +0000 (15:23 +0800)
committerGreg Farnum <gfarnum@redhat.com>
Fri, 8 May 2015 18:17:06 +0000 (11:17 -0700)
Signed-off-by: Yan, Zheng <zyan@redhat.com>
(cherry picked from commit 3c4028ec21e3ef9e8801c4570420c88722651cc7)
Reviewed-by: Greg Farnum <gfarnum@redhat.com>
src/client/Client.cc
src/client/Client.h
src/common/config_opts.h

index 5c4654da2f8c68fbc80b9e2ba6ef9eaa21d48988..c6a44778c13b289a9b4bdd7d31d2114487450923 100644 (file)
@@ -2677,6 +2677,10 @@ void Client::put_cap_ref(Inode *in, int cap)
 
 int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
 {
+  int r = check_pool_perm(in, need);
+  if (r < 0)
+    return r;
+
   while (1) {
     if (!in->is_any_caps())
       return -ESTALE;
@@ -10608,6 +10612,102 @@ bool Client::is_quota_bytes_approaching(Inode *in)
   return false;
 }
 
+enum {
+  POOL_CHECKED = 1,
+  POOL_CHECKING = 2,
+  POOL_READ = 4,
+  POOL_WRITE = 8,
+};
+
+int Client::check_pool_perm(Inode *in, int need)
+{
+  if (!cct->_conf->client_check_pool_perm)
+    return 0;
+
+  int64_t pool = in->layout.fl_pg_pool;
+  int have = 0;
+  while (true) {
+    std::map<int64_t, int>::iterator it = pool_perms.find(pool);
+    if (it == pool_perms.end())
+      break;
+    if (it->second == POOL_CHECKING) {
+      // avoid concurrent checkings
+      wait_on_list(waiting_for_pool_perm);
+    } else {
+      have = it->second;
+      assert(have & POOL_CHECKED);
+      break;
+    }
+  }
+
+  if (!have) {
+    pool_perms[pool] = POOL_CHECKING;
+
+    char oid_buf[32];
+    snprintf(oid_buf, sizeof(oid_buf), "%llx.00000000", (unsigned long long)in->ino);
+    object_t oid = oid_buf;
+
+    Mutex lock("Client::check_pool_perm::lock");
+    Cond cond;
+    bool done[2] = {false, false};
+    int rd_ret;
+    int wr_ret;
+
+    ObjectOperation rd_op;
+    rd_op.stat(NULL, (utime_t*)NULL, NULL);
+
+    objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), rd_op,
+                    in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0,
+                    new C_SafeCond(&lock, &cond, &done[0], &rd_ret), NULL);
+
+    ObjectOperation wr_op;
+    wr_op.create(true);
+
+    objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
+                    in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0,
+                    new C_SafeCond(&lock, &cond, &done[1], &wr_ret), NULL);
+
+    client_lock.Unlock();
+    lock.Lock();
+    while (!done[0] || !done[1])
+      cond.Wait(lock);
+    lock.Unlock();
+    client_lock.Lock();
+
+    if (rd_ret == 0 || rd_ret == -ENOENT)
+      have |= POOL_READ;
+    else if (rd_ret != -EPERM) {
+      ldout(cct, 10) << "check_pool_perm on pool " << pool
+                    << " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl;
+      return rd_ret;
+    }
+
+    if (wr_ret == 0 || wr_ret == -EEXIST)
+      have |= POOL_WRITE;
+    else if (wr_ret != -EPERM) {
+      ldout(cct, 10) << "check_pool_perm on pool " << pool
+                    << " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl;
+      return wr_ret;
+    }
+
+    pool_perms[pool] = have | POOL_CHECKED;
+    signal_cond_list(waiting_for_pool_perm);
+  }
+
+  if ((need & CEPH_CAP_FILE_RD) && !(have & POOL_READ)) {
+    ldout(cct, 10) << "check_pool_perm on pool " << pool
+                  << " need " << ccap_string(need) << ", but no read perm" << dendl;
+    return -EPERM;
+  }
+  if ((need & CEPH_CAP_FILE_WR) && !(have & POOL_WRITE)) {
+    ldout(cct, 10) << "check_pool_perm on pool " << pool
+                  << " need " << ccap_string(need) << ", but no write perm" << dendl;
+    return -EPERM;
+  }
+
+  return 0;
+}
+
 void Client::set_filer_flags(int flags)
 {
   Mutex::Locker l(client_lock);
index d1dbebb3a603ea99d2eba4b42c5337d63a0b6196..074887d966041ace7eacb2a6b42cea2411871bd0 100644 (file)
@@ -480,6 +480,10 @@ protected:
   bool is_quota_bytes_exceeded(Inode *in, int64_t new_bytes);
   bool is_quota_bytes_approaching(Inode *in);
 
+  std::map<int64_t, int> pool_perms;
+  list<Cond*> waiting_for_pool_perm;
+  int check_pool_perm(Inode *in, int need);
+
  public:
   void set_filer_flags(int flags);
   void clear_filer_flags(int flags);
index 5a1638aeb566f529e55c4ad4b18abeacc959acf7..9c004322e7b56de5ed0bc252b0f7b63b1b07b553 100644 (file)
@@ -329,6 +329,7 @@ OPTION(fuse_debug, OPT_BOOL, false)
 OPTION(fuse_multithreaded, OPT_BOOL, true)
 OPTION(client_try_dentry_invalidate, OPT_BOOL, true) // the client should try to use dentry invaldation instead of remounting, on kernels it believes that will work for
 OPTION(client_die_on_failed_remount, OPT_BOOL, true)
+OPTION(client_check_pool_perm, OPT_BOOL, true)
 
 OPTION(crush_location, OPT_STR, "")       // whitespace-separated list of key=value pairs describing crush location