]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: allow multiple watches in one transaction
authorSamuel Just <sam.just@inktank.com>
Wed, 20 Feb 2013 00:19:20 +0000 (16:19 -0800)
committerSamuel Just <sam.just@inktank.com>
Wed, 20 Feb 2013 21:29:20 +0000 (13:29 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 5d1d3ab95421ae471e72b0cd4d706990adb8a321..83502de7d1248580b41759fa94cb3e02ff84a038 100644 (file)
@@ -2476,8 +2476,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            oi.watchers[make_pair(cookie, entity)] = w;
            t.nop();  // make sure update the object_info on disk!
          }
-         ctx->watch_connect = true;
-         ctx->watch_info = w;
+         ctx->watch_connects.push_back(w);
          assert(obc->registered);
         } else {
          map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
@@ -2487,9 +2486,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
                     << entity << dendl;
             oi.watchers.erase(oi_iter);
            t.nop();  // update oi on disk
-
-           ctx->watch_disconnect = true;
-           ctx->watch_info = w;
+           ctx->watch_disconnects.push_back(w);
          } else {
            dout(10) << " can't remove: no watch by " << entity << dendl;
          }
@@ -3259,8 +3256,10 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
 
   dout(15) << "do_osd_op_effects on session " << session.get() << dendl;
 
-  if (ctx->watch_connect) {
-    pair<uint64_t, entity_name_t> watcher(ctx->watch_info.cookie, entity);
+  for (list<watch_info_t>::iterator i = ctx->watch_connects.begin();
+       i != ctx->watch_connects.end();
+       ++i) {
+    pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
     dout(15) << "do_osd_op_effects applying watch connect on session "
             << session.get() << " watcher " << watcher << dendl;
     WatchRef watch;
@@ -3272,8 +3271,8 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
       dout(15) << "do_osd_op_effects new watcher " << watcher
               << dendl;
       watch = Watch::makeWatchRef(
-       this, osd, ctx->obc, ctx->watch_info.timeout_seconds,
-       ctx->watch_info.cookie, entity);
+       this, osd, ctx->obc, i->timeout_seconds,
+       i->cookie, entity);
       ctx->obc->watchers.insert(
        make_pair(
          watcher,
@@ -3282,8 +3281,10 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
     watch->connect(conn);
   }
 
-  if (ctx->watch_disconnect) {
-    pair<uint64_t, entity_name_t> watcher(ctx->watch_info.cookie, entity);
+  for (list<watch_info_t>::iterator i = ctx->watch_disconnects.begin();
+       i != ctx->watch_disconnects.end();
+       ++i) {
+    pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
     dout(15) << "do_osd_op_effects applying watch disconnect on session "
             << session.get() << " and watcher " << watcher << dendl;
     if (ctx->obc->watchers.count(watcher)) {
index a33b86c85c7cd93f24bd3d3663e113884af30521..50af643163a2358571ef297b06a6e470ea9f24ad 100644 (file)
@@ -260,8 +260,8 @@ public:
     bool user_modify;     // user-visible modification
 
     // side effects
-    bool watch_connect, watch_disconnect;
-    watch_info_t watch_info;
+    list<watch_info_t> watch_connects;
+    list<watch_info_t> watch_disconnects;
     list<notify_info_t> notifies;
     struct NotifyAck {
       boost::optional<uint64_t> watch_cookie;
@@ -304,7 +304,6 @@ public:
       op(_op), reqid(_reqid), ops(_ops), obs(_obs), snapset(0),
       new_obs(_obs->oi, _obs->exists),
       modify(false), user_modify(false),
-      watch_connect(false), watch_disconnect(false),
       bytes_written(0), bytes_read(0),
       obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) { 
       if (_ssc) {