-bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size)
+int FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size)
{
// already full?
if (full_commit_seq || full_restart_seq)
- return false;
+ return -ENOSPC;
- retry:
- off64_t room = (header.max_size - pos) + (header.start - get_top());
+ // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL.
+ off64_t room;
+ if (pos >= header.start)
+ room = (header.max_size - pos) + (header.start - get_top()) - 1;
+ else
+ room = header.start - pos - 1;
+ dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start
+ << " top " << get_top() << dendl;
if (do_sync_cond) {
if (room < (header.max_size >> 1) &&
dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl;
if (pos + size > header.max_size)
must_write_header = true;
- return true;
+ return 0;
}
// full
<< " (max_size " << header.max_size << " start " << header.start << ")"
<< dendl;
- // wait?
- if (wait_on_full) {
- dout(1) << "check_for_full waiting for a commit" << dendl;
- commit_cond.Wait(write_lock);
- goto retry;
- }
+ if (wait_on_full)
+ return -ENOSPC;
+ // throw out what we have so far
full_commit_seq = seq;
full_restart_seq = seq+1;
while (!writeq.empty()) {
writeq.pop_front();
}
print_header();
- return false;
-
+ return -ENOSPC;
}
-void FileJournal::prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytes)
+int FileJournal::prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytes)
{
// gather queued writes
off64_t queue_pos = write_pos;
unsigned bmax = g_conf.journal_max_write_bytes;
if (full_commit_seq || full_restart_seq)
- return;
+ return -ENOSPC;
while (!writeq.empty()) {
- bool r = prepare_single_write(bl, queue_pos, orig_bytes);
- if (!r)
- break;
-
- orig_ops++;
+ int r = prepare_single_write(bl, queue_pos, orig_ops, orig_bytes);
+ if (r == -ENOSPC) {
+ if (orig_ops)
+ break; // commit what we have
+ dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
+ return -ENOSPC; // hrm, full on first op
+ }
if (eleft) {
if (--eleft == 0) {
}
//assert(write_pos + bl.length() == queue_pos);
+ return 0;
}
-bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_size)
+int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_ops, __u64& orig_bytes)
{
// grab next item
__u64 seq = writeq.front().seq;
off64_t base_size = 2*sizeof(entry_header_t) + ebl.length();
off64_t size = ROUND_UP_TO(base_size, header.alignment);
- if (!check_for_full(seq, queue_pos, size))
- return false;
+ int r = check_for_full(seq, queue_pos, size);
+ if (r < 0)
+ return r; // ENOSPC or EAGAIN
- orig_size += ebl.length();
+ orig_bytes += ebl.length();
+ orig_ops++;
// add to write buffer
- dout(15) << "prepare_single_write will write " << queue_pos << " : seq " << seq
+ dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
<< " len " << ebl.length() << " -> " << size
<< dendl;
if (queue_pos > header.max_size)
queue_pos = queue_pos + get_top() - header.max_size;
- return true;
+ return 0;
}
void FileJournal::write_bl(off64_t& pos, bufferlist& bl)
__u64 orig_bytes = 0;
bufferlist bl;
- prepare_multi_write(bl, orig_ops, orig_bytes);
+ int r = prepare_multi_write(bl, orig_ops, orig_bytes);
+ if (r == -ENOSPC) {
+ dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
+ commit_cond.Wait(write_lock);
+ dout(20) << "write_thread_entry woke up" << dendl;
+ continue;
+ }
+ assert(r == 0);
do_write(bl);
__u64 new_ops = throttle_ops.put(orig_ops);
void stop_writer();
void write_thread_entry();
- bool check_for_full(__u64 seq, off64_t pos, off64_t size);
- void prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytse);
- bool prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_bytes);
+ int check_for_full(__u64 seq, off64_t pos, off64_t size);
+ int prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytee);
+ int prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_ops, __u64& orig_bytes);
void do_write(bufferlist& bl);
void write_bl(off64_t& pos, bufferlist& bl);