]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/ReplicatedPG: do watch effects only when change commits
authorSage Weil <sage@redhat.com>
Fri, 21 Nov 2014 23:34:30 +0000 (15:34 -0800)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:39:20 +0000 (10:39 -0800)
Do not tell the client their watch succeeded until it is durable and
visible to clients doing a notify.  This is less important in the initial
watch registration stage, but critical in the disconnect (and disconnect
+ reconnect) pipelining cases.

Simplify the watch_disconnects struct to have exactly the information
we need, and no more.  Note that the conn is not needed for disconnects
(and we don't have a req to provide one in the watch timeout op case).

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/Watch.cc
src/osd/Watch.h

index 0c69981d97e15d0401cdca5b80dcfedd49b1aac2..8564a612602c6e922766fd2bb0592f0c578223ab 100644 (file)
@@ -2084,12 +2084,17 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
 
   // read or error?
   if (ctx->op_t->empty() || result < 0) {
+    // finish side-effects
+    if (result == 0)
+      do_osd_op_effects(ctx, m->get_connection());
+
     if (ctx->pending_async_reads.empty()) {
       complete_read_ctx(result, ctx);
     } else {
       in_progress_async_reads.push_back(make_pair(op, ctx));
       ctx->start_async_reads(this);
     }
+
     return;
   }
 
@@ -4342,7 +4347,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
                     << entity << dendl;
             oi.watchers.erase(oi_iter);
            t->nop();  // update oi on disk
-           ctx->watch_disconnects.push_back(w);
+           ctx->watch_disconnects.push_back(
+             OpContext::watch_disconnect_t(cookie, entity, false));
          } else {
            dout(10) << " can't remove: no watch by " << entity << dendl;
          }
@@ -5365,16 +5371,34 @@ void ReplicatedPG::add_interval_usage(interval_set<uint64_t>& s, object_stat_sum
   }
 }
 
