]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Objecter: spit out linger register step
authorSage Weil <sage@redhat.com>
Fri, 14 Nov 2014 21:32:12 +0000 (13:32 -0800)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:34:04 +0000 (10:34 -0800)
Signed-off-by: Sage Weil <sage@redhat.com>
src/librados/IoCtxImpl.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 8caae8cbd4f0aceec85504d742c01002a8c9297f..c68ab7e594bd0b867ef426510ad34fe73720daec 100644 (file)
@@ -1050,15 +1050,16 @@ int librados::IoCtxImpl::watch(const object_t& oid,
   WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
   wc->watch_ctx = ctx;
   wc->watch_ctx2 = ctx2;
+  wc->linger_op = objecter->linger_register(oid, oloc, 0);
   client->register_watch_notify_callback(wc, cookie);
   prepare_assert_ops(&wr);
   wr.watch(*cookie, CEPH_OSD_WATCH_OP_WATCH);
   bufferlist bl;
-  wc->linger_op = objecter->linger_watch(oid, oloc, wr,
-                                        snapc, ceph_clock_now(NULL), bl,
-                                        *cookie, 0,
-                                        NULL, onfinish, &wc->on_error,
-                                        &objver);
+  objecter->linger_watch(wc->linger_op, wr,
+                        snapc, ceph_clock_now(NULL), bl,
+                        *cookie,
+                        NULL, onfinish, &wc->on_error,
+                        &objver);
   lock->Unlock();
 
   mylock.Lock();
@@ -1158,6 +1159,7 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
 
   // Acquire cookie
   uint64_t cookie;
+  wc->linger_op = objecter->linger_register(oid, oloc, 0);
   client->register_watch_notify_callback(wc, &cookie);
   uint32_t prot_ver = 1;
   uint32_t timeout = notify_timeout;
@@ -1175,8 +1177,9 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
   // Issue RADOS op
   C_SaferCond onack;
   version_t objver;
-  wc->linger_op = objecter->linger_notify(oid, oloc, rd, snap_seq, inbl, NULL, 0,
-                                         &onack, &objver);
+  objecter->linger_notify(wc->linger_op,
+                         rd, snap_seq, inbl, NULL,
+                         &onack, &objver);
   lock->Unlock();
 
   ldout(client->cct, 10) << __func__ << " issued linger op " << wc->linger_op << dendl;
index db2aecc4948d2815156aee18e9c8d9c2c5f4bf29..a4a33b327ea7c47dc0791d10d23e09863bb424f1 100644 (file)
@@ -591,23 +591,43 @@ void Objecter::_linger_cancel(LingerOp *info)
   }
 }
 
