]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: include version_t in extra_reqids with promote 3738/head
authorSamuel Just <sjust@redhat.com>
Thu, 12 Feb 2015 01:07:05 +0000 (17:07 -0800)
committerSamuel Just <sjust@redhat.com>
Thu, 12 Feb 2015 01:44:49 +0000 (17:44 -0800)
Otherwise, we can't return the correct user version on a dup request.

Note: This patch does not handle compatilibity with the variant which
does not include the version_t field.  Since it's been less than 2 weeks
and we haven't had a release, I think that's ok since handling
compatilibity would require some overhead in the encode/decode
methods.

Fixes: 10830
Introduced (merge): b79c349067ab82b1ccb5d6cdac702f651b27a435
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/PG.h
src/osd/PGLog.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.cc
src/osd/osd_types.h
src/osdc/Objecter.h

index e982545b7ed01bb2cd52ff4eaae28e3f98e17785..a72ca5f485bb6f37a02d2b66d9626a058bdc0692 100644 (file)
@@ -780,7 +780,10 @@ protected:
                             waiting_for_blocked_object;
   // Callbacks should assume pg (and nothing else) is locked
   map<hobject_t, list<Context*> > callbacks_for_degraded_object;
-  map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
+
+  map<eversion_t,
+      list<pair<OpRequestRef, version_t> > > waiting_for_ack, waiting_for_ondisk;
+
   map<eversion_t,OpRequestRef>   replay_queue;
   void split_ops(PG *child, unsigned split_bits);
 
