// throw out what we have so far
full_state = FULL_FULL;
while (!writeq_empty()) {
- put_throttle(1, peek_write().bl.length());
+ put_throttle(1, peek_write().orig_len);
pop_write();
}
print_header(header);
write_item &next_write = peek_write();
uint64_t seq = next_write.seq;
bufferlist &ebl = next_write.bl;
- unsigned head_size = sizeof(entry_header_t);
- off64_t base_size = 2*head_size + ebl.length();
-
- int alignment = next_write.alignment; // we want to start ebl with this alignment
- unsigned pre_pad = 0;
- if (alignment >= 0)
- pre_pad = ((unsigned int)alignment - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
- off64_t size = ROUND_UP_TO(base_size + pre_pad, header.alignment);
- unsigned post_pad = size - base_size - pre_pad;
+ off64_t size = ebl.length();
int r = check_for_full(seq, queue_pos, size);
if (r < 0)
return r; // ENOSPC or EAGAIN
- orig_bytes += ebl.length();
+ uint32_t orig_len = next_write.orig_len;
+ orig_bytes += orig_len;
orig_ops++;
// add to write buffer
dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
- << " len " << ebl.length() << " -> " << size
- << " (head " << head_size << " pre_pad " << pre_pad
- << " ebl " << ebl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
- << " (ebl alignment " << alignment << ")"
- << dendl;
+ << " len " << orig_len << " -> " << size << dendl;
- // add it this entry
- entry_header_t h;
- memset(&h, 0, sizeof(h));
- h.seq = seq;
- h.pre_pad = pre_pad;
- h.len = ebl.length();
- h.post_pad = post_pad;
- h.make_magic(queue_pos, header.get_fsid64());
- h.crc32c = ebl.crc32c(0);
-
- bl.append((const char*)&h, sizeof(h));
- if (pre_pad) {
- bufferptr bp = buffer::create_static(pre_pad, zero_buf);
- bl.push_back(bp);
- }
- bl.claim_append(ebl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
-
- if (h.post_pad) {
- bufferptr bp = buffer::create_static(post_pad, zero_buf);
- bl.push_back(bp);
- }
- bl.append((const char*)&h, sizeof(h));
-
+ unsigned seq_offset = offsetof(entry_header_t, seq);
+ unsigned magic1_offset = offsetof(entry_header_t, magic1);
+ unsigned magic2_offset = offsetof(entry_header_t, magic2);
+
+ bufferptr headerptr = ebl.buffers().front();
+ uint64_t _seq = seq;
+ uint64_t _queue_pos = queue_pos;
+ uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64());
+ headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq);
+ headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
+ headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2);
+
+ bufferptr footerptr = ebl.buffers().back();
+ unsigned post_offset = footerptr.length() - sizeof(entry_header_t);
+ footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq);
+ footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
+ footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2);
+
+ bl.claim_append(ebl);
if (next_write.tracked_op)
next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
// make sure list segments are page aligned
if (directio && (!bl.is_aligned(block_size) ||
!bl.is_n_align_sized(CEPH_MINIMUM_BLOCK_SIZE))) {
- bl.rebuild_aligned(CEPH_MINIMUM_BLOCK_SIZE);
- dout(10) << __func__ << " total memcopy: " << bl.get_memcopy_count() << dendl;
+ assert(0 == "bl should be align");
if ((bl.length() & (CEPH_MINIMUM_BLOCK_SIZE - 1)) != 0 ||
(pos & (CEPH_MINIMUM_BLOCK_SIZE - 1)) != 0)
dout(0) << "rebuild_page_aligned failed, " << bl << dendl;
if (write_stop) {
dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
while (!writeq_empty()) {
- put_throttle(1, peek_write().bl.length());
+ put_throttle(1, peek_write().orig_len);
pop_write();
}
print_header(header);
}
#endif
-void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment,
+int FileJournal::prepare_entry(list<ObjectStore::Transaction*>& tls, bufferlist* tbl) {
+ dout(10) << "prepare_entry " << tls << dendl;
+ unsigned data_len = 0;
+ int data_align = -1; // -1 indicates that we don't care about the alignment
+ bufferlist bl;
+ for (list<ObjectStore::Transaction*>::iterator p = tls.begin();
+ p != tls.end(); ++p) {
+ ObjectStore::Transaction *t = *p;
+ if (t->get_data_length() > data_len &&
+ (int)t->get_data_length() >= g_conf->journal_align_min_size) {
+ data_len = t->get_data_length();
+ data_align = (t->get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
+ }
+ ::encode(*t, bl);
+ }
+ if (tbl->length()) {
+ bl.claim_append(*tbl);
+ }
+ // add it this entry
+ entry_header_t h;
+ unsigned head_size = sizeof(entry_header_t);
+ off64_t base_size = 2*head_size + bl.length();
+ memset(&h, 0, sizeof(h));
+ if (data_align >= 0)
+ h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
+ off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment);
+ unsigned post_pad = size - base_size - h.pre_pad;
+ h.len = bl.length();
+ h.post_pad = post_pad;
+ h.crc32c = bl.crc32c(0);
+ dout(10) << " len " << bl.length() << " -> " << size
+ << " (head " << head_size << " pre_pad " << h.pre_pad
+ << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
+ << " (bl alignment " << data_align << ")"
+ << dendl;
+ bufferlist ebl;
+ // header
+ ebl.append((const char*)&h, sizeof(h));
+ if (h.pre_pad) {
+ ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
+ }
+ // payload
+ ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
+ if (h.post_pad) {
+ ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
+ }
+ // footer
+ ebl.append((const char*)&h, sizeof(h));
+ ebl.rebuild_aligned(CEPH_MINIMUM_BLOCK_SIZE);
+ tbl->claim(ebl);
+ return h.len;
+}
+
+void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
Context *oncommit, TrackedOpRef osd_op)
{
// dump on queue
assert(e.length() > 0);
throttle_ops.take(1);
- throttle_bytes.take(e.length());
+ throttle_bytes.take(orig_len);
if (osd_op)
osd_op->mark_event("commit_queued_for_journal_write");
if (logger) {
seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));
if (writeq.empty())
writeq_cond.Signal();
- writeq.push_back(write_item(seq, e, alignment, osd_op));
+ writeq.push_back(write_item(seq, e, orig_len, osd_op));
}
}
dout(15) << " dropping committed but unwritten seq " << peek_write().seq
<< " len " << peek_write().bl.length()
<< dendl;
- put_throttle(1, peek_write().bl.length());
+ put_throttle(1, peek_write().orig_len);
pop_write();
}
struct write_item {
uint64_t seq;
bufferlist bl;
- int alignment;
+ uint32_t orig_len;
TrackedOpRef tracked_op;
- write_item(uint64_t s, bufferlist& b, int al, TrackedOpRef opref) :
- seq(s), alignment(al), tracked_op(opref) {
+ write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) :
+ seq(s), orig_len(ol), tracked_op(opref) {
bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
}
- write_item() : seq(0), alignment(0) {}
+ write_item() : seq(0), orig_len(0) {}
};
Mutex finisher_lock;
completions.pop_front();
}
- void submit_entry(uint64_t seq, bufferlist& bl, int alignment,
+ int prepare_entry(list<ObjectStore::Transaction*>& tls, bufferlist* tbl);
+
+ void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len,
Context *oncommit,
TrackedOpRef osd_op = TrackedOpRef());
/// End protected by finisher_lock
uint64_t magic1;
uint64_t magic2;
- void make_magic(off64_t pos, uint64_t fsid) {
- magic1 = pos;
- magic2 = fsid ^ seq ^ len;
+ static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) {
+ return (fsid ^ seq ^ len);
}
bool check_magic(off64_t pos, uint64_t fsid) {
return
- magic1 == (uint64_t)pos &&
- magic2 == (fsid ^ seq ^ len);
+ magic1 == (uint64_t)pos &&
+ magic2 == (fsid ^ seq ^ len);
}
} __attribute__((__packed__, aligned(4)));
string fn;
char *zero_buf;
-
off64_t max_size;
size_t block_size;
bool directio, aio, force_aio;
journal->throttle();
//prepare and encode transactions data out of lock
bufferlist tbl;
- int data_align = _op_journal_transactions_prepare(o->tls, tbl);
+ int orig_len = -1;
+ if (journal && journal->is_writeable()) {
+ orig_len = journal->prepare_entry(o->tls, &tbl);
+ }
uint64_t op_num = submit_manager.op_submit_start();
o->op = op_num;
if (m_filestore_journal_parallel) {
dout(5) << "queue_transactions (parallel) " << o->op << " " << o->tls << dendl;
- _op_journal_transactions(tbl, data_align, o->op, ondisk, osd_op);
+ _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
// queue inside submit_manager op submission lock
queue_op(osr, o);
osr->queue_journal(o->op);
- _op_journal_transactions(tbl, data_align, o->op,
+ _op_journal_transactions(tbl, orig_len, o->op,
new C_JournaledAhead(this, osr, o, ondisk),
osd_op);
} else {
//prepare and encode transactions data out of lock
bufferlist tbl;
- int data_align = _op_journal_transactions_prepare(tls, tbl);
+ int orig_len = -1;
+ if (journal && journal->is_writeable()) {
+ orig_len = journal->prepare_entry(tls, &tbl);
+ }
uint64_t op = submit_manager.op_submit_start();
dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
int r = do_transactions(tls, op);
if (r >= 0) {
- _op_journal_transactions(tbl, data_align, op, ondisk, osd_op);
+ _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
} else {
delete ondisk;
}
}
void JournalingObjectStore::_op_journal_transactions(
- bufferlist& tbl, int data_align, uint64_t op,
+ bufferlist& tbl, uint32_t orig_len, uint64_t op,
Context *onjournal, TrackedOpRef osd_op)
{
if (osd_op.get())
dout(10) << "op_journal_transactions " << op << dendl;
if (journal && journal->is_writeable()) {
- journal->submit_entry(op, tbl, data_align, onjournal, osd_op);
+ journal->submit_entry(op, tbl, orig_len, onjournal, osd_op);
} else if (onjournal) {
apply_manager.add_waiter(op, onjournal);
}
}
-int JournalingObjectStore::_op_journal_transactions_prepare(
- list<ObjectStore::Transaction*>& tls, bufferlist& tbl)
-{
- dout(10) << "_op_journal_transactions_prepare " << tls << dendl;
- unsigned data_len = 0;
- int data_align = -1; // -1 indicates that we don't care about the alignment
- for (list<ObjectStore::Transaction*>::iterator p = tls.begin();
- p != tls.end(); ++p) {
- ObjectStore::Transaction *t = *p;
- if (t->get_data_length() > data_len &&
- (int)t->get_data_length() >= g_conf->journal_align_min_size) {
- data_len = t->get_data_length();
- data_align = (t->get_data_alignment() - tbl.length()) & ~CEPH_PAGE_MASK;
- }
- ::encode(*t, tbl);
- }
- return data_align;
-}