-Objecter::LingerOp *Objecter::linger_watch(const object_t& oid,
-                                const object_locator_t& oloc,
-                                ObjectOperation& op,
-                                const SnapContext& snapc, utime_t mtime,
-                                bufferlist& inbl, uint64_t cookie, int flags,
-                                Context *onack, Context *oncommit,
-                                Context *onerror,
-                                version_t *objver)
+
+
+Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
+                                             const object_locator_t& oloc,
+                                             int flags)
 {
   LingerOp *info = new LingerOp;
   info->target.base_oid = oid;
   info->target.base_oloc = oloc;
   if (info->target.base_oloc.key == oid)
     info->target.base_oloc.key.clear();
+  info->target.flags = flags;
+  info->watch_valid_thru = ceph_clock_now(NULL);
+
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
+  // Acquire linger ID
+  info->linger_id = ++max_linger_id;
+  ldout(cct, 10) << __func__ << " info " << info
+                << " linger_id " << info->linger_id << dendl;
+  linger_ops[info->linger_id] = info;
+
+  info->get(); // for the caller
+  return info;
+}
+
+ceph_tid_t Objecter::linger_watch(LingerOp *info,
+                                 ObjectOperation& op,
+                                 const SnapContext& snapc, utime_t mtime,
+                                 bufferlist& inbl, uint64_t cookie,
+                                 Context *onack, Context *oncommit,
+                                 Context *onerror,
+                                 version_t *objver)
+{
   info->snapc = snapc;
   info->mtime = mtime;
-  info->target.flags = flags | CEPH_OSD_FLAG_WRITE;
+  info->target.flags |= CEPH_OSD_FLAG_WRITE;
   info->ops = op.ops;
   info->cookie = cookie;
   info->inbl = inbl;
@@ -616,31 +636,23 @@ Objecter::LingerOp *Objecter::linger_watch(const object_t& oid,
   info->on_reg_ack = onack;
   info->on_reg_commit = oncommit;
   info->on_error = onerror;
-  info->watch_valid_thru = ceph_clock_now(NULL);
 
   RWLock::WLocker wl(rwlock);
   _linger_submit(info);
   logger->inc(l_osdc_linger_active);
 
-  info->get(); // for the caller
-  return info;
+  return info->linger_id;
 }
 
-Objecter::LingerOp *Objecter::linger_notify(const object_t& oid,
-                                           const object_locator_t& oloc,
-                                           ObjectOperation& op,
-                                           snapid_t snap, bufferlist& inbl,
-                                           bufferlist *poutbl, int flags,
-                                           Context *onfinish,
-                                           version_t *objver)
+ceph_tid_t Objecter::linger_notify(LingerOp *info,
+                                  ObjectOperation& op,
+                                  snapid_t snap, bufferlist& inbl,
+                                  bufferlist *poutbl,
+                                  Context *onfinish,
+                                  version_t *objver)
 {
-  LingerOp *info = new LingerOp;
-  info->target.base_oid = oid;
-  info->target.base_oloc = oloc;
-  if (info->target.base_oloc.key == oid)
-    info->target.base_oloc.key.clear();
   info->snap = snap;
-  info->target.flags = flags | CEPH_OSD_FLAG_READ;
+  info->target.flags |= CEPH_OSD_FLAG_READ;
   info->ops = op.ops;
   info->inbl = inbl;
   info->poutbl = poutbl;
@@ -650,8 +662,8 @@ Objecter::LingerOp *Objecter::linger_notify(const object_t& oid,
   RWLock::WLocker wl(rwlock);
   _linger_submit(info);
   logger->inc(l_osdc_linger_active);
-  info->get(); // for the caller
-  return info;
+
+  return info->linger_id;
 }
 
 void Objecter::_linger_submit(LingerOp *info)
@@ -659,11 +671,7 @@ void Objecter::_linger_submit(LingerOp *info)
   assert(rwlock.is_wlocked());
   RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
 
-  // Acquire linger ID
-  info->linger_id = ++max_linger_id;
-  ldout(cct, 10) << __func__ << " info " << info
-                << " linger_id " << info->linger_id << dendl;
-  linger_ops[info->linger_id] = info;
+  assert(info->linger_id);
 
   // Populate Op::target
   OSDSession *s = NULL;
index 7bc50c84a8e68924b03f0669de39a3dcf4531e0a..5ae14ffb73617431dfdbebaea368acc0a86beed4 100644 (file)
@@ -1964,18 +1964,20 @@ public:
   }
 
   // caller owns a ref
-  LingerOp *linger_watch(const object_t& oid, const object_locator_t& oloc,
-                        ObjectOperation& op,
-                        const SnapContext& snapc, utime_t mtime,
-                        bufferlist& inbl, uint64_t cookie, int flags,
-                        Context *onack, Context *onfinish, Context *onerror,
-                        version_t *objver);
-  LingerOp *linger_notify(const object_t& oid, const object_locator_t& oloc,
+  LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
+                           int flags);
+  ceph_tid_t linger_watch(LingerOp *info,
                          ObjectOperation& op,
-                         snapid_t snap, bufferlist& inbl,
-                         bufferlist *poutbl, int flags,
-                         Context *onack,
+                         const SnapContext& snapc, utime_t mtime,
+                         bufferlist& inbl, uint64_t cookie,
+                         Context *onack, Context *onfinish, Context *onerror,
                          version_t *objver);
+  ceph_tid_t linger_notify(LingerOp *info,
+                          ObjectOperation& op,
+                          snapid_t snap, bufferlist& inbl,
+                          bufferlist *poutbl,
+                          Context *onack,
+                          version_t *objver);
   int linger_check(LingerOp *info);
   void linger_cancel(LingerOp *info);  // releases a reference
   void _linger_cancel(LingerOp *info);