-void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
+void ReplicatedPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn)
 {
-  ConnectionRef conn(ctx->op->get_req()->get_connection());
+  entity_name_t entity = ctx->reqid.name;
+  dout(15) << "do_osd_op_effects " << entity << " con " << conn.get() << dendl;
+
+  // disconnects first
+  for (list<OpContext::watch_disconnect_t>::iterator i =
+        ctx->watch_disconnects.begin();
+       i != ctx->watch_disconnects.end();
+       ++i) {
+    pair<uint64_t, entity_name_t> watcher(i->cookie, i->name);
+    if (ctx->obc->watchers.count(watcher)) {
+      WatchRef watch = ctx->obc->watchers[watcher];
+      dout(10) << "do_osd_op_effects disconnect watcher " << watcher << dendl;
+      ctx->obc->watchers.erase(watcher);
+      watch->remove(i->send_disconnect);
+    } else {
+      dout(10) << "do_osd_op_effects disconnect failed to find watcher "
+              << watcher << dendl;
+    }
+  }
+
+  if (!conn)
+    return;
   boost::intrusive_ptr<OSD::Session> session((OSD::Session *)conn->get_priv());
   if (!session.get())
     return;
   session->put();  // get_priv() takes a ref, and so does the intrusive_ptr
-  entity_name_t entity = ctx->reqid.name;
-
-  dout(15) << "do_osd_op_effects on session " << session.get() << dendl;
 
   for (list<pair<watch_info_t,bool> >::iterator i = ctx->watch_connects.begin();
        i != ctx->watch_connects.end();
@@ -5401,28 +5425,11 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
     watch->connect(conn, i->second);
   }
 
-  for (list<watch_info_t>::iterator i = ctx->watch_disconnects.begin();
-       i != ctx->watch_disconnects.end();
-       ++i) {
-    pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
-    dout(15) << "do_osd_op_effects applying watch disconnect on session "
-            << session.get() << " and watcher " << watcher << dendl;
-    if (ctx->obc->watchers.count(watcher)) {
-      WatchRef watch = ctx->obc->watchers[watcher];
-      dout(10) << "do_osd_op_effects applying disconnect found watcher "
-              << watcher << dendl;
-      ctx->obc->watchers.erase(watcher);
-      watch->remove();
-    } else {
-      dout(10) << "do_osd_op_effects failed to find watcher "
-              << watcher << dendl;
-    }
-  }
-
   for (list<notify_info_t>::iterator p = ctx->notifies.begin();
        p != ctx->notifies.end();
        ++p) {
     dout(10) << "do_osd_op_effects, notify " << *p << dendl;
+    ConnectionRef conn(ctx->op->get_req()->get_connection());
     NotifyRef notif(
       Notify::makeNotifyRef(
        conn,
@@ -5494,10 +5501,6 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
   if (result < 0)
     return result;
 
-  // finish side-effects
-  if (result == 0)
-    do_osd_op_effects(ctx);
-
   // read-op?  done?
   if (ctx->op_t->empty() && !ctx->modify) {
     unstable_stats.add(ctx->delta_stats);
@@ -7084,6 +7087,11 @@ void ReplicatedPG::eval_repop(RepGather *repop)
   if (repop->all_applied && repop->all_committed) {
     repop->rep_done = true;
 
+    do_osd_op_effects(
+      repop->ctx,
+      repop->ctx->op ? repop->ctx->op->get_req()->get_connection() :
+      ConnectionRef());
+
     calc_min_last_complete_ondisk();
 
     // kick snap_trimmer if necessary
@@ -7430,12 +7438,6 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
     return;
   }
 
-  watch->send_disconnect();
-
-  obc->watchers.erase(make_pair(watch->get_cookie(), watch->get_entity()));
-  obc->obs.oi.watchers.erase(make_pair(watch->get_cookie(), watch->get_entity()));
-  watch->remove();
-
   vector<OSDOp> ops;
   ceph_tid_t rep_tid = osd->get_tid();
   osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
@@ -7444,22 +7446,30 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
   ctx->mtime = ceph_clock_now(cct);
   ctx->at_version = get_next_version();
 
-  entity_inst_t nobody;
+  object_info_t& oi = ctx->new_obs.oi;
+  oi.watchers.erase(make_pair(watch->get_cookie(),
+                             watch->get_entity()));
 
-  RepGather *repop = new_repop(ctx, obc, rep_tid);
+  watch_info_t w(watch->get_cookie(),
+                cct->_conf->osd_client_watch_timeout,  // fixme someday
+                watch->get_peer_addr());
+  ctx->watch_disconnects.push_back(
+    OpContext::watch_disconnect_t(watch->get_cookie(), watch->get_entity(), true));
 
-  PGBackend::PGTransaction *t = ctx->op_t;
 
+  entity_inst_t nobody;
+  RepGather *repop = new_repop(ctx, obc, rep_tid);
+  PGBackend::PGTransaction *t = ctx->op_t;
   ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, obc->obs.oi.soid,
                                    ctx->at_version,
-                                   obc->obs.oi.version,
+                                   oi.version,
                                    0,
                                    osd_reqid_t(), ctx->mtime));
 
-  obc->obs.oi.prior_version = repop->obc->obs.oi.version;
-  obc->obs.oi.version = ctx->at_version;
+  oi.prior_version = repop->obc->obs.oi.version;
+  oi.version = ctx->at_version;
   bufferlist bl;
-  ::encode(obc->obs.oi, bl);
+  ::encode(oi, bl);
   setattr_maybe_cache(obc, repop->ctx, t, OI_ATTR, bl);
 
   if (pool.info.require_rollback()) {
index 30da075e6b5ad6783f4af117367466393b59b24e..484112c2568e5081f765ee66948f9bb2856ead5b 100644 (file)
@@ -442,7 +442,14 @@ public:
 
     // side effects
     list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
-    list<watch_info_t> watch_disconnects;
+    struct watch_disconnect_t {
+      uint64_t cookie;
+      entity_name_t name;
+      bool send_disconnect;
+      watch_disconnect_t(uint64_t c, entity_name_t n, bool sd)
+       : cookie(c), name(n), send_disconnect(sd) {}
+    };
+    list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
     list<notify_info_t> notifies;
     struct NotifyAck {
       boost::optional<uint64_t> watch_cookie;
@@ -1304,7 +1311,7 @@ public:
   int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op);
   int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl);
 
-  void do_osd_op_effects(OpContext *ctx);
+  void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
 private:
   hobject_t earliest_backfill() const;
   bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
index aab917edd576dd5817da0aa728a10669effb337b..3e440b782355490da210ee13a4c8f92dc06b6960 100644 (file)
@@ -427,9 +427,15 @@ bool Watch::is_discarded()
   return discarded;
 }
 
-void Watch::remove()
+void Watch::remove(bool send_disconnect)
 {
   dout(10) << "remove" << dendl;
+  if (send_disconnect && conn) {
+    bufferlist empty;
+    MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
+                                        CEPH_WATCH_EVENT_DISCONNECT, empty));
+    conn->send_message(reply);
+  }
   for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
        i != in_progress_notifies.end();
        ++i) {
@@ -476,16 +482,6 @@ void Watch::send_notify(NotifyRef notif)
   conn->send_message(notify_msg);
 }
 
-void Watch::send_disconnect()
-{
-  if (!conn)
-    return;
-  bufferlist empty;
-  MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
-                                      CEPH_WATCH_EVENT_DISCONNECT, empty));
-  conn->send_message(reply);
-}
-
 void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
 {
   dout(10) << "notify_ack" << dendl;
index 1a76691125d44de2a36c258a651d498afd9c50a0..6e4ec37a6b51f13c09a780e9b172d9290078377f 100644 (file)
@@ -203,9 +203,6 @@ public:
     return conn.get() != NULL;
   }
 
-  /// send a disconnect notice to the client
-  void send_disconnect();
-
   /// NOTE: must be called with pg lock held
   ~Watch();
 
@@ -253,7 +250,7 @@ public:
   bool is_discarded();
 
   /// Called on unwatch
-  void remove();
+  void remove(bool send_disconnect);
 
   /// Adds notif as in-progress notify
   void start_notify(