From: Greg Farnum Date: Wed, 28 Aug 2013 00:24:24 +0000 (-0700) Subject: ReplicatedPG: add OpContext::user_at_version X-Git-Tag: v0.69~40^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c119afa075085c4c1eef347406fd3f61018335aa;p=ceph.git ReplicatedPG: add OpContext::user_at_version Set this up with the existing at_version member, but only increase it for user_modify ops. Use this when logging the PG's user_version. In order to maintain compatibility with old clients on classic pools, we force user_version to follow at_version whenever it's updated. Now that we have and are maintaining this PG user version, use it for the user version on ops that get ENOENT back, when short-circuiting replies as part of reply_op_error()[1], or when replying to repops in eval_repop; further use it for the cls_current_version() function. This is a small semantic change for that function, as previously it would generally return the same value as the user would get sent back via MOSDOpReply -- but I don't think it was something you could count on. We now define it as being the user version of the PG at the start of the op, and as a bonus it is defined even for read ops (the at_version is only filled in on write operations). [1]: We tweak PGLog to make it easier to retrieve both user and PG versions. Signed-off-by: Greg Farnum --- diff --git a/src/objclass/class_api.cc b/src/objclass/class_api.cc index 6e8de53467f..b95260b7e16 100644 --- a/src/objclass/class_api.cc +++ b/src/objclass/class_api.cc @@ -582,7 +582,7 @@ uint64_t cls_current_version(cls_method_context_t hctx) { ReplicatedPG::OpContext *ctx = *(ReplicatedPG::OpContext **)hctx; - return ctx->at_version.version; + return ctx->user_at_version; } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 1f514e59648..52c35bd247c 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -6798,10 +6798,11 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue) void OSDService::reply_op_error(OpRequestRef op, int err) { - reply_op_error(op, err, eversion_t()); + reply_op_error(op, err, eversion_t(), 0); } -void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v) +void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, + version_t uv) { MOSDOp *m = static_cast(op->request); assert(m->get_header().type == CEPH_MSG_OSD_OP); @@ -6810,7 +6811,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v) MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags); Messenger *msgr = client_messenger; - reply->set_reply_versions(v, v.version); + reply->set_reply_versions(v, uv); if (m->get_source().is_osd()) msgr = cluster_messenger; msgr->send_message(reply, m->get_connection()); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 4d8c31e3046..d259ceae545 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -414,7 +414,7 @@ public: void dec_scrubs_active(); void reply_op_error(OpRequestRef op, int err); - void reply_op_error(OpRequestRef op, int err, eversion_t v); + void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv); void handle_misdirected_op(PG *pg, OpRequestRef op); // -- Watch -- diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h index 552f9b0cee9..8f192a8eac0 100644 --- a/src/osd/PGLog.h +++ b/src/osd/PGLog.h @@ -84,11 +84,11 @@ struct PGLog { bool logged_req(const osd_reqid_t &r) const { return caller_ops.count(r); } - eversion_t get_request_version(const osd_reqid_t &r) const { + const pg_log_entry_t *get_request(const osd_reqid_t &r) const { hash_map::const_iterator p = caller_ops.find(r); if (p == caller_ops.end()) - return eversion_t(); - return p->second->version; + return NULL; + return p->second; } void index() { diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 495e041fd8e..cfa1dce1942 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -908,14 +908,15 @@ void ReplicatedPG::do_op(OpRequestRef op) return; } - eversion_t oldv = pg_log.get_log().get_request_version(ctx->reqid); - if (oldv != eversion_t()) { + const pg_log_entry_t *entry = pg_log.get_log().get_request(ctx->reqid); + if (entry) { + const eversion_t& oldv = entry->version; dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl; delete ctx; put_object_context(obc); put_object_contexts(src_obc); if (already_complete(oldv)) { - osd->reply_op_error(op, 0, oldv); + osd->reply_op_error(op, 0, oldv, entry->user_version); } else { if (m->wants_ack()) { if (already_ack(oldv)) { @@ -957,6 +958,8 @@ void ReplicatedPG::do_op(OpRequestRef op) << dendl; } + ctx->user_at_version = info.last_user_version; + // note my stats utime_t now = ceph_clock_now(g_ceph_context); @@ -1028,9 +1031,9 @@ void ReplicatedPG::do_op(OpRequestRef op) ctx->reply->set_result(result); if (result >= 0) { - ctx->reply->set_reply_versions(ctx->at_version, ctx->at_version.version); + ctx->reply->set_reply_versions(ctx->at_version, ctx->user_at_version); } else if (result == -ENOENT) { - ctx->reply->set_enoent_reply_versions(info.last_update, info.last_update.version); + ctx->reply->set_enoent_reply_versions(info.last_update, ctx->user_at_version); } // read or error? @@ -3854,7 +3857,14 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) // finish and log the op. if (ctx->user_modify) { /* update the user_version for any modify ops, except for the watch op */ - ctx->new_obs.oi.user_version = ctx->at_version.version; + ++ctx->user_at_version; + assert(ctx->user_at_version > ctx->new_obs.oi.user_version); + /* In order for new clients and old clients to interoperate properly + * when exchanging versions, we need to lower bound the user_version + * (which our new clients pay proper attention to) + * by the at_version (which is all the old clients can ever see). */ + ctx->user_at_version = MAX(ctx->at_version.version, ctx->user_at_version); + ctx->new_obs.oi.user_version = ctx->user_at_version; } ctx->bytes_written = ctx->op_t.get_encoded_bytes(); @@ -3884,7 +3894,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) if (!ctx->new_obs.exists) logopcode = pg_log_entry_t::DELETE; ctx->log.push_back(pg_log_entry_t(logopcode, soid, ctx->at_version, old_version, - ctx->at_version.version, ctx->reqid, ctx->mtime)); + ctx->user_at_version, ctx->reqid, ctx->mtime)); // apply new object state. ctx->obc->obs = ctx->new_obs; @@ -4099,7 +4109,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) for (list::iterator i = waiting_for_ondisk[repop->v].begin(); i != waiting_for_ondisk[repop->v].end(); ++i) { - osd->reply_op_error(*i, 0, repop->v); + osd->reply_op_error(*i, 0, repop->v, 0); } waiting_for_ondisk.erase(repop->v); } @@ -4115,8 +4125,11 @@ void ReplicatedPG::eval_repop(RepGather *repop) MOSDOpReply *reply = repop->ctx->reply; if (reply) repop->ctx->reply = NULL; - else + else { reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); + reply->set_reply_versions(repop->ctx->at_version, + repop->ctx->user_at_version); + } reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); dout(10) << " sending commit on " << *repop << " " << reply << dendl; assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); @@ -4137,6 +4150,8 @@ void ReplicatedPG::eval_repop(RepGather *repop) ++i) { MOSDOp *m = (MOSDOp*)(*i)->request; MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); + reply->set_reply_versions(repop->ctx->at_version, + repop->ctx->user_at_version); reply->add_flags(CEPH_OSD_FLAG_ACK); osd->send_message_osd_client(reply, m->get_connection()); } @@ -4148,8 +4163,11 @@ void ReplicatedPG::eval_repop(RepGather *repop) MOSDOpReply *reply = repop->ctx->reply; if (reply) repop->ctx->reply = NULL; - else + else { reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); + reply->set_reply_versions(repop->ctx->at_version, + repop->ctx->user_at_version); + } reply->add_flags(CEPH_OSD_FLAG_ACK); dout(10) << " sending ack on " << *repop << " " << reply << dendl; assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index dcc659e65a9..36296c96ce1 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -116,6 +116,7 @@ public: utime_t mtime; SnapContext snapc; // writer snap context eversion_t at_version; // pg's current version pointer + version_t user_at_version; // pg's current user version pointer int current_osd_subop_num; @@ -147,7 +148,7 @@ public: op(_op), reqid(_reqid), ops(_ops), obs(_obs), snapset(0), new_obs(_obs->oi, _obs->exists), modify(false), user_modify(false), - bytes_written(0), bytes_read(0), + bytes_written(0), bytes_read(0), user_at_version(0), current_osd_subop_num(0), obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg), num_read(0),