]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: check OSD caps before read/write 4457/head
authorYan, Zheng <zyan@redhat.com>
Fri, 24 Apr 2015 07:23:21 +0000 (15:23 +0800)
committerYan, Zheng <zyan@redhat.com>
Mon, 27 Apr 2015 09:16:10 +0000 (17:16 +0800)
Signed-off-by: Yan, Zheng <zyan@redhat.com>
src/client/Client.cc
src/client/Client.h
src/common/config_opts.h

index 4e6eb8452caf2622f156ee50f75e21f849c4d412..1a2a4e6202b6de3d4e53b97fc26010e362f0a71d 100644 (file)
@@ -2719,6 +2719,10 @@ void Client::put_cap_ref(Inode *in, int cap)
 
 int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
 {
+  int r = check_pool_perm(in, need);
+  if (r < 0)
+    return r;
+
   while (1) {
     if (!in->is_any_caps())
       return -ESTALE;
@@ -10740,6 +10744,102 @@ bool Client::is_quota_bytes_approaching(Inode *in)
   return false;
 }
 
+enum {
+  POOL_CHECKED = 1,
+  POOL_CHECKING = 2,
+  POOL_READ = 4,
+  POOL_WRITE = 8,
+};
+
+int Client::check_pool_perm(Inode *in, int need)
+{
+  if (!cct->_conf->client_check_pool_perm)
+    return 0;
+
+  int64_t pool = in->layout.fl_pg_pool;
+  int have = 0;
+  while (true) {
+    std::map<int64_t, int>::iterator it = pool_perms.find(pool);
+    if (it == pool_perms.end())
+      break;
+    if (it->second == POOL_CHECKING) {
+      // avoid concurrent checkings
+      wait_on_list(waiting_for_pool_perm);
+    } else {
+      have = it->second;
+      assert(have & POOL_CHECKED);
+      break;
+    }
+  }
+
+  if (!have) {
+    pool_perms[pool] = POOL_CHECKING;
+
+    char oid_buf[32];
+    snprintf(oid_buf, sizeof(oid_buf), "%llx.00000000", (unsigned long long)in->ino);
+    object_t oid = oid_buf;
+
+    Mutex lock("Client::check_pool_perm::lock");
+    Cond cond;
+    bool done[2] = {false, false};
+    int rd_ret;
+    int wr_ret;
+
+    ObjectOperation rd_op;
+    rd_op.stat(NULL, (utime_t*)NULL, NULL);
+
+    objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), rd_op,
+                    in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0,
+                    new C_SafeCond(&lock, &cond, &done[0], &rd_ret), NULL);
+
+    ObjectOperation wr_op;
+    wr_op.create(true);
+
+    objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
+                    in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0,
+                    new C_SafeCond(&lock, &cond, &done[1], &wr_ret), NULL);
+
+    client_lock.Unlock();
+    lock.Lock();
+    while (!done[0] || !done[1])
+      cond.Wait(lock);
+    lock.Unlock();
+    client_lock.Lock();
+
+    if (rd_ret == 0 || rd_ret == -ENOENT)
+      have |= POOL_READ;
+    else if (rd_ret != -EPERM) {
+      ldout(cct, 10) << "check_pool_perm on pool " << pool
+                    << " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl;
+      return rd_ret;
+    }
+
+    if (wr_ret == 0 || wr_ret == -EEXIST)
+      have |= POOL_WRITE;
+    else if (wr_ret != -EPERM) {
+      ldout(cct, 10) << "check_pool_perm on pool " << pool
+                    << " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl;
+      return wr_ret;
+    }
+
+    pool_perms[pool] = have | POOL_CHECKED;
+    signal_cond_list(waiting_for_pool_perm);
+  }
+
+  if ((need & CEPH_CAP_FILE_RD) && !(have & POOL_READ)) {
+    ldout(cct, 10) << "check_pool_perm on pool " << pool
+                  << " need " << ccap_string(need) << ", but no read perm" << dendl;
+    return -EPERM;
+  }
+  if ((need & CEPH_CAP_FILE_WR) && !(have & POOL_WRITE)) {
+    ldout(cct, 10) << "check_pool_perm on pool " << pool
+                  << " need " << ccap_string(need) << ", but no write perm" << dendl;
+    return -EPERM;
+  }
+
+  return 0;
+}
+
 void Client::set_filer_flags(int flags)
 {
   Mutex::Locker l(client_lock);
index ab24153bc4b2b952ca562407524b1583f4c0ab8f..4430e565faca747558e74bcf76e6350f81bed79f 100644 (file)
@@ -478,6 +478,10 @@ protected:
   bool is_quota_bytes_exceeded(Inode *in, int64_t new_bytes);
   bool is_quota_bytes_approaching(Inode *in);
 
+  std::map<int64_t, int> pool_perms;
+  list<Cond*> waiting_for_pool_perm;
+  int check_pool_perm(Inode *in, int need);
+
  public:
   void set_filer_flags(int flags);
   void clear_filer_flags(int flags);
index 97cd83ed2228a2aab9b3de423ca1b198768d6af5..573f9e88383ec3667e9386168a1c5a210e7cf3a8 100644 (file)
@@ -331,6 +331,7 @@ OPTION(fuse_debug, OPT_BOOL, false)
 OPTION(fuse_multithreaded, OPT_BOOL, true)
 OPTION(client_try_dentry_invalidate, OPT_BOOL, true) // the client should try to use dentry invaldation instead of remounting, on kernels it believes that will work for
 OPTION(client_die_on_failed_remount, OPT_BOOL, true)
+OPTION(client_check_pool_perm, OPT_BOOL, true)
 
 OPTION(crush_location, OPT_STR, "")       // whitespace-separated list of key=value pairs describing crush location