]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: add OpContext::user_at_version
authorGreg Farnum <greg@inktank.com>
Wed, 28 Aug 2013 00:24:24 +0000 (17:24 -0700)
committerGreg Farnum <greg@inktank.com>
Wed, 28 Aug 2013 00:24:50 +0000 (17:24 -0700)
Set this up with the existing at_version member, but only increase
it for user_modify ops. Use this when logging the PG's user_version. In
order to maintain compatibility with old clients on classic pools, we
force user_version to follow at_version whenever it's updated.

Now that we have and are maintaining this PG user version, use it
for the user version on ops that get ENOENT back, when short-circuiting
replies as part of reply_op_error()[1], or when replying to repops
in eval_repop; further use it for the cls_current_version() function. This
is a small semantic change for that function, as previously it would
generally return the same value as the user would get sent back via
MOSDOpReply -- but I don't think it was something you could count on.
We now define it as being the user version of the PG at the start of the
op, and as a bonus it is defined even for read ops (the at_version is
only filled in on write operations).

[1]: We tweak PGLog to make it easier to retrieve both user and PG versions.

Signed-off-by: Greg Farnum <greg@inktank.com>
src/objclass/class_api.cc
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PGLog.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 6e8de53467f8692e0347a88d9612d853acd6a050..b95260b7e16667ae4629169265e678fa14f52b56 100644 (file)
@@ -582,7 +582,7 @@ uint64_t cls_current_version(cls_method_context_t hctx)
 {
   ReplicatedPG::OpContext *ctx = *(ReplicatedPG::OpContext **)hctx;
 
-  return ctx->at_version.version;
+  return ctx->user_at_version;
 }
 
 
index 1f514e5964834a82fadb9e404bdc8d277a64686e..52c35bd247c9fa0ba53cac81e34fcfe6bfa573ac 100644 (file)
@@ -6798,10 +6798,11 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
 
 void OSDService::reply_op_error(OpRequestRef op, int err)
 {
-  reply_op_error(op, err, eversion_t());
+  reply_op_error(op, err, eversion_t(), 0);
 }
 
-void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v)
+void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
+                                version_t uv)
 {
   MOSDOp *m = static_cast<MOSDOp*>(op->request);
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
@@ -6810,7 +6811,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v)
 
   MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags);
   Messenger *msgr = client_messenger;
-  reply->set_reply_versions(v, v.version);
+  reply->set_reply_versions(v, uv);
   if (m->get_source().is_osd())
     msgr = cluster_messenger;
   msgr->send_message(reply, m->get_connection());
index 4d8c31e30466182898307fbb0f8fdcf4d26959e9..d259ceae545c73ff7d856d797497077e0d19a152 100644 (file)
@@ -414,7 +414,7 @@ public:
   void dec_scrubs_active();
 
   void reply_op_error(OpRequestRef op, int err);
-  void reply_op_error(OpRequestRef op, int err, eversion_t v);
+  void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
   void handle_misdirected_op(PG *pg, OpRequestRef op);
 
   // -- Watch --
