]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd, librados: misc fixes, linger related issues
authorYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 30 Nov 2010 21:21:50 +0000 (13:21 -0800)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 30 Nov 2010 21:21:50 +0000 (13:21 -0800)
src/librados.cc
src/osd/ReplicatedPG.cc
src/osd/osd_types.h
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/testradospp.cc

index 8a8797c0fae7cf1c4b3b80c2b9650f5433830769..506d7aedb6e9ecb807d86f5d460d3e198b785aee 100644 (file)
@@ -77,43 +77,6 @@ class RadosClient : public Dispatcher
   Cond cond;
   SafeTimer timer;
 
-  // watch/notify
-  struct WatchContext : public Context {
-    librados::Rados::WatchCtx *ctx;
-    uint64_t cookie;
-    uint64_t ver;
-
-    WatchContext(librados::Rados::WatchCtx *_ctx) : ctx(_ctx) {}
-    ~WatchContext() {
-      delete ctx;
-    }
-    void finish(int r) {
-      ctx->finish(r);
-    }
-  };
-
-  Mutex watch_lock;
-  uint64_t max_watch_cookie;
-  map<uint64_t, WatchContext *> watchers;
-  pthread_key_t tls_key;
-
-  RadosTlsInfo *tls_info() {
-    struct RadosTlsInfo *info = (RadosTlsInfo *)pthread_getspecific(tls_key);
-    cout << "pthread_getspecific returned " << (void *)info << std::endl;
-    if (!info) {
-      info = new RadosTlsInfo(tls_key);
-      pthread_setspecific(tls_key, info);
-    }
-    return info;
-  }
-
-  void set_sync_op_version(eversion_t& ver) {
-    RadosTlsInfo *info = tls_info();
-    if (info)
-      info->objver = ver;
-  }
-
 public:
   RadosClient() : messenger(NULL), lock("radosclient"), timer(lock),
                   watch_lock("watch_lock"), max_watch_cookie(0) {
@@ -390,8 +353,10 @@ public:
     uint64_t cookie;
     uint64_t ver;
     librados::Rados::WatchCtx *ctx;
+    ObjectOperation *op;
 
-    WatchContext(PoolCtx& _pc, const object_t& _oc, librados::Rados::WatchCtx *_ctx) : pool_ctx(_pc), oid(_oc), ctx(_ctx) {}
+    WatchContext(PoolCtx& _pc, const object_t& _oc, librados::Rados::WatchCtx *_ctx,
+                 ObjectOperation *_op) : pool_ctx(_pc), oid(_oc), ctx(_ctx), op(_op) {}
     ~WatchContext() {
       delete ctx;
     }
@@ -426,8 +391,9 @@ public:
       pool.last_objver = ver;
   }
 
-  int register_watcher(PoolCtx& pool, const object_t& oid, librados::Rados::WatchCtx *ctx, uint64_t *cookie) {
-    WatchContext *wc = new WatchContext(pool, oid, ctx);
+  int register_watcher(PoolCtx& pool, const object_t& oid, librados::Rados::WatchCtx *ctx,
+                       ObjectOperation *op, uint64_t *cookie) {
+    WatchContext *wc = new WatchContext(pool, oid, ctx, op);
     if (!wc)
       return -ENOMEM;
     watch_lock.Lock();
@@ -1604,9 +1570,14 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_
   utime_t ut = g_clock.now();
   bufferlist inbl, outbl;
 
-  int r = register_watcher(pool, oid, ctx, cookie);
-  if (r < 0)
+  ObjectOperation *rd = new ObjectOperation();
+  if (!rd)
+    return -ENOMEM;
+  int r = register_watcher(pool, oid, ctx, rd, cookie);
+  if (r < 0) {
+    delete rd;
     return r;
+  }
 
   Mutex mylock("RadosClient::watch::mylock");
   Cond cond;
@@ -1616,13 +1587,14 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_
 
   lock.Lock();
   object_locator_t oloc(pool.poolid);
-  ObjectOperation rd;
+
   if (pool.assert_ver) {
-    rd.assert_version(pool.assert_ver);
+    rd->assert_version(pool.assert_ver);
     pool.assert_ver = 0;
   }
-  rd.watch(*cookie, ver, 1);
-  objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &objver);
+  rd->watch(*cookie, ver, 1);
+  rd->set_linger(true);
+  objecter->read(oid, oloc, *rd, pool.snap_seq, &outbl, 0, onack, &objver);
   lock.Unlock();
 
   mylock.Lock();
@@ -1632,6 +1604,10 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_
 
   set_sync_op_version(pool, objver);
 
+  if (r < 0) {
+    unregister_watcher(*cookie);
+  }
+
   return r;
 }
 
