]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd, librados: configurable notify timeout
authorYehuda Sadeh <yehuda@hq.newdream.net>
Thu, 30 Dec 2010 01:00:45 +0000 (17:00 -0800)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Thu, 30 Dec 2010 01:02:06 +0000 (17:02 -0800)
src/config.cc
src/config.h
src/include/librados.hpp
src/librados.cc
src/osd/ReplicatedPG.cc
src/osdc/Objecter.h
src/testradospp.cc

index b92eb4857b0bf0a7c92bc2e751dd986d3bda0ed1..83781f548ddb7a9a04449808db928662e7b2a599 100644 (file)
@@ -423,6 +423,7 @@ static struct config_option config_optionsp[] = {
        OPTION(client_readahead_max_periods, 0, OPT_LONGLONG, 4),  // as multiple of file layout period (object size * num stripes)
        OPTION(client_snapdir, 0, OPT_STR, ".snap"),
        OPTION(client_mountpoint, 'r', OPT_STR, "/"),
+       OPTION(client_notify_timeout, 0, OPT_INT, 10), // in seconds
        OPTION(fuse_direct_io, 0, OPT_INT, 0),
        OPTION(fuse_ll, 0, OPT_BOOL, true),
        OPTION(client_oc, 0, OPT_BOOL, true),
@@ -574,6 +575,7 @@ static struct config_option config_optionsp[] = {
        OPTION(osd_class_tmp, 0, OPT_STR, "/var/lib/ceph/tmp"),
        OPTION(osd_check_for_log_corruption, 0, OPT_BOOL, false),
        OPTION(osd_use_stale_snap, 0, OPT_BOOL, false),
+       OPTION(osd_max_notify_timeout, 0, OPT_INT, 30), // max notify timeout in seconds
        OPTION(filestore, 0, OPT_BOOL, false),
        OPTION(filestore_max_sync_interval, 0, OPT_DOUBLE, 5),    // seconds
        OPTION(filestore_min_sync_interval, 0, OPT_DOUBLE, .01),  // seconds
index 0acfbd360987d28861b0cb45c69ff2805d91e805..832e63312a2fe00b3c1d0bc738a20b39debf78f8 100644 (file)
@@ -224,6 +224,8 @@ struct md_config_t {
   int      client_oc_target_dirty;
   long long unsigned   client_oc_max_sync_write;
 
+  int      client_notify_timeout;
+
   // objecter
   bool  objecter_buffer_uncommitted;
   double objecter_mon_retry_interval;
@@ -408,6 +410,8 @@ struct md_config_t {
 
   bool osd_use_stale_snap;
 
+  int osd_max_notify_timeout;
+
   // filestore
   bool filestore;
   double   filestore_max_sync_interval;
index edbb1a4765e38574cef8bbcd39fc56d2f0f95f44..2959078f7258e435e640421e440e96184e363b05 100644 (file)
@@ -153,6 +153,7 @@ public:
   int watch(pool_t pool, const std::string& o, uint64_t ver, uint64_t *handle, librados::Rados::WatchCtx *ctx);
   int unwatch(pool_t pool, const std::string& o, uint64_t handle);
   int notify(pool_t pool, const std::string& o, uint64_t ver);
+  void set_notify_timeout(pool_t pool, uint32_t timeout);
 
   /* assert version for next sync operations */
   void set_assert_version(pool_t pool, uint64_t ver);
index 9c7a67bd6602a93353943846dadb58fba40745d4..a8d5e576e696aa3dd5b5a86fbbdfd2e81d965b6e 100644 (file)
@@ -94,8 +94,10 @@ public:
     SnapContext snapc;
     uint64_t assert_ver;
     eversion_t last_objver;
+    uint32_t notify_timeout;
 
-    PoolCtx(int pid, const char *n, snapid_t s = CEPH_NOSNAP) : poolid(pid), name(n), snap_seq(s), assert_ver(0) {}
+    PoolCtx(int pid, const char *n, snapid_t s = CEPH_NOSNAP) : poolid(pid), name(n), snap_seq(s), assert_ver(0),
+                                                               notify_timeout(g_conf.client_notify_timeout) {}
 
     void set_snap(snapid_t s) {
       if (!s)
@@ -426,6 +428,10 @@ public:
   void set_assert_version(PoolCtx& pool, uint64_t ver) {
     pool.assert_ver = ver;
   }
+
+  void set_notify_timeout(PoolCtx& pool, uint32_t timeout) {
+    pool.notify_timeout = timeout;
+  }
 };
 
 int RadosClient::init()
@@ -1692,7 +1698,11 @@ int RadosClient::notify(PoolCtx& pool, const object_t& oid, uint64_t ver)
   }
   lock.Lock();
   register_watcher(pool, oid, ctx, &rd, &cookie);
-  rd.notify(cookie, ver);
+  uint32_t prot_ver = 1;
+  uint32_t timeout = pool.notify_timeout;
+  ::encode(prot_ver, inbl);
+  ::encode(timeout, inbl);
+  rd.notify(cookie, ver, inbl);
   objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &objver);
   lock.Unlock();
 
@@ -2211,6 +2221,13 @@ void Rados::set_assert_version(pool_t pool, uint64_t ver)
     return;
   client->set_assert_version(*(RadosClient::PoolCtx *)pool, ver);
 }
+
+void Rados::set_notify_timeout(pool_t pool, uint32_t timeout)
+{
+  if (!client)
+    return;
+  client->set_notify_timeout(*(RadosClient::PoolCtx *)pool, timeout);
+}
 }
 
 // ---------------------------------------------