index 03eca0540b9f0ea9bb621fe620484ffd9af505f1..7d18f77c68c0fc02136b7e83187a8867ccdc7279 100644 (file)
@@ -126,22 +126,42 @@ struct PGLog {
     bool logged_req(const osd_reqid_t &r) const {
       return caller_ops.count(r) || extra_caller_ops.count(r);
     }
-    const pg_log_entry_t *get_request(const osd_reqid_t &r) const {
+    bool get_request(
+      const osd_reqid_t &r,
+      eversion_t *replay_version,
+      version_t *user_version) const {
+      assert(replay_version);
+      assert(user_version);
       ceph::unordered_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p;
       p = caller_ops.find(r);
-      if (p != caller_ops.end())
-       return p->second;
+      if (p != caller_ops.end()) {
+       *replay_version = p->second->version;
+       *user_version = p->second->user_version;
+       return true;
+      }
+
       // warning: we will return *a* request for this reqid, but not
       // necessarily the most recent.
       p = extra_caller_ops.find(r);
-      if (p != extra_caller_ops.end())
-       return p->second;
-      return NULL;
+      if (p != extra_caller_ops.end()) {
+       for (vector<pair<osd_reqid_t, version_t> >::const_iterator i =
+              p->second->extra_reqids.begin();
+            i != p->second->extra_reqids.end();
+            ++i) {
+         if (i->first == r) {
+           *replay_version = p->second->version;
+           *user_version = i->second;
+           return true;
+         }
+       }
+       assert(0 == "in extra_caller_ops but not extra_reqids");
+      }
+      return false;
     }
 
     /// get a (bounded) list of recent reqids for the given object
     void get_object_reqids(const hobject_t& oid, unsigned max,
-                          vector<osd_reqid_t> *pls) const {
+                          vector<pair<osd_reqid_t, version_t> > *pls) const {
       // make sure object is present at least once before we do an
       // O(n) search.
       if (objects.count(oid) == 0)
@@ -151,7 +171,7 @@ struct PGLog {
            ++i) {
        if (i->soid == oid) {
          if (i->reqid_is_indexed())
-           pls->push_back(i->reqid);
+           pls->push_back(make_pair(i->reqid, i->user_version));
          pls->insert(pls->end(), i->extra_reqids.begin(), i->extra_reqids.end());
          if (pls->size() >= max) {
            if (pls->size() > max) {
@@ -175,10 +195,11 @@ struct PGLog {
          //assert(caller_ops.count(i->reqid) == 0);  // divergent merge_log indexes new before unindexing old
          caller_ops[i->reqid] = &(*i);
        }
-       for (vector<osd_reqid_t>::const_iterator j = i->extra_reqids.begin();
+       for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
+              i->extra_reqids.begin();
             j != i->extra_reqids.end();
             ++j) {
-         extra_caller_ops.insert(make_pair(*j, &(*i)));
+         extra_caller_ops.insert(make_pair(j->first, &(*i)));
        }
       }
 
@@ -196,10 +217,11 @@ struct PGLog {
        //assert(caller_ops.count(i->reqid) == 0);  // divergent merge_log indexes new before unindexing old
        caller_ops[e.reqid] = &e;
       }
-      for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
+      for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
+            e.extra_reqids.begin();
           j != e.extra_reqids.end();
           ++j) {
-       extra_caller_ops.insert(make_pair(*j, &e));
+       extra_caller_ops.insert(make_pair(j->first, &e));
       }
     }
     void unindex() {
@@ -216,12 +238,13 @@ struct PGLog {
            caller_ops[e.reqid] == &e)
          caller_ops.erase(e.reqid);
       }
-      for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
+      for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
+            e.extra_reqids.begin();
           j != e.extra_reqids.end();
           ++j) {
        for (ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*>::iterator k =
-              extra_caller_ops.find(*j);
-            k != extra_caller_ops.end() && k->first == *j;
+              extra_caller_ops.find(j->first);
+            k != extra_caller_ops.end() && k->first == j->first;
             ++k) {
          if (k->second == &e) {
            extra_caller_ops.erase(k);
@@ -255,10 +278,11 @@ struct PGLog {
       if (e.reqid_is_indexed()) {
        caller_ops[e.reqid] = &(log.back());
       }
-      for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
+      for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
+            e.extra_reqids.begin();
           j != e.extra_reqids.end();
           ++j) {
-       extra_caller_ops.insert(make_pair(*j, &(log.back())));
+       extra_caller_ops.insert(make_pair(j->first, &(log.back())));
       }
     }
 
index 5f6a4177ed92b26a1264a2865d42e2ca623fe29b..2ad5442280a8f2d37910afe2130c7dd98dcc1472 100644 (file)
@@ -1489,27 +1489,30 @@ void ReplicatedPG::do_op(OpRequestRef& op)
     // promote ops, but we can't possible have both in our log where
     // the original request is still not stable on disk, so for our
     // purposes here it doesn't matter which one we get.
-    const pg_log_entry_t *entry = pg_log.get_log().get_request(m->get_reqid());
-    if (entry) {
-      const eversion_t& oldv = entry->version;
+    eversion_t replay_version;
+    version_t user_version;
+    bool got = pg_log.get_log().get_request(
+      m->get_reqid(), &replay_version, &user_version);
+    if (got) {
       dout(3) << __func__ << " dup " << m->get_reqid()
-             << " was " << oldv << dendl;
-      if (already_complete(oldv)) {
-       osd->reply_op_error(op, 0, oldv, entry->user_version);
+             << " was " << replay_version << dendl;
+      if (already_complete(replay_version)) {
+       osd->reply_op_error(op, 0, replay_version, user_version);
       } else {
        if (m->wants_ack()) {
-         if (already_ack(oldv)) {
+         if (already_ack(replay_version)) {
            MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
            reply->add_flags(CEPH_OSD_FLAG_ACK);
-           reply->set_reply_versions(oldv, entry->user_version);
+           reply->set_reply_versions(replay_version, user_version);
            osd->send_message_osd_client(reply, m->get_connection());
          } else {
-           dout(10) << " waiting for " << oldv << " to ack" << dendl;
-           waiting_for_ack[oldv].push_back(op);
+           dout(10) << " waiting for " << replay_version << " to ack" << dendl;
+           waiting_for_ack[replay_version].push_back(make_pair(op, user_version));
          }
        }
-       dout(10) << " waiting for " << oldv << " to commit" << dendl;
-       waiting_for_ondisk[oldv].push_back(op);  // always queue ondisk waiters, so that we can requeue if needed
+       dout(10) << " waiting for " << replay_version << " to commit" << dendl;
+        // always queue ondisk waiters, so that we can requeue if needed
+       waiting_for_ondisk[replay_version].push_back(make_pair(op, user_version));
        op->mark_delayed("waiting for ondisk");
       }
       return;
@@ -7425,11 +7428,12 @@ void ReplicatedPG::eval_repop(RepGather *repop)
     // send dup commits, in order
     if (waiting_for_ondisk.count(repop->v)) {
       assert(waiting_for_ondisk.begin()->first == repop->v);
-      for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
+      for (list<pair<OpRequestRef, version_t> >::iterator i =
+            waiting_for_ondisk[repop->v].begin();
           i != waiting_for_ondisk[repop->v].end();
           ++i) {
-       osd->reply_op_error(*i, 0, repop->ctx->at_version,
-                           repop->ctx->user_at_version);
+       osd->reply_op_error(i->first, 0, repop->ctx->at_version,
+                           i->second);
       }
       waiting_for_ondisk.erase(repop->v);
     }
@@ -7464,13 +7468,14 @@ void ReplicatedPG::eval_repop(RepGather *repop)
     // send dup acks, in order
     if (waiting_for_ack.count(repop->v)) {
       assert(waiting_for_ack.begin()->first == repop->v);
-      for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
+      for (list<pair<OpRequestRef, version_t> >::iterator i =
+            waiting_for_ack[repop->v].begin();
           i != waiting_for_ack[repop->v].end();
           ++i) {
-       MOSDOp *m = (MOSDOp*)(*i)->get_req();
+       MOSDOp *m = (MOSDOp*)i->first->get_req();
        MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
        reply->set_reply_versions(repop->ctx->at_version,
-                                 repop->ctx->user_at_version);
+                                 i->second);
        reply->add_flags(CEPH_OSD_FLAG_ACK);
        osd->send_message_osd_client(reply, m->get_connection());
       }
@@ -10237,10 +10242,16 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
       }
 
       // also requeue any dups, interleaved into position
-      map<eversion_t, list<OpRequestRef> >::iterator p = waiting_for_ondisk.find(repop->v);
+      map<eversion_t, list<pair<OpRequestRef, version_t> > >::iterator p =
+       waiting_for_ondisk.find(repop->v);
       if (p != waiting_for_ondisk.end()) {
        dout(10) << " also requeuing ondisk waiters " << p->second << dendl;
-       rq.splice(rq.end(), p->second);
+       for (list<pair<OpRequestRef, version_t> >::iterator i =
+              p->second.begin();
+            i != p->second.end();
+            ++i) {
+         rq.push_back(i->first);
+       }
        waiting_for_ondisk.erase(p);
       }
     }
@@ -10251,14 +10262,15 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
   if (requeue) {
     requeue_ops(rq);
     if (!waiting_for_ondisk.empty()) {
-      for (map<eversion_t, list<OpRequestRef> >::iterator i =
+      for (map<eversion_t, list<pair<OpRequestRef, version_t> > >::iterator i =
             waiting_for_ondisk.begin();
           i != waiting_for_ondisk.end();
           ++i) {
-       for (list<OpRequestRef>::iterator j = i->second.begin();
+       for (list<pair<OpRequestRef, version_t> >::iterator j =
+              i->second.begin();
             j != i->second.end();
             ++j) {
-         derr << __func__ << ": op " << *((*j)->get_req()) << " waiting on "
+         derr << __func__ << ": op " << *(j->first->get_req()) << " waiting on "
               << i->first << dendl;
        }
       }
index 75b66e502379c6084bf57e4775f984e4188b3bb8..7b538f648f2de4cd429fbd283c890c0413b0996d 100644 (file)
@@ -126,7 +126,7 @@ public:
     uint32_t flags;    // object_copy_data_t::FLAG_*
     uint32_t source_data_digest, source_omap_digest;
     uint32_t data_digest, omap_digest;
-    vector<osd_reqid_t> reqids;
+    vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
     bool is_data_digest() {
       return flags & object_copy_data_t::FLAG_DATA_DIGEST;
     }
@@ -555,7 +555,7 @@ public:
     int num_read;    ///< count read ops
     int num_write;   ///< count update ops
 
-    vector<osd_reqid_t> extra_reqids;
+    vector<pair<osd_reqid_t, version_t> > extra_reqids;
 
     CopyFromCallback *copy_cb;
 
index 6388eee96fcbfb34b4d62bcb01fa148d7ff06543..4efab9f70bbbd716ee170e1b01fecf8280fe480e 100644 (file)
@@ -3029,10 +3029,15 @@ void pg_log_entry_t::dump(Formatter *f) const
   f->dump_stream("prior_version") << prior_version;
   f->dump_stream("reqid") << reqid;
   f->open_array_section("extra_reqids");
-  for (vector<osd_reqid_t>::const_iterator p = extra_reqids.begin();
+  for (vector<pair<osd_reqid_t, version_t> >::const_iterator p =
+        extra_reqids.begin();
        p != extra_reqids.end();
-       ++p)
-    f->dump_stream("reqid") << *p;
+       ++p) {
+    f->open_object_section("extra_reqid");
+    f->dump_stream("reqid") << p->first;
+    f->dump_stream("user_version") << p->second;
+    f->close_section();
+  }
   f->close_section();
   f->dump_stream("mtime") << mtime;
   if (snaps.length() > 0) {
@@ -3651,7 +3656,7 @@ void object_copy_data_t::generate_test_instances(list<object_copy_data_t*>& o)
   o.back()->data.push_back(databp);
   o.back()->omap_header.append("this is an omap header");
   o.back()->snaps.push_back(123);
-  o.back()->reqids.push_back(osd_reqid_t());
+  o.back()->reqids.push_back(make_pair(osd_reqid_t(), version_t()));
 }
 
 void object_copy_data_t::dump(Formatter *f) const
@@ -3676,10 +3681,14 @@ void object_copy_data_t::dump(Formatter *f) const
     f->dump_unsigned("snap", *p);
   f->close_section();
   f->open_array_section("reqids");
-  for (vector<osd_reqid_t>::const_iterator p = reqids.begin();
+  for (vector<pair<osd_reqid_t, version_t> >::const_iterator p = reqids.begin();
        p != reqids.end();
-       ++p)
-    f->dump_stream("reqid") << *p;
+       ++p) {
+    f->open_object_section("extra_reqid");
+    f->dump_stream("reqid") << p->first;
+    f->dump_stream("user_version") << p->second;
+    f->close_section();
+  }
   f->close_section();
 }
 
index 11b92a60779523ad7f909be95d8bd6c1a074dc4c..d2e2345ed962e040d18f1841fea80b7b0f42d00e 100644 (file)
@@ -2155,7 +2155,7 @@ struct pg_log_entry_t {
   /// describes state for a locally-rollbackable entry
   ObjectModDesc mod_desc;
 
-  vector<osd_reqid_t> extra_reqids;
+  vector<pair<osd_reqid_t, version_t> > extra_reqids;
 
   pg_log_entry_t()
     : op(0), user_version(0),
@@ -2576,7 +2576,7 @@ struct object_copy_data_t {
   snapid_t snap_seq;
 
   ///< recent reqids on this object
-  vector<osd_reqid_t> reqids;
+  vector<pair<osd_reqid_t, version_t> > reqids;
 
 public:
   object_copy_data_t() : size((uint64_t)-1), data_digest(-1),
index ff7cb891f72a625fefd1626e55b994a89d619f7b..e29d16bbe99c368294bf470396a57104f4af88c0 100644 (file)
@@ -625,7 +625,7 @@ struct ObjectOperation {
     uint32_t *out_flags;
     uint32_t *out_data_digest;
     uint32_t *out_omap_digest;
-    vector<osd_reqid_t> *out_reqids;
+    vector<pair<osd_reqid_t, version_t> > *out_reqids;
     int *prval;
     C_ObjectOperation_copyget(object_copy_cursor_t *c,
                              uint64_t *s,
@@ -638,7 +638,7 @@ struct ObjectOperation {
                              uint32_t *flags,
                              uint32_t *dd,
                              uint32_t *od,
-                             vector<osd_reqid_t> *oreqids,
+                             vector<pair<osd_reqid_t, version_t> > *oreqids,
                              int *r)
       : cursor(c),
        out_size(s), out_mtime(m),
@@ -698,7 +698,7 @@ struct ObjectOperation {
                uint32_t *out_flags,
                uint32_t *out_data_digest,
                uint32_t *out_omap_digest,
-               vector<osd_reqid_t> *out_reqids,
+               vector<pair<osd_reqid_t, version_t> > *out_reqids,
                int *prval) {
     OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
     osd_op.op.copy_get.max = max;