@@ -1713,13 +1689,13 @@ int RadosClient::notify(PoolCtx& pool, const object_t& oid, uint64_t ver)
   eversion_t objver;
   uint64_t cookie;
   C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);
+  ObjectOperation rd;
 
-  r = register_watcher(pool, oid, ctx, &cookie);
+  r = register_watcher(pool, oid, ctx, &rd, &cookie);
   if (r < 0)
     return r;
 
   object_locator_t oloc(pool.poolid);
-  ObjectOperation rd;
   if (pool.assert_ver) {
     rd.assert_version(pool.assert_ver);
     pool.assert_ver = 0;
index 3562d50ddae96e63b1e54556e2960db6030b9d97..563c48f43fba5f15b942c4feb66a6c66fce2bd33 100644 (file)
@@ -1415,8 +1415,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
            session->get();
            session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
            obc->ref++;
+#if 0
            assert(obc->unconnected_watchers.count(entity));
            obc->unconnected_watchers.erase(entity);
+#endif
          } else if (iter->second == session) {
            // already there
            dout(10) << " already connected to watch " << w << " by " << entity
@@ -2554,7 +2556,7 @@ int ReplicatedPG::find_object_context(const object_t& oid, const object_locator_
   // want the head?
   sobject_t head(oid, CEPH_NOSNAP);
   if (snapid == CEPH_NOSNAP) {
-    ObjectContext *obc = get_object_context(head, OLOC_BLANK, can_create);
+    ObjectContext *obc = get_object_context(head, oloc, can_create);
     if (!obc)
       return -ENOENT;
     dout(10) << "find_object_context " << oid << " @" << snapid << dendl;
index 4dd17fc82ddffcf92496b96f011b1e81268d1302..09d080caf5758c88ebc903a4df43dedc2a62565d 100644 (file)
@@ -1389,7 +1389,7 @@ struct object_info_t {
   }
 
   object_info_t(const sobject_t& s, const object_locator_t& o) :
-    soid(s), size(0),
+    soid(s), oloc(o), size(0),
     truncate_seq(0), truncate_size(0) {}
   object_info_t(bufferlist& bl) {
     decode(bl);
index 994951cdffb166e8fac63df51bdaab3d8a6bb521..97245d5044e4ba332e85adb9f31f8a40d492d16e 100644 (file)
@@ -445,9 +445,11 @@ tid_t Objecter::op_submit(Op *op)
     dout(20) << " note: not requesting commit" << dendl;
   }
   op_osd[op->tid] = op;
+
   if (op->linger) {
     op_osd_linger[op->tid] = op;
   }
+
   pg.active_tids.insert(op->tid);
   pg.last = g_clock.now();
 
index da7666b3bb761af1ac0b1969aad8b5523d3511ab..512c4f7486d32aef93530e229d567b3656e9afb4 100644 (file)
@@ -48,8 +48,9 @@ struct ObjectOperation {
   vector<OSDOp> ops;
   int flags;
   int priority;
+  bool linger;
 
-  ObjectOperation() : flags(0), priority(0) {}
+  ObjectOperation() : flags(0), priority(0), linger(false) {}
 
   void add_op(int op) {
     int s = ops.size();
@@ -100,6 +101,9 @@ struct ObjectOperation {
     ops[s].op.pgls.count = count;
     ops[s].op.pgls.cookie = cookie;
   }
+  void set_linger(bool l) {
+    linger = l;
+  }
 
   // ------
 
@@ -271,6 +275,8 @@ public:
 
     eversion_t *objver;
 
+    bool linger;
+
     Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
        int f, Context *ac, Context *co, eversion_t *ov) :
       session_item(this),
@@ -278,7 +284,7 @@ public:
       con(NULL),
       snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co), 
       tid(0), attempts(0),
-      paused(false), objver(ov) {
+      paused(false), objver(ov), linger(false) {
       ops.swap(op);
     }
   };
index a2c4449f00427411c9855ca764b857278e92db51..48b0fc14a3d15fb4ac87acd9ededb7e656b311e8 100644 (file)
@@ -67,7 +67,7 @@ int main(int argc, const char **argv)
   cout << "open pool result = " << r << " pool = " << pool << std::endl;
 
   r = rados.write(pool, oid, 0, bl, bl.length());
-  uint64_t objver = rados.get_last_ver(pool);
+  uint64_t objver = rados.get_last_version(pool);
   cout << "rados.write returned " << r << " last_ver=" << objver << std::endl;
 
   uint64_t handle;
@@ -82,7 +82,7 @@ int main(int argc, const char **argv)
   cout << "*** press enter to continue ***" << std::endl;
   getchar();
 
-  rados.set_assert_ver(pool, objver);
+  rados.set_assert_version(pool, objver);
 
   r = rados.write(pool, oid, 0, bl, bl.length() - 1);
   cout << "rados.write returned " << r << std::endl;