class Journaler::C_ProbeEnd : public Context {
Journaler *ls;
public:
- off_t end;
+ __s64 end;
C_ProbeEnd(Journaler *l) : ls(l), end(-1) {}
void finish(int r) {
ls->_finish_probe_end(r, end);
filer.probe_fwd(inode, h.write_pos, &fin->end, CEPH_OSD_OP_INCLOCK_FAIL, fin);
}
-void Journaler::_finish_probe_end(int r, off_t end)
+void Journaler::_finish_probe_end(int r, __s64 end)
{
assert(state == STATE_PROBING);
class Journaler::C_Flush : public Context {
Journaler *ls;
- off_t start;
+ __s64 start;
public:
- C_Flush(Journaler *l, off_t s) : ls(l), start(s) {}
+ C_Flush(Journaler *l, __s64 s) : ls(l), start(s) {}
void finish(int r) { ls->_finish_flush(r, start); }
};
-void Journaler::_finish_flush(int r, off_t start)
+void Journaler::_finish_flush(int r, __s64 start)
{
assert(r>=0);
}
-off_t Journaler::append_entry(bufferlist& bl, Context *onsync)
+__s64 Journaler::append_entry(bufferlist& bl, Context *onsync)
{
uint32_t s = bl.length();
if (!g_conf.journaler_allow_split_entries) {
// will we span a stripe boundary?
int p = ceph_file_layout_su(inode.layout);
- if (write_pos / p != (write_pos + (off_t)(bl.length() + sizeof(s))) / p) {
+ if (write_pos / p != (write_pos + (__s64)(bl.length() + sizeof(s))) / p) {
// yes.
// move write_pos forward.
- off_t owp = write_pos;
+ __s64 owp = write_pos;
write_pos += p;
write_pos -= (write_pos % p);
* then discover we need even more for an especially large entry.
* i don't think that circumstance will arise particularly often.
*/
-void Journaler::_issue_read(off_t len)
+void Journaler::_issue_read(__s64 len)
{
// make sure we're fully flushed
_do_flush();
void Journaler::_prefetch()
{
// prefetch?
- off_t left = requested_pos - read_pos;
+ __s64 left = requested_pos - read_pos;
if (left <= prefetch_from && // should read more,
!_is_reading() && // and not reading anything right now
write_pos > requested_pos) { // there's something more to read...
// start reading some more?
if (!_is_reading()) {
if (s)
- fetch_len = MAX(fetch_len, (off_t)(sizeof(s)+s-read_buf.length()));
+ fetch_len = MAX(fetch_len, (__s64)(sizeof(s)+s-read_buf.length()));
_issue_read(fetch_len);
}
class Journaler::C_Trim : public Context {
Journaler *ls;
- off_t to;
+ __s64 to;
public:
- C_Trim(Journaler *l, off_t t) : ls(l), to(t) {}
+ C_Trim(Journaler *l, __s64 t) : ls(l), to(t) {}
void finish(int r) {
ls->_trim_finish(r, to);
}
void Journaler::trim()
{
- off_t trim_to = last_committed.expire_pos;
+ __s64 trim_to = last_committed.expire_pos;
trim_to -= trim_to % ceph_file_layout_period(inode.layout);
dout(10) << "trim last_commited head was " << last_committed
<< ", can trim to " << trim_to
trimming_pos = trim_to;
}
-void Journaler::_trim_finish(int r, off_t to)
+void Journaler::_trim_finish(int r, __s64 to)
{
dout(10) << "_trim_finish trimmed_pos was " << trimmed_pos
<< ", trimmed/trimming/expire now "
public:
// this goes at the head of the log "file".
struct Header {
- off_t trimmed_pos;
- off_t expire_pos;
- off_t read_pos;
- off_t write_pos;
+ __s64 trimmed_pos;
+ __s64 expire_pos;
+ __s64 read_pos;
+ __s64 write_pos;
Header() : trimmed_pos(0), expire_pos(0), read_pos(0), write_pos(0) {}
list<Context*> waitfor_recover;
void _finish_read_head(int r, bufferlist& bl);
- void _finish_probe_end(int r, off_t end);
+ void _finish_probe_end(int r, __s64 end);
class C_ReadHead;
friend class C_ReadHead;
class C_ProbeEnd;
// writer
- off_t write_pos; // logical write position, where next entry will go
- off_t flush_pos; // where we will flush. if write_pos>flush_pos, we're buffering writes.
- off_t ack_pos; // what has been acked.
+ __s64 write_pos; // logical write position, where next entry will go
+ __s64 flush_pos; // where we will flush. if write_pos>flush_pos, we're buffering writes.
+ __s64 ack_pos; // what has been acked.
bufferlist write_buf; // write buffer. flush_pos + write_buf.length() == write_pos.
- std::map<off_t, utime_t> pending_flush; // start offsets and times for pending flushes
- std::map<off_t, std::list<Context*> > waitfor_flush; // when flushed through given offset
+ std::map<__s64, utime_t> pending_flush; // start offsets and times for pending flushes
+ std::map<__s64, std::list<Context*> > waitfor_flush; // when flushed through given offset
void _do_flush();
- void _finish_flush(int r, off_t start);
+ void _finish_flush(int r, __s64 start);
class C_Flush;
friend class C_Flush;
// reader
- off_t read_pos; // logical read position, where next entry starts.
- off_t requested_pos; // what we've requested from OSD.
- off_t received_pos; // what we've received from OSD.
+ __s64 read_pos; // logical read position, where next entry starts.
+ __s64 requested_pos; // what we've requested from OSD.
+ __s64 received_pos; // what we've received from OSD.
bufferlist read_buf; // read buffer. read_pos + read_buf.length() == prefetch_pos.
bufferlist reading_buf; // what i'm reading into
- off_t fetch_len; // how much to read at a time
- off_t prefetch_from; // how far from end do we read next chunk
+ __s64 fetch_len; // how much to read at a time
+ __s64 prefetch_from; // how far from end do we read next chunk
// for read_entry() in-progress read
bufferlist *read_bl;
return requested_pos > received_pos;
}
void _finish_read(int r); // we just read some (read completion callback)
- void _issue_read(off_t len); // read some more
+ void _issue_read(__s64 len); // read some more
void _prefetch(); // maybe read ahead
class C_Read;
friend class C_Read;
friend class C_RetryRead;
// trimmer
- off_t expire_pos; // what we're allowed to trim to
- off_t trimming_pos; // what we've requested to trim through
- off_t trimmed_pos; // what has been trimmed
- map<off_t, list<Context*> > waitfor_trim;
+ __s64 expire_pos; // what we're allowed to trim to
+ __s64 trimming_pos; // what we've requested to trim through
+ __s64 trimmed_pos; // what has been trimmed
+ map<__s64, list<Context*> > waitfor_trim;
- void _trim_finish(int r, off_t to);
+ void _trim_finish(int r, __s64 to);
class C_Trim;
friend class C_Trim;
public:
- Journaler(inode_t& inode_, Objecter *obj, Logger *l, Mutex *lk, off_t fl=0, off_t pff=0) :
+ Journaler(inode_t& inode_, Objecter *obj, Logger *l, Mutex *lk, __s64 fl=0, __s64 pff=0) :
inode(inode_), objecter(obj), filer(objecter), logger(l),
lock(lk), timer(*lk), delay_flush_event(0),
state(STATE_UNDEF), error(0),
bool is_active() { return state == STATE_ACTIVE; }
int get_error() { return error; }
- off_t get_write_pos() const { return write_pos; }
- off_t get_write_ack_pos() const { return ack_pos; }
- off_t get_read_pos() const { return read_pos; }
- off_t get_expire_pos() const { return expire_pos; }
- off_t get_trimmed_pos() const { return trimmed_pos; }
+ __s64 get_write_pos() const { return write_pos; }
+ __s64 get_write_ack_pos() const { return ack_pos; }
+ __s64 get_read_pos() const { return read_pos; }
+ __s64 get_expire_pos() const { return expire_pos; }
+ __s64 get_trimmed_pos() const { return trimmed_pos; }
// write
- off_t append_entry(bufferlist& bl, Context *onsync = 0);
+ __s64 append_entry(bufferlist& bl, Context *onsync = 0);
void flush(Context *onsync = 0);
// read
- void set_read_pos(off_t p) {
+ void set_read_pos(__s64 p) {
assert(requested_pos == received_pos); // we can't cope w/ in-progress read right now.
assert(read_bl == 0); // ...
read_pos = requested_pos = received_pos = p;
void read_entry(bufferlist* bl, Context *onfinish);
// trim
- void set_expire_pos(off_t ep) { expire_pos = ep; }
+ void set_expire_pos(__s64 ep) { expire_pos = ep; }
void trim();
//bool is_trimmable() { return trimming_pos < expire_pos; }
- //void trim(off_t trim_to=0, Context *c=0);
+ //void trim(__s64 trim_to=0, Context *c=0);
};
WRITE_CLASS_ENCODER(Journaler::Header)