void OSDService::reply_op_error(OpRequestRef op, int err)
{
- reply_op_error(op, err, eversion_t());
+ reply_op_error(op, err, eversion_t(), 0);
}
-void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v)
+void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
+ version_t uv)
{
MOSDOp *m = static_cast<MOSDOp*>(op->request);
assert(m->get_header().type == CEPH_MSG_OSD_OP);
MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags);
Messenger *msgr = client_messenger;
- reply->set_reply_versions(v, v.version);
+ reply->set_reply_versions(v, uv);
if (m->get_source().is_osd())
msgr = cluster_messenger;
msgr->send_message(reply, m->get_connection());
return;
}
- eversion_t oldv = pg_log.get_log().get_request_version(ctx->reqid);
- if (oldv != eversion_t()) {
+ const pg_log_entry_t *entry = pg_log.get_log().get_request(ctx->reqid);
+ if (entry) {
+ const eversion_t& oldv = entry->version;
dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
delete ctx;
put_object_context(obc);
put_object_contexts(src_obc);
if (already_complete(oldv)) {
- osd->reply_op_error(op, 0, oldv);
+ osd->reply_op_error(op, 0, oldv, entry->user_version);
} else {
if (m->wants_ack()) {
if (already_ack(oldv)) {
<< dendl;
}
+ ctx->user_at_version = info.last_user_version;
+
// note my stats
utime_t now = ceph_clock_now(g_ceph_context);
ctx->reply->set_result(result);
if (result >= 0) {
- ctx->reply->set_reply_versions(ctx->at_version, ctx->at_version.version);
+ ctx->reply->set_reply_versions(ctx->at_version, ctx->user_at_version);
} else if (result == -ENOENT) {
- ctx->reply->set_enoent_reply_versions(info.last_update, info.last_update.version);
+ ctx->reply->set_enoent_reply_versions(info.last_update, ctx->user_at_version);
}
// read or error?
// finish and log the op.
if (ctx->user_modify) {
/* update the user_version for any modify ops, except for the watch op */
- ctx->new_obs.oi.user_version = ctx->at_version.version;
+ ++ctx->user_at_version;
+ assert(ctx->user_at_version > ctx->new_obs.oi.user_version);
+ /* In order for new clients and old clients to interoperate properly
+ * when exchanging versions, we need to lower bound the user_version
+ * (which our new clients pay proper attention to)
+ * by the at_version (which is all the old clients can ever see). */
+ ctx->user_at_version = MAX(ctx->at_version.version, ctx->user_at_version);
+ ctx->new_obs.oi.user_version = ctx->user_at_version;
}
ctx->bytes_written = ctx->op_t.get_encoded_bytes();
if (!ctx->new_obs.exists)
logopcode = pg_log_entry_t::DELETE;
ctx->log.push_back(pg_log_entry_t(logopcode, soid, ctx->at_version, old_version,
- ctx->at_version.version, ctx->reqid, ctx->mtime));
+ ctx->user_at_version, ctx->reqid, ctx->mtime));
// apply new object state.
ctx->obc->obs = ctx->new_obs;
for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
i != waiting_for_ondisk[repop->v].end();
++i) {
- osd->reply_op_error(*i, 0, repop->v);
+ osd->reply_op_error(*i, 0, repop->v, 0);
}
waiting_for_ondisk.erase(repop->v);
}
MOSDOpReply *reply = repop->ctx->reply;
if (reply)
repop->ctx->reply = NULL;
- else
+ else {
reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+ reply->set_reply_versions(repop->ctx->at_version,
+ repop->ctx->user_at_version);
+ }
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending commit on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
++i) {
MOSDOp *m = (MOSDOp*)(*i)->request;
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+ reply->set_reply_versions(repop->ctx->at_version,
+ repop->ctx->user_at_version);
reply->add_flags(CEPH_OSD_FLAG_ACK);
osd->send_message_osd_client(reply, m->get_connection());
}
MOSDOpReply *reply = repop->ctx->reply;
if (reply)
repop->ctx->reply = NULL;
- else
+ else {
reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+ reply->set_reply_versions(repop->ctx->at_version,
+ repop->ctx->user_at_version);
+ }
reply->add_flags(CEPH_OSD_FLAG_ACK);
dout(10) << " sending ack on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
utime_t mtime;
SnapContext snapc; // writer snap context
eversion_t at_version; // pg's current version pointer
+ version_t user_at_version; // pg's current user version pointer
int current_osd_subop_num;
op(_op), reqid(_reqid), ops(_ops), obs(_obs), snapset(0),
new_obs(_obs->oi, _obs->exists),
modify(false), user_modify(false),
- bytes_written(0), bytes_read(0),
+ bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg),
num_read(0),