class Journaler::C_ReProbe : public Context {
Journaler *ls;
- Context *onfinish;
+ C_OnFinisher *onfinish;
public:
uint64_t end;
- C_ReProbe(Journaler *l, Context *onfinish_) :
+ C_ReProbe(Journaler *l, C_OnFinisher *onfinish_) :
ls(l), onfinish(onfinish_), end(0) {}
void finish(int r) {
ls->_finish_reprobe(r, end, onfinish);
ldout(cct, 1) << "read_head" << dendl;
state = STATE_READHEAD;
C_ReadHead *fin = new C_ReadHead(this);
- read_head(fin, &fin->bl);
+ _read_head(fin, &fin->bl);
}
-void Journaler::read_head(Context *on_finish, bufferlist *bl)
+void Journaler::_read_head(Context *on_finish, bufferlist *bl)
{
+ assert(lock.is_locked_by_me());
assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
object_t oid = file_object_t(ino, 0);
object_locator_t oloc(pg_pool);
- objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, new C_OnFinisher(on_finish, finisher));
+ objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, wrap_finisher(on_finish));
}
void Journaler::reread_head(Context *onfinish)
{
Mutex::Locker l(lock);
- _reread_head(onfinish);
+ _reread_head(wrap_finisher(onfinish));
}
/**
state = STATE_REREADHEAD;
C_RereadHead *fin = new C_RereadHead(this, onfinish);
- read_head(fin, &fin->bl);
+ _read_head(fin, &fin->bl);
}
void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
assert(state == STATE_PROBING || state == STATE_REPROBING);
// probe the log
filer.probe(ino, &layout, CEPH_NOSNAP,
- write_pos, end, 0, true, 0, new C_OnFinisher(finish, finisher));
+ write_pos, end, 0, true, 0, wrap_finisher(finish));
}
-void Journaler::_reprobe(Context *finish)
+void Journaler::_reprobe(C_OnFinisher *finish)
{
ldout(cct, 10) << "reprobe" << dendl;
assert(state == STATE_ACTIVE);
}
-void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish)
+void Journaler::_finish_reprobe(int r, uint64_t new_end, C_OnFinisher *onfinish)
{
Mutex::Locker l(lock);
class Journaler::C_RereadHeadProbe : public Context
{
Journaler *ls;
- Context *final_finish;
+ C_OnFinisher *final_finish;
public:
- C_RereadHeadProbe(Journaler *l, Context *finish) :
+ C_RereadHeadProbe(Journaler *l, C_OnFinisher *finish) :
ls(l), final_finish(finish) {}
void finish(int r) {
ls->_finish_reread_head_and_probe(r, final_finish);
Mutex::Locker l(lock);
assert(state == STATE_ACTIVE);
- _reread_head(new C_RereadHeadProbe(this, onfinish));
+ _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
}
-void Journaler::_finish_reread_head_and_probe(int r, Context *onfinish)
+void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
{
// Expect to be called back from finish_reread_head, which already takes lock
assert(lock.is_locked_by_me());
public:
Journaler *ls;
Header h;
- Context *oncommit;
- C_WriteHead(Journaler *l, Header& h_, Context *c) : ls(l), h(h_), oncommit(c) {}
+ C_OnFinisher *oncommit;
+ C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_), oncommit(c) {}
void finish(int r) {
ls->_finish_write_head(r, h, oncommit);
}
object_locator_t oloc(pg_pool);
objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0,
NULL,
- new C_OnFinisher(new C_WriteHead(this, last_written, oncommit), finisher));
+ wrap_finisher(new C_WriteHead(this, last_written, wrap_finisher(oncommit))));
}
-void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit)
+void Journaler::_finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit)
{
Mutex::Locker l(lock);
filer.write(ino, &layout, snapc,
flush_pos, len, write_bl, ceph_clock_now(cct),
0,
- NULL, new C_OnFinisher(onsafe, finisher));
+ NULL, wrap_finisher(onsafe));
flush_pos += len;
assert(write_buf.length() == write_pos - flush_pos);
// queue waiter
if (onsafe) {
- waitfor_safe[write_pos].push_back(new C_OnFinisher(onsafe, finisher));
+ waitfor_safe[write_pos].push_back(wrap_finisher(onsafe));
}
}
void Journaler::flush(Context *onsafe)
{
Mutex::Locker l(lock);
- _flush(onsafe);
+ _flush(wrap_finisher(onsafe));
}
-void Journaler::_flush(Context *onsafe)
+void Journaler::_flush(C_OnFinisher *onsafe)
{
assert(!readonly);
ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl;
}
SnapContext snapc;
- Context *c = new C_OnFinisher(new C_Journaler_Prezero(this, prezeroing_pos, len), finisher);
+ Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos, len));
filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), 0, NULL, c);
prezeroing_pos += len;
}
ldout(cct, 0) << "_finish_read got error " << r << dendl;
error = r;
if (on_readable) {
- Context *f = on_readable;
+ C_OnFinisher *f = on_readable;
on_readable = 0;
f->complete(r);
}
// readable!
ldout(cct, 10) << "_finish_read now readable (or at journal end)" << dendl;
if (on_readable) {
- Context *f = on_readable;
+ C_OnFinisher *f = on_readable;
on_readable = 0;
f->complete(0);
}
if (l > len)
l = len;
C_Read *c = new C_Read(this, requested_pos);
- filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, new C_OnFinisher(c, finisher));
+ filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, wrap_finisher(c));
requested_pos += l;
len -= l;
}
class Journaler::C_EraseFinish : public Context {
Journaler *journaler;
- Context *completion;
+ C_OnFinisher *completion;
public:
- C_EraseFinish(Journaler *j, Context *c) : journaler(j), completion(c) {}
+ C_EraseFinish(Journaler *j, C_OnFinisher *c) : journaler(j), completion(c) {}
void finish(int r) {
journaler->_finish_erase(r, completion);
}
uint64_t first = trimmed_pos / get_layout_period();
uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0,
- new C_OnFinisher(new C_EraseFinish(this, completion), finisher));
+ wrap_finisher(new C_EraseFinish(this, wrap_finisher(completion))));
// We will not start the operation to delete the header until _finish_erase has
// seen the data deletion succeed: otherwise if there was an error deleting data
// we might prematurely delete the header thereby lose our reference to the data.
}
-void Journaler::_finish_erase(int data_result, Context *completion)
+void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
{
Mutex::Locker l(lock);
if (data_result == 0) {
// Async delete the journal header
filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0,
- new C_OnFinisher(completion, finisher));
+ wrap_finisher(completion));
} else {
lderr(cct) << "Failed to delete journal " << ino << " data: " << cpp_strerror(data_result) << dendl;
completion->complete(data_result);
ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl;
assert(!_is_readable());
assert(on_readable == 0);
- on_readable = onreadable;
+ on_readable = wrap_finisher(onreadable);
}
uint64_t num = (trim_to - trimming_pos) / period;
SnapContext snapc;
filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0,
- new C_OnFinisher(new C_Trim(this, trim_to), finisher));
+ wrap_finisher(new C_Trim(this, trim_to)));
trimming_pos = trim_to;
}
}
}
+/**
+ * set write error callback
+ *
+ * Set a callback/context to trigger if we get a write error from
+ * the objecter. This may be from an explicit request (e.g., flush)
+ * or something async the journaler did on its own (e.g., journal
+ * header update).
+ *
+ * It is only used once; if the caller continues to use the
+ * Journaler and wants to hear about errors, it needs to reset the
+ * error_handler.
+ *
+ * @param c callback/context to trigger on error
+ */
+void Journaler::set_write_error_handler(Context *c) {
+ Mutex::Locker l(lock);
+ assert(!on_write_error);
+ on_write_error = wrap_finisher(c);
+}
+
+
+/**
+ * Wrap a context in a C_OnFinisher, if it is non-NULL
+ *
+ * Utility function to avoid lots of error-prone and verbose
+ * NULL checking on contexts passed in.
+ */
+C_OnFinisher *Journaler::wrap_finisher(Context *c)
+{
+ if (c != NULL) {
+ return new C_OnFinisher(c, finisher);
+ } else {
+ return NULL;
+ }
+}
class Context;
class PerfCounters;
class Finisher;
+class C_OnFinisher;
typedef __u8 stream_format_t;
// header
utime_t last_wrote_head;
- void _finish_write_head(int r, Header &wrote, Context *oncommit);
+ void _finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit);
class C_WriteHead;
friend class C_WriteHead;
void _reread_head(Context *onfinish);
void _set_layout(ceph_file_layout const *l);
list<Context*> waitfor_recover;
- void read_head(Context *on_finish, bufferlist *bl);
+ void _read_head(Context *on_finish, bufferlist *bl);
void _finish_read_head(int r, bufferlist& bl);
void _finish_reread_head(int r, bufferlist& bl, Context *finish);
void _probe(Context *finish, uint64_t *end);
void _finish_probe_end(int r, uint64_t end);
- void _reprobe(Context *onfinish);
- void _finish_reprobe(int r, uint64_t end, Context *onfinish);
- void _finish_reread_head_and_probe(int r, Context *onfinish);
+ void _reprobe(C_OnFinisher *onfinish);
+ void _finish_reprobe(int r, uint64_t end, C_OnFinisher *onfinish);
+ void _finish_reread_head_and_probe(int r, C_OnFinisher *onfinish);
class C_ReadHead;
friend class C_ReadHead;
class C_ProbeEnd;
bool waiting_for_zero;
interval_set<uint64_t> pending_zero; // non-contig bits we've zeroed
std::set<uint64_t> pending_safe;
+ // XXX this would be C_OnFinisher reather than Context if it weren't for use of C_RetryRead here
+ // FIXME but oh dear, these are called back inside lock and RetryRead takes the lock!!!
std::map<uint64_t, std::list<Context*> > waitfor_safe; // when safe through given offset
- void _flush(Context *onsafe);
+ void _flush(C_OnFinisher *onsafe);
void _do_flush(unsigned amount=0);
void _finish_flush(int r, uint64_t start, utime_t stamp);
class C_Flush;
uint64_t temp_fetch_len;
// for wait_for_readable()
- Context *on_readable;
-
- Context *on_write_error;
+ C_OnFinisher *on_readable;
+ C_OnFinisher *on_write_error;
void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback
void _finish_retry_read(int r);
bool _is_readable();
- void _finish_erase(int data_result, Context *completion);
+ void _finish_erase(int data_result, C_OnFinisher *completion);
class C_EraseFinish;
friend class C_EraseFinish;
+ C_OnFinisher *wrap_finisher(Context *c);
+
public:
Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim, Finisher *f=NULL) :
last_committed(mag),
assert(!readonly);
_issue_prezero();
}
- /**
- * set write error callback
- *
- * Set a callback/context to trigger if we get a write error from
- * the objecter. This may be from an explicit request (e.g., flush)
- * or something async the journaler did on its own (e.g., journal
- * header update).
- *
- * It is only used once; if the caller continues to use the
- * Journaler and wants to hear about errors, it needs to reset the
- * error_handler.
- *
- * @param c callback/context to trigger on error
- */
- void set_write_error_handler(Context *c) {
- Mutex::Locker l(lock);
- assert(!on_write_error);
- on_write_error = c;
- }
+
+ void set_write_error_handler(Context *c);
// Synchronous getters
// ===================