keyring_init(&g_conf);
FOR_EACH_ARG(args) {
- if (CONF_ARG_EQ("mkfs", '\0')) {
+ if (CEPH_ARGPARSE_EQ("mkfs", '\0')) {
mkfs = true;
- } else if (CONF_ARG_EQ("osdmap", '\0')) {
- CONF_SAFE_SET_ARG_VAL(&osdmapfn, OPT_STR);
- } else if (CONF_ARG_EQ("inject_monmap", '\0')) {
- CONF_SAFE_SET_ARG_VAL(&inject_monmap, OPT_STR);
+ } else if (CEPH_ARGPARSE_EQ("osdmap", '\0')) {
+ CEPH_ARGPARSE_SET_ARG_VAL(&osdmapfn, OPT_STR);
++ } else if (CEPH_ARGPARSE_EQ("inject_monmap", '\0')) {
++ CEPH_ARGPARSE_SET_ARG_VAL(&inject_monmap, OPT_STR);
} else
usage();
}
}
assert(r>=0);
- dout(10) << "_finish_read got " << received_pos << "~" << reading_buf.length() << dendl;
- received_pos += reading_buf.length();
- read_buf.claim_append(reading_buf);
- assert(received_pos <= requested_pos);
- dout(10) << "_finish_read read_buf now " << read_pos << "~" << read_buf.length()
- << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
- << dendl;
-
- if (is_readable() || read_pos == write_pos) { // NOTE: this check may read more
+ dout(10) << "_finish_read got " << offset << "~" << bl.length() << dendl;
+ prefetch_buf[offset].swap(bl);
+
+ _assimilate_prefetch();
+ _prefetch();
+}
+
+void Journaler::_assimilate_prefetch()
+{
+ bool was_readable = _is_readable();
+
+ bool got_any = false;
+ while (!prefetch_buf.empty()) {
+ map<uint64_t,bufferlist>::iterator p = prefetch_buf.begin();
+ if (p->first != received_pos) {
+ uint64_t gap = p->first - received_pos;
+ dout(10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos
+ << " to first prefetched buffer " << p->first << dendl;
+ break;
+ }
+
+ dout(10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl;
+ received_pos += p->second.length();
+ read_buf.claim_append(p->second);
+ assert(received_pos <= requested_pos);
+ prefetch_buf.erase(p);
+ got_any = true;
+ }
+
+ if (got_any)
+ dout(10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length()
+ << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
+ << dendl;
+
- if (got_any && !was_readable && _is_readable()) {
++ if ((got_any && !was_readable && _is_readable()) ||
++ read_pos == write_pos) {
// readable!
-- dout(10) << "_finish_read now readable" << dendl;
++ dout(10) << "_finish_read now readable (or at journal end)" << dendl;
if (on_readable) {
Context *f = on_readable;
on_readable = 0;