From: Samuel Just Date: Thu, 17 Apr 2014 19:27:07 +0000 (-0700) Subject: osd/: propogate hit_set history with repop X-Git-Tag: v0.80-rc1~13^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5821cc7e55940b1722d35b3f447a9a954c60eb5a;p=ceph.git osd/: propogate hit_set history with repop We don't actually send the whole info on each repop, just the log entries, updated stats, and a few other bits. For hit_set ops, we need to also communicate the new hit_set history status atomically with the log entries and the transaction. Thus, we add a channel for an optional pg_hit_set_history_t field in PGBackend::submit_transaction interface and associated messages and implementations to update the hit_set info field along with the log entries. This also means that hit_set_(persist|trim) update an updated_hit_set_history field on the OpContext instead of directly modifying the info field. Fixes: #8124 Signed-off-by: Samuel Just --- diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index 7c9c5048b2aa..6a381861d613 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -25,7 +25,7 @@ class MOSDSubOp : public Message { - static const int HEAD_VERSION = 9; + static const int HEAD_VERSION = 10; static const int COMPAT_VERSION = 1; public: @@ -90,6 +90,9 @@ public: hobject_t new_temp_oid; ///< new temp object that we must now start tracking hobject_t discard_temp_oid; ///< previously used temp object that we can now stop tracking + /// non-empty if this transaction involves a hit_set history update + boost::optional updated_hit_set_history; + int get_cost() const { if (ops.size() == 1 && ops[0].op.op == CEPH_OSD_OP_PULL) return ops[0].op.extent.length; @@ -169,6 +172,9 @@ public: ghobject_t::NO_SHARD); pgid.shard = ghobject_t::NO_SHARD; } + if (header.version >= 10) { + ::decode(updated_hit_set_history, p); + } } virtual void encode_payload(uint64_t features) { @@ -217,6 +223,7 @@ public: ::encode(discard_temp_oid, payload); ::encode(from, payload); ::encode(pgid.shard, payload); + ::encode(updated_hit_set_history, payload); } MOSDSubOp() @@ -258,6 +265,8 @@ public: out << " v " << version << " snapset=" << snapset << " snapc=" << snapc; if (!data_subset.empty()) out << " subset " << data_subset; + if (updated_hit_set_history) + out << ", has_updated_hit_set_history"; out << ")"; } }; diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index dc2a26fb6192..96985385941e 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -818,6 +818,7 @@ void ECBackend::handle_sub_write( clear_temp_objs(op.temp_removed); get_parent()->log_operation( op.log_entries, + op.updated_hit_set_history, op.trim_to, !(op.t.empty()), localt); @@ -1201,6 +1202,7 @@ void ECBackend::submit_transaction( PGTransaction *_t, const eversion_t &trim_to, vector &log_entries, + boost::optional &hset_history, Context *on_local_applied_sync, Context *on_all_applied, Context *on_all_commit, @@ -1215,6 +1217,7 @@ void ECBackend::submit_transaction( op->version = at_version; op->trim_to = trim_to; op->log_entries.swap(log_entries); + op->updated_hit_set_history.swap(hset_history); op->on_local_applied_sync = on_local_applied_sync; op->on_all_applied = on_all_applied; op->on_all_commit = on_all_commit; @@ -1520,6 +1523,7 @@ void ECBackend::start_write(Op *op) { op->version, op->trim_to, op->log_entries, + op->updated_hit_set_history, op->temp_added, op->temp_cleared); if (*i == get_parent()->whoami_shard()) { diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index 0aa37c154fa3..2061ea87912b 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -98,6 +98,7 @@ public: PGTransaction *t, const eversion_t &trim_to, vector &log_entries, + boost::optional &hset_history, Context *on_local_applied_sync, Context *on_all_applied, Context *on_all_commit, @@ -326,6 +327,7 @@ public: eversion_t version; eversion_t trim_to; vector log_entries; + boost::optional updated_hit_set_history; Context *on_local_applied_sync; Context *on_all_applied; Context *on_all_commit; diff --git a/src/osd/ECMsgTypes.cc b/src/osd/ECMsgTypes.cc index 87e622b0bf1a..4e4c8e3d9423 100644 --- a/src/osd/ECMsgTypes.cc +++ b/src/osd/ECMsgTypes.cc @@ -16,7 +16,7 @@ void ECSubWrite::encode(bufferlist &bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); ::encode(from, bl); ::encode(tid, bl); ::encode(reqid, bl); @@ -28,12 +28,13 @@ void ECSubWrite::encode(bufferlist &bl) const ::encode(log_entries, bl); ::encode(temp_added, bl); ::encode(temp_removed, bl); + ::encode(updated_hit_set_history, bl); ENCODE_FINISH(bl); } void ECSubWrite::decode(bufferlist::iterator &bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); ::decode(from, bl); ::decode(tid, bl); ::decode(reqid, bl); @@ -45,17 +46,22 @@ void ECSubWrite::decode(bufferlist::iterator &bl) ::decode(log_entries, bl); ::decode(temp_added, bl); ::decode(temp_removed, bl); + if (struct_v >= 2) { + ::decode(updated_hit_set_history, bl); + } DECODE_FINISH(bl); } std::ostream &operator<<( std::ostream &lhs, const ECSubWrite &rhs) { - return lhs - << "ECSubWrite(tid=" << rhs.tid - << ", reqid=" << rhs.reqid - << ", at_version=" << rhs.at_version - << ", trim_to=" << rhs.trim_to << ")"; + lhs << "ECSubWrite(tid=" << rhs.tid + << ", reqid=" << rhs.reqid + << ", at_version=" << rhs.at_version + << ", trim_to=" << rhs.trim_to; + if (rhs.updated_hit_set_history) + lhs << ", has_updated_hit_set_history"; + return lhs << ")"; } void ECSubWrite::dump(Formatter *f) const @@ -64,6 +70,8 @@ void ECSubWrite::dump(Formatter *f) const f->dump_stream("reqid") << reqid; f->dump_stream("at_version") << at_version; f->dump_stream("trim_to") << trim_to; + f->dump_stream("has_updated_hit_set_history") + << static_cast(updated_hit_set_history); } void ECSubWrite::generate_test_instances(list &o) diff --git a/src/osd/ECMsgTypes.h b/src/osd/ECMsgTypes.h index f4cde6dcd4e3..11c519d57bb4 100644 --- a/src/osd/ECMsgTypes.h +++ b/src/osd/ECMsgTypes.h @@ -31,6 +31,7 @@ struct ECSubWrite { vector log_entries; set temp_added; set temp_removed; + boost::optional updated_hit_set_history; ECSubWrite() {} ECSubWrite( pg_shard_t from, @@ -42,6 +43,7 @@ struct ECSubWrite { eversion_t at_version, eversion_t trim_to, vector log_entries, + boost::optional updated_hit_set_history, const set &temp_added, const set &temp_removed) : from(from), tid(tid), reqid(reqid), @@ -49,7 +51,8 @@ struct ECSubWrite { at_version(at_version), trim_to(trim_to), log_entries(log_entries), temp_added(temp_added), - temp_removed(temp_removed) {} + temp_removed(temp_removed), + updated_hit_set_history(updated_hit_set_history) {} void encode(bufferlist &bl) const; void decode(bufferlist::iterator &bl); void dump(Formatter *f) const; diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index bd18a4706256..1dbf20d2a75d 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -175,6 +175,7 @@ virtual void log_operation( vector &logv, + boost::optional &hset_history, const eversion_t &trim_to, bool transaction_applied, ObjectStore::Transaction *t) = 0; @@ -492,6 +493,8 @@ PGTransaction *t, ///< [in] trans to execute const eversion_t &trim_to, ///< [in] trim log to here vector &log_entries, ///< [in] log entries for t + /// [in] hitset history (if updated with this transaction) + boost::optional &hset_history, Context *on_local_applied_sync, ///< [in] called when applied locally Context *on_all_applied, ///< [in] called when all acked Context *on_all_commit, ///< [in] called when all commit diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 8856461a8bc9..5a9668fc2a16 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -494,6 +494,7 @@ void ReplicatedBackend::submit_transaction( PGTransaction *_t, const eversion_t &trim_to, vector &log_entries, + boost::optional &hset_history, Context *on_local_applied_sync, Context *on_all_acked, Context *on_all_commit, @@ -536,6 +537,7 @@ void ReplicatedBackend::submit_transaction( t->get_temp_cleared().size() ? *(t->get_temp_cleared().begin()) :hobject_t(), log_entries, + hset_history, &op, op_t); @@ -546,7 +548,7 @@ void ReplicatedBackend::submit_transaction( } clear_temp_objs(t->get_temp_cleared()); - parent->log_operation(log_entries, trim_to, true, &local_t); + parent->log_operation(log_entries, hset_history, trim_to, true, &local_t); local_t.append(*op_t); local_t.swap(*op_t); diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index cfad2edae6a4..371574b35766 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -343,6 +343,7 @@ public: PGTransaction *t, const eversion_t &trim_to, vector &log_entries, + boost::optional &hset_history, Context *on_local_applied_sync, Context *on_all_applied, Context *on_all_commit, @@ -361,6 +362,7 @@ private: hobject_t new_temp_oid, hobject_t discard_temp_oid, vector &log_entries, + boost::optional &hset_history, InProgressOp *op, ObjectStore::Transaction *op_t); void op_applied(InProgressOp *op); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index cdb049a3d1e4..e62649e92372 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -6553,6 +6553,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now) repop->ctx->op_t, pg_trim_to, repop->ctx->log, + repop->ctx->updated_hset_history, onapplied_sync, on_all_applied, on_all_commit, @@ -6571,6 +6572,7 @@ void ReplicatedBackend::issue_op( hobject_t new_temp_oid, hobject_t discard_temp_oid, vector &log_entries, + boost::optional &hset_hist, InProgressOp *op, ObjectStore::Transaction *op_t) { @@ -6625,6 +6627,7 @@ void ReplicatedBackend::issue_op( wr->new_temp_oid = new_temp_oid; wr->discard_temp_oid = discard_temp_oid; + wr->updated_hit_set_history = hset_hist; get_parent()->send_message_osd_cluster( peer.osd, wr, get_osdmap()->get_epoch()); @@ -7353,6 +7356,7 @@ void ReplicatedBackend::sub_op_modify(OpRequestRef op) parent->update_stats(m->pg_stats); parent->log_operation( log, + m->updated_hit_set_history, m->pg_trim_to, update_snaps, &(rm->localt)); @@ -10564,18 +10568,20 @@ void ReplicatedPG::hit_set_persist() repop->on_applied = new C_HitSetFlushing(this, flush_time); OpContext *ctx = repop->ctx; ctx->at_version = get_next_version(); + ctx->updated_hset_history = info.hit_set; + pg_hit_set_history_t &updated_hit_set_hist = *(ctx->updated_hset_history); - if (info.hit_set.current_last_stamp != utime_t()) { + if (updated_hit_set_hist.current_last_stamp != utime_t()) { // FIXME: we cheat slightly here by bundling in a remove on a object // other the RepGather object. we aren't carrying an ObjectContext for // the deleted object over this period. hobject_t old_obj = - get_hit_set_current_object(info.hit_set.current_last_stamp); + get_hit_set_current_object(updated_hit_set_hist.current_last_stamp); ctx->log.push_back( pg_log_entry_t(pg_log_entry_t::DELETE, old_obj, ctx->at_version, - info.hit_set.current_last_update, + updated_hit_set_hist.current_last_update, 0, osd_reqid_t(), ctx->mtime)); @@ -10601,13 +10607,13 @@ void ReplicatedPG::hit_set_persist() ctx->delta_stats.num_bytes -= st.st_size; } - info.hit_set.current_last_update = info.last_update; // *after* above remove! - info.hit_set.current_info.version = ctx->at_version; + updated_hit_set_hist.current_last_update = info.last_update; // *after* above remove! + updated_hit_set_hist.current_info.version = ctx->at_version; - info.hit_set.history.push_back(info.hit_set.current_info); + updated_hit_set_hist.history.push_back(updated_hit_set_hist.current_info); hit_set_create(); - info.hit_set.current_info = pg_hit_set_info_t(); - info.hit_set.current_last_stamp = utime_t(); + updated_hit_set_hist.current_info = pg_hit_set_info_t(); + updated_hit_set_hist.current_last_stamp = utime_t(); // fabricate an object_info_t and SnapSet obc->obs.oi.version = ctx->at_version; @@ -10655,9 +10661,12 @@ void ReplicatedPG::hit_set_persist() void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max) { - for (unsigned num = info.hit_set.history.size(); num > max; --num) { - list::iterator p = info.hit_set.history.begin(); - assert(p != info.hit_set.history.end()); + assert(repop->ctx->updated_hset_history); + pg_hit_set_history_t &updated_hit_set_hist = + *(repop->ctx->updated_hset_history); + for (unsigned num = updated_hit_set_hist.history.size(); num > max; --num) { + list::iterator p = updated_hit_set_hist.history.begin(); + assert(p != updated_hit_set_hist.history.end()); hobject_t oid = get_hit_set_archive_object(p->begin, p->end); assert(!is_degraded_object(oid)); @@ -10685,7 +10694,7 @@ void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max) } if (agent_state) agent_state->remove_oldest_hit_set(); - info.hit_set.history.pop_front(); + updated_hit_set_hist.history.pop_front(); struct stat st; int r = osd->store->stat( diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 239d118c9f0c..89c13ada8eee 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -341,9 +341,14 @@ public: } void log_operation( vector &logv, + boost::optional &hset_history, const eversion_t &trim_to, bool transaction_applied, ObjectStore::Transaction *t) { + if (hset_history) { + info.hit_set = *hset_history; + dirty_info = true; + } append_log(logv, trim_to, *t, transaction_applied); } @@ -453,6 +458,7 @@ public: PGBackend::PGTransaction *op_t; vector log; + boost::optional updated_hset_history; interval_set modified_ranges; ObjectContextRef obc;