From f021e18a76af7329baece499acdfd2341686d9c1 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 13 Aug 2009 15:30:14 -0700 Subject: [PATCH] uclient: handle short sync reads vs eof --- src/client/Client.cc | 202 ++++++++++++++++++++++++++++++++++++++++++- src/client/Client.h | 13 ++- 2 files changed, 210 insertions(+), 5 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 97e816e62f22e..1ff528f31b10b 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -3799,8 +3799,36 @@ void Client::unlock_fh_pos(Fh *f) } +// -//char *hackbuf = 0; +int Client::_get_caps(Inode *in, int need, int want, int *got, loff_t endoff) +{ + while (1) { + if (endoff > 0) { + // ... + } + + int implemented; + int have = in->caps_issued(&implemented); + if ((have & need) == need) { + int butnot = want & ~(have & need); + int revoking = implemented & ~have; + dout(10) << "get_caps " << *in << " have " << ccap_string(have) + << " need " << ccap_string(need) << " want " << ccap_string(want) + << " but not " << ccap_string(butnot) << " revoking " << ccap_string(revoking) + << dendl; + if ((revoking & butnot) == 0) { + *got = need | (have & want); + in->get_cap_ref(need); + return 0; + } + } + + // wait + dout(10) << "get_caps " << *in << " waiting" << dendl; + wait_on_list(in->waitfor_caps); + } +} // blocking osd interface @@ -3829,6 +3857,13 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl) { Inode *in = f->inode; + //bool lazy = f->mode == CEPH_FILE_MODE_LAZY; + + int got; + int r = _get_caps(in, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_CACHE, &got, -1); + if (r < 0) + return r; + bool movepos = false; if (offset < 0) { lock_fh_pos(f); @@ -3836,7 +3871,168 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl) movepos = true; } - bool lazy = f->mode == CEPH_FILE_MODE_LAZY; + if (got & CEPH_CAP_FILE_CACHE) + r = _read_async(f, offset, size, bl); + else + r = _read_sync(f, offset, size, bl); + + if (movepos) { + // adjust fd pos + f->pos = offset+bl->length(); + unlock_fh_pos(f); + } + + // adjust readahead state + if (f->last_pos != offset) { + f->nr_consec_read = f->consec_read_bytes = 0; + } else { + f->nr_consec_read++; + } + f->consec_read_bytes += bl->length(); + dout(10) << "readahead nr_consec_read " << f->nr_consec_read + << " for " << f->consec_read_bytes << " bytes" + << " .. last_pos " << f->last_pos << " .. offset " << offset + << dendl; + f->last_pos = offset+bl->length(); + + // done! + put_cap_ref(in, got); + return r; +} + +int Client::_read_async(Fh *f, __u64 off, __u64 len, bufferlist *bl) +{ + Inode *in = f->inode; + bool readahead = true; + + dout(10) << "_read_async " << *in << " " << off << "~" << len << dendl; + + // trim read based on file size? + if (off >= in->size) + return 0; + if (off + len > in->size) { + len = in->size - off; + readahead = false; + } + + // we will populate the cache here + if (in->cap_refs[CEPH_CAP_FILE_CACHE] == 0) + in->get_cap_ref(CEPH_CAP_FILE_CACHE); + + // readahead? + if (readahead && + f->nr_consec_read && + (g_conf.client_readahead_max_bytes || + g_conf.client_readahead_max_periods)) { + loff_t l = f->consec_read_bytes * 2; + if (g_conf.client_readahead_min) + l = MAX(l, g_conf.client_readahead_min); + if (g_conf.client_readahead_max_bytes) + l = MIN(l, g_conf.client_readahead_max_bytes); + loff_t p = ceph_file_layout_period(in->layout); + if (g_conf.client_readahead_max_periods) + l = MIN(l, g_conf.client_readahead_max_periods * p); + if (l >= 2*p) + // align with period + l -= (off+l) % p; + // don't read past end of file + if (off+l > in->size) + l = in->size - off; + + dout(10) << "readahead " << f->nr_consec_read << " reads " + << f->consec_read_bytes << " bytes ... readahead " << off << "~" << l + << " (caller wants " << off << "~" << len << ")" << dendl; + if (l > 0) { + objectcacher->file_read(in->ino, &in->layout, in->snapid, + off, l, NULL, 0, 0); + dout(10) << "readahead initiated" << dendl; + } + } + + // read (and possibly block) + int r, rvalue = 0; + Mutex flock("Client::_read_async flock"); + Cond cond; + bool done = false; + Context *onfinish = new C_SafeCond(&flock, &cond, &done, &rvalue); + if (in->snapid == CEPH_NOSNAP) + r = objectcacher->file_read(in->ino, &in->layout, in->snapid, + off, len, bl, 0, onfinish); + else + r = objectcacher->file_read(in->ino, &in->layout, in->snapid, + off, len, bl, 0, onfinish); + if (r == 0) { + while (!done) + cond.Wait(client_lock); + r = rvalue; + } else { + // it was cached. + delete onfinish; + } + return r; +} + +int Client::_read_sync(Fh *f, __u64 off, __u64 len, bufferlist *bl) +{ + Inode *in = f->inode; + __u64 pos = off; + int left = len; + int read = 0; + + dout(10) << "_read_sync " << *in << " " << off << "~" << len << dendl; + + int flags = 0; + if (in->hack_balance_reads || g_conf.client_hack_balance_reads) + flags |= CEPH_OSD_FLAG_BALANCE_READS; + + Mutex flock("Client::_read_sync flock"); + Cond cond; + while (1) { + int r = 0; + bool done = false; + Context *onfinish = new C_SafeCond(&flock, &cond, &done, &r); + bufferlist tbl; + + int wanted = left; + filer->read(in->ino, &in->layout, in->snapid, + pos, left, &tbl, flags, onfinish); + while (!done) + cond.Wait(client_lock); + + if (r < 0) + return r; + if (tbl.length()) { + r = tbl.length(); + + read += r; + pos += r; + left -= r; + bl->claim_append(tbl); + } + // short read? + if (r >= 0 && r < wanted) { + if (pos + left <= in->size) { + // hole, zero and return. + bufferptr z(left); + z.zero(); + bl->push_back(z); + read += left; + return read; + } + + // reverify size + r = _getattr(in, CEPH_STAT_CAP_SIZE); + if (r < 0) + return r; + + // eof? short read. + if (pos >= in->size) + return read; + } + } +} + +#if 0 // wait for RD cap and/or a valid file size int issued; @@ -3995,7 +4191,7 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl) return rvalue; } - +#endif /* diff --git a/src/client/Client.h b/src/client/Client.h index deb9be398d8fb..f7152f37fe71f 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -453,13 +453,18 @@ class Inode { return true; } - int caps_issued() { + int caps_issued(int *implemented = 0) { int c = exporting_issued | snap_caps; + int i = 0; for (map::iterator it = caps.begin(); it != caps.end(); it++) - if (cap_is_valid(it->second)) + if (cap_is_valid(it->second)) { c |= it->second->issued; + i |= it->second->implemented; + } + if (implemented) + *implemented = i; return c; } void touch_cap(InodeCap *cap) { @@ -1024,6 +1029,10 @@ private: int _ll_put(Inode *in, int num); void _ll_drop_pins(); + int _get_caps(Inode *in, int need, int want, int *got, loff_t endoff); + int _read_sync(Fh *f, __u64 off, __u64 len, bufferlist *bl); + int _read_async(Fh *f, __u64 off, __u64 len, bufferlist *bl); + // internal interface // call these with client_lock held! int _do_lookup(Inode *dir, const char *name, Inode **target); -- 2.39.5