From 74d5ccf1b5a0422572c9a3463af8cc6fc808a88b Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 11 Feb 2015 17:07:05 -0800 Subject: [PATCH] osd/: include version_t in extra_reqids with promote Otherwise, we can't return the correct user version on a dup request. Note: This patch does not handle compatilibity with the variant which does not include the version_t field. Since it's been less than 2 weeks and we haven't had a release, I think that's ok since handling compatilibity would require some overhead in the encode/decode methods. Fixes: 10830 Introduced (merge): b79c349067ab82b1ccb5d6cdac702f651b27a435 Signed-off-by: Samuel Just --- src/osd/PG.h | 5 +++- src/osd/PGLog.h | 58 +++++++++++++++++++++++++++++------------ src/osd/ReplicatedPG.cc | 58 +++++++++++++++++++++++++---------------- src/osd/ReplicatedPG.h | 4 +-- src/osd/osd_types.cc | 23 +++++++++++----- src/osd/osd_types.h | 4 +-- src/osdc/Objecter.h | 6 ++--- 7 files changed, 103 insertions(+), 55 deletions(-) diff --git a/src/osd/PG.h b/src/osd/PG.h index e982545b7ed01..a72ca5f485bb6 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -780,7 +780,10 @@ protected: waiting_for_blocked_object; // Callbacks should assume pg (and nothing else) is locked map > callbacks_for_degraded_object; - map > waiting_for_ack, waiting_for_ondisk; + + map > > waiting_for_ack, waiting_for_ondisk; + map replay_queue; void split_ops(PG *child, unsigned split_bits); diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h index 03eca0540b9f0..7d18f77c68c0f 100644 --- a/src/osd/PGLog.h +++ b/src/osd/PGLog.h @@ -126,22 +126,42 @@ struct PGLog { bool logged_req(const osd_reqid_t &r) const { return caller_ops.count(r) || extra_caller_ops.count(r); } - const pg_log_entry_t *get_request(const osd_reqid_t &r) const { + bool get_request( + const osd_reqid_t &r, + eversion_t *replay_version, + version_t *user_version) const { + assert(replay_version); + assert(user_version); ceph::unordered_map::const_iterator p; p = caller_ops.find(r); - if (p != caller_ops.end()) - return p->second; + if (p != caller_ops.end()) { + *replay_version = p->second->version; + *user_version = p->second->user_version; + return true; + } + // warning: we will return *a* request for this reqid, but not // necessarily the most recent. p = extra_caller_ops.find(r); - if (p != extra_caller_ops.end()) - return p->second; - return NULL; + if (p != extra_caller_ops.end()) { + for (vector >::const_iterator i = + p->second->extra_reqids.begin(); + i != p->second->extra_reqids.end(); + ++i) { + if (i->first == r) { + *replay_version = p->second->version; + *user_version = i->second; + return true; + } + } + assert(0 == "in extra_caller_ops but not extra_reqids"); + } + return false; } /// get a (bounded) list of recent reqids for the given object void get_object_reqids(const hobject_t& oid, unsigned max, - vector *pls) const { + vector > *pls) const { // make sure object is present at least once before we do an // O(n) search. if (objects.count(oid) == 0) @@ -151,7 +171,7 @@ struct PGLog { ++i) { if (i->soid == oid) { if (i->reqid_is_indexed()) - pls->push_back(i->reqid); + pls->push_back(make_pair(i->reqid, i->user_version)); pls->insert(pls->end(), i->extra_reqids.begin(), i->extra_reqids.end()); if (pls->size() >= max) { if (pls->size() > max) { @@ -175,10 +195,11 @@ struct PGLog { //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old caller_ops[i->reqid] = &(*i); } - for (vector::const_iterator j = i->extra_reqids.begin(); + for (vector >::const_iterator j = + i->extra_reqids.begin(); j != i->extra_reqids.end(); ++j) { - extra_caller_ops.insert(make_pair(*j, &(*i))); + extra_caller_ops.insert(make_pair(j->first, &(*i))); } } @@ -196,10 +217,11 @@ struct PGLog { //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old caller_ops[e.reqid] = &e; } - for (vector::const_iterator j = e.extra_reqids.begin(); + for (vector >::const_iterator j = + e.extra_reqids.begin(); j != e.extra_reqids.end(); ++j) { - extra_caller_ops.insert(make_pair(*j, &e)); + extra_caller_ops.insert(make_pair(j->first, &e)); } } void unindex() { @@ -216,12 +238,13 @@ struct PGLog { caller_ops[e.reqid] == &e) caller_ops.erase(e.reqid); } - for (vector::const_iterator j = e.extra_reqids.begin(); + for (vector >::const_iterator j = + e.extra_reqids.begin(); j != e.extra_reqids.end(); ++j) { for (ceph::unordered_multimap::iterator k = - extra_caller_ops.find(*j); - k != extra_caller_ops.end() && k->first == *j; + extra_caller_ops.find(j->first); + k != extra_caller_ops.end() && k->first == j->first; ++k) { if (k->second == &e) { extra_caller_ops.erase(k); @@ -255,10 +278,11 @@ struct PGLog { if (e.reqid_is_indexed()) { caller_ops[e.reqid] = &(log.back()); } - for (vector::const_iterator j = e.extra_reqids.begin(); + for (vector >::const_iterator j = + e.extra_reqids.begin(); j != e.extra_reqids.end(); ++j) { - extra_caller_ops.insert(make_pair(*j, &(log.back()))); + extra_caller_ops.insert(make_pair(j->first, &(log.back()))); } } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 5f6a4177ed92b..2ad5442280a8f 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1489,27 +1489,30 @@ void ReplicatedPG::do_op(OpRequestRef& op) // promote ops, but we can't possible have both in our log where // the original request is still not stable on disk, so for our // purposes here it doesn't matter which one we get. - const pg_log_entry_t *entry = pg_log.get_log().get_request(m->get_reqid()); - if (entry) { - const eversion_t& oldv = entry->version; + eversion_t replay_version; + version_t user_version; + bool got = pg_log.get_log().get_request( + m->get_reqid(), &replay_version, &user_version); + if (got) { dout(3) << __func__ << " dup " << m->get_reqid() - << " was " << oldv << dendl; - if (already_complete(oldv)) { - osd->reply_op_error(op, 0, oldv, entry->user_version); + << " was " << replay_version << dendl; + if (already_complete(replay_version)) { + osd->reply_op_error(op, 0, replay_version, user_version); } else { if (m->wants_ack()) { - if (already_ack(oldv)) { + if (already_ack(replay_version)) { MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false); reply->add_flags(CEPH_OSD_FLAG_ACK); - reply->set_reply_versions(oldv, entry->user_version); + reply->set_reply_versions(replay_version, user_version); osd->send_message_osd_client(reply, m->get_connection()); } else { - dout(10) << " waiting for " << oldv << " to ack" << dendl; - waiting_for_ack[oldv].push_back(op); + dout(10) << " waiting for " << replay_version << " to ack" << dendl; + waiting_for_ack[replay_version].push_back(make_pair(op, user_version)); } } - dout(10) << " waiting for " << oldv << " to commit" << dendl; - waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed + dout(10) << " waiting for " << replay_version << " to commit" << dendl; + // always queue ondisk waiters, so that we can requeue if needed + waiting_for_ondisk[replay_version].push_back(make_pair(op, user_version)); op->mark_delayed("waiting for ondisk"); } return; @@ -7425,11 +7428,12 @@ void ReplicatedPG::eval_repop(RepGather *repop) // send dup commits, in order if (waiting_for_ondisk.count(repop->v)) { assert(waiting_for_ondisk.begin()->first == repop->v); - for (list::iterator i = waiting_for_ondisk[repop->v].begin(); + for (list >::iterator i = + waiting_for_ondisk[repop->v].begin(); i != waiting_for_ondisk[repop->v].end(); ++i) { - osd->reply_op_error(*i, 0, repop->ctx->at_version, - repop->ctx->user_at_version); + osd->reply_op_error(i->first, 0, repop->ctx->at_version, + i->second); } waiting_for_ondisk.erase(repop->v); } @@ -7464,13 +7468,14 @@ void ReplicatedPG::eval_repop(RepGather *repop) // send dup acks, in order if (waiting_for_ack.count(repop->v)) { assert(waiting_for_ack.begin()->first == repop->v); - for (list::iterator i = waiting_for_ack[repop->v].begin(); + for (list >::iterator i = + waiting_for_ack[repop->v].begin(); i != waiting_for_ack[repop->v].end(); ++i) { - MOSDOp *m = (MOSDOp*)(*i)->get_req(); + MOSDOp *m = (MOSDOp*)i->first->get_req(); MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); reply->set_reply_versions(repop->ctx->at_version, - repop->ctx->user_at_version); + i->second); reply->add_flags(CEPH_OSD_FLAG_ACK); osd->send_message_osd_client(reply, m->get_connection()); } @@ -10237,10 +10242,16 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) } // also requeue any dups, interleaved into position - map >::iterator p = waiting_for_ondisk.find(repop->v); + map > >::iterator p = + waiting_for_ondisk.find(repop->v); if (p != waiting_for_ondisk.end()) { dout(10) << " also requeuing ondisk waiters " << p->second << dendl; - rq.splice(rq.end(), p->second); + for (list >::iterator i = + p->second.begin(); + i != p->second.end(); + ++i) { + rq.push_back(i->first); + } waiting_for_ondisk.erase(p); } } @@ -10251,14 +10262,15 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) if (requeue) { requeue_ops(rq); if (!waiting_for_ondisk.empty()) { - for (map >::iterator i = + for (map > >::iterator i = waiting_for_ondisk.begin(); i != waiting_for_ondisk.end(); ++i) { - for (list::iterator j = i->second.begin(); + for (list >::iterator j = + i->second.begin(); j != i->second.end(); ++j) { - derr << __func__ << ": op " << *((*j)->get_req()) << " waiting on " + derr << __func__ << ": op " << *(j->first->get_req()) << " waiting on " << i->first << dendl; } } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 75b66e502379c..7b538f648f2de 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -126,7 +126,7 @@ public: uint32_t flags; // object_copy_data_t::FLAG_* uint32_t source_data_digest, source_omap_digest; uint32_t data_digest, omap_digest; - vector reqids; + vector > reqids; // [(reqid, user_version)] bool is_data_digest() { return flags & object_copy_data_t::FLAG_DATA_DIGEST; } @@ -555,7 +555,7 @@ public: int num_read; ///< count read ops int num_write; ///< count update ops - vector extra_reqids; + vector > extra_reqids; CopyFromCallback *copy_cb; diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 6388eee96fcbf..4efab9f70bbbd 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -3029,10 +3029,15 @@ void pg_log_entry_t::dump(Formatter *f) const f->dump_stream("prior_version") << prior_version; f->dump_stream("reqid") << reqid; f->open_array_section("extra_reqids"); - for (vector::const_iterator p = extra_reqids.begin(); + for (vector >::const_iterator p = + extra_reqids.begin(); p != extra_reqids.end(); - ++p) - f->dump_stream("reqid") << *p; + ++p) { + f->open_object_section("extra_reqid"); + f->dump_stream("reqid") << p->first; + f->dump_stream("user_version") << p->second; + f->close_section(); + } f->close_section(); f->dump_stream("mtime") << mtime; if (snaps.length() > 0) { @@ -3651,7 +3656,7 @@ void object_copy_data_t::generate_test_instances(list& o) o.back()->data.push_back(databp); o.back()->omap_header.append("this is an omap header"); o.back()->snaps.push_back(123); - o.back()->reqids.push_back(osd_reqid_t()); + o.back()->reqids.push_back(make_pair(osd_reqid_t(), version_t())); } void object_copy_data_t::dump(Formatter *f) const @@ -3676,10 +3681,14 @@ void object_copy_data_t::dump(Formatter *f) const f->dump_unsigned("snap", *p); f->close_section(); f->open_array_section("reqids"); - for (vector::const_iterator p = reqids.begin(); + for (vector >::const_iterator p = reqids.begin(); p != reqids.end(); - ++p) - f->dump_stream("reqid") << *p; + ++p) { + f->open_object_section("extra_reqid"); + f->dump_stream("reqid") << p->first; + f->dump_stream("user_version") << p->second; + f->close_section(); + } f->close_section(); } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 11b92a6077952..d2e2345ed962e 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2155,7 +2155,7 @@ struct pg_log_entry_t { /// describes state for a locally-rollbackable entry ObjectModDesc mod_desc; - vector extra_reqids; + vector > extra_reqids; pg_log_entry_t() : op(0), user_version(0), @@ -2576,7 +2576,7 @@ struct object_copy_data_t { snapid_t snap_seq; ///< recent reqids on this object - vector reqids; + vector > reqids; public: object_copy_data_t() : size((uint64_t)-1), data_digest(-1), diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index ff7cb891f72a6..e29d16bbe99c3 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -625,7 +625,7 @@ struct ObjectOperation { uint32_t *out_flags; uint32_t *out_data_digest; uint32_t *out_omap_digest; - vector *out_reqids; + vector > *out_reqids; int *prval; C_ObjectOperation_copyget(object_copy_cursor_t *c, uint64_t *s, @@ -638,7 +638,7 @@ struct ObjectOperation { uint32_t *flags, uint32_t *dd, uint32_t *od, - vector *oreqids, + vector > *oreqids, int *r) : cursor(c), out_size(s), out_mtime(m), @@ -698,7 +698,7 @@ struct ObjectOperation { uint32_t *out_flags, uint32_t *out_data_digest, uint32_t *out_omap_digest, - vector *out_reqids, + vector > *out_reqids, int *prval) { OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET); osd_op.op.copy_get.max = max; -- 2.39.5