return -ENOSPC;
while (!writeq_empty()) {
- int r = prepare_single_write(bl, queue_pos, orig_ops, orig_bytes);
- if (r == -ENOSPC) {
- if (orig_ops)
- break; // commit what we have
-
- if (logger)
- logger->inc(l_os_j_full);
-
- if (wait_on_full) {
- dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
- } else {
- dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
-
- // throw out what we have so far
- full_state = FULL_FULL;
- while (!writeq_empty()) {
- put_throttle(1, peek_write().orig_len);
- pop_write();
- }
- print_header(header);
+ list<write_item> items;
+ batch_pop_write(items);
+ list<write_item>::iterator it = items.begin();
+ while (it != items.end()) {
+ int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes);
+ if (r == 0) { // prepare ok, delete it
+ items.erase(it++);
}
-
- return -ENOSPC; // hrm, full on first op
- }
-
- if (eleft) {
- if (--eleft == 0) {
- dout(20) << "prepare_multi_write hit max events per write " << g_conf->journal_max_write_entries << dendl;
- break;
+ if (r == -ENOSPC) {
+ // the journal maybe full, insert the left item to writeq
+ batch_unpop_write(items);
+ if (orig_ops)
+ goto out; // commit what we have
+
+ if (logger)
+ logger->inc(l_os_j_full);
+
+ if (wait_on_full) {
+ dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
+ } else {
+ dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
+
+ // throw out what we have so far
+ full_state = FULL_FULL;
+ while (!writeq_empty()) {
+ put_throttle(1, peek_write().orig_len);
+ pop_write();
+ }
+ print_header(header);
+ }
+
+ return -ENOSPC; // hrm, full on first op
}
- }
- if (bmax) {
- if (bl.length() >= bmax) {
- dout(20) << "prepare_multi_write hit max write size " << g_conf->journal_max_write_bytes << dendl;
- break;
+ if (eleft) {
+ if (--eleft == 0) {
+ dout(20) << "prepare_multi_write hit max events per write " << g_conf->journal_max_write_entries << dendl;
+ batch_unpop_write(items);
+ goto out;
+ }
+ }
+ if (bmax) {
+ if (bl.length() >= bmax) {
+ dout(20) << "prepare_multi_write hit max write size " << g_conf->journal_max_write_bytes << dendl;
+ batch_unpop_write(items);
+ goto out;
+ }
}
}
}
+out:
dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
assert((write_pos + bl.length() == queue_pos) ||
(write_pos + bl.length() - header.max_size + get_top() == queue_pos));
{
assert(finisher_lock.is_locked());
utime_t now = ceph_clock_now(g_ceph_context);
- while (!completions_empty()) {
- completion_item next = completion_peek_front();
+ list<completion_item> items;
+ batch_pop_completions(items);
+ list<completion_item>::iterator it = items.begin();
+ while (it != items.end()) {
+ completion_item& next = *it;
if (next.seq > seq)
break;
- completion_pop_front();
utime_t lat = now;
lat -= next.start;
dout(10) << "queue_completions_thru seq " << seq
finisher->queue(next.finish);
if (next.tracked_op)
next.tracked_op->mark_event("journaled_completion_queued");
+ items.erase(it++);
}
+ batch_unpop_completions(items);
finisher_cond.Signal();
}
-int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
+
+int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
{
- // grab next item
- write_item &next_write = peek_write();
uint64_t seq = next_write.seq;
bufferlist &ebl = next_write.bl;
off64_t size = ebl.length();
if (next_write.tracked_op)
next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
- // pop from writeq
- pop_write();
journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
writing_seq = seq;
writeq.pop_front();
}
+void FileJournal::batch_pop_write(list<write_item> &items)
+{
+ assert(write_lock.is_locked());
+ Mutex::Locker locker(writeq_lock);
+ writeq.swap(items);
+}
+
+void FileJournal::batch_unpop_write(list<write_item> &items)
+{
+ assert(write_lock.is_locked());
+ Mutex::Locker locker(writeq_lock);
+ writeq.splice(writeq.begin(), items);
+}
+
void FileJournal::commit_start(uint64_t seq)
{
dout(10) << "commit_start" << dendl;
Mutex writeq_lock;
Cond writeq_cond;
- deque<write_item> writeq;
+ list<write_item> writeq;
bool writeq_empty();
write_item &peek_write();
void pop_write();
+ void batch_pop_write(list<write_item> &items);
+ void batch_unpop_write(list<write_item> &items);
Mutex completions_lock;
- deque<completion_item> completions;
+ list<completion_item> completions;
bool completions_empty() {
Mutex::Locker l(completions_lock);
return completions.empty();
}
+ void batch_pop_completions(list<completion_item> &items) {
+ Mutex::Locker l(completions_lock);
+ completions.swap(items);
+ }
+ void batch_unpop_completions(list<completion_item> &items) {
+ Mutex::Locker l(completions_lock);
+ completions.splice(completions.begin(), items);
+ }
completion_item completion_peek_front() {
Mutex::Locker l(completions_lock);
assert(!completions.empty());
int check_for_full(uint64_t seq, off64_t pos, off64_t size);
int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
- int prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes);
+ int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos,
+ uint64_t& orig_ops, uint64_t& orig_bytes);
void do_write(bufferlist& bl);
void write_finish_thread_entry();