index ff65d74bff44630e1b09b594ed4ea1257d893796..c63c517d590a3cd64e7e82058d439b195eb272b5 100644 (file)
@@ -1154,6 +1154,17 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
 
    case CEPH_OSD_OP_NOTIFY:
       {
+       uint32_t ver;
+       uint32_t timeout;
+
+       try {
+          ::decode(ver, bp);
+         ::decode(timeout, bp);
+       } catch (const buffer::error &e) {
+         timeout = 0;
+       }
+       if (!timeout || timeout > g_conf.osd_max_notify_timeout)
+               timeout = g_conf.osd_max_notify_timeout;
        dout(0) << "CEPH_OSD_OP_NOTIFY" << dendl;
         ObjectContext *obc = ctx->obc;
        dout(0) << "ctx->obc=" << (void *)obc << dendl;
@@ -1200,7 +1211,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
           obc->ref++;
           notif->obc = obc;
          notif->timeout = new Watch::C_NotifyTimeout(osd, notif);
-         osd->watch_timer.add_event_after(5.0, notif->timeout); /* FIXME: use a configurable timeout here */
+         osd->watch_timer.add_event_after(timeout, notif->timeout);
        }
        osd->watch_lock.Unlock();
       }
index 1594bfe9affa7a665e9c5da1b5d03d886e8f7e1e..0034cddb4f589537d5d96cf700c29b8ee6cbe8d9 100644 (file)
@@ -85,13 +85,14 @@ struct ObjectOperation {
     ops[s].data.append(method, ops[s].op.cls.method_len);
     ops[s].data.append(indata);
   }
-  void add_watch(int op, uint64_t cookie, uint64_t ver, uint8_t flag) {
+  void add_watch(int op, uint64_t cookie, uint64_t ver, uint8_t flag, bufferlist& inbl) {
     int s = ops.size();
     ops.resize(s+1);
     ops[s].op.op = op;
     ops[s].op.watch.cookie = cookie;
     ops[s].op.watch.ver = ver;
     ops[s].op.watch.flag = flag;
+    ops[s].data.append(inbl);
   }
   void add_pgls(int op, uint64_t count, uint64_t cookie) {
     int s = ops.size();
@@ -187,19 +188,22 @@ struct ObjectOperation {
 
   // watch/notify
   void watch(uint64_t cookie, uint64_t ver, bool set) {
-    add_watch(CEPH_OSD_OP_WATCH, cookie, ver, (set ? 1 : 0));
+    bufferlist inbl;
+    add_watch(CEPH_OSD_OP_WATCH, cookie, ver, (set ? 1 : 0), inbl);
   }
 
-  void notify(uint64_t cookie, uint64_t ver) {
-    add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1); 
+  void notify(uint64_t cookie, uint64_t ver, bufferlist& inbl) {
+    add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1, inbl); 
   }
 
   void notify_ack(uint64_t notify_id, uint64_t ver) {
-    add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0);
+    bufferlist bl;
+    add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl);
   }
 
   void assert_version(uint64_t ver) {
-    add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0);
+    bufferlist bl;
+    add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0, bl);
   }
 };
 
index c2ae109eb7f742f3cc9fcb94993c32dee630caca..aaef841cf0c1d0896f1a9d473b7fecc3edc4dc70 100644 (file)
@@ -77,6 +77,7 @@ int main(int argc, const char **argv)
 
   cout << "*** press enter to continue ***" << std::endl;
   getchar();
+  rados.set_notify_timeout(pool, 7);
   r = rados.notify(pool, oid, objver);
   cout << "rados.notify returned " << r << std::endl;
   cout << "*** press enter to continue ***" << std::endl;