index 552f9b0cee96b4246c0571239fe403cb0f41db22..8f192a8eac0ac30dd6d6c0fbd4b883977ca2754f 100644 (file)
@@ -84,11 +84,11 @@ struct PGLog {
     bool logged_req(const osd_reqid_t &r) const {
       return caller_ops.count(r);
     }
-    eversion_t get_request_version(const osd_reqid_t &r) const {
+    const pg_log_entry_t *get_request(const osd_reqid_t &r) const {
       hash_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p = caller_ops.find(r);
       if (p == caller_ops.end())
-       return eversion_t();
-      return p->second->version;    
+       return NULL;
+      return p->second;
     }
 
     void index() {
index 495e041fd8e62dbd415b8b591cc8f8868d878393..cfa1dce1942611198e52c6a601ca4105b9bb9df5 100644 (file)
@@ -908,14 +908,15 @@ void ReplicatedPG::do_op(OpRequestRef op)
       return;
     }
 
-    eversion_t oldv = pg_log.get_log().get_request_version(ctx->reqid);
-    if (oldv != eversion_t()) {
+    const pg_log_entry_t *entry = pg_log.get_log().get_request(ctx->reqid);
+    if (entry) {
+      const eversion_t& oldv = entry->version;
       dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
       delete ctx;
       put_object_context(obc);
       put_object_contexts(src_obc);
       if (already_complete(oldv)) {
-       osd->reply_op_error(op, 0, oldv);
+       osd->reply_op_error(op, 0, oldv, entry->user_version);
       } else {
        if (m->wants_ack()) {
          if (already_ack(oldv)) {
@@ -957,6 +958,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
             << dendl;  
   }
 
+  ctx->user_at_version = info.last_user_version;
+
   // note my stats
   utime_t now = ceph_clock_now(g_ceph_context);
 
@@ -1028,9 +1031,9 @@ void ReplicatedPG::do_op(OpRequestRef op)
   ctx->reply->set_result(result);
 
   if (result >= 0) {
-    ctx->reply->set_reply_versions(ctx->at_version, ctx->at_version.version);
+    ctx->reply->set_reply_versions(ctx->at_version, ctx->user_at_version);
   } else if (result == -ENOENT) {
-    ctx->reply->set_enoent_reply_versions(info.last_update, info.last_update.version);
+    ctx->reply->set_enoent_reply_versions(info.last_update, ctx->user_at_version);
   }
 
   // read or error?
@@ -3854,7 +3857,14 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
   // finish and log the op.
   if (ctx->user_modify) {
     /* update the user_version for any modify ops, except for the watch op */
-    ctx->new_obs.oi.user_version = ctx->at_version.version;
+    ++ctx->user_at_version;
+    assert(ctx->user_at_version > ctx->new_obs.oi.user_version);
+    /* In order for new clients and old clients to interoperate properly
+     * when exchanging versions, we need to lower bound the user_version
+     * (which our new clients pay proper attention to)
+     * by the at_version (which is all the old clients can ever see). */
+    ctx->user_at_version = MAX(ctx->at_version.version, ctx->user_at_version);
+    ctx->new_obs.oi.user_version = ctx->user_at_version;
   }
   ctx->bytes_written = ctx->op_t.get_encoded_bytes();
  
@@ -3884,7 +3894,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
   if (!ctx->new_obs.exists)
     logopcode = pg_log_entry_t::DELETE;
   ctx->log.push_back(pg_log_entry_t(logopcode, soid, ctx->at_version, old_version,
-                               ctx->at_version.version, ctx->reqid, ctx->mtime));
+                               ctx->user_at_version, ctx->reqid, ctx->mtime));
 
   // apply new object state.
   ctx->obc->obs = ctx->new_obs;
@@ -4099,7 +4109,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
        for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
             i != waiting_for_ondisk[repop->v].end();
             ++i) {
-         osd->reply_op_error(*i, 0, repop->v);
+         osd->reply_op_error(*i, 0, repop->v, 0);
        }
        waiting_for_ondisk.erase(repop->v);
       }
@@ -4115,8 +4125,11 @@ void ReplicatedPG::eval_repop(RepGather *repop)
        MOSDOpReply *reply = repop->ctx->reply;
        if (reply)
          repop->ctx->reply = NULL;
-       else
+       else {
          reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+         reply->set_reply_versions(repop->ctx->at_version,
+                                   repop->ctx->user_at_version);
+       }
        reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
        dout(10) << " sending commit on " << *repop << " " << reply << dendl;
        assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
@@ -4137,6 +4150,8 @@ void ReplicatedPG::eval_repop(RepGather *repop)
             ++i) {
          MOSDOp *m = (MOSDOp*)(*i)->request;
          MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+         reply->set_reply_versions(repop->ctx->at_version,
+                                   repop->ctx->user_at_version);
          reply->add_flags(CEPH_OSD_FLAG_ACK);
          osd->send_message_osd_client(reply, m->get_connection());
        }
@@ -4148,8 +4163,11 @@ void ReplicatedPG::eval_repop(RepGather *repop)
        MOSDOpReply *reply = repop->ctx->reply;
        if (reply)
          repop->ctx->reply = NULL;
-       else
+       else {
          reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+         reply->set_reply_versions(repop->ctx->at_version,
+                                   repop->ctx->user_at_version);
+       }
        reply->add_flags(CEPH_OSD_FLAG_ACK);
        dout(10) << " sending ack on " << *repop << " " << reply << dendl;
         assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
index dcc659e65a9ba1e5b4cf9d5946c72842668b33ec..36296c96ce13268a0105b2f74bb4cfeb1b910500 100644 (file)
@@ -116,6 +116,7 @@ public:
     utime_t mtime;
     SnapContext snapc;           // writer snap context
     eversion_t at_version;       // pg's current version pointer
+    version_t user_at_version;   // pg's current user version pointer
 
     int current_osd_subop_num;
 
@@ -147,7 +148,7 @@ public:
       op(_op), reqid(_reqid), ops(_ops), obs(_obs), snapset(0),
       new_obs(_obs->oi, _obs->exists),
       modify(false), user_modify(false),
-      bytes_written(0), bytes_read(0),
+      bytes_written(0), bytes_read(0), user_at_version(0),
       current_osd_subop_num(0),
       obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg),
       num_read(0),