while (true) {
assert((log_reader->buf.pos & ~super.block_mask()) == 0);
uint64_t pos = log_reader->buf.pos;
+ uint64_t read_pos = pos;
bufferlist bl;
{
- int r = _read(log_reader, &log_reader->buf, pos, super.block_size,
+ int r = _read(log_reader, &log_reader->buf, read_pos, super.block_size,
&bl, NULL);
assert(r == (int)super.block_size);
+ read_pos += r;
}
uint64_t more = 0;
uint64_t seq;
dout(20) << __func__ << " need 0x" << std::hex << more << std::dec
<< " more bytes" << dendl;
bufferlist t;
- int r = _read(log_reader, &log_reader->buf, pos + super.block_size, more,
- &t, NULL);
+ int r = _read(log_reader, &log_reader->buf, read_pos, more, &t, NULL);
if (r < (int)more) {
dout(10) << __func__ << " 0x" << std::hex << pos
<< ": stop: len is 0x" << bl.length() + more << std::dec
}
assert(r == (int)more);
bl.claim_append(t);
+ read_pos += r;
}
bluefs_transaction_t t;
try {
assert(t.seq == 1);
break;
+ case bluefs_transaction_t::OP_JUMP:
+ {
+ uint64_t next_seq;
+ uint64_t offset;
+ ::decode(next_seq, p);
+ ::decode(offset, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_jump seq " << next_seq
+ << " offset 0x" << std::hex << offset << std::dec << dendl;
+ assert(next_seq >= log_seq);
+ log_seq = next_seq - 1; // we will increment it below
+ uint64_t skip = offset - read_pos;
+ if (skip) {
+ bufferlist junk;
+ int r = _read(log_reader, &log_reader->buf, read_pos, skip, &junk,
+ NULL);
+ if (r != (int)skip) {
+ dout(10) << __func__ << " 0x" << std::hex << read_pos
+ << ": stop: failed to skip to " << offset
+ << std::dec << dendl;
+ assert(0 == "problem with op_jump");
+ }
+ }
+ }
+ break;
+
case bluefs_transaction_t::OP_JUMP_SEQ:
{
uint64_t next_seq;
OP_DIR_REMOVE, ///< remove a dir (dirname)
OP_FILE_UPDATE, ///< set/update file metadata (file)
OP_FILE_REMOVE, ///< remove file (ino)
+ OP_JUMP, ///< jump the seq # and offset
OP_JUMP_SEQ, ///< jump the seq #
} op_t;
::encode((__u8)OP_FILE_REMOVE, op_bl);
::encode(ino, op_bl);
}
+ void op_jump(uint64_t next_seq, uint64_t offset) {
+ ::encode((__u8)OP_JUMP, op_bl);
+ ::encode(next_seq, op_bl);
+ ::encode(offset, op_bl);
+ }
void op_jump_seq(uint64_t next_seq) {
::encode((__u8)OP_JUMP_SEQ, op_bl);
::encode(next_seq, op_bl);