struct IndexedLog : public pg_log_t {
ceph::unordered_map<hobject_t,pg_log_entry_t*> objects; // ptrs into log. be careful!
ceph::unordered_map<osd_reqid_t,pg_log_entry_t*> caller_ops;
+ ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*> extra_caller_ops;
// recovery pointers
list<pg_log_entry_t>::iterator complete_to; // not inclusive of referenced item
return objects.count(oid);
}
bool logged_req(const osd_reqid_t &r) const {
- return caller_ops.count(r);
+ return caller_ops.count(r) || extra_caller_ops.count(r);
}
const pg_log_entry_t *get_request(const osd_reqid_t &r) const {
- ceph::unordered_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p = caller_ops.find(r);
- if (p == caller_ops.end())
- return NULL;
- return p->second;
+ ceph::unordered_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p;
+ p = caller_ops.find(r);
+ if (p != caller_ops.end())
+ return p->second;
+ // warning: we will return *a* request for this reqid, but not
+ // necessarily the most recent.
+ p = extra_caller_ops.find(r);
+ if (p != extra_caller_ops.end())
+ return p->second;
+ return NULL;
}
void index() {
objects.clear();
caller_ops.clear();
+ extra_caller_ops.clear();
for (list<pg_log_entry_t>::iterator i = log.begin();
i != log.end();
++i) {
if (i->reqid_is_indexed()) {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[i->reqid] = &(*i);
+ for (vector<osd_reqid_t>::const_iterator j = i->extra_reqids.begin();
+ j != i->extra_reqids.end();
+ ++j) {
+ extra_caller_ops.insert(make_pair(*j, &(*i)));
+ }
}
}
if (e.reqid_is_indexed()) {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[e.reqid] = &e;
+ for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
+ j != e.extra_reqids.end();
+ ++j) {
+ extra_caller_ops.insert(make_pair(*j, &e));
+ }
}
}
void unindex() {
objects.clear();
caller_ops.clear();
+ extra_caller_ops.clear();
}
void unindex(pg_log_entry_t& e) {
// NOTE: this only works if we remove from the _tail_ of the log!
if (objects.count(e.soid) && objects[e.soid]->version == e.version)
objects.erase(e.soid);
- if (e.reqid_is_indexed() &&
- caller_ops.count(e.reqid) && // divergent merge_log indexes new before unindexing old
- caller_ops[e.reqid] == &e)
- caller_ops.erase(e.reqid);
+ if (e.reqid_is_indexed()) {
+ if (caller_ops.count(e.reqid) && // divergent merge_log indexes new before unindexing old
+ caller_ops[e.reqid] == &e)
+ caller_ops.erase(e.reqid);
+ for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
+ j != e.extra_reqids.end();
+ ++j) {
+ for (ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*>::iterator k =
+ extra_caller_ops.find(*j);
+ k != extra_caller_ops.end() && k->first == *j;
+ ++j) {
+ if (k->second == &e) {
+ extra_caller_ops.erase(k);
+ break;
+ }
+ }
+ }
+ }
}
// actors
// to our index
objects[e.soid] = &(log.back());
- if (e.reqid_is_indexed())
+ if (e.reqid_is_indexed()) {
caller_ops[e.reqid] = &(log.back());
+ for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
+ j != e.extra_reqids.end();
+ ++j) {
+ extra_caller_ops.insert(make_pair(*j, &(log.back())));
+ }
+ }
}
void trim(
void pg_log_entry_t::encode(bufferlist &bl) const
{
- ENCODE_START(9, 4, bl);
+ ENCODE_START(10, 4, bl);
::encode(op, bl);
::encode(soid, bl);
::encode(version, bl);
::encode(snaps, bl);
::encode(user_version, bl);
::encode(mod_desc, bl);
+ ::encode(extra_reqids, bl);
ENCODE_FINISH(bl);
}
void pg_log_entry_t::decode(bufferlist::iterator &bl)
{
- DECODE_START_LEGACY_COMPAT_LEN(8, 4, 4, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(10, 4, 4, bl);
::decode(op, bl);
if (struct_v < 2) {
sobject_t old_soid;
::decode(prior_version, bl);
::decode(reqid, bl);
+
::decode(mtime, bl);
if (struct_v < 5)
invalid_pool = true;
::decode(mod_desc, bl);
else
mod_desc.mark_unrollbackable();
+ if (struct_v >= 10)
+ ::decode(extra_reqids, bl);
DECODE_FINISH(bl);
}
f->dump_stream("version") << version;
f->dump_stream("prior_version") << prior_version;
f->dump_stream("reqid") << reqid;
+ f->open_array_section("extra_reqids");
+ for (vector<osd_reqid_t>::const_iterator p = extra_reqids.begin();
+ p != extra_reqids.end();
+ ++p)
+ f->dump_stream("reqid") << *p;
+ f->close_section();
f->dump_stream("mtime") << mtime;
if (snaps.length() > 0) {
vector<snapid_t> v;