// yes!
dout(10) << " wrapping from " << *pos << " to " << get_top() << dendl;
header.wrap = *pos;
- *pos = get_top();
+ write_pos = *pos = get_top();
must_write_header = true;
return true;
}
off64_t queue_pos = write_pos;
int eleft = g_conf.journal_max_write_entries;
- int bleft = g_conf.journal_max_write_bytes;
+ unsigned bmax = g_conf.journal_max_write_bytes;
if (full_commit_seq || full_restart_seq)
return;
-
+
while (!writeq.empty()) {
- // grab next item
- __u64 seq = writeq.front().seq;
- bufferlist &ebl = writeq.front().bl;
- off64_t size = 2*sizeof(entry_header_t) + ebl.length();
-
bool can_wrap = !bl.length(); // only wrap if this is a new thinger
- if (!check_for_wrap(seq, &queue_pos, size, can_wrap))
- break;
-
- // set write_pos? (check_for_wrap may have moved it)
- if (!bl.length())
- write_pos = queue_pos;
-
- // add to write buffer
- dout(15) << "prepare_multi_write will write " << queue_pos << " : seq " << seq
- << " len " << ebl.length() << " -> " << size
- << " (left " << eleft << "/" << bleft << ")"
- << dendl;
-
- // add it this entry
- entry_header_t h;
- h.seq = seq;
- h.flags = 0;
- h.len = ebl.length();
- h.pre_pad = 0;
- h.post_pad = 0;
- h.make_magic(queue_pos, header.fsid);
-
-
- // pad?
- if ((queue_pos + size) % header.alignment) {
- h.post_pad = header.alignment - ((queue_pos + size) % header.alignment);
- size += h.post_pad;
- //dout(20) << " padding with " << h.post_pad << " bytes, queue_pos now " << queue_pos << dendl;
- }
- dout(15) << " pad " << h.pre_pad << " " << h.post_pad << " len " << h.len << dendl;
-
- bl.append((const char*)&h, sizeof(h));
- bl.claim_append(ebl);
- if (h.post_pad) {
- bufferptr bp = buffer::create_static(h.post_pad, zero_buf);
- bl.push_back(bp);
- }
- bl.append((const char*)&h, sizeof(h));
- if (writeq.front().fin) {
- writing_seq.push_back(seq);
- writing_fin.push_back(writeq.front().fin);
- }
-
- // pop from writeq
- writeq.pop_front();
- journalq.push_back(pair<__u64,off64_t>(seq, queue_pos));
-
- queue_pos += size;
+ bool r = prepare_single_write(bl, queue_pos, can_wrap);
+ if (!r)
+ break;
if (eleft) {
if (--eleft == 0) {
- dout(20) << " hit max events per write " << g_conf.journal_max_write_entries << dendl;
+ dout(20) << "prepare_multi_write hit max events per write " << g_conf.journal_max_write_entries << dendl;
break;
}
}
- if (bleft) {
- bleft -= size;
- if (bleft == 0) {
- dout(20) << " hit max write size " << g_conf.journal_max_write_bytes << dendl;
+ if (bmax) {
+ if (bl.length() >= bmax) {
+ dout(20) << "prepare_multi_write hit max write size " << g_conf.journal_max_write_bytes << dendl;
break;
}
}
}
+
+ assert(write_pos + bl.length() == queue_pos);
}
-bool FileJournal::prepare_single_dio_write(bufferlist& bl)
+bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap)
{
// grab next item
__u64 seq = writeq.front().seq;
bufferlist &ebl = writeq.front().bl;
-
off64_t base_size = 2*sizeof(entry_header_t) + ebl.length();
off64_t size = ROUND_UP_TO(base_size, header.alignment);
-
- if (!check_for_wrap(seq, &write_pos, size, true))
+
+ if (!check_for_wrap(seq, &queue_pos, size, can_wrap))
return false;
- if (full_commit_seq || full_restart_seq) return false;
-
- // build it
- dout(15) << "prepare_single_dio_write will write " << write_pos << " : seq " << seq
- << " len " << ebl.length() << " -> " << size << " (base " << base_size << ")" << dendl;
-
- bufferptr bp = buffer::create_page_aligned(size);
- entry_header_t *h = (entry_header_t*)bp.c_str();
- h->seq = seq;
- h->flags = 0;
- h->len = ebl.length();
- h->pre_pad = 0;
- h->post_pad = size - base_size;
- dout(15) << " pad " << h->pre_pad << " " << h->post_pad << dendl;
- h->make_magic(write_pos, header.fsid);
- ebl.copy(0, ebl.length(), bp.c_str()+sizeof(*h)+h->pre_pad);
- memcpy(bp.c_str() + sizeof(*h) + h->pre_pad + ebl.length() + h->post_pad, h, sizeof(*h));
- bl.push_back(bp);
-
+
+ // add to write buffer
+ dout(15) << "prepare_single_write will write " << queue_pos << " : seq " << seq
+ << " len " << ebl.length() << " -> " << size
+ << dendl;
+
+ // add it this entry
+ entry_header_t h;
+ h.seq = seq;
+ h.pre_pad = 0;
+ h.len = ebl.length();
+ h.post_pad = size - base_size;
+ h.make_magic(queue_pos, header.fsid);
+
+ bl.append((const char*)&h, sizeof(h));
+ bl.claim_append(ebl);
+ if (h.post_pad) {
+ bufferptr bp = buffer::create_static(h.post_pad, zero_buf);
+ bl.push_back(bp);
+ }
+ bl.append((const char*)&h, sizeof(h));
+
if (writeq.front().fin) {
writing_seq.push_back(seq);
writing_fin.push_back(writeq.front().fin);
}
-
+
// pop from writeq
writeq.pop_front();
- journalq.push_back(pair<__u64,off64_t>(seq, write_pos));
+ journalq.push_back(pair<__u64,off64_t>(seq, queue_pos));
+
+ queue_pos += size;
return true;
}
off64_t pos = write_pos;
::lseek64(fd, write_pos, SEEK_SET);
#endif
+
+ // make sure this is a single, contiguous buffer
+ if (directio && !bl.is_contiguous()) {
+ bl.rebuild();
+ //dout(0) << "len " << bl.length() << " page_mask " << PAGE_MASK << " ~" << ~PAGE_MASK << dendl;
+ assert((bl.length() & ~PAGE_MASK) == 0);
+ }
+
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
it != bl.buffers().end();
it++) {
}
bufferlist bl;
- if (directio)
- prepare_single_dio_write(bl);
- else
- prepare_multi_write(bl);
+ prepare_multi_write(bl);
do_write(bl);
}