From 1c6944f79a3f1400525d13c298e569c1471d9b78 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 21 Nov 2014 15:34:30 -0800 Subject: [PATCH] osd/ReplicatedPG: do watch effects only when change commits Do not tell the client their watch succeeded until it is durable and visible to clients doing a notify. This is less important in the initial watch registration stage, but critical in the disconnect (and disconnect + reconnect) pipelining cases. Simplify the watch_disconnects struct to have exactly the information we need, and no more. Note that the conn is not needed for disconnects (and we don't have a req to provide one in the watch timeout op case). Signed-off-by: Sage Weil --- src/osd/ReplicatedPG.cc | 92 +++++++++++++++++++++++------------------ src/osd/ReplicatedPG.h | 11 ++++- src/osd/Watch.cc | 18 ++++---- src/osd/Watch.h | 5 +-- 4 files changed, 68 insertions(+), 58 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 0c69981d97e15..8564a612602c6 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -2084,12 +2084,17 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) // read or error? if (ctx->op_t->empty() || result < 0) { + // finish side-effects + if (result == 0) + do_osd_op_effects(ctx, m->get_connection()); + if (ctx->pending_async_reads.empty()) { complete_read_ctx(result, ctx); } else { in_progress_async_reads.push_back(make_pair(op, ctx)); ctx->start_async_reads(this); } + return; } @@ -4342,7 +4347,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) << entity << dendl; oi.watchers.erase(oi_iter); t->nop(); // update oi on disk - ctx->watch_disconnects.push_back(w); + ctx->watch_disconnects.push_back( + OpContext::watch_disconnect_t(cookie, entity, false)); } else { dout(10) << " can't remove: no watch by " << entity << dendl; } @@ -5365,16 +5371,34 @@ void ReplicatedPG::add_interval_usage(interval_set& s, object_stat_sum } } -void ReplicatedPG::do_osd_op_effects(OpContext *ctx) +void ReplicatedPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn) { - ConnectionRef conn(ctx->op->get_req()->get_connection()); + entity_name_t entity = ctx->reqid.name; + dout(15) << "do_osd_op_effects " << entity << " con " << conn.get() << dendl; + + // disconnects first + for (list::iterator i = + ctx->watch_disconnects.begin(); + i != ctx->watch_disconnects.end(); + ++i) { + pair watcher(i->cookie, i->name); + if (ctx->obc->watchers.count(watcher)) { + WatchRef watch = ctx->obc->watchers[watcher]; + dout(10) << "do_osd_op_effects disconnect watcher " << watcher << dendl; + ctx->obc->watchers.erase(watcher); + watch->remove(i->send_disconnect); + } else { + dout(10) << "do_osd_op_effects disconnect failed to find watcher " + << watcher << dendl; + } + } + + if (!conn) + return; boost::intrusive_ptr session((OSD::Session *)conn->get_priv()); if (!session.get()) return; session->put(); // get_priv() takes a ref, and so does the intrusive_ptr - entity_name_t entity = ctx->reqid.name; - - dout(15) << "do_osd_op_effects on session " << session.get() << dendl; for (list >::iterator i = ctx->watch_connects.begin(); i != ctx->watch_connects.end(); @@ -5401,28 +5425,11 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) watch->connect(conn, i->second); } - for (list::iterator i = ctx->watch_disconnects.begin(); - i != ctx->watch_disconnects.end(); - ++i) { - pair watcher(i->cookie, entity); - dout(15) << "do_osd_op_effects applying watch disconnect on session " - << session.get() << " and watcher " << watcher << dendl; - if (ctx->obc->watchers.count(watcher)) { - WatchRef watch = ctx->obc->watchers[watcher]; - dout(10) << "do_osd_op_effects applying disconnect found watcher " - << watcher << dendl; - ctx->obc->watchers.erase(watcher); - watch->remove(); - } else { - dout(10) << "do_osd_op_effects failed to find watcher " - << watcher << dendl; - } - } - for (list::iterator p = ctx->notifies.begin(); p != ctx->notifies.end(); ++p) { dout(10) << "do_osd_op_effects, notify " << *p << dendl; + ConnectionRef conn(ctx->op->get_req()->get_connection()); NotifyRef notif( Notify::makeNotifyRef( conn, @@ -5494,10 +5501,6 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) if (result < 0) return result; - // finish side-effects - if (result == 0) - do_osd_op_effects(ctx); - // read-op? done? if (ctx->op_t->empty() && !ctx->modify) { unstable_stats.add(ctx->delta_stats); @@ -7084,6 +7087,11 @@ void ReplicatedPG::eval_repop(RepGather *repop) if (repop->all_applied && repop->all_committed) { repop->rep_done = true; + do_osd_op_effects( + repop->ctx, + repop->ctx->op ? repop->ctx->op->get_req()->get_connection() : + ConnectionRef()); + calc_min_last_complete_ondisk(); // kick snap_trimmer if necessary @@ -7430,12 +7438,6 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) return; } - watch->send_disconnect(); - - obc->watchers.erase(make_pair(watch->get_cookie(), watch->get_entity())); - obc->obs.oi.watchers.erase(make_pair(watch->get_cookie(), watch->get_entity())); - watch->remove(); - vector ops; ceph_tid_t rep_tid = osd->get_tid(); osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); @@ -7444,22 +7446,30 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) ctx->mtime = ceph_clock_now(cct); ctx->at_version = get_next_version(); - entity_inst_t nobody; + object_info_t& oi = ctx->new_obs.oi; + oi.watchers.erase(make_pair(watch->get_cookie(), + watch->get_entity())); - RepGather *repop = new_repop(ctx, obc, rep_tid); + watch_info_t w(watch->get_cookie(), + cct->_conf->osd_client_watch_timeout, // fixme someday + watch->get_peer_addr()); + ctx->watch_disconnects.push_back( + OpContext::watch_disconnect_t(watch->get_cookie(), watch->get_entity(), true)); - PGBackend::PGTransaction *t = ctx->op_t; + entity_inst_t nobody; + RepGather *repop = new_repop(ctx, obc, rep_tid); + PGBackend::PGTransaction *t = ctx->op_t; ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, obc->obs.oi.soid, ctx->at_version, - obc->obs.oi.version, + oi.version, 0, osd_reqid_t(), ctx->mtime)); - obc->obs.oi.prior_version = repop->obc->obs.oi.version; - obc->obs.oi.version = ctx->at_version; + oi.prior_version = repop->obc->obs.oi.version; + oi.version = ctx->at_version; bufferlist bl; - ::encode(obc->obs.oi, bl); + ::encode(oi, bl); setattr_maybe_cache(obc, repop->ctx, t, OI_ATTR, bl); if (pool.info.require_rollback()) { diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 30da075e6b5ad..484112c2568e5 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -442,7 +442,14 @@ public: // side effects list > watch_connects; ///< new watch + will_ping flag - list watch_disconnects; + struct watch_disconnect_t { + uint64_t cookie; + entity_name_t name; + bool send_disconnect; + watch_disconnect_t(uint64_t c, entity_name_t n, bool sd) + : cookie(c), name(n), send_disconnect(sd) {} + }; + list watch_disconnects; ///< old watch + send_discon list notifies; struct NotifyAck { boost::optional watch_cookie; @@ -1304,7 +1311,7 @@ public: int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op); int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl); - void do_osd_op_effects(OpContext *ctx); + void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn); private: hobject_t earliest_backfill() const; bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const; diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc index aab917edd576d..3e440b7823554 100644 --- a/src/osd/Watch.cc +++ b/src/osd/Watch.cc @@ -427,9 +427,15 @@ bool Watch::is_discarded() return discarded; } -void Watch::remove() +void Watch::remove(bool send_disconnect) { dout(10) << "remove" << dendl; + if (send_disconnect && conn) { + bufferlist empty; + MWatchNotify *reply(new MWatchNotify(cookie, 0, 0, + CEPH_WATCH_EVENT_DISCONNECT, empty)); + conn->send_message(reply); + } for (map::iterator i = in_progress_notifies.begin(); i != in_progress_notifies.end(); ++i) { @@ -476,16 +482,6 @@ void Watch::send_notify(NotifyRef notif) conn->send_message(notify_msg); } -void Watch::send_disconnect() -{ - if (!conn) - return; - bufferlist empty; - MWatchNotify *reply(new MWatchNotify(cookie, 0, 0, - CEPH_WATCH_EVENT_DISCONNECT, empty)); - conn->send_message(reply); -} - void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl) { dout(10) << "notify_ack" << dendl; diff --git a/src/osd/Watch.h b/src/osd/Watch.h index 1a76691125d44..6e4ec37a6b51f 100644 --- a/src/osd/Watch.h +++ b/src/osd/Watch.h @@ -203,9 +203,6 @@ public: return conn.get() != NULL; } - /// send a disconnect notice to the client - void send_disconnect(); - /// NOTE: must be called with pg lock held ~Watch(); @@ -253,7 +250,7 @@ public: bool is_discarded(); /// Called on unwatch - void remove(); + void remove(bool send_disconnect); /// Adds notif as in-progress notify void start_notify( -- 2.39.5