waiting_for_blocked_object;
// Callbacks should assume pg (and nothing else) is locked
map<hobject_t, list<Context*> > callbacks_for_degraded_object;
- map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
+
+ map<eversion_t,
+ list<pair<OpRequestRef, version_t> > > waiting_for_ack, waiting_for_ondisk;
+
map<eversion_t,OpRequestRef> replay_queue;
void split_ops(PG *child, unsigned split_bits);
bool logged_req(const osd_reqid_t &r) const {
return caller_ops.count(r) || extra_caller_ops.count(r);
}
- const pg_log_entry_t *get_request(const osd_reqid_t &r) const {
+ bool get_request(
+ const osd_reqid_t &r,
+ eversion_t *replay_version,
+ version_t *user_version) const {
+ assert(replay_version);
+ assert(user_version);
ceph::unordered_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p;
p = caller_ops.find(r);
- if (p != caller_ops.end())
- return p->second;
+ if (p != caller_ops.end()) {
+ *replay_version = p->second->version;
+ *user_version = p->second->user_version;
+ return true;
+ }
+
// warning: we will return *a* request for this reqid, but not
// necessarily the most recent.
p = extra_caller_ops.find(r);
- if (p != extra_caller_ops.end())
- return p->second;
- return NULL;
+ if (p != extra_caller_ops.end()) {
+ for (vector<pair<osd_reqid_t, version_t> >::const_iterator i =
+ p->second->extra_reqids.begin();
+ i != p->second->extra_reqids.end();
+ ++i) {
+ if (i->first == r) {
+ *replay_version = p->second->version;
+ *user_version = i->second;
+ return true;
+ }
+ }
+ assert(0 == "in extra_caller_ops but not extra_reqids");
+ }
+ return false;
}
/// get a (bounded) list of recent reqids for the given object
void get_object_reqids(const hobject_t& oid, unsigned max,
- vector<osd_reqid_t> *pls) const {
+ vector<pair<osd_reqid_t, version_t> > *pls) const {
// make sure object is present at least once before we do an
// O(n) search.
if (objects.count(oid) == 0)
++i) {
if (i->soid == oid) {
if (i->reqid_is_indexed())
- pls->push_back(i->reqid);
+ pls->push_back(make_pair(i->reqid, i->user_version));
pls->insert(pls->end(), i->extra_reqids.begin(), i->extra_reqids.end());
if (pls->size() >= max) {
if (pls->size() > max) {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[i->reqid] = &(*i);
}
- for (vector<osd_reqid_t>::const_iterator j = i->extra_reqids.begin();
+ for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
+ i->extra_reqids.begin();
j != i->extra_reqids.end();
++j) {
- extra_caller_ops.insert(make_pair(*j, &(*i)));
+ extra_caller_ops.insert(make_pair(j->first, &(*i)));
}
}
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[e.reqid] = &e;
}
- for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
+ for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
+ e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
- extra_caller_ops.insert(make_pair(*j, &e));
+ extra_caller_ops.insert(make_pair(j->first, &e));
}
}
void unindex() {
caller_ops[e.reqid] == &e)
caller_ops.erase(e.reqid);
}
- for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
+ for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
+ e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
for (ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*>::iterator k =
- extra_caller_ops.find(*j);
- k != extra_caller_ops.end() && k->first == *j;
+ extra_caller_ops.find(j->first);
+ k != extra_caller_ops.end() && k->first == j->first;
++k) {
if (k->second == &e) {
extra_caller_ops.erase(k);
if (e.reqid_is_indexed()) {
caller_ops[e.reqid] = &(log.back());
}
- for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
+ for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
+ e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
- extra_caller_ops.insert(make_pair(*j, &(log.back())));
+ extra_caller_ops.insert(make_pair(j->first, &(log.back())));
}
}
// promote ops, but we can't possible have both in our log where
// the original request is still not stable on disk, so for our
// purposes here it doesn't matter which one we get.
- const pg_log_entry_t *entry = pg_log.get_log().get_request(m->get_reqid());
- if (entry) {
- const eversion_t& oldv = entry->version;
+ eversion_t replay_version;
+ version_t user_version;
+ bool got = pg_log.get_log().get_request(
+ m->get_reqid(), &replay_version, &user_version);
+ if (got) {
dout(3) << __func__ << " dup " << m->get_reqid()
- << " was " << oldv << dendl;
- if (already_complete(oldv)) {
- osd->reply_op_error(op, 0, oldv, entry->user_version);
+ << " was " << replay_version << dendl;
+ if (already_complete(replay_version)) {
+ osd->reply_op_error(op, 0, replay_version, user_version);
} else {
if (m->wants_ack()) {
- if (already_ack(oldv)) {
+ if (already_ack(replay_version)) {
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
reply->add_flags(CEPH_OSD_FLAG_ACK);
- reply->set_reply_versions(oldv, entry->user_version);
+ reply->set_reply_versions(replay_version, user_version);
osd->send_message_osd_client(reply, m->get_connection());
} else {
- dout(10) << " waiting for " << oldv << " to ack" << dendl;
- waiting_for_ack[oldv].push_back(op);
+ dout(10) << " waiting for " << replay_version << " to ack" << dendl;
+ waiting_for_ack[replay_version].push_back(make_pair(op, user_version));
}
}
- dout(10) << " waiting for " << oldv << " to commit" << dendl;
- waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed
+ dout(10) << " waiting for " << replay_version << " to commit" << dendl;
+ // always queue ondisk waiters, so that we can requeue if needed
+ waiting_for_ondisk[replay_version].push_back(make_pair(op, user_version));
op->mark_delayed("waiting for ondisk");
}
return;
// send dup commits, in order
if (waiting_for_ondisk.count(repop->v)) {
assert(waiting_for_ondisk.begin()->first == repop->v);
- for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
+ for (list<pair<OpRequestRef, version_t> >::iterator i =
+ waiting_for_ondisk[repop->v].begin();
i != waiting_for_ondisk[repop->v].end();
++i) {
- osd->reply_op_error(*i, 0, repop->ctx->at_version,
- repop->ctx->user_at_version);
+ osd->reply_op_error(i->first, 0, repop->ctx->at_version,
+ i->second);
}
waiting_for_ondisk.erase(repop->v);
}
// send dup acks, in order
if (waiting_for_ack.count(repop->v)) {
assert(waiting_for_ack.begin()->first == repop->v);
- for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
+ for (list<pair<OpRequestRef, version_t> >::iterator i =
+ waiting_for_ack[repop->v].begin();
i != waiting_for_ack[repop->v].end();
++i) {
- MOSDOp *m = (MOSDOp*)(*i)->get_req();
+ MOSDOp *m = (MOSDOp*)i->first->get_req();
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
reply->set_reply_versions(repop->ctx->at_version,
- repop->ctx->user_at_version);
+ i->second);
reply->add_flags(CEPH_OSD_FLAG_ACK);
osd->send_message_osd_client(reply, m->get_connection());
}
}
// also requeue any dups, interleaved into position
- map<eversion_t, list<OpRequestRef> >::iterator p = waiting_for_ondisk.find(repop->v);
+ map<eversion_t, list<pair<OpRequestRef, version_t> > >::iterator p =
+ waiting_for_ondisk.find(repop->v);
if (p != waiting_for_ondisk.end()) {
dout(10) << " also requeuing ondisk waiters " << p->second << dendl;
- rq.splice(rq.end(), p->second);
+ for (list<pair<OpRequestRef, version_t> >::iterator i =
+ p->second.begin();
+ i != p->second.end();
+ ++i) {
+ rq.push_back(i->first);
+ }
waiting_for_ondisk.erase(p);
}
}
if (requeue) {
requeue_ops(rq);
if (!waiting_for_ondisk.empty()) {
- for (map<eversion_t, list<OpRequestRef> >::iterator i =
+ for (map<eversion_t, list<pair<OpRequestRef, version_t> > >::iterator i =
waiting_for_ondisk.begin();
i != waiting_for_ondisk.end();
++i) {
- for (list<OpRequestRef>::iterator j = i->second.begin();
+ for (list<pair<OpRequestRef, version_t> >::iterator j =
+ i->second.begin();
j != i->second.end();
++j) {
- derr << __func__ << ": op " << *((*j)->get_req()) << " waiting on "
+ derr << __func__ << ": op " << *(j->first->get_req()) << " waiting on "
<< i->first << dendl;
}
}
uint32_t flags; // object_copy_data_t::FLAG_*
uint32_t source_data_digest, source_omap_digest;
uint32_t data_digest, omap_digest;
- vector<osd_reqid_t> reqids;
+ vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
bool is_data_digest() {
return flags & object_copy_data_t::FLAG_DATA_DIGEST;
}
int num_read; ///< count read ops
int num_write; ///< count update ops
- vector<osd_reqid_t> extra_reqids;
+ vector<pair<osd_reqid_t, version_t> > extra_reqids;
CopyFromCallback *copy_cb;
f->dump_stream("prior_version") << prior_version;
f->dump_stream("reqid") << reqid;
f->open_array_section("extra_reqids");
- for (vector<osd_reqid_t>::const_iterator p = extra_reqids.begin();
+ for (vector<pair<osd_reqid_t, version_t> >::const_iterator p =
+ extra_reqids.begin();
p != extra_reqids.end();
- ++p)
- f->dump_stream("reqid") << *p;
+ ++p) {
+ f->open_object_section("extra_reqid");
+ f->dump_stream("reqid") << p->first;
+ f->dump_stream("user_version") << p->second;
+ f->close_section();
+ }
f->close_section();
f->dump_stream("mtime") << mtime;
if (snaps.length() > 0) {
o.back()->data.push_back(databp);
o.back()->omap_header.append("this is an omap header");
o.back()->snaps.push_back(123);
- o.back()->reqids.push_back(osd_reqid_t());
+ o.back()->reqids.push_back(make_pair(osd_reqid_t(), version_t()));
}
void object_copy_data_t::dump(Formatter *f) const
f->dump_unsigned("snap", *p);
f->close_section();
f->open_array_section("reqids");
- for (vector<osd_reqid_t>::const_iterator p = reqids.begin();
+ for (vector<pair<osd_reqid_t, version_t> >::const_iterator p = reqids.begin();
p != reqids.end();
- ++p)
- f->dump_stream("reqid") << *p;
+ ++p) {
+ f->open_object_section("extra_reqid");
+ f->dump_stream("reqid") << p->first;
+ f->dump_stream("user_version") << p->second;
+ f->close_section();
+ }
f->close_section();
}
/// describes state for a locally-rollbackable entry
ObjectModDesc mod_desc;
- vector<osd_reqid_t> extra_reqids;
+ vector<pair<osd_reqid_t, version_t> > extra_reqids;
pg_log_entry_t()
: op(0), user_version(0),
snapid_t snap_seq;
///< recent reqids on this object
- vector<osd_reqid_t> reqids;
+ vector<pair<osd_reqid_t, version_t> > reqids;
public:
object_copy_data_t() : size((uint64_t)-1), data_digest(-1),
uint32_t *out_flags;
uint32_t *out_data_digest;
uint32_t *out_omap_digest;
- vector<osd_reqid_t> *out_reqids;
+ vector<pair<osd_reqid_t, version_t> > *out_reqids;
int *prval;
C_ObjectOperation_copyget(object_copy_cursor_t *c,
uint64_t *s,
uint32_t *flags,
uint32_t *dd,
uint32_t *od,
- vector<osd_reqid_t> *oreqids,
+ vector<pair<osd_reqid_t, version_t> > *oreqids,
int *r)
: cursor(c),
out_size(s), out_mtime(m),
uint32_t *out_flags,
uint32_t *out_data_digest,
uint32_t *out_omap_digest,
- vector<osd_reqid_t> *out_reqids,
+ vector<pair<osd_reqid_t, version_t> > *out_reqids,
int *prval) {
OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
osd_op.op.copy_get.max = max;