#include "osdc/Journaler.h"
#include "common/errno.h"
#include "include/assert.h"
+#include "common/Finisher.h"
#define dout_subsys ceph_subsys_journaler
#undef dout_prefix
object_t oid = file_object_t(ino, 0);
object_locator_t oloc(pg_pool);
- objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, on_finish);
+ objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, new C_OnFinisher(on_finish, finisher));
}
void Journaler::reread_head(Context *onfinish)
ldout(cct, 1) << "_finish_read_head " << h << ". probing for end of log (from " << write_pos << ")..." << dendl;
C_ProbeEnd *fin = new C_ProbeEnd(this);
state = STATE_PROBING;
- probe(fin, &fin->end);
+ _probe(fin, &fin->end);
}
-void Journaler::probe(Context *finish, uint64_t *end)
+void Journaler::_probe(Context *finish, uint64_t *end)
{
+ assert(lock.is_locked_by_me());
ldout(cct, 1) << "probing for end of the log" << dendl;
assert(state == STATE_PROBING || state == STATE_REPROBING);
// probe the log
filer.probe(ino, &layout, CEPH_NOSNAP,
- write_pos, end, 0, true, 0, finish);
+ write_pos, end, 0, true, 0, new C_OnFinisher(finish, finisher));
}
-void Journaler::reprobe(Context *finish)
+void Journaler::_reprobe(Context *finish)
{
- Mutex::Locker l(lock);
-
ldout(cct, 10) << "reprobe" << dendl;
assert(state == STATE_ACTIVE);
state = STATE_REPROBING;
C_ReProbe *fin = new C_ReProbe(this, finish);
- probe(fin, &fin->end);
+ _probe(fin, &fin->end);
}
assert(lock.is_locked_by_me());
assert(!r); //if we get an error, we're boned
- reprobe(onfinish);
+ _reprobe(onfinish);
}
object_locator_t oloc(pg_pool);
objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0,
NULL,
- new C_WriteHead(this, last_written, oncommit));
+ new C_OnFinisher(new C_WriteHead(this, last_written, oncommit), finisher));
}
void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit)
filer.write(ino, &layout, snapc,
flush_pos, len, write_bl, ceph_clock_now(cct),
0,
- NULL, onsafe);
+ NULL, new C_OnFinisher(onsafe, finisher));
flush_pos += len;
assert(write_buf.length() == write_pos - flush_pos);
ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at "
<< "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
if (onsafe) {
- onsafe->complete(0);
- onsafe = 0;
+ finisher->queue(onsafe, 0);
}
return;
}
// queue waiter
- if (onsafe)
- waitfor_safe[write_pos].push_back(onsafe);
+ if (onsafe) {
+ waitfor_safe[write_pos].push_back(new C_OnFinisher(onsafe, finisher));
+ }
}
void Journaler::flush(Context *onsafe)
ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl;
}
SnapContext snapc;
- Context *c = new C_Journaler_Prezero(this, prezeroing_pos, len);
+ Context *c = new C_OnFinisher(new C_Journaler_Prezero(this, prezeroing_pos, len), finisher);
filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), 0, NULL, c);
prezeroing_pos += len;
}
}
+// Lock cycle because we get called out of objecter callback (holding
+// objecter read lock), but there are also cases where we take the journaler
+// lock before calling into objecter to do I/O.
void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
{
Mutex::Locker l(lock);
if (l > len)
l = len;
C_Read *c = new C_Read(this, requested_pos);
- filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, c);
+ filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, new C_OnFinisher(c, finisher));
requested_pos += l;
len -= l;
}
uint64_t first = trimmed_pos / get_layout_period();
uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0,
- new C_EraseFinish(this, completion));
+ new C_OnFinisher(new C_EraseFinish(this, completion), finisher));
// We will not start the operation to delete the header until _finish_erase has
// seen the data deletion succeed: otherwise if there was an error deleting data
if (data_result == 0) {
// Async delete the journal header
filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0,
- completion);
+ new C_OnFinisher(completion, finisher));
} else {
lderr(cct) << "Failed to delete journal " << ino << " data: " << cpp_strerror(data_result) << dendl;
completion->complete(data_result);
uint64_t num = (trim_to - trimming_pos) / period;
SnapContext snapc;
filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0,
- new C_Trim(this, trim_to));
+ new C_OnFinisher(new C_Trim(this, trim_to), finisher));
trimming_pos = trim_to;
}
class CephContext;
class Context;
class PerfCounters;
+class Finisher;
typedef __u8 stream_format_t;
return stream_format;
}
+ Header last_committed;
+
private:
// me
CephContext *cct;
Mutex lock;
+ Finisher *finisher;
Header last_written;
- Header last_committed;
inodeno_t ino;
int64_t pg_pool;
bool readonly;
void read_head(Context *on_finish, bufferlist *bl);
void _finish_read_head(int r, bufferlist& bl);
void _finish_reread_head(int r, bufferlist& bl, Context *finish);
- void probe(Context *finish, uint64_t *end);
+ void _probe(Context *finish, uint64_t *end);
void _finish_probe_end(int r, uint64_t end);
+ void _reprobe(Context *onfinish);
void _finish_reprobe(int r, uint64_t end, Context *onfinish);
void _finish_reread_head_and_probe(int r, Context *onfinish);
class C_ReadHead;
friend class C_EraseFinish;
public:
- Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim) :
- cct(obj->cct), lock("Journaler"),
- last_written(mag), last_committed(mag),
+ Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim, Finisher *f=NULL) :
+ last_committed(mag),
+ cct(obj->cct), lock("Journaler"), finisher(f),
+ last_written(mag),
ino(ino_), pg_pool(pool), readonly(true),
stream_format(-1), journal_stream(-1),
magic(mag),
void create(ceph_file_layout *layout, stream_format_t const sf);
void recover(Context *onfinish);
void reread_head(Context *onfinish);
- void reprobe(Context *onfinish);
void reread_head_and_probe(Context *onfinish);
void write_head(Context *onsave=0);
void wait_for_flush(Context *onsafe = 0);
// Synchronous getters
// ===================
+ // TODO: need some locks on reads for true safety
uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; }
ceph_file_layout& get_layout() { return layout; }
bool is_active() { return state == STATE_ACTIVE; }