From ac964259b4a1d90e1f87ce3f8108257a04726412 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Mon, 14 Sep 2015 12:19:50 -0400 Subject: [PATCH] osdc: Whitespace Since I'm making a pervasive change anyway, knock off end-of-line spaces and wrap overly long lines. Signed-off-by: Adam C. Emerson --- src/osdc/Filer.cc | 87 ++-- src/osdc/Filer.h | 141 ++++--- src/osdc/Journaler.cc | 321 ++++++++------ src/osdc/Journaler.h | 128 +++--- src/osdc/ObjectCacher.cc | 803 +++++++++++++++++++----------------- src/osdc/ObjectCacher.h | 162 +++++--- src/osdc/Objecter.cc | 734 +++++++++++++++++++------------- src/osdc/Objecter.h | 668 ++++++++++++++++-------------- src/osdc/Striper.cc | 155 ++++--- src/osdc/Striper.h | 48 ++- src/osdc/WritebackHandler.h | 10 +- 11 files changed, 1857 insertions(+), 1400 deletions(-) diff --git a/src/osdc/Filer.cc b/src/osdc/Filer.cc index 5c8a13d869722..ce5d2877960bf 100644 --- a/src/osdc/Filer.cc +++ b/src/osdc/Filer.cc @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -7,9 +7,9 @@ * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software + * License version 2.1, as published by the Free Software * Foundation. See file COPYING. - * + * */ @@ -39,7 +39,8 @@ public: object_t oid; uint64_t size; utime_t mtime; - C_Probe(Filer *f, Probe *p, object_t o) : filer(f), probe(p), oid(o), size(0) {} + C_Probe(Filer *f, Probe *p, object_t o) : filer(f), probe(p), oid(o), + size(0) {} void finish(int r) { if (r == -ENOENT) { r = 0; @@ -50,7 +51,7 @@ public: { probe->lock.Lock(); if (r != 0) { - probe->err = r; + probe->err = r; } probe_complete = filer->_probed(probe, oid, size, mtime); @@ -60,18 +61,18 @@ public: probe->onfinish->complete(probe->err); delete probe; } - } + } }; int Filer::probe(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid, uint64_t start_from, - uint64_t *end, // LB, when !fwd + uint64_t *end, // LB, when !fwd utime_t *pmtime, bool fwd, int flags, - Context *onfinish) + Context *onfinish) { ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ") << hex << ino << dec @@ -80,11 +81,13 @@ int Filer::probe(inodeno_t ino, assert(snapid); // (until there is a non-NOSNAP write) - Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime, flags, fwd, onfinish); - + Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime, + flags, fwd, onfinish); + // period (bytes before we jump unto a new set of object(s)) - uint64_t period = (uint64_t)layout->fl_stripe_count * (uint64_t)layout->fl_object_size; - + uint64_t period = (uint64_t)layout->fl_stripe_count * + (uint64_t)layout->fl_object_size; + // start with 1+ periods. probe->probing_len = period; if (probe->fwd) { @@ -96,7 +99,7 @@ int Filer::probe(inodeno_t ino, probe->probing_len -= period - (start_from % period); probe->probing_off -= probe->probing_len; } - + probe->lock.Lock(); _probe(probe); assert(!probe->lock.is_locked_by_me()); @@ -112,16 +115,16 @@ void Filer::_probe(Probe *probe) { assert(probe->lock.is_locked_by_me()); - ldout(cct, 10) << "_probe " << hex << probe->ino << dec - << " " << probe->probing_off << "~" << probe->probing_len - << dendl; - + ldout(cct, 10) << "_probe " << hex << probe->ino << dec + << " " << probe->probing_off << "~" << probe->probing_len + << dendl; + // map range onto objects probe->known_size.clear(); probe->probing.clear(); - Striper::file_to_extents(cct, probe->ino, &probe->layout, - probe->probing_off, probe->probing_len, 0, probe->probing); - + Striper::file_to_extents(cct, probe->ino, &probe->layout, probe->probing_off, + probe->probing_len, 0, probe->probing); + std::vector stat_extents; for (vector::iterator p = probe->probing.begin(); p != probe->probing.end(); @@ -133,7 +136,7 @@ void Filer::_probe(Probe *probe) probe->lock.Unlock(); for (std::vector::iterator i = stat_extents.begin(); - i != stat_extents.end(); ++i) { + i != stat_extents.end(); ++i) { C_Probe *c = new C_Probe(this, probe, i->oid); objecter->stat(i->oid, i->oloc, probe->snapid, &c->size, &c->mtime, probe->flags | CEPH_OSD_FLAG_RWORDERED, @@ -146,7 +149,8 @@ void Filer::_probe(Probe *probe) * * @return true if probe is complete and Probe object may be freed. */ -bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mtime) +bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, + utime_t mtime) { assert(probe->lock.is_locked_by_me()); @@ -187,34 +191,37 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt p != probe->probing.end(); ++p) { uint64_t shouldbe = p->length + p->offset; - ldout(cct, 10) << "_probed " << probe->ino << " object " << hex << p->oid << dec - << " should be " << shouldbe - << ", actual is " << probe->known_size[p->oid] - << dendl; + ldout(cct, 10) << "_probed " << probe->ino << " object " << hex + << p->oid << dec << " should be " << shouldbe + << ", actual is " << probe->known_size[p->oid] + << dendl; if (!probe->found_size) { assert(probe->known_size[p->oid] <= shouldbe); if ((probe->fwd && probe->known_size[p->oid] == shouldbe) || - (!probe->fwd && probe->known_size[p->oid] == 0 && probe->probing_off > 0)) + (!probe->fwd && probe->known_size[p->oid] == 0 && + probe->probing_off > 0)) continue; // keep going - + // aha, we found the end! // calc offset into buffer_extent to get distance from probe->from. uint64_t oleft = probe->known_size[p->oid] - p->offset; - for (vector >::iterator i = p->buffer_extents.begin(); + for (vector >::iterator i + = p->buffer_extents.begin(); i != p->buffer_extents.end(); ++i) { if (oleft <= (uint64_t)i->second) { end = probe->probing_off + i->first + oleft; - ldout(cct, 10) << "_probed end is in buffer_extent " << i->first << "~" << i->second << " off " << oleft - << ", from was " << probe->probing_off << ", end is " << end - << dendl; - + ldout(cct, 10) << "_probed end is in buffer_extent " << i->first + << "~" << i->second << " off " << oleft + << ", from was " << probe->probing_off << ", end is " + << end << dendl; + probe->found_size = true; ldout(cct, 10) << "_probed found size at " << end << dendl; *probe->psize = end; - + if (!probe->pmtime) // stop if we don't need mtime too break; } @@ -228,7 +235,8 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt // keep probing! ldout(cct, 10) << "_probed probing further" << dendl; - uint64_t period = (uint64_t)probe->layout.fl_stripe_count * (uint64_t)probe->layout.fl_object_size; + uint64_t period = (uint64_t)probe->layout.fl_stripe_count * + (uint64_t)probe->layout.fl_object_size; if (probe->fwd) { probe->probing_off += probe->probing_len; assert(probe->probing_off % period == 0); @@ -278,7 +286,7 @@ int Filer::purge_range(inodeno_t ino, uint64_t first_obj, uint64_t num_obj, utime_t mtime, int flags, - Context *oncommit) + Context *oncommit) { assert(num_obj > 0); @@ -312,8 +320,9 @@ void Filer::_do_purge_range(PurgeRange *pr, int fin) { pr->lock.Lock(); pr->uncommitted -= fin; - ldout(cct, 10) << "_do_purge_range " << pr->ino << " objects " << pr->first << "~" << pr->num - << " uncommitted " << pr->uncommitted << dendl; + ldout(cct, 10) << "_do_purge_range " << pr->ino << " objects " << pr->first + << "~" << pr->num << " uncommitted " << pr->uncommitted + << dendl; if (pr->num == 0 && pr->uncommitted == 0) { pr->oncommit->complete(0); @@ -345,5 +354,3 @@ void Filer::_do_purge_range(PurgeRange *pr, int fin) new C_OnFinisher(new C_PurgeRange(this, pr), finisher)); } } - - diff --git a/src/osdc/Filer.h b/src/osdc/Filer.h index 26a9204e9ffa7..d8105bbc94cf6 100644 --- a/src/osdc/Filer.h +++ b/src/osdc/Filer.h @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -7,9 +7,9 @@ * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software + * License version 2.1, as published by the Free Software * Foundation. See file COPYING. - * + * */ @@ -23,7 +23,7 @@ * * also, provide convenience methods that call objecter for you. * - * "files" are identified by ino. + * "files" are identified by ino. */ #include "include/types.h" @@ -44,7 +44,7 @@ class Filer { CephContext *cct; Objecter *objecter; Finisher *finisher; - + // probes struct Probe { Mutex lock; @@ -60,10 +60,10 @@ class Filer { bool fwd; Context *onfinish; - + vector probing; uint64_t probing_off, probing_len; - + map known_size; utime_t max_mtime; @@ -73,13 +73,13 @@ class Filer { bool found_size; Probe(inodeno_t i, ceph_file_layout &l, snapid_t sn, - uint64_t f, uint64_t *e, utime_t *m, int fl, bool fw, Context *c) : + uint64_t f, uint64_t *e, utime_t *m, int fl, bool fw, Context *c) : lock("Filer::Probe"), ino(i), layout(l), snapid(sn), psize(e), pmtime(m), flags(fl), fwd(fw), onfinish(c), probing_off(f), probing_len(0), err(0), found_size(false) {} }; - + class C_Probe; void _probe(Probe *p); @@ -102,11 +102,11 @@ class Filer { int read(inodeno_t ino, ceph_file_layout *layout, snapid_t snap, - uint64_t offset, - uint64_t len, - bufferlist *bl, // ptr to data + uint64_t offset, + uint64_t len, + bufferlist *bl, // ptr to data int flags, - Context *onfinish, + Context *onfinish, int op_flags = 0) { assert(snap); // (until there is a non-NOSNAP write) vector extents; @@ -116,19 +116,20 @@ class Filer { } int read_trunc(inodeno_t ino, - ceph_file_layout *layout, - snapid_t snap, - uint64_t offset, - uint64_t len, - bufferlist *bl, // ptr to data - int flags, - uint64_t truncate_size, - __u32 truncate_seq, - Context *onfinish, - int op_flags = 0) { + ceph_file_layout *layout, + snapid_t snap, + uint64_t offset, + uint64_t len, + bufferlist *bl, // ptr to data + int flags, + uint64_t truncate_size, + __u32 truncate_seq, + Context *onfinish, + int op_flags = 0) { assert(snap); // (until there is a non-NOSNAP write) vector extents; - Striper::file_to_extents(cct, ino, layout, offset, len, truncate_size, extents); + Striper::file_to_extents(cct, ino, layout, offset, len, truncate_size, + extents); objecter->sg_read_trunc(extents, snap, bl, flags, truncate_size, truncate_seq, onfinish, op_flags); return 0; @@ -137,35 +138,37 @@ class Filer { int write(inodeno_t ino, ceph_file_layout *layout, const SnapContext& snapc, - uint64_t offset, - uint64_t len, - bufferlist& bl, + uint64_t offset, + uint64_t len, + bufferlist& bl, utime_t mtime, - int flags, - Context *onack, - Context *oncommit, + int flags, + Context *onack, + Context *oncommit, int op_flags = 0) { vector extents; Striper::file_to_extents(cct, ino, layout, offset, len, 0, extents); - objecter->sg_write(extents, snapc, bl, mtime, flags, onack, oncommit, op_flags); + objecter->sg_write(extents, snapc, bl, mtime, flags, onack, oncommit, + op_flags); return 0; } int write_trunc(inodeno_t ino, - ceph_file_layout *layout, - const SnapContext& snapc, - uint64_t offset, - uint64_t len, - bufferlist& bl, - utime_t mtime, - int flags, - uint64_t truncate_size, - __u32 truncate_seq, - Context *onack, - Context *oncommit, - int op_flags = 0) { + ceph_file_layout *layout, + const SnapContext& snapc, + uint64_t offset, + uint64_t len, + bufferlist& bl, + utime_t mtime, + int flags, + uint64_t truncate_size, + __u32 truncate_seq, + Context *onack, + Context *oncommit, + int op_flags = 0) { vector extents; - Striper::file_to_extents(cct, ino, layout, offset, len, truncate_size, extents); + Striper::file_to_extents(cct, ino, layout, offset, len, truncate_size, + extents); objecter->sg_write_trunc(extents, snapc, bl, mtime, flags, truncate_size, truncate_seq, onack, oncommit, op_flags); return 0; @@ -188,11 +191,14 @@ class Filer { ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC; ops[0].op.extent.truncate_seq = truncate_seq; ops[0].op.extent.truncate_size = extents[0].offset; - objecter->_modify(extents[0].oid, extents[0].oloc, ops, mtime, snapc, flags, onack, oncommit); + objecter->_modify(extents[0].oid, extents[0].oloc, ops, mtime, snapc, + flags, onack, oncommit); } else { C_GatherBuilder gack(cct, onack); C_GatherBuilder gcom(cct, oncommit); - for (vector::iterator p = extents.begin(); p != extents.end(); ++p) { + for (vector::iterator p = extents.begin(); + p != extents.end(); + ++p) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC; ops[0].op.extent.truncate_size = p->offset; @@ -211,26 +217,29 @@ class Filer { ceph_file_layout *layout, const SnapContext& snapc, uint64_t offset, - uint64_t len, + uint64_t len, utime_t mtime, int flags, bool keep_first, - Context *onack, - Context *oncommit) { + Context *onack, + Context *oncommit) { vector extents; Striper::file_to_extents(cct, ino, layout, offset, len, 0, extents); if (extents.size() == 1) { - if (extents[0].offset == 0 && extents[0].length == layout->fl_object_size && - (!keep_first || extents[0].objectno != 0)) - objecter->remove(extents[0].oid, extents[0].oloc, + if (extents[0].offset == 0 && extents[0].length == layout->fl_object_size + && (!keep_first || extents[0].objectno != 0)) + objecter->remove(extents[0].oid, extents[0].oloc, snapc, mtime, flags, onack, oncommit); else - objecter->zero(extents[0].oid, extents[0].oloc, extents[0].offset, extents[0].length, - snapc, mtime, flags, onack, oncommit); + objecter->zero(extents[0].oid, extents[0].oloc, extents[0].offset, + extents[0].length, snapc, mtime, flags, onack, + oncommit); } else { C_GatherBuilder gack(cct, onack); C_GatherBuilder gcom(cct, oncommit); - for (vector::iterator p = extents.begin(); p != extents.end(); ++p) { + for (vector::iterator p = extents.begin(); + p != extents.end(); + ++p) { if (p->offset == 0 && p->length == layout->fl_object_size && (!keep_first || p->objectno != 0)) objecter->remove(p->oid, p->oloc, @@ -238,7 +247,7 @@ class Filer { onack ? gack.new_sub():0, oncommit ? gcom.new_sub():0); else - objecter->zero(p->oid, p->oloc, p->offset, p->length, + objecter->zero(p->oid, p->oloc, p->offset, p->length, snapc, mtime, flags, onack ? gack.new_sub():0, oncommit ? gcom.new_sub():0); @@ -253,17 +262,17 @@ class Filer { ceph_file_layout *layout, const SnapContext& snapc, uint64_t offset, - uint64_t len, + uint64_t len, utime_t mtime, int flags, - Context *onack, - Context *oncommit) { + Context *onack, + Context *oncommit) { return zero(ino, layout, - snapc, offset, - len, mtime, - flags, false, - onack, oncommit); + snapc, offset, + len, mtime, + flags, false, + onack, oncommit); } // purge range of ino.### objects int purge_range(inodeno_t ino, @@ -276,7 +285,7 @@ class Filer { void _do_purge_range(struct PurgeRange *pr, int fin); /* - * probe + * probe * specify direction, * and whether we stop when we find data, or hole. */ @@ -291,6 +300,4 @@ class Filer { Context *onfinish); }; - - -#endif +#endif // !CEPH_FILER_H diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 5b99517ad0692..a0e494823161c 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -7,9 +7,9 @@ * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software + * License version 2.1, as published by the Free Software * Foundation. See file COPYING. - * + * */ #include "common/perf_counters.h" @@ -23,7 +23,8 @@ #define dout_subsys ceph_subsys_journaler #undef dout_prefix -#define dout_prefix *_dout << objecter->messenger->get_myname() << ".journaler" << (readonly ? "(ro) ":"(rw) ") +#define dout_prefix *_dout << objecter->messenger->get_myname() \ + << ".journaler" << (readonly ? "(ro) ":"(rw) ") void Journaler::set_readonly() @@ -55,10 +56,11 @@ void Journaler::create(ceph_file_layout *l, stream_format_t const sf) prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos = requested_pos = received_pos = - expire_pos = trimming_pos = trimmed_pos = (uint64_t)layout.fl_stripe_count * layout.fl_object_size; + expire_pos = trimming_pos = trimmed_pos = + (uint64_t)layout.fl_stripe_count * layout.fl_object_size; - ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino << std::dec - << ", format=" << stream_format << dendl; + ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino + << std::dec << ", format=" << stream_format << dendl; } void Journaler::set_layout(ceph_file_layout const *l) @@ -86,7 +88,7 @@ void Journaler::_set_layout(ceph_file_layout const *l) /***************** HEADER *******************/ -ostream& operator<<(ostream& out, Journaler::Header &h) +ostream& operator<<(ostream& out, Journaler::Header &h) { return out << "loghead(trim " << h.trimmed_pos << ", expire " << h.expire_pos @@ -110,7 +112,8 @@ class Journaler::C_RereadHead : public Context { Context *onfinish; public: bufferlist bl; - C_RereadHead(Journaler *l, Context *onfinish_) : ls (l), onfinish(onfinish_){} + C_RereadHead(Journaler *l, Context *onfinish_) : ls (l), + onfinish(onfinish_) {} void finish(int r) { ls->_finish_reread_head(r, bl, onfinish); } @@ -152,7 +155,7 @@ void Journaler::recover(Context *onread) if (onread) waitfor_recover.push_back(onread); - + if (state != STATE_UNDEF) { ldout(cct, 1) << "recover - already recovering" << dendl; return; @@ -215,7 +218,8 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) finish->complete(-EINVAL); return; } - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = h.write_pos; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos + = h.write_pos; expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; init_headers(h); @@ -232,8 +236,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) assert(state == STATE_READHEAD); if (r!=0) { - ldout(cct, 0) << "error getting journal off disk" - << dendl; + ldout(cct, 0) << "error getting journal off disk" << dendl; list ls; ls.swap(waitfor_recover); finish_contexts(cct, ls, r); @@ -241,13 +244,14 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) } if (bl.length() == 0) { - ldout(cct, 1) << "_finish_read_head r=" << r << " read 0 bytes, assuming empty log" << dendl; + ldout(cct, 1) << "_finish_read_head r=" << r + << " read 0 bytes, assuming empty log" << dendl; state = STATE_ACTIVE; list ls; ls.swap(waitfor_recover); finish_contexts(cct, ls, 0); return; - } + } // unpack header bool corrupt = false; @@ -258,7 +262,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) if (h.magic != magic) { ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '" - << magic << "'" << dendl; + << magic << "'" << dendl; corrupt = true; } else if (h.write_pos < h.expire_pos || h.expire_pos < h.trimmed_pos) { ldout(cct, 0) << "Corrupt header (bad offsets): " << h << dendl; @@ -275,7 +279,8 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) return; } - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = h.write_pos; + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos + = h.write_pos; read_pos = requested_pos = received_pos = expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; @@ -284,7 +289,9 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) stream_format = h.stream_format; journal_stream.set_format(h.stream_format); - ldout(cct, 1) << "_finish_read_head " << h << ". probing for end of log (from " << write_pos << ")..." << dendl; + ldout(cct, 1) << "_finish_read_head " << h + << ". probing for end of log (from " << write_pos << ")..." + << dendl; C_ProbeEnd *fin = new C_ProbeEnd(this); state = STATE_PROBING; _probe(fin, &fin->end); @@ -311,12 +318,13 @@ void Journaler::_reprobe(C_OnFinisher *finish) } -void Journaler::_finish_reprobe(int r, uint64_t new_end, C_OnFinisher *onfinish) +void Journaler::_finish_reprobe(int r, uint64_t new_end, + C_OnFinisher *onfinish) { Mutex::Locker l(lock); assert(new_end >= write_pos || r < 0); - ldout(cct, 1) << "_finish_reprobe new_end = " << new_end + ldout(cct, 1) << "_finish_reprobe new_end = " << new_end << " (header had " << write_pos << ")." << dendl; prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = new_end; @@ -334,21 +342,20 @@ void Journaler::_finish_probe_end(int r, uint64_t end) } if (((int64_t)end) == -1) { end = write_pos; - ldout(cct, 1) << "_finish_probe_end write_pos = " << end - << " (header had " << write_pos << "). log was empty. recovered." - << dendl; + ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had " + << write_pos << "). log was empty. recovered." << dendl; assert(0); // hrm. } else { assert(end >= write_pos); - ldout(cct, 1) << "_finish_probe_end write_pos = " << end - << " (header had " << write_pos << "). recovered." - << dendl; + ldout(cct, 1) << "_finish_probe_end write_pos = " << end + << " (header had " << write_pos << "). recovered." + << dendl; } state = STATE_ACTIVE; prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = end; - + out: // done. list ls; @@ -393,7 +400,8 @@ public: Journaler *ls; Header h; C_OnFinisher *oncommit; - C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_), oncommit(c) {} + C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_), + oncommit(c) {} void finish(int r) { ls->_finish_write_head(r, h, oncommit); } @@ -416,7 +424,7 @@ void Journaler::_write_head(Context *oncommit) last_written.write_pos = safe_pos; last_written.stream_format = stream_format; ldout(cct, 10) << "write_head " << last_written << dendl; - + // Avoid persisting bad pointers in case of bugs assert(last_written.write_pos >= last_written.expire_pos); assert(last_written.expire_pos >= last_written.trimmed_pos); @@ -426,16 +434,17 @@ void Journaler::_write_head(Context *oncommit) bufferlist bl; ::encode(last_written, bl); SnapContext snapc; - + object_t oid = file_object_t(ino, 0); object_locator_t oloc(pg_pool); - objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0, - NULL, - wrap_finisher(new C_WriteHead(this, last_written, wrap_finisher(oncommit))), + objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0, NULL, + wrap_finisher(new C_WriteHead(this, last_written, + wrap_finisher(oncommit))), 0, 0, write_iohint); } -void Journaler::_finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit) +void Journaler::_finish_write_head(int r, Header &wrote, + C_OnFinisher *oncommit) { Mutex::Locker l(lock); @@ -500,8 +509,8 @@ void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp) ldout(cct, 10) << "_finish_flush safe from " << start << ", pending_safe " << pending_safe << ", (prezeroing/prezero)/write/flush/safe positions now " - << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos - << "/" << flush_pos << "/" << safe_pos + << "(" << prezeroing_pos << "/" << prezero_pos << ")/" + << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; // kick waiters <= safe_pos @@ -525,30 +534,34 @@ uint64_t Journaler::append_entry(bufferlist& bl) if (!cct->_conf->journaler_allow_split_entries) { // will we span a stripe boundary? int p = layout.fl_stripe_unit; - if (write_pos / p != (write_pos + (int64_t)(bl.length() + sizeof(s))) / p) { + if (write_pos / p != (write_pos + (int64_t)(bl.length() + + sizeof(s))) / p) { // yes. // move write_pos forward. int64_t owp = write_pos; write_pos += p; write_pos -= (write_pos % p); - + // pad with zeros. bufferptr bp(write_pos - owp); bp.zero(); assert(bp.length() >= 4); write_buf.push_back(bp); - + // now flush. flush(); - - ldout(cct, 12) << "append_entry skipped " << (write_pos-owp) << " bytes to " << write_pos << " to avoid spanning stripe boundary" << dendl; + + ldout(cct, 12) << "append_entry skipped " << (write_pos-owp) + << " bytes to " << write_pos + << " to avoid spanning stripe boundary" << dendl; } } - - + + // append size_t wrote = journal_stream.write(bl, &write_buf, write_pos); - ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~" << wrote << dendl; + ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~" + << wrote << dendl; write_pos += wrote; // flush previous object? @@ -558,7 +571,8 @@ uint64_t Journaler::append_entry(bufferlist& bl) uint64_t write_obj = write_pos / su; uint64_t flush_obj = flush_pos / su; if (write_obj != flush_obj) { - ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl; + ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " + << write_obj << " flo " << flush_obj << ")" << dendl; _do_flush(write_buf.length() - write_off); } @@ -588,13 +602,15 @@ void Journaler::_do_flush(unsigned amount) int64_t newlen = prezero_pos - flush_pos - period; if (newlen <= 0) { ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len - << " already too close to prezero_pos " << prezero_pos << ", zeroing first" << dendl; + << " already too close to prezero_pos " << prezero_pos + << ", zeroing first" << dendl; waiting_for_zero = true; return; } if (newlen < len) { - ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len << " but hit prezero_pos " << prezero_pos - << ", will do " << flush_pos << "~" << newlen << dendl; + ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len + << " but hit prezero_pos " << prezero_pos + << ", will do " << flush_pos << "~" << newlen << dendl; len = newlen; } else { waiting_for_zero = false; @@ -603,7 +619,7 @@ void Journaler::_do_flush(unsigned amount) waiting_for_zero = false; } ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl; - + // submit write for anything pending // flush _start_ pos to _finish_flush utime_t now = ceph_clock_now(cct); @@ -628,9 +644,11 @@ void Journaler::_do_flush(unsigned amount) flush_pos += len; assert(write_buf.length() == write_pos - flush_pos); - - ldout(cct, 10) << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at " - << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; + + ldout(cct, 10) + << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at " + << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos + << "/" << flush_pos << "/" << safe_pos << dendl; _issue_prezero(); } @@ -649,12 +667,14 @@ void Journaler::wait_for_flush(Context *onsafe) void Journaler::_wait_for_flush(Context *onsafe) { assert(!readonly); - + // all flushed and safe? if (write_pos == safe_pos) { assert(write_buf.length() == 0); - ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at " - << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; + ldout(cct, 10) + << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe " + "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" + << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; if (onsafe) { finisher->queue(onsafe, 0); } @@ -665,7 +685,7 @@ void Journaler::_wait_for_flush(Context *onsafe) if (onsafe) { waitfor_safe[write_pos].push_back(wrap_finisher(onsafe)); } -} +} void Journaler::flush(Context *onsafe) { @@ -679,8 +699,10 @@ void Journaler::_flush(C_OnFinisher *onsafe) if (write_pos == flush_pos) { assert(write_buf.length() == 0); - ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at " - << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; + ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/" + "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos + << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos + << dendl; if (onsafe) { onsafe->complete(0); } @@ -690,10 +712,11 @@ void Journaler::_flush(C_OnFinisher *onsafe) // delay! schedule an event. ldout(cct, 20) << "flush delaying flush" << dendl; if (delay_flush_event) { - timer->cancel_event(delay_flush_event); + timer->cancel_event(delay_flush_event); } delay_flush_event = new C_DelayFlush(this); - timer->add_event_after(cct->_conf->journaler_batch_interval, delay_flush_event); + timer->add_event_after(cct->_conf->journaler_batch_interval, + delay_flush_event); } else { ldout(cct, 20) << "flush not delaying flush" << dendl; _do_flush(); @@ -702,7 +725,8 @@ void Journaler::_flush(C_OnFinisher *onsafe) } // write head? - if (last_wrote_head.sec() + cct->_conf->journaler_write_head_interval < ceph_clock_now(cct).sec()) { + if (last_wrote_head.sec() + cct->_conf->journaler_write_head_interval + < ceph_clock_now(cct).sec()) { _write_head(); } } @@ -713,7 +737,8 @@ void Journaler::_flush(C_OnFinisher *onsafe) struct C_Journaler_Prezero : public Context { Journaler *journaler; uint64_t from, len; - C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l) : journaler(j), from(f), len(l) {} + C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l) + : journaler(j), from(f), len(l) {} void finish(int r) { journaler->_finish_prezero(r, from, len); } @@ -723,8 +748,8 @@ void Journaler::_issue_prezero() { assert(prezeroing_pos >= flush_pos); - // we need to zero at least two periods, minimum, to ensure that we have a full - // empty object/period in front of us. + // we need to zero at least two periods, minimum, to ensure that we + // have a full empty object/period in front of us. uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods); /* @@ -736,7 +761,8 @@ void Journaler::_issue_prezero() to -= to % period; if (prezeroing_pos >= to) { - ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos " << prezeroing_pos << dendl; + ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos " + << prezeroing_pos << dendl; return; } @@ -744,14 +770,18 @@ void Journaler::_issue_prezero() uint64_t len; if (prezeroing_pos % period == 0) { len = period; - ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~" << period << " (full period)" << dendl; + ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~" + << period << " (full period)" << dendl; } else { len = period - (prezeroing_pos % period); - ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl; + ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" + << len << " (partial period)" << dendl; } SnapContext snapc; - Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos, len)); - filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), 0, NULL, c); + Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos, + len)); + filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), + 0, NULL, c); prezeroing_pos += len; } } @@ -764,9 +794,9 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) Mutex::Locker l(lock); ldout(cct, 10) << "_prezeroed to " << start << "~" << len - << ", prezeroing/prezero was " << prezeroing_pos << "/" << prezero_pos - << ", pending " << pending_zero - << dendl; + << ", prezeroing/prezero was " << prezeroing_pos << "/" + << prezero_pos << ", pending " << pending_zero + << dendl; if (r < 0 && r != -ENOENT) { lderr(cct) << "_prezeroed got " << cpp_strerror(r) << dendl; handle_write_error(r); @@ -790,9 +820,10 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) } else { pending_zero.insert(start, len); } - ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos << "/" << prezero_pos - << ", pending " << pending_zero - << dendl; + ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos + << "/" << prezero_pos + << ", pending " << pending_zero + << dendl; } @@ -820,10 +851,11 @@ public: // Should only be called from waitfor_safe i.e. already inside lock assert(ls->lock.is_locked_by_me()); ls->_prefetch(); - } + } }; -void Journaler::_finish_read(int r, uint64_t offset, uint64_t length, bufferlist& bl) +void Journaler::_finish_read(int r, uint64_t offset, uint64_t length, + bufferlist& bl) { Mutex::Locker l(lock); @@ -831,9 +863,11 @@ void Journaler::_finish_read(int r, uint64_t offset, uint64_t length, bufferlist ldout(cct, 0) << "_finish_read got error " << r << dendl; error = r; } else { - ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length() << dendl; + ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length() + << dendl; if (bl.length() < length) { - ldout(cct, 0) << "_finish_read got less than expected (" << length << ")" << dendl; + ldout(cct, 0) << "_finish_read got less than expected (" << length << ")" + << dendl; error = -EINVAL; } } @@ -873,12 +907,14 @@ void Journaler::_assimilate_prefetch() map::iterator p = prefetch_buf.begin(); if (p->first != received_pos) { uint64_t gap = p->first - received_pos; - ldout(cct, 10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos - << " to first prefetched buffer " << p->first << dendl; + ldout(cct, 10) << "_assimilate_prefetch gap of " << gap + << " from received_pos " << received_pos + << " to first prefetched buffer " << p->first << dendl; break; } - ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl; + ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~" + << p->second.length() << dendl; received_pos += p->second.length(); read_buf.claim_append(p->second); assert(received_pos <= requested_pos); @@ -887,9 +923,10 @@ void Journaler::_assimilate_prefetch() } if (got_any) { - ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length() - << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos - << dendl; + ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~" + << read_buf.length() << ", read pointers " << read_pos + << "/" << received_pos << "/" << requested_pos + << dendl; // Update readability (this will also hit any decode errors resulting // from bad data) @@ -912,11 +949,12 @@ void Journaler::_issue_read(uint64_t len) // make sure we're fully flushed _do_flush(); - // stuck at safe_pos? - // (this is needed if we are reading the tail of a journal we are also writing to) + // stuck at safe_pos? (this is needed if we are reading the tail of + // a journal we are also writing to) assert(requested_pos <= safe_pos); if (requested_pos == safe_pos) { - ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl; + ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos + << ", waiting" << dendl; assert(write_pos > requested_pos); if (flush_pos == safe_pos) { _flush(NULL); @@ -929,14 +967,15 @@ void Journaler::_issue_read(uint64_t len) // don't read too much if (requested_pos + len > safe_pos) { len = safe_pos - requested_pos; - ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos << dendl; + ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos + << dendl; } // go. - ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len - << ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len) - << dendl; - + ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len + << ", read pointers " << read_pos << "/" << received_pos + << "/" << (requested_pos+len) << dendl; + // step by period (object). _don't_ do a single big filer.read() // here because it will wait for all object reads to complete before // giving us back any data. this way we can process whatever bits @@ -949,7 +988,8 @@ void Journaler::_issue_read(uint64_t len) if (l > len) l = len; C_Read *c = new C_Read(this, requested_pos, l); - filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, wrap_finisher(c), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, + wrap_finisher(c), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); requested_pos += l; len -= l; } @@ -982,8 +1022,9 @@ void Journaler::_prefetch() if (requested_pos < target) { uint64_t len = target - requested_pos; - ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos << " < target " << target - << " (" << raw_target << "), prefetching " << len << dendl; + ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos + << " < target " << target << " (" << raw_target + << "), prefetching " << len << dendl; _issue_read(len); } } @@ -1005,11 +1046,13 @@ bool Journaler::_is_readable() } ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length() - << ", but need " << need << " for next entry; fetch_len is " << fetch_len << dendl; + << ", but need " << need << " for next entry; fetch_len is " + << fetch_len << dendl; // partial fragment at the end? if (received_pos == write_pos) { - ldout(cct, 10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl; + ldout(cct, 10) << "is_readable() detected partial entry at tail, " + "adjusting write_pos to " << read_pos << dendl; // adjust write_pos prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos; @@ -1017,8 +1060,8 @@ bool Journaler::_is_readable() // reset read state requested_pos = received_pos = read_pos; - read_buf.clear(); - + read_buf.clear(); + // FIXME: truncate on disk? return false; @@ -1026,7 +1069,8 @@ bool Journaler::_is_readable() if (need > fetch_len) { temp_fetch_len = need; - ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len << dendl; + ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len + << dendl; } ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl; @@ -1036,7 +1080,7 @@ bool Journaler::_is_readable() /* * is_readable() - kickstart prefetch, too */ -bool Journaler::is_readable() +bool Journaler::is_readable() { Mutex::Locker l(lock); @@ -1070,12 +1114,15 @@ void Journaler::erase(Context *completion) // Async delete the journal data uint64_t first = trimmed_pos / get_layout_period(); uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2; - filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0, - wrap_finisher(new C_EraseFinish(this, wrap_finisher(completion)))); - - // We will not start the operation to delete the header until _finish_erase has - // seen the data deletion succeed: otherwise if there was an error deleting data - // we might prematurely delete the header thereby lose our reference to the data. + filer.purge_range(ino, &layout, SnapContext(), first, num, + ceph_clock_now(cct), 0, + wrap_finisher(new C_EraseFinish( + this, wrap_finisher(completion)))); + + // We will not start the operation to delete the header until + // _finish_erase has seen the data deletion succeed: otherwise if + // there was an error deleting data we might prematurely delete the + // header thereby lose our reference to the data. } void Journaler::_finish_erase(int data_result, C_OnFinisher *completion) @@ -1084,10 +1131,11 @@ void Journaler::_finish_erase(int data_result, C_OnFinisher *completion) if (data_result == 0) { // Async delete the journal header - filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0, - wrap_finisher(completion)); + filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), + 0, wrap_finisher(completion)); } else { - lderr(cct) << "Failed to delete journal " << ino << " data: " << cpp_strerror(data_result) << dendl; + lderr(cct) << "Failed to delete journal " << ino << " data: " + << cpp_strerror(data_result) << dendl; completion->complete(data_result); } } @@ -1101,7 +1149,8 @@ bool Journaler::try_read_entry(bufferlist& bl) Mutex::Locker l(lock); if (!readable) { - ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" << dendl; + ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" + << dendl; return false; } @@ -1118,8 +1167,9 @@ bool Journaler::try_read_entry(bufferlist& bl) return false; } - ldout(cct, 10) << "try_read_entry at " << read_pos << " read " - << read_pos << "~" << consumed << " (have " << read_buf.length() << ")" << dendl; + ldout(cct, 10) << "try_read_entry at " << read_pos << " read " + << read_pos << "~" << consumed << " (have " + << read_buf.length() << ")" << dendl; read_pos += consumed; try { @@ -1146,7 +1196,8 @@ void Journaler::wait_for_readable(Context *onreadable) assert(on_readable == 0); if (!readable) { - ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl; + ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " + << onreadable << dendl; on_readable = wrap_finisher(onreadable); } else { // race with OSD reply @@ -1186,33 +1237,33 @@ void Journaler::_trim() << ", can trim to " << trim_to << dendl; if (trim_to == 0 || trim_to == trimming_pos) { - ldout(cct, 10) << "trim already trimmed/trimming to " - << trimmed_pos << "/" << trimming_pos << dendl; + ldout(cct, 10) << "trim already trimmed/trimming to " + << trimmed_pos << "/" << trimming_pos << dendl; return; } if (trimming_pos > trimmed_pos) { - ldout(cct, 10) << "trim already trimming atm, try again later. trimmed/trimming is " - << trimmed_pos << "/" << trimming_pos << dendl; + ldout(cct, 10) << "trim already trimming atm, try again later. " + "trimmed/trimming is " << trimmed_pos << "/" << trimming_pos << dendl; return; } - + // trim assert(trim_to <= write_pos); assert(trim_to <= expire_pos); assert(trim_to > trimming_pos); - ldout(cct, 10) << "trim trimming to " << trim_to - << ", trimmed/trimming/expire are " - << trimmed_pos << "/" << trimming_pos << "/" << expire_pos - << dendl; + ldout(cct, 10) << "trim trimming to " << trim_to + << ", trimmed/trimming/expire are " + << trimmed_pos << "/" << trimming_pos << "/" << expire_pos + << dendl; // delete range of objects uint64_t first = trimming_pos / period; uint64_t num = (trim_to - trimming_pos) / period; SnapContext snapc; - filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0, + filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0, wrap_finisher(new C_Trim(this, trim_to))); - trimming_pos = trim_to; + trimming_pos = trim_to; } void Journaler::_finish_trim(int r, uint64_t to) @@ -1231,7 +1282,7 @@ void Journaler::_finish_trim(int r, uint64_t to) } assert(r >= 0 || r == -ENOENT); - + assert(to <= trimming_pos); assert(to > trimmed_pos); trimmed_pos = to; @@ -1247,9 +1298,11 @@ void Journaler::handle_write_error(int r) on_write_error = NULL; called_write_error = true; } else if (called_write_error) { - /* We don't call error handler more than once, subsequent errors are dropped -- - * this is okay as long as the error handler does something dramatic like respawn */ - lderr(cct) << __func__ << ": multiple write errors, handler already called" << dendl; + /* We don't call error handler more than once, subsequent errors + * are dropped -- this is okay as long as the error handler does + * something dramatic like respawn */ + lderr(cct) << __func__ << ": multiple write errors, handler already called" + << dendl; } else { assert(0 == "unhandled write error"); } @@ -1282,7 +1335,7 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const if (format >= JOURNAL_FORMAT_RESILIENT) { ::decode(entry_sentinel, p); if (entry_sentinel != sentinel) { - throw buffer::malformed_input("Invalid sentinel"); + throw buffer::malformed_input("Invalid sentinel"); } } @@ -1320,7 +1373,8 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const * that this is not equal to the length of `entry`, which contains * the inner serialized LogEvent and not the envelope. */ -size_t JournalStream::read(bufferlist &from, bufferlist *entry, uint64_t *start_ptr) +size_t JournalStream::read(bufferlist &from, bufferlist *entry, + uint64_t *start_ptr) { assert(start_ptr != NULL); assert(entry != NULL); @@ -1359,7 +1413,8 @@ size_t JournalStream::read(bufferlist &from, bufferlist *entry, uint64_t *start_ /** * Append one entry */ -size_t JournalStream::write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr) +size_t JournalStream::write(bufferlist &entry, bufferlist *to, + uint64_t const &start_ptr) { assert(to != NULL); diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 9db19c1b23962..af6738225fd5b 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -7,43 +7,50 @@ * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software + * License version 2.1, as published by the Free Software * Foundation. See file COPYING. - * + * */ /* Journaler * - * This class stripes a serial log over objects on the store. Four logical pointers: + * This class stripes a serial log over objects on the store. Four + * logical pointers: * * write_pos - where we're writing new entries - * unused_field - where we're reading old entires - * expire_pos - what is deemed "old" by user - * trimmed_pos - where we're expiring old items + * unused_field - where we're reading old entires + * expire_pos - what is deemed "old" by user + * trimmed_pos - where we're expiring old items * * trimmed_pos <= expire_pos <= unused_field <= write_pos. * - * Often, unused_field <= write_pos (as with MDS log). During recovery, write_pos is undefined - * until the end of the log is discovered. + * Often, unused_field <= write_pos (as with MDS log). During + * recovery, write_pos is undefined until the end of the log is + * discovered. * - * A "head" struct at the beginning of the log is used to store metadata at - * regular intervals. The basic invariants include: + * A "head" struct at the beginning of the log is used to store + * metadata at regular intervals. The basic invariants include: * - * head.unused_field <= unused_field -- the head may "lag", since it's updated lazily. + * head.unused_field <= unused_field -- the head may "lag", since + * it's updated lazily. * head.write_pos <= write_pos * head.expire_pos <= expire_pos * head.trimmed_pos <= trimmed_pos * * More significantly, * - * head.expire_pos >= trimmed_pos -- this ensures we can find the "beginning" of the log - * as last recorded, before it is trimmed. trimming will - * block until a sufficiently current expire_pos is committed. + * head.expire_pos >= trimmed_pos -- this ensures we can find the + * "beginning" of the log as last + * recorded, before it is trimmed. + * trimming will block until a + * sufficiently current expire_pos + * is committed. * - * To recover log state, we simply start at the last write_pos in the head, and probe the - * object sequence sizes until we read the end. + * To recover log state, we simply start at the last write_pos in the + * head, and probe the object sequence sizes until we read the end. * - * Head struct is stored in the first object. Actual journal starts after layout.period() bytes. + * Head struct is stored in the first object. Actual journal starts + * after layout.period() bytes. * */ @@ -78,16 +85,18 @@ enum StreamFormat { // Legacy envelope is leading uint32_t size #define JOURNAL_ENVELOPE_LEGACY (sizeof(uint32_t)) -// Resilient envelope is leading uint64_t sentinel, uint32_t size, trailing uint64_t start_ptr -#define JOURNAL_ENVELOPE_RESILIENT (sizeof(uint32_t) + sizeof(uint64_t) + sizeof(uint64_t)) +// Resilient envelope is leading uint64_t sentinel, uint32_t size, +// trailing uint64_t start_ptr +#define JOURNAL_ENVELOPE_RESILIENT (sizeof(uint32_t) + sizeof(uint64_t) + \ + sizeof(uint64_t)) /** * Represents a collection of entries serialized in a byte stream. * * Each entry consists of: * - a blob (used by the next level up as a serialized LogEvent) - * - a uint64_t (used by the next level up as a pointer to the start of the entry - * in the collection bytestream) + * - a uint64_t (used by the next level up as a pointer to the start + * of the entry in the collection bytestream) */ class JournalStream { @@ -118,12 +127,14 @@ public: uint64_t unused_field; uint64_t write_pos; string magic; - ceph_file_layout layout; //< The mapping from byte stream offsets to RADOS objects - stream_format_t stream_format; //< The encoding of LogEvents within the journal byte stream + ceph_file_layout layout; //< The mapping from byte stream offsets + // to RADOS objects + stream_format_t stream_format; //< The encoding of LogEvents + // within the journal byte stream Header(const char *m="") : - trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), magic(m), stream_format(-1) { - memset(&layout, 0, sizeof(layout)); + trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), magic(m), + stream_format(-1) {memset(&layout, 0, sizeof(layout)); } void encode(bufferlist &bl) const { @@ -146,9 +157,9 @@ public: ::decode(write_pos, bl); ::decode(layout, bl); if (struct_v > 1) { - ::decode(stream_format, bl); + ::decode(stream_format, bl); } else { - stream_format = JOURNAL_FORMAT_LEGACY; + stream_format = JOURNAL_FORMAT_LEGACY; } DECODE_FINISH(bl); } @@ -288,16 +299,21 @@ private: // writer uint64_t prezeroing_pos; - uint64_t prezero_pos; // we zero journal space ahead of write_pos to avoid problems with tail probing - uint64_t write_pos; // logical write position, where next entry will go - uint64_t flush_pos; // where we will flush. if write_pos>flush_pos, we're buffering writes. - uint64_t safe_pos; // what has been committed safely to disk. - bufferlist write_buf; // write buffer. flush_pos + write_buf.length() == write_pos. + uint64_t prezero_pos; ///< we zero journal space ahead of write_pos to + // avoid problems with tail probing + uint64_t write_pos; ///< logical write position, where next entry + // will go + uint64_t flush_pos; ///< where we will flush. if + /// write_pos>flush_pos, we're buffering writes. + uint64_t safe_pos; ///< what has been committed safely to disk. + bufferlist write_buf; ///< write buffer. flush_pos + + /// write_buf.length() == write_pos. bool waiting_for_zero; interval_set pending_zero; // non-contig bits we've zeroed std::set pending_safe; - std::map > waitfor_safe; // when safe through given offset + // when safe through given offset + std::map > waitfor_safe; void _flush(C_OnFinisher *onsafe); void _do_flush(unsigned amount=0); @@ -309,7 +325,8 @@ private: uint64_t read_pos; // logical read position, where next entry starts. uint64_t requested_pos; // what we've requested from OSD. uint64_t received_pos; // what we've received from OSD. - bufferlist read_buf; // read buffer. unused_field + read_buf.length() == prefetch_pos. + // read buffer. unused_field + read_buf.length() == prefetch_pos. + bufferlist read_buf; map prefetch_buf; @@ -317,15 +334,16 @@ private: uint64_t temp_fetch_len; // for wait_for_readable() - C_OnFinisher *on_readable; - C_OnFinisher *on_write_error; - bool called_write_error; + C_OnFinisher *on_readable; + C_OnFinisher *on_write_error; + bool called_write_error; - void _finish_read(int r, uint64_t offset, uint64_t length, bufferlist &bl); // read completion callback + // read completion callback + void _finish_read(int r, uint64_t offset, uint64_t length, bufferlist &bl); void _finish_retry_read(int r); void _assimilate_prefetch(); - void _issue_read(uint64_t len); // read some more - void _prefetch(); // maybe read ahead + void _issue_read(uint64_t len); // read some more + void _prefetch(); // maybe read ahead class C_Read; friend class C_Read; class C_RetryRead; @@ -349,8 +367,8 @@ private: // only init_headers when following or first reading off-disk void init_headers(Header& h) { assert(readonly || - state == STATE_READHEAD || - state == STATE_REREADHEAD); + state == STATE_READHEAD || + state == STATE_REREADHEAD); last_written = last_committed = h; } @@ -371,10 +389,12 @@ private: C_OnFinisher *wrap_finisher(Context *c); - uint32_t write_iohint; //the fadvise flags for write op, see CEPH_OSD_OP_FADIVSE_* + uint32_t write_iohint; // the fadvise flags for write op, see + // CEPH_OSD_OP_FADIVSE_* public: - Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim, Finisher *f) : + Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, + PerfCounters *l, int lkey, SafeTimer *tim, Finisher *f) : last_committed(mag), cct(obj->cct), lock("Journaler"), finisher(f), last_written(mag), @@ -396,9 +416,10 @@ public: } /* reset - * NOTE: we assume the caller knows/has ensured that any objects - * in our sequence do not exist.. e.g. after a MKFS. this is _not_ - * an "erase" method. + * + * NOTE: we assume the caller knows/has ensured that any objects in + * our sequence do not exist.. e.g. after a MKFS. this is _not_ an + * "erase" method. */ void reset() { Mutex::Locker l(lock); @@ -441,13 +462,14 @@ public: void set_layout(ceph_file_layout const *l); void set_readonly(); void set_writeable(); - void set_write_pos(int64_t p) { + void set_write_pos(int64_t p) { Mutex::Locker l(lock); prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p; } - void set_read_pos(int64_t p) { + void set_read_pos(int64_t p) { Mutex::Locker l(lock); - assert(requested_pos == received_pos); // we can't cope w/ in-progress read right now. + // we can't cope w/ in-progress read right now. + assert(requested_pos == received_pos); read_pos = requested_pos = received_pos = p; read_buf.clear(); } @@ -486,7 +508,9 @@ public: // Synchronous getters // =================== // TODO: need some locks on reads for true safety - uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; } + uint64_t get_layout_period() const { + return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; + } ceph_file_layout& get_layout() { return layout; } bool is_active() { return state == STATE_ACTIVE; } int get_error() { return error; } diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 060b7326d24bc..40b7f3980e146 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include @@ -11,7 +11,8 @@ #include "include/assert.h" -#define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on while holding the lock +#define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on + /// while holding the lock /*** ObjectCacher::BufferHead ***/ @@ -24,11 +25,12 @@ -ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, loff_t off) +ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, + loff_t off) { assert(oc->lock.is_locked()); ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl; - + // split off right ObjectCacher::BufferHead *right = new BufferHead(this); @@ -45,15 +47,15 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, loff_t o loff_t newleftlen = off - left->start(); right->set_start(off); right->set_length(left->length() - newleftlen); - + // shorten left oc->bh_stat_sub(left); left->set_length(newleftlen); oc->bh_stat_add(left); - + // add right oc->bh_add(this, right); - + // split buffers too bufferlist bl; bl.claim(left->bl); @@ -65,13 +67,15 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, loff_t o // move read waiters if (!left->waitfor_read.empty()) { - map >::iterator start_remove = left->waitfor_read.begin(); + map >::iterator start_remove + = left->waitfor_read.begin(); while (start_remove != left->waitfor_read.end() && start_remove->first < right->start()) ++start_remove; for (map >::iterator p = start_remove; p != left->waitfor_read.end(); ++p) { - ldout(oc->cct, 20) << "split moving waiters at byte " << p->first << " to right bh" << dendl; + ldout(oc->cct, 20) << "split moving waiters at byte " << p->first + << " to right bh" << dendl; right->waitfor_read[p->first].swap( p->second ); assert(p->second.empty()); } @@ -116,10 +120,10 @@ void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right) // waiters for (map >::iterator p = right->waitfor_read.begin(); p != right->waitfor_read.end(); - ++p) - left->waitfor_read[p->first].splice( left->waitfor_read[p->first].begin(), - p->second ); - + ++p) + left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(), + p->second ); + // hose right delete right; @@ -141,8 +145,8 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh) if (p != data.begin()) { --p; if (p->second->end() == bh->start() && - p->second->get_state() == bh->get_state() && - p->second->can_merge_journal(bh)) { + p->second->get_state() == bh->get_state() && + p->second->can_merge_journal(bh)) { merge_left(p->second, bh); bh = p->second; } else { @@ -208,23 +212,23 @@ bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len) * - create missing buffer_heads as necessary. */ int ObjectCacher::Object::map_read(OSDRead *rd, - map& hits, - map& missing, - map& rx, + map& hits, + map& missing, + map& rx, map& errors) { assert(oc->lock.is_locked()); for (vector::iterator ex_it = rd->extents.begin(); ex_it != rd->extents.end(); ++ex_it) { - + if (ex_it->oid != oid.oid) continue; - ldout(oc->cct, 10) << "map_read " << ex_it->oid + ldout(oc->cct, 10) << "map_read " << ex_it->oid << " " << ex_it->offset << "~" << ex_it->length << dendl; - + loff_t cur = ex_it->offset; loff_t left = ex_it->length; @@ -232,58 +236,60 @@ int ObjectCacher::Object::map_read(OSDRead *rd, while (left > 0) { // at end? if (p == data.end()) { - // rest is a miss. - BufferHead *n = new BufferHead(this); - n->set_start(cur); - n->set_length(left); - oc->bh_add(this, n); + // rest is a miss. + BufferHead *n = new BufferHead(this); + n->set_start(cur); + n->set_length(left); + oc->bh_add(this, n); if (complete) { oc->mark_zero(n); hits[cur] = n; - ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl; + ldout(oc->cct, 20) << "map_read miss+complete+zero " << left + << " left, " << *n << dendl; } else { missing[cur] = n; - ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl; + ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n + << dendl; } - cur += left; - assert(cur == (loff_t)ex_it->offset + (loff_t)ex_it->length); - break; // no more. + cur += left; + assert(cur == (loff_t)ex_it->offset + (loff_t)ex_it->length); + break; // no more. } - + if (p->first <= cur) { - // have it (or part of it) - BufferHead *e = p->second; + // have it (or part of it) + BufferHead *e = p->second; - if (e->is_clean() || - e->is_dirty() || - e->is_tx() || + if (e->is_clean() || + e->is_dirty() || + e->is_tx() || e->is_zero()) { - hits[cur] = e; // readable! - ldout(oc->cct, 20) << "map_read hit " << *e << dendl; - } else if (e->is_rx()) { - rx[cur] = e; // missing, not readable. - ldout(oc->cct, 20) << "map_read rx " << *e << dendl; - } else if (e->is_error()) { + hits[cur] = e; // readable! + ldout(oc->cct, 20) << "map_read hit " << *e << dendl; + } else if (e->is_rx()) { + rx[cur] = e; // missing, not readable. + ldout(oc->cct, 20) << "map_read rx " << *e << dendl; + } else if (e->is_error()) { errors[cur] = e; ldout(oc->cct, 20) << "map_read error " << *e << dendl; } else { assert(0); } - - loff_t lenfromcur = MIN(e->end() - cur, left); - cur += lenfromcur; - left -= lenfromcur; - ++p; - continue; // more? - + + loff_t lenfromcur = MIN(e->end() - cur, left); + cur += lenfromcur; + left -= lenfromcur; + ++p; + continue; // more? + } else if (p->first > cur) { - // gap.. miss - loff_t next = p->first; - BufferHead *n = new BufferHead(this); + // gap.. miss + loff_t next = p->first; + BufferHead *n = new BufferHead(this); loff_t len = MIN(next - cur, left); - n->set_start(cur); + n->set_start(cur); n->set_length(len); - oc->bh_add(this,n); + oc->bh_add(this,n); if (complete) { oc->mark_zero(n); hits[cur] = n; @@ -292,11 +298,11 @@ int ObjectCacher::Object::map_read(OSDRead *rd, missing[cur] = n; ldout(oc->cct, 20) << "map_read gap " << *n << dendl; } - cur += MIN(left, n->length()); - left -= MIN(left, n->length()); - continue; // more? + cur += MIN(left, n->length()); + left -= MIN(left, n->length()); + continue; // more? } else { - assert(0); + assert(0); } } } @@ -340,7 +346,8 @@ void ObjectCacher::Object::audit_buffers() * map a range of extents on an object's buffer cache. * - combine any bh's we're writing into one * - break up bufferheads that don't fall completely within the range - * //no! - return a bh that includes the write. may also include other dirty data to left and/or right. + * //no! - return a bh that includes the write. may also include + * other dirty data to left and/or right. */ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) { @@ -353,8 +360,8 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) if (ex_it->oid != oid.oid) continue; - ldout(oc->cct, 10) << "map_write oex " << ex_it->oid - << " " << ex_it->offset << "~" << ex_it->length << dendl; + ldout(oc->cct, 10) << "map_write oex " << ex_it->oid << " " + << ex_it->offset << "~" << ex_it->length << dendl; loff_t cur = ex_it->offset; loff_t left = ex_it->length; @@ -365,90 +372,93 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) // at end ? if (p == data.end()) { - if (final == NULL) { - final = new BufferHead(this); - final->set_start( cur ); - final->set_length( max ); - oc->bh_add(this, final); - ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl; - } else { - replace_journal_tid(final, wr->journal_tid); + if (final == NULL) { + final = new BufferHead(this); + final->set_start( cur ); + final->set_length( max ); + oc->bh_add(this, final); + ldout(oc->cct, 10) << "map_write adding trailing bh " << *final + << dendl; + } else { + replace_journal_tid(final, wr->journal_tid); oc->bh_stat_sub(final); - final->set_length(final->length() + max); + final->set_length(final->length() + max); oc->bh_stat_add(final); - } - left -= max; - cur += max; - continue; + } + left -= max; + cur += max; + continue; } - ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl; + ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second + << dendl; //oc->verify_stats(); if (p->first <= cur) { - BufferHead *bh = p->second; - ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl; - - if (p->first < cur) { - assert(final == 0); - if (cur + max >= bh->end()) { - // we want right bit (one splice) - final = split(bh, cur); // just split it, take right half. - ++p; - assert(p->second == final); - } else { - // we want middle bit (two splices) - final = split(bh, cur); - ++p; - assert(p->second == final); - split(final, cur+max); - } - } else { + BufferHead *bh = p->second; + ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" + << dendl; + + if (p->first < cur) { + assert(final == 0); + if (cur + max >= bh->end()) { + // we want right bit (one splice) + final = split(bh, cur); // just split it, take right half. + ++p; + assert(p->second == final); + } else { + // we want middle bit (two splices) + final = split(bh, cur); + ++p; + assert(p->second == final); + split(final, cur+max); + } + } else { assert(p->first == cur); - if (bh->length() <= max) { - // whole bufferhead, piece of cake. - } else { - // we want left bit (one splice) - split(bh, cur + max); // just split - } - if (final) { + if (bh->length() <= max) { + // whole bufferhead, piece of cake. + } else { + // we want left bit (one splice) + split(bh, cur + max); // just split + } + if (final) { oc->mark_dirty(bh); oc->mark_dirty(final); --p; // move iterator back to final assert(p->second == final); - replace_journal_tid(bh, 0); - merge_left(final, bh); + replace_journal_tid(bh, 0); + merge_left(final, bh); } else { - final = bh; + final = bh; } - } - - // keep going. - loff_t lenfromcur = final->end() - cur; - cur += lenfromcur; - left -= lenfromcur; - ++p; - continue; + } + + // keep going. + loff_t lenfromcur = final->end() - cur; + cur += lenfromcur; + left -= lenfromcur; + ++p; + continue; } else { - // gap! - loff_t next = p->first; - loff_t glen = MIN(next - cur, max); - ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl; - if (final) { - replace_journal_tid(final, wr->journal_tid); + // gap! + loff_t next = p->first; + loff_t glen = MIN(next - cur, max); + ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl; + if (final) { + replace_journal_tid(final, wr->journal_tid); oc->bh_stat_sub(final); - final->set_length(final->length() + glen); + final->set_length(final->length() + glen); oc->bh_stat_add(final); - } else { - final = new BufferHead(this); - final->set_start( cur ); - final->set_length( glen ); - oc->bh_add(this, final); - } - - cur += glen; - left -= glen; - continue; // more? + } else { + final = new BufferHead(this); + final->set_start( cur ); + final->set_length( glen ); + oc->bh_add(this, final); + } + + cur += glen; + left -= glen; + continue; // more? } } } @@ -461,14 +471,15 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) return final; } -void ObjectCacher::Object::replace_journal_tid(BufferHead *bh, ceph_tid_t tid) { +void ObjectCacher::Object::replace_journal_tid(BufferHead *bh, + ceph_tid_t tid) { ceph_tid_t bh_tid = bh->get_journal_tid(); assert(tid == 0 || bh_tid <= tid); if (bh_tid != 0 && bh_tid != tid) { // inform journal that it should not expect a writeback from this extent - oc->writeback_handler.overwrite_extent(get_oid(), bh->start(), bh->length(), - bh_tid); + oc->writeback_handler.overwrite_extent(get_oid(), bh->start(), + bh->length(), bh_tid); } bh->set_journal_tid(tid); } @@ -480,7 +491,7 @@ void ObjectCacher::Object::truncate(loff_t s) while (!data.empty()) { BufferHead *bh = data.rbegin()->second; - if (bh->end() <= s) + if (bh->end() <= s) break; // split bh at truncation point? @@ -501,7 +512,8 @@ void ObjectCacher::Object::truncate(loff_t s) void ObjectCacher::Object::discard(loff_t off, loff_t len) { assert(oc->lock.is_locked()); - ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len << dendl; + ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len + << dendl; if (!exists) { ldout(oc->cct, 10) << " setting exists on " << *this << dendl; @@ -547,22 +559,23 @@ void ObjectCacher::Object::discard(loff_t off, loff_t len) #define dout_prefix *_dout << "objectcacher " -ObjectCacher::ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, Mutex& l, +ObjectCacher::ObjectCacher(CephContext *cct_, string name, + WritebackHandler& wb, Mutex& l, flush_set_callback_t flush_callback, - void *flush_callback_arg, - uint64_t max_bytes, uint64_t max_objects, - uint64_t max_dirty, uint64_t target_dirty, - double max_dirty_age, bool block_writes_upfront) + void *flush_callback_arg, uint64_t max_bytes, + uint64_t max_objects, uint64_t max_dirty, + uint64_t target_dirty, double max_dirty_age, + bool block_writes_upfront) : perfcounter(NULL), cct(cct_), writeback_handler(wb), name(name), lock(l), max_dirty(max_dirty), target_dirty(target_dirty), max_size(max_bytes), max_objects(max_objects), block_writes_upfront(block_writes_upfront), - flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg), - last_read_tid(0), - flusher_stop(false), flusher_thread(this), finisher(cct), - stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0), - stat_error(0), stat_dirty_waiting(0), reads_outstanding(0) + flush_set_callback(flush_callback), + flush_set_callback_arg(flush_callback_arg), + last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct), + stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0), + stat_missing(0), stat_error(0), stat_dirty_waiting(0), reads_outstanding(0) { this->max_dirty_age.set_from_double(max_dirty_age); perf_start(); @@ -574,9 +587,10 @@ ObjectCacher::~ObjectCacher() finisher.stop(); perf_stop(); // we should be empty. - for (vector >::iterator i = objects.begin(); - i != objects.end(); - ++i) + for (vector >::iterator i + = objects.begin(); + i != objects.end(); + ++i) assert(i->empty()); assert(bh_lru_rest.lru_get_size() == 0); assert(bh_lru_dirty.lru_get_size() == 0); @@ -589,18 +603,30 @@ void ObjectCacher::perf_start() string n = "objectcacher-" + name; PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last); - plb.add_u64_counter(l_objectcacher_cache_ops_hit, "cache_ops_hit", "Hit operations"); - plb.add_u64_counter(l_objectcacher_cache_ops_miss, "cache_ops_miss", "Miss operations"); - plb.add_u64_counter(l_objectcacher_cache_bytes_hit, "cache_bytes_hit", "Hit data"); - plb.add_u64_counter(l_objectcacher_cache_bytes_miss, "cache_bytes_miss", "Miss data"); - plb.add_u64_counter(l_objectcacher_data_read, "data_read", "Read data"); - plb.add_u64_counter(l_objectcacher_data_written, "data_written", "Data written to cache"); - plb.add_u64_counter(l_objectcacher_data_flushed, "data_flushed", "Data flushed"); + plb.add_u64_counter(l_objectcacher_cache_ops_hit, + "cache_ops_hit", "Hit operations"); + plb.add_u64_counter(l_objectcacher_cache_ops_miss, + "cache_ops_miss", "Miss operations"); + plb.add_u64_counter(l_objectcacher_cache_bytes_hit, + "cache_bytes_hit", "Hit data"); + plb.add_u64_counter(l_objectcacher_cache_bytes_miss, + "cache_bytes_miss", "Miss data"); + plb.add_u64_counter(l_objectcacher_data_read, + "data_read", "Read data"); + plb.add_u64_counter(l_objectcacher_data_written, + "data_written", "Data written to cache"); + plb.add_u64_counter(l_objectcacher_data_flushed, + "data_flushed", "Data flushed"); plb.add_u64_counter(l_objectcacher_overwritten_in_flush, - "data_overwritten_while_flushing", "Data overwritten while flushing"); - plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked", "Write operations, delayed due to dirty limits"); - plb.add_u64_counter(l_objectcacher_write_bytes_blocked, "write_bytes_blocked", "Write data blocked on dirty limit"); - plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked", "Time spent blocking a write due to dirty limits"); + "data_overwritten_while_flushing", + "Data overwritten while flushing"); + plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked", + "Write operations, delayed due to dirty limits"); + plb.add_u64_counter(l_objectcacher_write_bytes_blocked, + "write_bytes_blocked", + "Write data blocked on dirty limit"); + plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked", + "Time spent blocking a write due to dirty limits"); perfcounter = plb.create_perf_counters(); cct->get_perfcounters_collection()->add(perfcounter); @@ -638,18 +664,18 @@ ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid, // create it. Object *o = new Object(this, oid, object_no, oset, l, truncate_size, - truncate_seq); + truncate_seq); objects[l.pool][oid] = o; ob_lru.lru_insert_top(o); return o; } -void ObjectCacher::close_object(Object *ob) +void ObjectCacher::close_object(Object *ob) { assert(lock.is_locked()); ldout(cct, 10) << "close_object " << *ob << dendl; assert(ob->can_close()); - + // ok! ob_lru.lru_remove(ob); objects[ob->oloc.pool].erase(ob->get_soid()); @@ -657,9 +683,6 @@ void ObjectCacher::close_object(Object *ob) delete ob; } - - - void ObjectCacher::bh_read(BufferHead *bh, int op_flags) { assert(lock.is_locked()); @@ -682,13 +705,13 @@ void ObjectCacher::bh_read(BufferHead *bh, int op_flags) ++reads_outstanding; } -void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, - loff_t start, uint64_t length, - bufferlist &bl, int r, +void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, + ceph_tid_t tid, loff_t start, + uint64_t length, bufferlist &bl, int r, bool trust_enoent) { assert(lock.is_locked()); - ldout(cct, 7) << "bh_read_finish " + ldout(cct, 7) << "bh_read_finish " << oid << " tid " << tid << " " << start << "~" << length @@ -700,8 +723,9 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, if (r >= 0 && bl.length() < length) { bufferptr bp(length - bl.length()); bp.zero(); - ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~" << length - << " with " << bp.length() << " bytes of zeroes" << dendl; + ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~" + << length << " with " << bp.length() << " bytes of zeroes" + << dendl; bl.push_back(bp); } @@ -712,17 +736,20 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, ldout(cct, 7) << "bh_read_finish no object cache" << dendl; } else { Object *ob = objects[poolid][oid]; - + if (r == -ENOENT && !ob->complete) { - // wake up *all* rx waiters, or else we risk reordering identical reads. e.g. + // wake up *all* rx waiters, or else we risk reordering + // identical reads. e.g. // read 1~1 // reply to unrelated 3~1 -> !exists // read 1~1 -> immediate ENOENT // reply to first 1~1 -> ooo ENOENT bool allzero = true; - for (map::iterator p = ob->data.begin(); p != ob->data.end(); ++p) { + for (map::iterator p = ob->data.begin(); + p != ob->data.end(); ++p) { BufferHead *bh = p->second; - for (map >::iterator p = bh->waitfor_read.begin(); + for (map >::iterator p + = bh->waitfor_read.begin(); p != bh->waitfor_read.end(); ++p) ls.splice(ls.end(), p->second); @@ -734,7 +761,9 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, // just pass through and retry all waiters if we don't trust // -ENOENT for this read if (trust_enoent) { - ldout(cct, 7) << "bh_read_finish ENOENT, marking complete and !exists on " << *ob << dendl; + ldout(cct, 7) + << "bh_read_finish ENOENT, marking complete and !exists on " << *ob + << dendl; ob->complete = true; ob->exists = false; @@ -751,8 +780,9 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, * returning -ENOENT immediately. */ if (allzero) { - ldout(cct, 10) << "bh_read_finish ENOENT and allzero, getting rid of " - << "bhs for " << *ob << dendl; + ldout(cct, 10) + << "bh_read_finish ENOENT and allzero, getting rid of " + << "bhs for " << *ob << dendl; map::iterator p = ob->data.begin(); while (p != ob->data.end()) { BufferHead *bh = p->second; @@ -782,29 +812,31 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, ldout(cct, 20) << "checking bh " << *bh << dendl; // finishers? - for (map >::iterator it = bh->waitfor_read.begin(); - it != bh->waitfor_read.end(); - ++it) + for (map >::iterator it + = bh->waitfor_read.begin(); + it != bh->waitfor_read.end(); + ++it) ls.splice(ls.end(), it->second); bh->waitfor_read.clear(); if (bh->start() > opos) { - ldout(cct, 1) << "bh_read_finish skipping gap " + ldout(cct, 1) << "bh_read_finish skipping gap " << opos << "~" << bh->start() - opos << dendl; - opos = bh->start(); - continue; + opos = bh->start(); + continue; } if (!bh->is_rx()) { - ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl; - opos = bh->end(); - continue; + ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl; + opos = bh->end(); + continue; } if (bh->last_read_tid != tid) { - ldout(cct, 10) << "bh_read_finish bh->last_read_tid " << bh->last_read_tid - << " != tid " << tid << ", skipping" << dendl; + ldout(cct, 10) << "bh_read_finish bh->last_read_tid " + << bh->last_read_tid << " != tid " << tid + << ", skipping" << dendl; opos = bh->end(); continue; } @@ -867,14 +899,16 @@ void ObjectCacher::bh_write(BufferHead *bh) // finishers C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool, - bh->ob->get_soid(), bh->start(), bh->length()); + bh->ob->get_soid(), bh->start(), + bh->length()); // go - ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(), bh->ob->get_oloc(), - bh->start(), bh->length(), - bh->snapc, bh->bl, bh->last_write, - bh->ob->truncate_size, - bh->ob->truncate_seq, - bh->journal_tid, oncommit); + ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(), + bh->ob->get_oloc(), + bh->start(), bh->length(), + bh->snapc, bh->bl, bh->last_write, + bh->ob->truncate_size, + bh->ob->truncate_seq, + bh->journal_tid, oncommit); ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl; // set bh last_write_tid @@ -893,11 +927,8 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start, uint64_t length, ceph_tid_t tid, int r) { assert(lock.is_locked()); - ldout(cct, 7) << "bh_write_commit " - << oid - << " tid " << tid - << " " << start << "~" << length - << " returned " << r + ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid + << " " << start << "~" << length << " returned " << r << dendl; if (objects[poolid].count(oid) == 0) { @@ -905,13 +936,15 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start, } else { Object *ob = objects[poolid][oid]; int was_dirty_or_tx = ob->oset->dirty_or_tx; - + if (!ob->exists) { ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl; ob->exists = true; - if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length, ob->get_snap())) { - ldout(cct, 10) << "bh_write_commit may copy on write, clearing complete on " << *ob << dendl; + if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length, + ob->get_snap())) { + ldout(cct, 10) << "bh_write_commit may copy on write, clearing " + "complete on " << *ob << dendl; ob->complete = false; } } @@ -919,36 +952,36 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start, list hit; // apply to bh's! for (map::iterator p = ob->data_lower_bound(start); - p != ob->data.end(); - ++p) { + p != ob->data.end(); + ++p) { BufferHead *bh = p->second; - + if (bh->start() > start+(loff_t)length) break; if (bh->start() < start && - bh->end() > start+(loff_t)length) { - ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl; - continue; + bh->end() > start+(loff_t)length) { + ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl; + continue; } - + // make sure bh is tx if (!bh->is_tx()) { - ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl; - continue; + ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl; + continue; } - + // make sure bh tid matches if (bh->last_write_tid != tid) { - assert(bh->last_write_tid > tid); - ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl; - continue; + assert(bh->last_write_tid > tid); + ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl; + continue; } if (r >= 0) { // ok! mark bh clean and error-free mark_clean(bh); - bh->set_journal_tid(0); + bh->set_journal_tid(0); if (bh->get_nocache()) bh_lru_rest.lru_bottouch(bh); hit.push_back(bh); @@ -985,8 +1018,9 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start, if (flush_set_callback && was_dirty_or_tx > 0 && - oset->dirty_or_tx == 0) { // nothing dirty/tx - flush_set_callback(flush_set_callback_arg, oset); + oset->dirty_or_tx == 0) { + // nothing dirty/tx + flush_set_callback(flush_set_callback_arg, oset); } if (!ls.empty()) @@ -1000,30 +1034,32 @@ void ObjectCacher::flush(loff_t amount) utime_t cutoff = ceph_clock_now(cct); ldout(cct, 10) << "flush " << amount << dendl; - + /* - * NOTE: we aren't actually pulling things off the LRU here, just looking at the - * tail item. Then we call bh_write, which moves it to the other LRU, so that we - * can call lru_dirty.lru_get_next_expire() again. + * NOTE: we aren't actually pulling things off the LRU here, just + * looking at the tail item. Then we call bh_write, which moves it + * to the other LRU, so that we can call + * lru_dirty.lru_get_next_expire() again. */ loff_t did = 0; while (amount == 0 || did < amount) { - BufferHead *bh = static_cast(bh_lru_dirty.lru_get_next_expire()); + BufferHead *bh = static_cast( + bh_lru_dirty.lru_get_next_expire()); if (!bh) break; if (bh->last_write > cutoff) break; did += bh->length(); bh_write(bh); - } + } } void ObjectCacher::trim() { assert(lock.is_locked()); - ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean " << get_stat_clean() - << ", objects: max " << max_objects << " current " << ob_lru.lru_get_size() - << dendl; + ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean " + << get_stat_clean() << ", objects: max " << max_objects + << " current " << ob_lru.lru_get_size() << dendl; while (get_stat_clean() > 0 && (uint64_t) get_stat_clean() > max_size) { BufferHead *bh = static_cast(bh_lru_rest.lru_expire()); @@ -1051,17 +1087,18 @@ void ObjectCacher::trim() ldout(cct, 10) << "trim trimming " << *ob << dendl; close_object(ob); } - - ldout(cct, 10) << "trim finish: max " << max_size << " clean " << get_stat_clean() - << ", objects: max " << max_objects << " current " << ob_lru.lru_get_size() - << dendl; + + ldout(cct, 10) << "trim finish: max " << max_size << " clean " + << get_stat_clean() << ", objects: max " << max_objects + << " current " << ob_lru.lru_get_size() << dendl; } /* public */ -bool ObjectCacher::is_cached(ObjectSet *oset, vector& extents, snapid_t snapid) +bool ObjectCacher::is_cached(ObjectSet *oset, vector& extents, + snapid_t snapid) { assert(lock.is_locked()); for (vector::iterator ex_it = extents.begin(); @@ -1082,7 +1119,8 @@ bool ObjectCacher::is_cached(ObjectSet *oset, vector& extents, sna /* - * returns # bytes read (if in cache). onfinish is untouched (caller must delete it) + * returns # bytes read (if in cache). onfinish is untouched (caller + * must delete it) * returns 0 if doing async read */ int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish) @@ -1119,7 +1157,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, // get Object cache sobject_t soid(ex_it->oid, rd->snap); - Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc, ex_it->truncate_size, oset->truncate_seq); + Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc, + ex_it->truncate_size, oset->truncate_seq); if (external_call) touch_ob(o); @@ -1128,7 +1167,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl; // should we worry about COW underneaeth us? - if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset, ex_it->length, soid.snap)) { + if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset, + ex_it->length, soid.snap)) { ldout(cct, 20) << "readx may copy on write" << dendl; bool wait = false; for (map::iterator bh_it = o->data.begin(); @@ -1143,8 +1183,10 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, } } if (wait) { - ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid << " on " << *o << dendl; - o->waitfor_commit[o->last_write_tid].push_back(new C_RetryRead(this, rd, oset, onfinish)); + ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid + << " on " << *o << dendl; + o->waitfor_commit[o->last_write_tid].push_back( + new C_RetryRead(this,rd, oset, onfinish)); // FIXME: perfcounter! return 0; } @@ -1162,7 +1204,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, } } if (allzero) { - ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT" << dendl; + ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT" + << dendl; delete rd; if (dontneed) bottouch_ob(o); @@ -1187,11 +1230,11 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, // read missing map::iterator last = missing.end(); for (map::iterator bh_it = missing.begin(); - bh_it != missing.end(); - ++bh_it) { + bh_it != missing.end(); + ++bh_it) { uint64_t rx_bytes = static_cast( stat_rx + bh_it->second->length()); - bytes_not_in_cache += bh_it->second->length(); + bytes_not_in_cache += bh_it->second->length(); if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) { // cache is full with concurrent reads -- wait for rx's to complete // to constrain memory growth (especially during copy-ups) @@ -1218,40 +1261,43 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, if (last != missing.end()) { ldout(cct, 10) << "readx missed, waiting on " << *last->second << " off " << last->first << dendl; - last->second->waitfor_read[last->first].push_back( new C_RetryRead(this, rd, oset, onfinish) ); + last->second->waitfor_read[last->first].push_back( + new C_RetryRead(this, rd, oset, onfinish) ); } // bump rx for (map::iterator bh_it = rx.begin(); - bh_it != rx.end(); - ++bh_it) { - touch_bh(bh_it->second); // bump in lru, so we don't lose it. - if (success && onfinish) { - ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second - << " off " << bh_it->first << dendl; - bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) ); - } - bytes_not_in_cache += bh_it->second->length(); + bh_it != rx.end(); + ++bh_it) { + touch_bh(bh_it->second); // bump in lru, so we don't lose it. + if (success && onfinish) { + ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second + << " off " << bh_it->first << dendl; + bh_it->second->waitfor_read[bh_it->first].push_back( + new C_RetryRead(this, rd, oset, onfinish) ); + } + bytes_not_in_cache += bh_it->second->length(); success = false; } for (map::iterator bh_it = hits.begin(); - bh_it != hits.end(); ++bh_it) - touch_bh(bh_it->second); //bump in lru, so we don't lose it when later read + bh_it != hits.end(); ++bh_it) + //bump in lru, so we don't lose it when later read + touch_bh(bh_it->second); } else { assert(!hits.empty()); // make a plain list for (map::iterator bh_it = hits.begin(); - bh_it != hits.end(); - ++bh_it) { + bh_it != hits.end(); + ++bh_it) { BufferHead *bh = bh_it->second; ldout(cct, 10) << "readx hit bh " << *bh << dendl; if (bh->is_error() && bh->error) error = bh->error; - bytes_in_cache += bh->length(); + bytes_in_cache += bh->length(); if (bh->get_nocache() && bh->is_clean()) bh_lru_rest.lru_bottouch(bh); @@ -1259,7 +1305,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, touch_bh(bh); //must be after touch_bh because touch_bh set dontneed false if (dontneed && - ((loff_t)ex_it->offset <= bh->start() && (bh->end() <= (loff_t)(ex_it->offset + ex_it->length)))) { + ((loff_t)ex_it->offset <= bh->start() && + (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) { bh->set_dontneed(true); //if dirty if (bh->is_clean()) bh_lru_rest.lru_bottouch(bh); @@ -1267,28 +1314,31 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, } if (!error) { - // create reverse map of buffer offset -> object for the eventual result. - // this is over a single ObjectExtent, so we know that + // create reverse map of buffer offset -> object for the + // eventual result. this is over a single ObjectExtent, so we + // know that // - the bh's are contiguous // - the buffer frags need not be (and almost certainly aren't) loff_t opos = ex_it->offset; map::iterator bh_it = hits.begin(); assert(bh_it->second->start() <= opos); uint64_t bhoff = opos - bh_it->second->start(); - vector >::iterator f_it = ex_it->buffer_extents.begin(); + vector >::iterator f_it + = ex_it->buffer_extents.begin(); uint64_t foff = 0; while (1) { BufferHead *bh = bh_it->second; assert(opos == (loff_t)(bh->start() + bhoff)); uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff); - ldout(cct, 10) << "readx rmap opos " << opos - << ": " << *bh << " +" << bhoff - << " frag " << f_it->first << "~" << f_it->second << " +" << foff << "~" << len - << dendl; - - bufferlist bit; // put substr here first, since substr_of clobbers, and - // we may get multiple bh's at this stripe_map position + ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +" + << bhoff << " frag " << f_it->first << "~" + << f_it->second << " +" << foff << "~" << len + << dendl; + + bufferlist bit; + // put substr here first, since substr_of clobbers, and we + // may get multiple bh's at this stripe_map position if (bh->is_zero()) { bufferptr bp(len); bp.zero(); @@ -1323,7 +1373,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, bottouch_ob(o); } } - + if (!success) { if (perfcounter && external_call) { perfcounter->inc(l_objectcacher_data_read, total_bytes_read); @@ -1333,7 +1383,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, if (onfinish) { ldout(cct, 20) << "readx defer " << rd << dendl; } else { - ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)" << dendl; + ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)" + << dendl; delete rd; } return 0; // wait! @@ -1355,7 +1406,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, i != stripe_map.end(); ++i) { assert(pos == i->first); - ldout(cct, 10) << "readx adding buffer len " << i->second.length() << " at " << pos << dendl; + ldout(cct, 10) << "readx adding buffer len " << i->second.length() + << " at " << pos << dendl; pos += i->second.length(); rd->bl->claim_append(i->second); assert(rd->bl->length() == pos); @@ -1400,7 +1452,7 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace) uint64_t bytes_written_in_flush = 0; bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED; bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE; - + for (vector::iterator ex_it = wr->extents.begin(); ex_it != wr->extents.end(); ++ex_it) { @@ -1413,7 +1465,7 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace) BufferHead *bh = o->map_write(wr); bool missing = bh->is_missing(); bh->snapc = wr->snapc; - + bytes_written += bh->length(); if (bh->is_tx()) { bytes_written_in_flush += bh->length(); @@ -1425,17 +1477,20 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace) // - the buffer frags need not be (and almost certainly aren't) // note: i assume striping is monotonic... no jumps backwards, ever! loff_t opos = ex_it->offset; - for (vector >::iterator f_it = ex_it->buffer_extents.begin(); - f_it != ex_it->buffer_extents.end(); - ++f_it) { - ldout(cct, 10) << "writex writing " << f_it->first << "~" << f_it->second << " into " << *bh << " at " << opos << dendl; + for (vector >::iterator f_it + = ex_it->buffer_extents.begin(); + f_it != ex_it->buffer_extents.end(); + ++f_it) { + ldout(cct, 10) << "writex writing " << f_it->first << "~" + << f_it->second << " into " << *bh << " at " << opos + << dendl; uint64_t bhoff = bh->start() - opos; assert(f_it->second <= bh->length() - bhoff); // get the frag we're mapping in - bufferlist frag; - frag.substr_of(wr->bl, - f_it->first, f_it->second); + bufferlist frag; + frag.substr_of(wr->bl, + f_it->first, f_it->second); // keep anything left of bhoff bufferlist newbl; @@ -1465,7 +1520,7 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace) perfcounter->inc(l_objectcacher_data_written, bytes_written); if (bytes_written_in_flush) { perfcounter->inc(l_objectcacher_overwritten_in_flush, - bytes_written_in_flush); + bytes_written_in_flush); } } @@ -1517,7 +1572,8 @@ void ObjectCacher::maybe_wait_for_writeback(uint64_t len) } // blocking wait for write. -int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Context *onfreespace) +int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, + Context *onfreespace) { assert(lock.is_locked()); int ret = 0; @@ -1540,7 +1596,8 @@ int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, C assert(fin); bool flushed = flush_set(oset, wr->extents, fin); assert(!flushed); // we just dirtied it, and didn't drop our lock! - ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len << " bytes" << dendl; + ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len + << " bytes" << dendl; if (block_writes_upfront) { while (!done) cond.Wait(lock); @@ -1565,7 +1622,8 @@ void ObjectCacher::flusher_entry() writeback_handler.get_client_lock(); lock.Lock(); while (!flusher_stop) { - loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() + get_stat_dirty(); + loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() + + get_stat_dirty(); ldout(cct, 11) << "flusher " << all << " / " << max_size << ": " << get_stat_tx() << " tx, " @@ -1578,11 +1636,9 @@ void ObjectCacher::flusher_entry() loff_t actual = get_stat_dirty() + get_stat_dirty_waiting(); if (actual > 0 && (uint64_t) actual > target_dirty) { // flush some dirty pages - ldout(cct, 10) << "flusher " - << get_stat_dirty() << " dirty + " << get_stat_dirty_waiting() - << " dirty_waiting > target " - << target_dirty - << ", flushing some dirty bhs" << dendl; + ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + " + << get_stat_dirty_waiting() << " dirty_waiting > target " + << target_dirty << ", flushing some dirty bhs" << dendl; flush(actual - target_dirty); } else { // check tail of lru for old dirty items @@ -1590,7 +1646,8 @@ void ObjectCacher::flusher_entry() cutoff -= max_dirty_age; BufferHead *bh = 0; int max = MAX_FLUSH_UNDER_LOCK; - while ((bh = static_cast(bh_lru_dirty.lru_get_next_expire())) != 0 && + while ((bh = static_cast(bh_lru_dirty. + lru_get_next_expire())) != 0 && bh->last_write < cutoff && --max > 0) { ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl; @@ -1599,8 +1656,8 @@ void ObjectCacher::flusher_entry() if (!max) { // back off the lock to avoid starving other threads lock.Unlock(); - writeback_handler.put_client_lock(); - writeback_handler.get_client_lock(); + writeback_handler.put_client_lock(); + writeback_handler.get_client_lock(); lock.Lock(); continue; } @@ -1654,16 +1711,16 @@ bool ObjectCacher::set_is_cached(ObjectSet *oset) assert(lock.is_locked()); if (oset->objects.empty()) return false; - + for (xlist::iterator p = oset->objects.begin(); !p.end(); ++p) { Object *ob = *p; for (map::iterator q = ob->data.begin(); - q != ob->data.end(); - ++q) { + q != ob->data.end(); + ++q) { BufferHead *bh = q->second; - if (!bh->is_dirty() && !bh->is_tx()) - return true; + if (!bh->is_dirty() && !bh->is_tx()) + return true; } } @@ -1675,20 +1732,20 @@ bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset) assert(lock.is_locked()); if (oset->objects.empty()) return false; - + for (xlist::iterator i = oset->objects.begin(); !i.end(); ++i) { Object *ob = *i; - + for (map::iterator p = ob->data.begin(); - p != ob->data.end(); - ++p) { + p != ob->data.end(); + ++p) { BufferHead *bh = p->second; - if (bh->is_dirty() || bh->is_tx()) - return true; + if (bh->is_dirty() || bh->is_tx()) + return true; } - } - + } + return false; } @@ -1704,7 +1761,7 @@ void ObjectCacher::purge(Object *ob) // flush. non-blocking. no callback. -// true if clean, already flushed. +// true if clean, already flushed. // false if we wrote something. // be sloppy about the ranges and flush any buffer it touches bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length) @@ -1712,7 +1769,9 @@ bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length) assert(lock.is_locked()); bool clean = true; ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl; - for (map::iterator p = ob->data_lower_bound(offset); p != ob->data.end(); ++p) { + for (map::iterator p = ob->data_lower_bound(offset); + p != ob->data.end(); + ++p) { BufferHead *bh = p->second; ldout(cct, 20) << "flush " << *bh << dendl; if (length && bh->start() > offset+length) { @@ -1731,7 +1790,8 @@ bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length) return clean; } -bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather, Context *onfinish) +bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather, + Context *onfinish) { assert(lock.is_locked()); if (gather->has_subs()) { @@ -1782,9 +1842,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) // we'll need to gather... ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid " - << ob->last_write_tid - << " on " << *ob - << dendl; + << ob->last_write_tid << " on " << *ob << dendl; ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub()); } @@ -1793,7 +1851,8 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) // flush. non-blocking, takes callback. // returns true if already flushed -bool ObjectCacher::flush_set(ObjectSet *oset, vector& exv, Context *onfinish) +bool ObjectCacher::flush_set(ObjectSet *oset, vector& exv, + Context *onfinish) { assert(lock.is_locked()); assert(onfinish != NULL); @@ -1818,11 +1877,12 @@ bool ObjectCacher::flush_set(ObjectSet *oset, vector& exv, Context continue; Object *ob = objects[oset->poolid][soid]; - ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid << " " << ob << dendl; + ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid + << " " << ob << dendl; if (!flush(ob, ex.offset, ex.length)) { // we'll need to gather... - ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid " + ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid " << ob->last_write_tid << " on " << *ob << dendl; ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub()); } @@ -1869,7 +1929,7 @@ loff_t ObjectCacher::release(Object *ob) BufferHead *bh = p->second; if (bh->is_clean() || bh->is_zero() || bh->is_error()) clean.push_back(bh); - else + else o_unclean += bh->length(); } @@ -1922,16 +1982,16 @@ loff_t ObjectCacher::release_set(ObjectSet *oset) loff_t o_unclean = release(ob); unclean += o_unclean; - if (o_unclean) - ldout(cct, 10) << "release_set " << oset << " " << *ob - << " has " << o_unclean << " bytes left" - << dendl; + if (o_unclean) + ldout(cct, 10) << "release_set " << oset << " " << *ob + << " has " << o_unclean << " bytes left" + << dendl; p = q; } if (unclean) { ldout(cct, 10) << "release_set " << oset - << ", " << unclean << " bytes left" << dendl; + << ", " << unclean << " bytes left" << dendl; } return unclean; @@ -1943,8 +2003,9 @@ uint64_t ObjectCacher::release_all() assert(lock.is_locked()); ldout(cct, 10) << "release_all" << dendl; uint64_t unclean = 0; - - vector >::iterator i = objects.begin(); + + vector >::iterator i + = objects.begin(); while (i != objects.end()) { ceph::unordered_map::iterator p = i->begin(); while (p != i->end()) { @@ -1957,16 +2018,17 @@ uint64_t ObjectCacher::release_all() unclean += o_unclean; if (o_unclean) - ldout(cct, 10) << "release_all " << *ob - << " has " << o_unclean << " bytes left" - << dendl; + ldout(cct, 10) << "release_all " << *ob + << " has " << o_unclean << " bytes left" + << dendl; p = n; } ++i; } if (unclean) { - ldout(cct, 10) << "release_all unclean " << unclean << " bytes left" << dendl; + ldout(cct, 10) << "release_all unclean " << unclean << " bytes left" + << dendl; } return unclean; @@ -1994,7 +2056,8 @@ void ObjectCacher::clear_nonexistence(ObjectSet *oset) } /** - * discard object extents from an ObjectSet by removing the objects in exls from the in-memory oset. + * discard object extents from an ObjectSet by removing the objects in + * exls from the in-memory oset. */ void ObjectCacher::discard_set(ObjectSet *oset, const vector& exls) { @@ -2003,7 +2066,7 @@ void ObjectCacher::discard_set(ObjectSet *oset, const vector& exls ldout(cct, 10) << "discard_set on " << oset << " dne" << dendl; return; } - + ldout(cct, 10) << "discard_set " << oset << dendl; bool were_dirty = oset->dirty_or_tx > 0; @@ -2017,7 +2080,7 @@ void ObjectCacher::discard_set(ObjectSet *oset, const vector& exls if (objects[oset->poolid].count(soid) == 0) continue; Object *ob = objects[oset->poolid][soid]; - + ob->discard(ex.offset, ex.length); } @@ -2032,54 +2095,53 @@ void ObjectCacher::verify_stats() const assert(lock.is_locked()); ldout(cct, 10) << "verify_stats" << dendl; - loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0, error = 0; - for (vector >::const_iterator i = objects.begin(); - i != objects.end(); - ++i) { - for (ceph::unordered_map::const_iterator p = i->begin(); - p != i->end(); - ++p) { + loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0, + error = 0; + for (vector >::const_iterator i + = objects.begin(); + i != objects.end(); + ++i) { + for (ceph::unordered_map::const_iterator p + = i->begin(); + p != i->end(); + ++p) { Object *ob = p->second; for (map::const_iterator q = ob->data.begin(); - q != ob->data.end(); - ++q) { - BufferHead *bh = q->second; - switch (bh->get_state()) { - case BufferHead::STATE_MISSING: - missing += bh->length(); - break; - case BufferHead::STATE_CLEAN: - clean += bh->length(); - break; - case BufferHead::STATE_ZERO: - zero += bh->length(); - break; - case BufferHead::STATE_DIRTY: - dirty += bh->length(); - break; - case BufferHead::STATE_TX: - tx += bh->length(); - break; - case BufferHead::STATE_RX: - rx += bh->length(); - break; + q != ob->data.end(); + ++q) { + BufferHead *bh = q->second; + switch (bh->get_state()) { + case BufferHead::STATE_MISSING: + missing += bh->length(); + break; + case BufferHead::STATE_CLEAN: + clean += bh->length(); + break; + case BufferHead::STATE_ZERO: + zero += bh->length(); + break; + case BufferHead::STATE_DIRTY: + dirty += bh->length(); + break; + case BufferHead::STATE_TX: + tx += bh->length(); + break; + case BufferHead::STATE_RX: + rx += bh->length(); + break; case BufferHead::STATE_ERROR: error += bh->length(); break; - default: - assert(0); - } + default: + assert(0); + } } } } - ldout(cct, 10) << " clean " << clean - << " rx " << rx - << " tx " << tx - << " dirty " << dirty - << " missing " << missing - << " error " << error - << dendl; + ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx + << " dirty " << dirty << " missing " << missing + << " error " << error << dendl; assert(clean == stat_clean); assert(rx == stat_rx); assert(tx == stat_tx); @@ -2167,7 +2229,7 @@ void ObjectCacher::bh_set_state(BufferHead *bh, int s) if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) { bh_lru_rest.lru_remove(bh); bh_lru_dirty.lru_insert_top(bh); - } else if (s != BufferHead::STATE_DIRTY && state == BufferHead::STATE_DIRTY) { + } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) { bh_lru_dirty.lru_remove(bh); if (bh->get_dontneed()) bh_lru_rest.lru_insert_bot(bh); @@ -2187,7 +2249,8 @@ void ObjectCacher::bh_set_state(BufferHead *bh, int s) dirty_or_tx_bh.erase(bh); } - if (s != BufferHead::STATE_ERROR && bh->get_state() == BufferHead::STATE_ERROR) { + if (s != BufferHead::STATE_ERROR && + bh->get_state() == BufferHead::STATE_ERROR) { bh->error = 0; } diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index 87fe351b4698a..590331997e517 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #ifndef CEPH_OBJECTCACHER_H #define CEPH_OBJECTCACHER_H @@ -26,16 +26,25 @@ enum { l_objectcacher_cache_ops_miss, // ops we don't satisfy completely from cache l_objectcacher_cache_bytes_hit, // bytes read directly from cache - l_objectcacher_cache_bytes_miss, // bytes we couldn't read directly from cache + + l_objectcacher_cache_bytes_miss, // bytes we couldn't read directly + + // from cache l_objectcacher_data_read, // total bytes read out l_objectcacher_data_written, // bytes written to cache l_objectcacher_data_flushed, // bytes flushed to WritebackHandler - l_objectcacher_overwritten_in_flush, // bytes overwritten while flushing is in progress - - l_objectcacher_write_ops_blocked, // total write ops we delayed due to dirty limits - l_objectcacher_write_bytes_blocked, // total number of write bytes we delayed due to dirty limits - l_objectcacher_write_time_blocked, // total time in seconds spent blocking a write due to dirty limits + l_objectcacher_overwritten_in_flush, // bytes overwritten while + // flushing is in progress + + l_objectcacher_write_ops_blocked, // total write ops we delayed due + // to dirty limits + l_objectcacher_write_bytes_blocked, // total number of write bytes + // we delayed due to dirty + // limits + l_objectcacher_write_time_blocked, // total time in seconds spent + // blocking a write due to dirty + // limits l_objectcacher_last, }; @@ -50,21 +59,22 @@ class ObjectCacher { typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset); - // read scatter/gather + // read scatter/gather struct OSDRead { vector extents; snapid_t snap; map read_data; // bits of data as they come back bufferlist *bl; int fadvise_flags; - OSDRead(snapid_t s, bufferlist *b, int f) : snap(s), bl(b), fadvise_flags(f) {} + OSDRead(snapid_t s, bufferlist *b, int f) + : snap(s), bl(b), fadvise_flags(f) {} }; OSDRead *prepare_read(snapid_t snap, bufferlist *b, int f) { return new OSDRead(snap, b, f); } - - // write scatter/gather + + // write scatter/gather struct OSDWrite { vector extents; SnapContext snapc; @@ -73,9 +83,9 @@ class ObjectCacher { int fadvise_flags; ceph_tid_t journal_tid; OSDWrite(const SnapContext& sc, const bufferlist& b, utime_t mt, int f, - ceph_tid_t _journal_tid) + ceph_tid_t _journal_tid) : snapc(sc), bl(b), mtime(mt), fadvise_flags(f), - journal_tid(_journal_tid) {} + journal_tid(_journal_tid) {} }; OSDWrite *prepare_write(const SnapContext& sc, const bufferlist &b, @@ -116,11 +126,11 @@ class ObjectCacher { SnapContext snapc; ceph_tid_t journal_tid; int error; // holds return value for failed reads - - map< loff_t, list > waitfor_read; - + + map > waitfor_read; + // cons - BufferHead(Object *o) : + BufferHead(Object *o) : state(STATE_MISSING), ref(0), dontneed(false), @@ -132,7 +142,7 @@ class ObjectCacher { error(0) { ex.start = ex.length = 0; } - + // extent loff_t start() const { return ex.start; } void set_start(loff_t s) { ex.start = s; } @@ -153,7 +163,6 @@ class ObjectCacher { return journal_tid; } inline void set_journal_tid(ceph_tid_t _journal_tid) { - journal_tid = _journal_tid; } @@ -164,7 +173,7 @@ class ObjectCacher { bool is_tx() { return state == STATE_TX; } bool is_rx() { return state == STATE_RX; } bool is_error() { return state == STATE_ERROR; } - + // reference counting int get() { assert(ref >= 0); @@ -194,7 +203,7 @@ class ObjectCacher { inline bool can_merge_journal(BufferHead *bh) const { return (get_journal_tid() == 0 || bh->get_journal_tid() == 0 || - get_journal_tid() == bh->get_journal_tid()); + get_journal_tid() == bh->get_journal_tid()); } }; @@ -213,7 +222,7 @@ class ObjectCacher { xlist::item set_item; object_locator_t oloc; uint64_t truncate_size, truncate_seq; - + bool complete; bool exists; @@ -258,7 +267,7 @@ class ObjectCacher { ObjectSet *get_object_set() { return oset; } string get_namespace() { return oloc.nspace; } uint64_t get_object_number() const { return object_no; } - + object_locator_t& get_oloc() { return oloc; } void set_object_locator(object_locator_t& l) { oloc = l; } @@ -321,9 +330,9 @@ class ObjectCacher { bool is_cached(loff_t off, loff_t len); bool include_all_cached_data(loff_t off, loff_t len); int map_read(OSDRead *rd, - map& hits, - map& missing, - map& rx, + map& hits, + map& missing, + map& rx, map& errors); BufferHead *map_write(OSDWrite *wr); @@ -344,7 +353,7 @@ class ObjectCacher { return ref; } }; - + struct ObjectSet { void *parent; @@ -373,7 +382,7 @@ class ObjectCacher { string name; Mutex& lock; - + uint64_t max_dirty, target_dirty, max_size, max_objects; utime_t max_dirty_age; bool block_writes_upfront; @@ -381,7 +390,8 @@ class ObjectCacher { flush_set_callback_t flush_set_callback; void *flush_set_callback_arg; - vector > objects; // indexed by pool_id + // indexed by pool_id + vector > objects; list waitfor_read; @@ -410,7 +420,7 @@ class ObjectCacher { Object *get_object_maybe(sobject_t oid, object_locator_t &l) { // have it? if (((uint32_t)l.pool < objects.size()) && - (objects[l.pool].count(oid))) + (objects[l.pool].count(oid))) return objects[l.pool][oid]; return NULL; } @@ -462,18 +472,29 @@ class ObjectCacher { // bh states void bh_set_state(BufferHead *bh, int s); - void copy_bh_state(BufferHead *bh1, BufferHead *bh2) { + void copy_bh_state(BufferHead *bh1, BufferHead *bh2) { bh_set_state(bh2, bh1->get_state()); } - - void mark_missing(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_MISSING); } - void mark_clean(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_CLEAN); } - void mark_zero(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_ZERO); } - void mark_rx(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_RX); } - void mark_tx(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_TX); } - void mark_error(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_ERROR); } - void mark_dirty(BufferHead *bh) { - bh_set_state(bh, BufferHead::STATE_DIRTY); + + void mark_missing(BufferHead *bh) { + bh_set_state(bh,BufferHead::STATE_MISSING); + } + void mark_clean(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_CLEAN); + } + void mark_zero(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_ZERO); + } + void mark_rx(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_RX); + } + void mark_tx(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_TX); } + void mark_error(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_ERROR); + } + void mark_dirty(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_DIRTY); bh_lru_dirty.lru_touch(bh); //bh->set_dirty_stamp(ceph_clock_now(g_ceph_context)); } @@ -530,7 +551,8 @@ class ObjectCacher { public: bufferlist bl; - C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s, uint64_t l) : + C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s, + uint64_t l) : oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l), set_item(this), trust_enoent(true), tid(t) { @@ -558,7 +580,8 @@ class ObjectCacher { uint64_t length; public: ceph_tid_t tid; - C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s, uint64_t l) : + C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s, + uint64_t l) : oc(c), poolid(_poolid), oid(o), start(s), length(l), tid(0) {} void finish(int r) { oc->bh_write_commit(poolid, oid, start, length, tid, r); @@ -608,7 +631,8 @@ class ObjectCacher { ObjectSet *oset; Context *onfinish; public: - C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c) : oc(_oc), rd(r), oset(os), onfinish(c) {} + C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c) + : oc(_oc), rd(r), oset(os), onfinish(c) {} void finish(int r) { if (r < 0) { if (onfinish) @@ -617,7 +641,7 @@ class ObjectCacher { } int ret = oc->_readx(rd, oset, onfinish, false); if (ret != 0 && onfinish) { - onfinish->complete(ret); + onfinish->complete(ret); } } }; @@ -632,11 +656,13 @@ class ObjectCacher { */ int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish); int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace); - bool is_cached(ObjectSet *oset, vector& extents, snapid_t snapid); + bool is_cached(ObjectSet *oset, vector& extents, + snapid_t snapid); private: // write blocking - int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Context *onfreespace); + int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, + Context *onfreespace); void maybe_wait_for_writeback(uint64_t len); bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish); @@ -646,12 +672,14 @@ public: bool set_is_dirty_or_committing(ObjectSet *oset); bool flush_set(ObjectSet *oset, Context *onfinish=0); - bool flush_set(ObjectSet *oset, vector& ex, Context *onfinish=0); - void flush_all(Context *onfinish=0); + bool flush_set(ObjectSet *oset, vector& ex, + Context *onfinish = 0); + void flush_all(Context *onfinish = 0); void purge_set(ObjectSet *oset); - loff_t release_set(ObjectSet *oset); // returns # of bytes not released (ie non-clean) + // returns # of bytes not released (ie non-clean) + loff_t release_set(ObjectSet *oset); uint64_t release_all(); void discard_set(ObjectSet *oset, const vector& ex); @@ -686,35 +714,38 @@ public: // file functions /*** async+caching (non-blocking) file interface ***/ - int file_is_cached(ObjectSet *oset, ceph_file_layout *layout, snapid_t snapid, - loff_t offset, uint64_t len) { + int file_is_cached(ObjectSet *oset, ceph_file_layout *layout, + snapid_t snapid, loff_t offset, uint64_t len) { vector extents; - Striper::file_to_extents(cct, oset->ino, layout, offset, len, oset->truncate_size, extents); + Striper::file_to_extents(cct, oset->ino, layout, offset, len, + oset->truncate_size, extents); return is_cached(oset, extents, snapid); } int file_read(ObjectSet *oset, ceph_file_layout *layout, snapid_t snapid, - loff_t offset, uint64_t len, - bufferlist *bl, - int flags, - Context *onfinish) { + loff_t offset, uint64_t len, bufferlist *bl, int flags, + Context *onfinish) { OSDRead *rd = prepare_read(snapid, bl, flags); - Striper::file_to_extents(cct, oset->ino, layout, offset, len, oset->truncate_size, rd->extents); + Striper::file_to_extents(cct, oset->ino, layout, offset, len, + oset->truncate_size, rd->extents); return readx(rd, oset, onfinish); } - int file_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc, - loff_t offset, uint64_t len, - bufferlist& bl, utime_t mtime, int flags) { + int file_write(ObjectSet *oset, ceph_file_layout *layout, + const SnapContext& snapc, loff_t offset, uint64_t len, + bufferlist& bl, utime_t mtime, int flags) { OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0); - Striper::file_to_extents(cct, oset->ino, layout, offset, len, oset->truncate_size, wr->extents); + Striper::file_to_extents(cct, oset->ino, layout, offset, len, + oset->truncate_size, wr->extents); return writex(wr, oset, NULL); } - bool file_flush(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc, - loff_t offset, uint64_t len, Context *onfinish) { + bool file_flush(ObjectSet *oset, ceph_file_layout *layout, + const SnapContext& snapc, loff_t offset, uint64_t len, + Context *onfinish) { vector extents; - Striper::file_to_extents(cct, oset->ino, layout, offset, len, oset->truncate_size, extents); + Striper::file_to_extents(cct, oset->ino, layout, offset, len, + oset->truncate_size, extents); return flush_set(oset, extents, onfinish); } }; @@ -740,7 +771,8 @@ inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh) if (bh.error) out << " error=" << bh.error; out << "]"; out << " waiters = {"; - for (map >::const_iterator it = bh.waitfor_read.begin(); + for (map >::const_iterator it + = bh.waitfor_read.begin(); it != bh.waitfor_read.end(); ++it) { out << " " << it->first << "->["; for (list::const_iterator lit = it->second.begin(); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 2f67809b17f99..d18cead9b1528 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -7,9 +7,9 @@ * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software + * License version 2.1, as published by the Free Software * Foundation. See file COPYING. - * + * */ #include "Objecter.h" @@ -147,7 +147,8 @@ static const char *config_keys[] = { Mutex *Objecter::OSDSession::get_lock(object_t& oid) { #define HASH_PRIME 1021 - uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size()) % HASH_PRIME; + uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size()) + % HASH_PRIME; return completion_locks[h % num_locks]; } @@ -174,7 +175,7 @@ void Objecter::update_crush_location() int r = CrushWrapper::parse_loc_multimap(lvec, &crush_location); if (r < 0) { lderr(cct) << "warning: crush_location '" << cct->_conf->crush_location - << "' does not parse" << dendl; + << "' does not parse" << dendl; } } @@ -191,7 +192,7 @@ void Objecter::init() PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last); pcb.add_u64(l_osdc_op_active, "op_active", - "Operations active", "actv"); + "Operations active", "actv"); pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations"); pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations"); pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data"); @@ -201,73 +202,116 @@ void Objecter::init() pcb.add_u64_counter(l_osdc_op, "op", "Operations"); pcb.add_u64_counter(l_osdc_op_r, "op_r", - "Read operations", "read"); + "Read operations", "read"); pcb.add_u64_counter(l_osdc_op_w, "op_w", - "Write operations", "writ"); - pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations"); + "Write operations", "writ"); + pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", + "Read-modify-write operations"); pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation"); pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations"); - pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create", "Create object operations"); + pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create", + "Create object operations"); pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations"); pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations"); - pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull", "Write full object operations"); - pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append", "Append operation"); - pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero", "Set object to zero operations"); - pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate", "Truncate object operations"); - pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete", "Delete object operations"); - pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext", "Map extent operations"); - pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read", "Sparse read operations"); - pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange", "Clone range operations"); - pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr", "Get xattr operations"); - pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr", "Set xattr operations"); - pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr", "Xattr comparison operations"); - pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr", "Remove xattr operations"); - pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs", "Reset xattr operations"); - pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up", "TMAP update operations"); - pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put", "TMAP put operations"); - pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get", "TMAP get operations"); - pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call", "Call (execute) operations"); - pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch", "Watch by object operations"); - pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify", "Notify about object operations"); - pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr", "Extended attribute comparison in multi operations"); + pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull", + "Write full object operations"); + pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append", + "Append operation"); + pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero", + "Set object to zero operations"); + pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate", + "Truncate object operations"); + pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete", + "Delete object operations"); + pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext", + "Map extent operations"); + pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read", + "Sparse read operations"); + pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange", + "Clone range operations"); + pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr", + "Get xattr operations"); + pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr", + "Set xattr operations"); + pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr", + "Xattr comparison operations"); + pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr", + "Remove xattr operations"); + pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs", + "Reset xattr operations"); + pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up", + "TMAP update operations"); + pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put", + "TMAP put operations"); + pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get", + "TMAP get operations"); + pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call", + "Call (execute) operations"); + pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch", + "Watch by object operations"); + pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify", + "Notify about object operations"); + pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr", + "Extended attribute comparison in multi operations"); pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls"); pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter"); pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations"); - pcb.add_u64(l_osdc_linger_active, "linger_active", "Active lingering operations"); - pcb.add_u64_counter(l_osdc_linger_send, "linger_send", "Sent lingering operations"); - pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend", "Resent lingering operations"); - pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping", "Sent pings to lingering operations"); - - pcb.add_u64(l_osdc_poolop_active, "poolop_active", "Active pool operations"); - pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send", "Sent pool operations"); - pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend", "Resent pool operations"); - - pcb.add_u64(l_osdc_poolstat_active, "poolstat_active", "Active get pool stat operations"); - pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send", "Pool stat operations sent"); - pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend", "Resent pool stats"); + pcb.add_u64(l_osdc_linger_active, "linger_active", + "Active lingering operations"); + pcb.add_u64_counter(l_osdc_linger_send, "linger_send", + "Sent lingering operations"); + pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend", + "Resent lingering operations"); + pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping", + "Sent pings to lingering operations"); + + pcb.add_u64(l_osdc_poolop_active, "poolop_active", + "Active pool operations"); + pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send", + "Sent pool operations"); + pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend", + "Resent pool operations"); + + pcb.add_u64(l_osdc_poolstat_active, "poolstat_active", + "Active get pool stat operations"); + pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send", + "Pool stat operations sent"); + pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend", + "Resent pool stats"); pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations"); pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats"); - pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend", "Resent FS stats"); + pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend", + "Resent FS stats"); pcb.add_u64(l_osdc_command_active, "command_active", "Active commands"); - pcb.add_u64_counter(l_osdc_command_send, "command_send", "Sent commands"); - pcb.add_u64_counter(l_osdc_command_resend, "command_resend", "Resent commands"); + pcb.add_u64_counter(l_osdc_command_send, "command_send", + "Sent commands"); + pcb.add_u64_counter(l_osdc_command_resend, "command_resend", + "Resent commands"); pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch"); - pcb.add_u64_counter(l_osdc_map_full, "map_full", "Full OSD maps received"); - pcb.add_u64_counter(l_osdc_map_inc, "map_inc", "Incremental OSD maps received"); - - pcb.add_u64(l_osdc_osd_sessions, "osd_sessions", "Open sessions"); // open sessions - pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open", "Sessions opened"); - pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close", "Sessions closed"); + pcb.add_u64_counter(l_osdc_map_full, "map_full", + "Full OSD maps received"); + pcb.add_u64_counter(l_osdc_map_inc, "map_inc", + "Incremental OSD maps received"); + + pcb.add_u64(l_osdc_osd_sessions, "osd_sessions", + "Open sessions"); // open sessions + pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open", + "Sessions opened"); + pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close", + "Sessions closed"); pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions"); - pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr", "OSD OMAP write operations"); - pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd", "OSD OMAP read operations"); - pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del", "OSD OMAP delete operations"); + pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr", + "OSD OMAP write operations"); + pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd", + "OSD OMAP read operations"); + pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del", + "OSD OMAP delete operations"); logger = pcb.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); @@ -339,7 +383,8 @@ void Objecter::shutdown() } while(!check_latest_map_commands.empty()) { - map::iterator i = check_latest_map_commands.begin(); + map::iterator i + = check_latest_map_commands.begin(); i->second->put(); check_latest_map_commands.erase(i->first); } @@ -364,7 +409,8 @@ void Objecter::shutdown() ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl; while(!homeless_session->linger_ops.empty()) { - std::map::iterator i = homeless_session->linger_ops.begin(); + std::map::iterator i + = homeless_session->linger_ops.begin(); ldout(cct, 10) << " linger_op " << i->first << dendl; LingerOp *lop = i->second; { @@ -388,7 +434,8 @@ void Objecter::shutdown() } while(!homeless_session->command_ops.empty()) { - std::map::iterator i = homeless_session->command_ops.begin(); + std::map::iterator i + = homeless_session->command_ops.begin(); ldout(cct, 10) << " command_op " << i->first << dendl; CommandOp *cop = i->second; { @@ -441,7 +488,8 @@ void Objecter::_send_linger(LingerOp *info) info->watch_lock.get_read(); // just to read registered status bufferlist *poutbl = NULL; if (info->registered && info->is_watch) { - ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" << dendl; + ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" + << dendl; opv.push_back(OSDOp()); opv.back().op.op = CEPH_OSD_OP_WATCH; opv.back().op.watch.cookie = info->get_cookie(); @@ -449,7 +497,8 @@ void Objecter::_send_linger(LingerOp *info) opv.back().op.watch.gen = ++info->register_gen; oncommit = new C_Linger_Reconnect(this, info); } else { - ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl; + ldout(cct, 15) << "send_linger " << info->linger_id << " register" + << dendl; opv = info->ops; C_Linger_Commit *c = new C_Linger_Commit(this, info); if (!info->is_watch) { @@ -512,7 +561,8 @@ void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl) bufferlist::iterator p = outbl.begin(); try { ::decode(info->notify_id, p); - ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id << dendl; + ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id + << dendl; } catch (buffer::error& e) { } @@ -577,7 +627,8 @@ void Objecter::_send_linger_ping(LingerOp *info) assert(info->session->lock.is_locked()); if (cct->_conf->objecter_inject_no_watch_ping) { - ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING" << dendl; + ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING" + << dendl; return; } if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) { @@ -586,7 +637,8 @@ void Objecter::_send_linger_ping(LingerOp *info) } utime_t now = ceph_clock_now(NULL); - ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now << dendl; + ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now + << dendl; vector opv(1); opv[0].op.op = CEPH_OSD_OP_WATCH; @@ -815,8 +867,9 @@ void Objecter::handle_watch_notify(MWatchNotify *m) } } } else if (!info->is_watch) { - // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline since - // we know the only user (librados) is safe to call in fast-dispatch context + // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline + // since we know the only user (librados) is safe to call in + // fast-dispatch context if (info->notify_id && info->notify_id != m->notify_id) { ldout(cct, 10) << __func__ << " reply notify " << m->notify_id @@ -916,12 +969,12 @@ bool Objecter::ms_dispatch(Message *m) } void Objecter::_scan_requests(OSDSession *s, - bool force_resend, - bool cluster_full, - map *pool_full_map, - map& need_resend, - list& need_resend_linger, - map& need_resend_command) + bool force_resend, + bool cluster_full, + map *pool_full_map, + map& need_resend, + list& need_resend_linger, + map& need_resend_command) { assert(rwlock.is_wlocked()); @@ -936,12 +989,15 @@ void Objecter::_scan_requests(OSDSession *s, while (lp != s->linger_ops.end()) { LingerOp *op = lp->second; assert(op->session == s); - ++lp; // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation + // check_linger_pool_dne() may touch linger_ops; prevent iterator + // invalidation + ++lp; ldout(cct, 10) << " checking linger op " << op->linger_id << dendl; bool unregister, force_resend_writes = cluster_full; int r = _recalc_linger_op_target(op, lc); if (pool_full_map) - force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool]; + force_resend_writes = force_resend_writes || + (*pool_full_map)[op->target.base_oloc.pool]; switch (r) { case RECALC_OP_TARGET_NO_ACTION: if (!force_resend && !force_resend_writes) @@ -954,10 +1010,10 @@ void Objecter::_scan_requests(OSDSession *s, case RECALC_OP_TARGET_POOL_DNE: _check_linger_pool_dne(op, &unregister); if (unregister) { - ldout(cct, 10) << " need to unregister linger op " + ldout(cct, 10) << " need to unregister linger op " << op->linger_id << dendl; op->get(); - unregister_lingers.push_back(op); + unregister_lingers.push_back(op); } break; } @@ -971,7 +1027,8 @@ void Objecter::_scan_requests(OSDSession *s, ldout(cct, 10) << " checking op " << op->tid << dendl; bool force_resend_writes = cluster_full; if (pool_full_map) - force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool]; + force_resend_writes = force_resend_writes || + (*pool_full_map)[op->target.base_oloc.pool]; int r = _calc_target(&op->target, &op->last_force_resend); switch (r) { case RECALC_OP_TARGET_NO_ACTION: @@ -1000,7 +1057,8 @@ void Objecter::_scan_requests(OSDSession *s, ldout(cct, 10) << " checking command " << c->tid << dendl; bool force_resend_writes = cluster_full; if (pool_full_map) - force_resend_writes = force_resend_writes || (*pool_full_map)[c->target_pg.pool()]; + force_resend_writes = force_resend_writes || + (*pool_full_map)[c->target_pg.pool()]; int r = _calc_command_target(c); switch (r) { case RECALC_OP_TARGET_NO_ACTION: @@ -1011,7 +1069,7 @@ void Objecter::_scan_requests(OSDSession *s, case RECALC_OP_TARGET_NEED_RESEND: need_resend_command[c->tid] = c; if (c->session) { - _session_command_op_remove(c->session, c); + _session_command_op_remove(c->session, c); } _command_cancel_map_check(c); break; @@ -1020,7 +1078,7 @@ void Objecter::_scan_requests(OSDSession *s, case RECALC_OP_TARGET_OSD_DOWN: _check_command_map_dne(c); break; - } + } } s->lock.unlock(); @@ -1039,7 +1097,7 @@ void Objecter::handle_osd_map(MOSDMap *m) if (!initialized.read()) return; - assert(osdmap); + assert(osdmap); if (m->fsid != monc->get_fsid()) { ldout(cct, 0) << "handle_osd_map fsid " << m->fsid @@ -1049,26 +1107,27 @@ void Objecter::handle_osd_map(MOSDMap *m) bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD); bool cluster_full = _osdmap_full_flag(); - bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full || _osdmap_has_pool_full(); + bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full || + _osdmap_has_pool_full(); map pool_full_map; - for (map::const_iterator it = osdmap->get_pools().begin(); + for (map::const_iterator it + = osdmap->get_pools().begin(); it != osdmap->get_pools().end(); ++it) pool_full_map[it->first] = _osdmap_pool_full(it->second); - + list need_resend_linger; map need_resend; map need_resend_command; if (m->get_last() <= osdmap->get_epoch()) { - ldout(cct, 3) << "handle_osd_map ignoring epochs [" - << m->get_first() << "," << m->get_last() - << "] <= " << osdmap->get_epoch() << dendl; + ldout(cct, 3) << "handle_osd_map ignoring epochs [" + << m->get_first() << "," << m->get_last() + << "] <= " << osdmap->get_epoch() << dendl; } else { - ldout(cct, 3) << "handle_osd_map got epochs [" - << m->get_first() << "," << m->get_last() - << "] > " << osdmap->get_epoch() - << dendl; + ldout(cct, 3) << "handle_osd_map got epochs [" + << m->get_first() << "," << m->get_last() + << "] > " << osdmap->get_epoch() << dendl; if (osdmap->get_epoch()) { bool skipped_map = false; @@ -1076,7 +1135,7 @@ void Objecter::handle_osd_map(MOSDMap *m) for (epoch_t e = osdmap->get_epoch() + 1; e <= m->get_last(); e++) { - + if (osdmap->get_epoch() == e-1 && m->incremental_maps.count(e)) { ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e @@ -1107,10 +1166,10 @@ void Objecter::handle_osd_map(MOSDMap *m) logger->set(l_osdc_map_epoch, osdmap->get_epoch()); cluster_full = cluster_full || _osdmap_full_flag(); - update_pool_full_map(pool_full_map); + update_pool_full_map(pool_full_map); _scan_requests(homeless_session, skipped_map, cluster_full, - &pool_full_map, need_resend, - need_resend_linger, need_resend_command); + &pool_full_map, need_resend, + need_resend_linger, need_resend_command); // osd addr changes? for (map::iterator p = osd_sessions.begin(); @@ -1118,7 +1177,7 @@ void Objecter::handle_osd_map(MOSDMap *m) OSDSession *s = p->second; _scan_requests(s, skipped_map, cluster_full, &pool_full_map, need_resend, - need_resend_linger, need_resend_command); + need_resend_linger, need_resend_command); ++p; if (!osdmap->is_up(s->osd) || (s->con && @@ -1129,16 +1188,16 @@ void Objecter::handle_osd_map(MOSDMap *m) assert(e == osdmap->get_epoch()); } - + } else { // first map. we want the full thing. if (m->maps.count(m->get_last())) { - for (map::iterator p = osd_sessions.begin(); + for (map::iterator p = osd_sessions.begin(); p != osd_sessions.end(); ++p) { OSDSession *s = p->second; _scan_requests(s, false, false, NULL, need_resend, - need_resend_linger, need_resend_command); - } + need_resend_linger, need_resend_command); + } ldout(cct, 3) << "handle_osd_map decoding full epoch " << m->get_last() << dendl; osdmap->decode(m->maps[m->get_last()]); @@ -1156,10 +1215,12 @@ void Objecter::handle_osd_map(MOSDMap *m) } bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD); - bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag() || _osdmap_has_pool_full(); + bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag() + || _osdmap_has_pool_full(); // was/is paused? - if (was_pauserd || was_pausewr || pauserd || pausewr || osdmap->get_epoch() < epoch_barrier) { + if (was_pauserd || was_pausewr || pauserd || pausewr || + osdmap->get_epoch() < epoch_barrier) { _maybe_request_map(); } @@ -1221,7 +1282,7 @@ void Objecter::handle_osd_map(MOSDMap *m) } _dump_active(); - + // finish any Contexts that were waiting on a map update map > >::iterator p = waiting_for_map.begin(); @@ -1249,22 +1310,25 @@ void Objecter::C_Op_Map_Latest::finish(int r) if (r == -EAGAIN || r == -ECANCELED) return; - lgeneric_subdout(objecter->cct, objecter, 10) << "op_map_latest r=" << r << " tid=" << tid - << " latest " << latest << dendl; + lgeneric_subdout(objecter->cct, objecter, 10) + << "op_map_latest r=" << r << " tid=" << tid + << " latest " << latest << dendl; RWLock::WLocker wl(objecter->rwlock); map::iterator iter = objecter->check_latest_map_ops.find(tid); if (iter == objecter->check_latest_map_ops.end()) { - lgeneric_subdout(objecter->cct, objecter, 10) << "op_map_latest op " << tid << " not found" << dendl; + lgeneric_subdout(objecter->cct, objecter, 10) + << "op_map_latest op "<< tid << " not found" << dendl; return; } Op *op = iter->second; objecter->check_latest_map_ops.erase(iter); - lgeneric_subdout(objecter->cct, objecter, 20) << "op_map_latest op " << op << dendl; + lgeneric_subdout(objecter->cct, objecter, 20) + << "op_map_latest op "<< op << dendl; if (op->map_dne_bound == 0) op->map_dne_bound = latest; @@ -1274,7 +1338,8 @@ void Objecter::C_Op_Map_Latest::finish(int r) op->put(); } -int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap) +int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, + snapid_t *snap) { RWLock::RLocker rl(rwlock); @@ -1296,7 +1361,8 @@ int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t return -ENOENT; } -int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap, pool_snap_info_t *info) +int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap, + pool_snap_info_t *info) { RWLock::RLocker rl(rwlock); @@ -1366,11 +1432,11 @@ void Objecter::_check_op_pool_dne(Op *op, bool session_locked) assert(s != NULL); if (!session_locked) { - s->lock.get_write(); + s->lock.get_write(); } _finish_op(op, 0); if (!session_locked) { - s->lock.unlock(); + s->lock.unlock(); } } } else { @@ -1563,7 +1629,8 @@ void Objecter::_command_cancel_map_check(CommandOp *c) /** * Look up OSDSession by OSD id. * - * @returns 0 on success, or -EAGAIN if the lock context requires promotion to write. + * @returns 0 on success, or -EAGAIN if the lock context requires + * promotion to write. */ int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc) { @@ -1571,7 +1638,8 @@ int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc) if (osd < 0) { *session = homeless_session; - ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless" << dendl; + ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless" + << dendl; return 0; } @@ -1580,7 +1648,8 @@ int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc) OSDSession *s = p->second; s->get(); *session = s; - ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " << s->get_nref() << dendl; + ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " + << s->get_nref() << dendl; return 0; } if (!lc.is_wlocked()) { @@ -1593,14 +1662,16 @@ int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc) logger->inc(l_osdc_osd_sessions, osd_sessions.size()); s->get(); *session = s; - ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " << s->get_nref() << dendl; + ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " + << s->get_nref() << dendl; return 0; } void Objecter::put_session(Objecter::OSDSession *s) { if (s && !s->is_homeless()) { - ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " << s->get_nref() << dendl; + ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " + << s->get_nref() << dendl; s->put(); } } @@ -1610,7 +1681,8 @@ void Objecter::get_session(Objecter::OSDSession *s) assert(s != NULL); if (!s->is_homeless()) { - ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " << s->get_nref() << dendl; + ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " + << s->get_nref() << dendl; s->get(); } } @@ -1620,7 +1692,8 @@ void Objecter::_reopen_session(OSDSession *s) assert(s->lock.is_locked()); entity_inst_t inst = osdmap->get_inst(s->osd); - ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now " << inst << dendl; + ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now " + << inst << dendl; if (s->con) { s->con->mark_down(); logger->inc(l_osdc_osd_session_close); @@ -1674,15 +1747,15 @@ void Objecter::close_session(OSDSession *s) { RWLock::WLocker wl(homeless_session->lock); for (std::list::iterator i = homeless_lingers.begin(); - i != homeless_lingers.end(); ++i) { + i != homeless_lingers.end(); ++i) { _session_linger_op_assign(homeless_session, *i); } for (std::list::iterator i = homeless_ops.begin(); - i != homeless_ops.end(); ++i) { + i != homeless_ops.end(); ++i) { _session_op_assign(homeless_session, *i); } for (std::list::iterator i = homeless_commands.begin(); - i != homeless_commands.end(); ++i) { + i != homeless_commands.end(); ++i) { _session_command_op_assign(homeless_session, *i); } } @@ -1741,7 +1814,8 @@ void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin) _get_latest_version(oldest, newest, fin); } -void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, Context *fin) +void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, + Context *fin) { assert(rwlock.is_wlocked()); if (osdmap->get_epoch() >= newest) { @@ -1768,9 +1842,11 @@ void Objecter::_maybe_request_map() if (_osdmap_full_flag() || osdmap->test_flag(CEPH_OSDMAP_PAUSERD) || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) { - ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next osd map (FULL flag is set)" << dendl; + ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next " + "osd map (FULL flag is set)" << dendl; } else { - ldout(cct, 10) << "_maybe_request_map subscribing (onetime) to next osd map" << dendl; + ldout(cct, 10) + << "_maybe_request_map subscribing (onetime) to next osd map" << dendl; flag = CEPH_SUBSCRIBE_ONETIME; } epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0; @@ -1789,13 +1865,13 @@ void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err) /** * Use this together with wait_for_map: this is a pre-check to avoid - * allocating a Context for wait_for_map if we can see that we definitely - * already have the epoch. + * allocating a Context for wait_for_map if we can see that we + * definitely already have the epoch. * - * This does *not* replace the need to handle the return value of wait_for_map: - * just because we don't have it in this pre-check doesn't mean we won't - * have it when calling back into wait_for_map, since the objecter lock - * is dropped in between. + * This does *not* replace the need to handle the return value of + * wait_for_map: just because we don't have it in this pre-check + * doesn't mean we won't have it when calling back into wait_for_map, + * since the objecter lock is dropped in between. */ bool Objecter::have_map(const epoch_t epoch) { @@ -1831,13 +1907,15 @@ void Objecter::kick_requests(OSDSession *session) _linger_ops_resend(lresend); } -void Objecter::_kick_requests(OSDSession *session, map& lresend) +void Objecter::_kick_requests(OSDSession *session, + map& lresend) { assert(rwlock.is_wlocked()); // resend ops map resend; // resend in tid order - for (map::iterator p = session->ops.begin(); p != session->ops.end();) { + for (map::iterator p = session->ops.begin(); + p != session->ops.end();) { Op *op = p->second; ++p; logger->inc(l_osdc_op_resend); @@ -1856,7 +1934,8 @@ void Objecter::_kick_requests(OSDSession *session, map& lr } // resend lingers - for (map::iterator j = session->linger_ops.begin(); j != session->linger_ops.end(); ++j) { + for (map::iterator j = session->linger_ops.begin(); + j != session->linger_ops.end(); ++j) { LingerOp *op = j->second; op->get(); logger->inc(l_osdc_linger_resend); @@ -1866,7 +1945,8 @@ void Objecter::_kick_requests(OSDSession *session, map& lr // resend commands map cresend; // resend in order - for (map::iterator k = session->command_ops.begin(); k != session->command_ops.end(); ++k) { + for (map::iterator k = session->command_ops.begin(); + k != session->command_ops.end(); ++k) { logger->inc(l_osdc_command_resend); cresend[k->first] = k->second; } @@ -1923,7 +2003,8 @@ void Objecter::tick() unsigned laggy_ops = 0; - for (map::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { + for (map::iterator siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; RWLock::RLocker l(s->lock); bool found = false; @@ -1933,7 +2014,8 @@ void Objecter::tick() Op *op = p->second; assert(op->session); if (op->stamp < cutoff) { - ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd << " is laggy" << dendl; + ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd + << " is laggy" << dendl; found = true; ++laggy_ops; } @@ -1944,7 +2026,8 @@ void Objecter::tick() LingerOp *op = p->second; RWLock::WLocker wl(op->watch_lock); assert(op->session); - ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl; + ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first + << " (osd." << op->session->osd << ")" << dendl; found = true; if (op->is_watch && op->registered && !op->last_error) _send_linger_ping(op); @@ -1954,7 +2037,8 @@ void Objecter::tick() ++p) { CommandOp *op = p->second; assert(op->session); - ldout(cct, 10) << " pinging osd that serves command tid " << p->first << " (osd." << op->session->osd << ")" << dendl; + ldout(cct, 10) << " pinging osd that serves command tid " << p->first + << " (osd." << op->session->osd << ")" << dendl; found = true; } if (found) @@ -1987,17 +2071,23 @@ void Objecter::resend_mon_ops() ldout(cct, 10) << "resend_mon_ops" << dendl; - for (map::iterator p = poolstat_ops.begin(); p!=poolstat_ops.end(); ++p) { + for (map::iterator p = poolstat_ops.begin(); + p != poolstat_ops.end(); + ++p) { _poolstat_submit(p->second); logger->inc(l_osdc_poolstat_resend); } - for (map::iterator p = statfs_ops.begin(); p!=statfs_ops.end(); ++p) { + for (map::iterator p = statfs_ops.begin(); + p != statfs_ops.end(); + ++p) { _fs_stats_submit(p->second); logger->inc(l_osdc_statfs_resend); } - for (map::iterator p = pool_ops.begin(); p!=pool_ops.end(); ++p) { + for (map::iterator p = pool_ops.begin(); + p != pool_ops.end(); + ++p) { _pool_op_submit(p->second); logger->inc(l_osdc_poolop_resend); } @@ -2012,11 +2102,13 @@ void Objecter::resend_mon_ops() for (map::iterator p = check_latest_map_lingers.begin(); p != check_latest_map_lingers.end(); ++p) { - C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, p->second->linger_id); + C_Linger_Map_Latest *c + = new C_Linger_Map_Latest(this, p->second->linger_id); monc->get_version("osdmap", &c->latest, NULL, c); } - for (map::iterator p = check_latest_map_commands.begin(); + for (map::iterator p + = check_latest_map_commands.begin(); p != check_latest_map_commands.end(); ++p) { C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid); @@ -2033,7 +2125,8 @@ class C_CancelOp : public Context ceph_tid_t tid; Objecter *objecter; public: - C_CancelOp(ceph_tid_t tid, Objecter *objecter) : tid(tid), objecter(objecter) {} + C_CancelOp(ceph_tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} void finish(int r) { objecter->op_cancel(tid, -ETIMEDOUT); } @@ -2046,7 +2139,8 @@ ceph_tid_t Objecter::op_submit(Op *op, int *ctx_budget) return _op_submit_with_budget(op, lc, ctx_budget); } -ceph_tid_t Objecter::_op_submit_with_budget(Op *op, RWLock::Context& lc, int *ctx_budget) +ceph_tid_t Objecter::_op_submit_with_budget(Op *op, RWLock::Context& lc, + int *ctx_budget) { assert(initialized.read()); @@ -2095,7 +2189,8 @@ void Objecter::_send_op_account(Op *op) logger->inc(l_osdc_op_active); logger->inc(l_osdc_op); - if ((op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) + if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) == + (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) logger->inc(l_osdc_op_rmw); else if (op->target.flags & CEPH_OSD_FLAG_WRITE) logger->inc(l_osdc_op_w); @@ -2164,7 +2259,9 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) assert(op->session == NULL); OSDSession *s = NULL; - bool const check_for_latest_map = _calc_target(&op->target, &op->last_force_resend) == RECALC_OP_TARGET_POOL_DNE; + bool const check_for_latest_map = _calc_target(&op->target, + &op->last_force_resend) + == RECALC_OP_TARGET_POOL_DNE; // Try to get a session, including a retry if we need to take write lock int r = _get_session(op->target.osd, &s, lc); @@ -2184,11 +2281,11 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) _send_op_account(op); // send? - ldout(cct, 10) << "_op_submit oid '" << op->target.base_oid - << "' '" << op->target.base_oloc << "' '" << op->target.target_oloc - << "' " << op->ops << " tid " << op->tid - << " osd." << (!s->is_homeless() ? s->osd : -1) - << dendl; + ldout(cct, 10) << "_op_submit oid " << op->target.base_oid + << " '" << op->target.base_oloc << "' '" + << op->target.target_oloc << "' " << op->ops << " tid " + << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1) + << dendl; assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)); @@ -2209,10 +2306,10 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && !(op->target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE)) && - (_osdmap_full_flag() || + (_osdmap_full_flag() || _osdmap_pool_full(op->target.base_oloc.pool))) { - ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid.read() - << dendl; + ldout(cct, 0) << " FULL, paused modify " << op << " tid " + << last_tid.read() << dendl; op->target.paused = true; _maybe_request_map(); } else if (!s->is_homeless()) { @@ -2235,8 +2332,8 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) _send_op(op, m); } - // Last chance to touch Op here, after giving up session lock it can be - // freed at any time by response handler. + // Last chance to touch Op here, after giving up session lock it can + // be freed at any time by response handler. ceph_tid_t tid = op->tid; if (check_for_latest_map) { _send_op_map_check(op); @@ -2246,7 +2343,8 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) s->lock.unlock(); put_session(s); - ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl; + ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() + << " uncommitted" << dendl; return tid; } @@ -2259,7 +2357,8 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) map::iterator p = s->ops.find(tid); if (p == s->ops.end()) { - ldout(cct, 10) << __func__ << " tid " << tid << " dne in session " << s->osd << dendl; + ldout(cct, 10) << __func__ << " tid " << tid << " dne in session " + << s->osd << dendl; return -ENOENT; } @@ -2269,7 +2368,8 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) s->con->revoke_rx_buffer(tid); } - ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd << dendl; + ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd + << dendl; Op *op = p->second; if (op->onack) { op->onack->complete(r); @@ -2308,26 +2408,29 @@ int Objecter::_op_cancel(ceph_tid_t tid, int r) { int ret = 0; - ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r << dendl; + ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r + << dendl; start: - for (map::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { + for (map::iterator siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; s->lock.get_read(); if (s->ops.find(tid) != s->ops.end()) { s->lock.unlock(); ret = op_cancel(s, tid, r); if (ret == -ENOENT) { - /* oh no! raced, maybe tid moved to another session, restarting */ - goto start; + /* oh no! raced, maybe tid moved to another session, restarting */ + goto start; } return ret; } s->lock.unlock(); } - ldout(cct, 5) << __func__ << ": tid " << tid << " not found in live sessions" << dendl; + ldout(cct, 5) << __func__ << ": tid " << tid + << " not found in live sessions" << dendl; // Handle case where the op is in homeless session homeless_session->lock.get_read(); @@ -2344,7 +2447,8 @@ start: homeless_session->lock.unlock(); } - ldout(cct, 5) << __func__ << ": tid " << tid << " not found in homeless session" << dendl; + ldout(cct, 5) << __func__ << ": tid " << tid + << " not found in homeless session" << dendl; return ret; } @@ -2357,20 +2461,25 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool) std::vector to_cancel; bool found = false; - for (map::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { + for (map::iterator siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; s->lock.get_read(); - for (map::iterator op_i = s->ops.begin(); op_i != s->ops.end(); ++op_i) { + for (map::iterator op_i = s->ops.begin(); + op_i != s->ops.end(); ++op_i) { if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE - && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) { - to_cancel.push_back(op_i->first); + && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) { + to_cancel.push_back(op_i->first); } } s->lock.unlock(); - for (std::vector::iterator titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) { + for (std::vector::iterator titer = to_cancel.begin(); + titer != to_cancel.end(); + ++titer) { int cancel_result = op_cancel(s, *titer, r); - // We hold rwlock across search and cancellation, so cancels should always succeed + // We hold rwlock across search and cancellation, so cancels + // should always succeed assert(cancel_result == 0); } if (!found && to_cancel.size()) @@ -2412,12 +2521,11 @@ bool Objecter::target_should_be_paused(op_target_t *t) const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool); bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD); bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || - _osdmap_full_flag() || - _osdmap_pool_full(*pi); + _osdmap_full_flag() || _osdmap_pool_full(*pi); return (t->flags & CEPH_OSD_FLAG_READ && pauserd) || - (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) || - (osdmap->get_epoch() < epoch_barrier); + (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) || + (osdmap->get_epoch() < epoch_barrier); } /** @@ -2454,7 +2562,8 @@ bool Objecter::_osdmap_pool_full(const int64_t pool_id) const bool Objecter::_osdmap_has_pool_full() const { - for (map::const_iterator it = osdmap->get_pools().begin(); + for (map::const_iterator it + = osdmap->get_pools().begin(); it != osdmap->get_pools().end(); ++it) { if (_osdmap_pool_full(it->second)) return true; @@ -2478,12 +2587,14 @@ bool Objecter::_osdmap_full_flag() const void Objecter::update_pool_full_map(map& pool_full_map) { - for (map::const_iterator it = osdmap->get_pools().begin(); + for (map::const_iterator it + = osdmap->get_pools().begin(); it != osdmap->get_pools().end(); ++it) { if (pool_full_map.find(it->first) == pool_full_map.end()) { pool_full_map[it->first] = _osdmap_pool_full(it->second); } else { - pool_full_map[it->first] = _osdmap_pool_full(it->second) || pool_full_map[it->first]; + pool_full_map[it->first] = _osdmap_pool_full(it->second) || + pool_full_map[it->first]; } } } @@ -2506,7 +2617,8 @@ int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key, return p->raw_hash_to_pg(p->hash_key(key, ns)); } -int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend, bool any_change) +int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend, + bool any_change) { assert(rwlock.is_locked()); @@ -2536,7 +2648,7 @@ int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend, bool any t->target_oloc = t->base_oloc; need_check_tiering = true; } - + if (need_check_tiering && (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { if (is_read && pi->has_read_tier()) @@ -2574,23 +2686,23 @@ int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend, bool any bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE); unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask); if (any_change && pg_interval_t::is_new_interval( - t->acting_primary, - acting_primary, - t->acting, - acting, - t->up_primary, - up_primary, - t->up, - up, - t->size, - size, - t->min_size, - min_size, - t->pg_num, - pg_num, - t->sort_bitwise, - sort_bitwise, - pg_t(prev_seed, pgid.pool(), pgid.preferred()))) { + t->acting_primary, + acting_primary, + t->acting, + acting, + t->up_primary, + up_primary, + t->up, + up, + t->size, + size, + t->min_size, + min_size, + t->pg_num, + pg_num, + t->sort_bitwise, + sort_bitwise, + pg_t(prev_seed, pgid.pool(), pgid.preferred()))) { force_resend = true; } @@ -2630,7 +2742,8 @@ int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend, bool any if (p) t->used_replica = true; osd = acting[p]; - ldout(cct, 10) << " chose random osd." << osd << " of " << acting << dendl; + ldout(cct, 10) << " chose random osd." << osd << " of " << acting + << dendl; } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) && acting.size() > 1) { // look for a local replica. prefer the primary if the @@ -2721,7 +2834,8 @@ void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op) op->session = to; to->linger_ops[op->linger_id] = op; - ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id << dendl; + ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id + << dendl; } void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op) @@ -2737,7 +2851,8 @@ void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op) put_session(from); op->session = NULL; - ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id << dendl; + ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id + << dendl; } void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op) @@ -2773,24 +2888,27 @@ void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op) ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl; } -int Objecter::_recalc_linger_op_target(LingerOp *linger_op, RWLock::Context& lc) +int Objecter::_recalc_linger_op_target(LingerOp *linger_op, + RWLock::Context& lc) { assert(rwlock.is_wlocked()); - int r = _calc_target(&linger_op->target, &linger_op->last_force_resend, true); + int r = _calc_target(&linger_op->target, &linger_op->last_force_resend, + true); if (r == RECALC_OP_TARGET_NEED_RESEND) { ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id << " pgid " << linger_op->target.pgid << " acting " << linger_op->target.acting << dendl; - + OSDSession *s = NULL; r = _get_session(linger_op->target.osd, &s, lc); assert(r == 0); if (linger_op->session != s) { - // NB locking two sessions (s and linger_op->session) at the same time here - // is only safe because we are the only one that takes two, and we are - // holding rwlock for write. Disable lockdep because it doesn't know that. + // NB locking two sessions (s and linger_op->session) at the + // same time here is only safe because we are the only one that + // takes two, and we are holding rwlock for write. Disable + // lockdep because it doesn't know that. s->lock.get_write(false); _session_linger_op_remove(linger_op->session, linger_op); _session_linger_op_assign(s, linger_op); @@ -2850,7 +2968,7 @@ void Objecter::finish_op(OSDSession *session, ceph_tid_t tid) { ldout(cct, 15) << "finish_op " << tid << dendl; RWLock::RLocker rl(rwlock); - + RWLock::WLocker wl(session->lock); map::iterator iter = session->ops.find(tid); @@ -2879,7 +2997,7 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op) op->target.paused = false; op->stamp = ceph_clock_now(cct); - MOSDOp *m = new MOSDOp(client_inc.read(), op->tid, + MOSDOp *m = new MOSDOp(client_inc.read(), op->tid, op->target.target_oid, op->target.target_oloc, op->target.pgid, osdmap->get_epoch(), @@ -2921,20 +3039,23 @@ void Objecter::_send_op(Op *op, MOSDOp *m) m = _prepare_osd_op(op); } - ldout(cct, 15) << "_send_op " << op->tid << " to osd." << op->session->osd << dendl; + ldout(cct, 15) << "_send_op " << op->tid << " to osd." << op->session->osd + << dendl; ConnectionRef con = op->session->con; assert(con); // preallocated rx buffer? if (op->con) { - ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl; + ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on " + << op->con << dendl; op->con->revoke_rx_buffer(op->tid); } if (op->outbl && op->ontimeout == NULL && // only post rx_buffer if no timeout; see #9582 op->outbl->length()) { - ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con << dendl; + ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con + << dendl; op->con = con; op->con->post_rx_buffer(op->tid, *op->outbl); } @@ -2956,10 +3077,10 @@ int Objecter::calc_op_budget(Op *op) op_budget += i->indata.length(); } else if (ceph_osd_op_mode_read(i->op.op)) { if (ceph_osd_op_type_data(i->op.op)) { - if ((int64_t)i->op.extent.length > 0) + if ((int64_t)i->op.extent.length > 0) op_budget += (int64_t)i->op.extent.length; } else if (ceph_osd_op_type_attr(i->op.op)) { - op_budget += i->op.xattr.name_len + i->op.xattr.value_len; + op_budget += i->op.xattr.name_len + i->op.xattr.value_len; } } } @@ -3032,8 +3153,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) map::iterator iter = s->ops.find(tid); if (iter == s->ops.end()) { ldout(cct, 7) << "handle_osd_op_reply " << tid - << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack")) - << " ... stray" << dendl; + << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ? + " onnvram" : " ack")) + << " ... stray" << dendl; s->lock.unlock(); put_session(s); m->put(); @@ -3041,8 +3163,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } ldout(cct, 7) << "handle_osd_op_reply " << tid - << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack")) - << " v " << m->get_replay_version() << " uv " << m->get_user_version() + << (m->is_ondisk() ? " ondisk" : + (m->is_onnvram() ? " onnvram" : " ack")) + << " v " << m->get_replay_version() << " uv " + << m->get_user_version() << " in " << m->get_pg() << " attempt " << m->get_retry_attempt() << dendl; @@ -3050,7 +3174,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) if (m->get_retry_attempt() >= 0) { if (m->get_retry_attempt() != (op->attempts - 1)) { - ldout(cct, 7) << " ignoring reply from attempt " << m->get_retry_attempt() + ldout(cct, 7) << " ignoring reply from attempt " + << m->get_retry_attempt() << " from " << m->get_source_inst() << "; last attempt " << (op->attempts - 1) << " sent to " << op->session->con->get_peer_addr() << dendl; @@ -3126,7 +3251,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) // per-op result demuxing vector out_ops; m->claim_ops(out_ops); - + if (out_ops.size() != op->ops.size()) ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops << " != request ops " << op->ops @@ -3183,7 +3308,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } /* get it before we call _finish_op() */ - Mutex *completion_lock = (op->target.base_oid.name.size() ? s->get_lock(op->target.base_oid) : NULL); + Mutex *completion_lock = (op->target.base_oid.name.size() ? + s->get_lock(op->target.base_oid) : NULL); // done with this tid? if (!op->onack && !op->oncommit && !op->oncommit_sync) { @@ -3191,7 +3317,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) _finish_op(op, 0); } - ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl; + ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() + << " uncommitted" << dendl; // serialize completions if (completion_lock) { @@ -3250,7 +3377,8 @@ void Objecter::list_nobjects(NListContext *list_context, Context *onfinish) list_context->at_end_of_pool = true; ldout(cct, 20) << " no more pgs; reached end of pool" << dendl; } else { - ldout(cct, 20) << " move to next pg " << list_context->current_pg << dendl; + ldout(cct, 20) << " move to next pg " << list_context->current_pg + << dendl; } } if (list_context->at_end_of_pool) { @@ -3272,8 +3400,10 @@ void Objecter::list_nobjects(NListContext *list_context, Context *onfinish) list_context->sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE); ldout(cct, 20) << pg_num << " placement groups" << dendl; } - if (list_context->sort_bitwise != osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { - ldout(cct, 10) << " hobject sort order changed, restarting this pg" << dendl; + if (list_context->sort_bitwise != + osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { + ldout(cct, 10) << " hobject sort order changed, restarting this pg" + << dendl; list_context->cookie = collection_list_handle_t(); list_context->sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE); } @@ -3288,14 +3418,15 @@ void Objecter::list_nobjects(NListContext *list_context, Context *onfinish) assert(list_context->current_pg <= pg_num); ObjectOperation op; - op.pg_nls(list_context->max_entries, list_context->filter, list_context->cookie, - list_context->current_pg_epoch); + op.pg_nls(list_context->max_entries, list_context->filter, + list_context->cookie, list_context->current_pg_epoch); list_context->bl.clear(); C_NList *onack = new C_NList(list_context, onfinish, this); object_locator_t oloc(list_context->pool_id, list_context->nspace); pg_read(list_context->current_pg, oloc, op, - &list_context->bl, 0, onack, &onack->epoch, &list_context->ctx_budget); + &list_context->bl, 0, onack, &onack->epoch, + &list_context->ctx_budget); } void Objecter::_nlist_reply(NListContext *list_context, int r, @@ -3355,7 +3486,8 @@ void Objecter::_nlist_reply(NListContext *list_context, int r, void Objecter::put_nlist_context_budget(NListContext *list_context) { if (list_context->ctx_budget >= 0) { - ldout(cct, 10) << " release listing context's budget " << list_context->ctx_budget << dendl; + ldout(cct, 10) << " release listing context's budget " << + list_context->ctx_budget << dendl; put_op_budget_bytes(list_context->ctx_budget); list_context->ctx_budget = -1; } @@ -3396,7 +3528,8 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) list_context->at_end_of_pool = true; ldout(cct, 20) << " no more pgs; reached end of pool" << dendl; } else { - ldout(cct, 20) << " move to next pg " << list_context->current_pg << dendl; + ldout(cct, 20) << " move to next pg " << list_context->current_pg + << dendl; } } if (list_context->at_end_of_pool) { @@ -3418,8 +3551,10 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) list_context->sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE); ldout(cct, 20) << pg_num << " placement groups" << dendl; } - if (list_context->sort_bitwise != osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { - ldout(cct, 10) << " hobject sort order changed, restarting this pg" << dendl; + if (list_context->sort_bitwise != + osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { + ldout(cct, 10) << " hobject sort order changed, restarting this pg" + << dendl; list_context->cookie = collection_list_handle_t(); list_context->sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE); } @@ -3434,14 +3569,14 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) assert(list_context->current_pg <= pg_num); ObjectOperation op; - op.pg_ls(list_context->max_entries, list_context->filter, list_context->cookie, - list_context->current_pg_epoch); + op.pg_ls(list_context->max_entries, list_context->filter, + list_context->cookie, list_context->current_pg_epoch); list_context->bl.clear(); C_List *onack = new C_List(list_context, onfinish, this); object_locator_t oloc(list_context->pool_id, list_context->nspace); - pg_read(list_context->current_pg, oloc, op, - &list_context->bl, 0, onack, &onack->epoch, &list_context->ctx_budget); + pg_read(list_context->current_pg, oloc, op, &list_context->bl, 0, onack, + &onack->epoch, &list_context->ctx_budget); } void Objecter::_list_reply(ListContext *list_context, int r, @@ -3501,19 +3636,22 @@ void Objecter::_list_reply(ListContext *list_context, int r, void Objecter::put_list_context_budget(ListContext *list_context) { if (list_context->ctx_budget >= 0) { - ldout(cct, 10) << " release listing context's budget " << list_context->ctx_budget << dendl; + ldout(cct, 10) << " release listing context's budget " + << list_context->ctx_budget << dendl; put_op_budget_bytes(list_context->ctx_budget); list_context->ctx_budget = -1; } } -//snapshots +// snapshots -int Objecter::create_pool_snap(int64_t pool, string& snap_name, Context *onfinish) +int Objecter::create_pool_snap(int64_t pool, string& snap_name, + Context *onfinish) { RWLock::WLocker wl(rwlock); - ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: " << snap_name << dendl; + ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: " + << snap_name << dendl; const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) @@ -3569,10 +3707,12 @@ int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, return 0; } -int Objecter::delete_pool_snap(int64_t pool, string& snap_name, Context *onfinish) +int Objecter::delete_pool_snap(int64_t pool, string& snap_name, + Context *onfinish) { RWLock::WLocker wl(rwlock); - ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: " << snap_name << dendl; + ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: " + << snap_name << dendl; const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) @@ -3589,9 +3729,9 @@ int Objecter::delete_pool_snap(int64_t pool, string& snap_name, Context *onfinis op->onfinish = onfinish; op->pool_op = POOL_OP_DELETE_SNAP; pool_ops[op->tid] = op; - + pool_op_submit(op); - + return 0; } @@ -3599,8 +3739,8 @@ int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish) { RWLock::WLocker wl(rwlock); - ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: " - << snap << dendl; + ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: " + << snap << dendl; PoolOp *op = new PoolOp; if (!op) return -ENOMEM; op->tid = last_tid.inc(); @@ -3709,8 +3849,8 @@ class C_CancelPoolOp : public Context ceph_tid_t tid; Objecter *objecter; public: - C_CancelPoolOp(ceph_tid_t tid, Objecter *objecter) : tid(tid), - objecter(objecter) {} + C_CancelPoolOp(ceph_tid_t tid, Objecter *objecter) + : tid(tid), objecter(objecter) {} void finish(int r) { objecter->pool_op_cancel(tid, -ETIMEDOUT); } @@ -3764,7 +3904,8 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) map::iterator iter = pool_ops.find(tid); if (iter != pool_ops.end()) { PoolOp *op = iter->second; - ldout(cct, 10) << "have request " << tid << " at " << op << " Op: " << ceph_pool_op_name(op->pool_op) << dendl; + ldout(cct, 10) << "have request " << tid << " at " << op << " Op: " + << ceph_pool_op_name(op->pool_op) << dendl; if (op->blp) op->blp->claim(m->response_data); if (m->version > last_seen_osdmap_version) @@ -3776,16 +3917,17 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) // (for promotion) above. iter = pool_ops.find(tid); if (iter == pool_ops.end()) - goto done; // op is gone. + goto done; // op is gone. if (osdmap->get_epoch() < m->epoch) { - ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch << " before calling back" << dendl; - _wait_for_new_map(op->onfinish, m->epoch, m->replyCode); + ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch + << " before calling back" << dendl; + _wait_for_new_map(op->onfinish, m->epoch, m->replyCode); } else { // map epoch changed, probably because a MOSDMap message // sneaked in. Do caller-specified callback now or else // we lose it forever. assert(op->onfinish); - op->onfinish->complete(m->replyCode); + op->onfinish->complete(m->replyCode); } } else { @@ -3855,15 +3997,16 @@ class C_CancelPoolStatOp : public Context ceph_tid_t tid; Objecter *objecter; public: - C_CancelPoolStatOp(ceph_tid_t tid, Objecter *objecter) : tid(tid), - objecter(objecter) {} + C_CancelPoolStatOp(ceph_tid_t tid, Objecter *objecter) + : tid(tid), objecter(objecter) {} void finish(int r) { // note that objecter lock == timer lock, and is already held objecter->pool_stat_op_cancel(tid, -ETIMEDOUT); } }; -void Objecter::get_pool_stats(list& pools, map *result, +void Objecter::get_pool_stats(list& pools, + map *result, Context *onfinish) { ldout(cct, 10) << "get_pool_stats " << pools << dendl; @@ -3892,7 +4035,9 @@ void Objecter::get_pool_stats(list& pools, map *resu void Objecter::_poolstat_submit(PoolStatOp *op) { ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl; - monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid, op->pools, last_seen_pgmap_version)); + monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid, + op->pools, + last_seen_pgmap_version)); op->last_submit = ceph_clock_now(cct); logger->inc(l_osdc_poolstat_send); @@ -3921,7 +4066,7 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) _finish_pool_stat_op(op, 0); } else { ldout(cct, 10) << "unknown request " << tid << dendl; - } + } ldout(cct, 10) << "done" << dendl; m->put(); } @@ -3967,8 +4112,8 @@ class C_CancelStatfsOp : public Context ceph_tid_t tid; Objecter *objecter; public: - C_CancelStatfsOp(ceph_tid_t tid, Objecter *objecter) : tid(tid), - objecter(objecter) {} + C_CancelStatfsOp(ceph_tid_t tid, Objecter *objecter) + : tid(tid), objecter(objecter) {} void finish(int r) { objecter->statfs_op_cancel(tid, -ETIMEDOUT); } @@ -4001,7 +4146,8 @@ void Objecter::_fs_stats_submit(StatfsOp *op) assert(rwlock.is_wlocked()); ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl; - monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid, last_seen_pgmap_version)); + monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid, + last_seen_pgmap_version)); op->last_submit = ceph_clock_now(cct); logger->inc(l_osdc_statfs_send); @@ -4071,7 +4217,8 @@ void Objecter::_finish_statfs_op(StatfsOp *op, int r) // scatter/gather -void Objecter::_sg_read_finish(vector& extents, vector& resultbl, +void Objecter::_sg_read_finish(vector& extents, + vector& resultbl, bufferlist *bl, Context *onfinish) { // all done @@ -4129,19 +4276,20 @@ bool Objecter::ms_handle_reset(Connection *con) map::iterator p = osd_sessions.find(osd); if (p != osd_sessions.end()) { OSDSession *session = p->second; - map lresend; - session->lock.get_write(); + map lresend; + session->lock.get_write(); _reopen_session(session); _kick_requests(session, lresend); - session->lock.unlock(); - _linger_ops_resend(lresend); - rwlock.unlock(); + session->lock.unlock(); + _linger_ops_resend(lresend); + rwlock.unlock(); maybe_request_map(); } else { - rwlock.unlock(); + rwlock.unlock(); } } else { - ldout(cct, 10) << "ms_handle_reset on unknown osd addr " << con->get_peer_addr() << dendl; + ldout(cct, 10) << "ms_handle_reset on unknown osd addr " + << con->get_peer_addr() << dendl; } return true; } @@ -4184,7 +4332,9 @@ void Objecter::op_target_t::dump(Formatter *f) const void Objecter::_dump_active(OSDSession *s) { - for (map::iterator p = s->ops.begin(); p != s->ops.end(); ++p) { + for (map::iterator p = s->ops.begin(); + p != s->ops.end(); + ++p) { Op *op = p->second; ldout(cct, 20) << op->tid << "\t" << op->target.pgid << "\tosd." << (op->session ? op->session->osd : -1) @@ -4195,8 +4345,10 @@ void Objecter::_dump_active(OSDSession *s) void Objecter::_dump_active() { - ldout(cct, 20) << "dump_active .. " << num_homeless_ops.read() << " homeless" << dendl; - for (map::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { + ldout(cct, 20) << "dump_active .. " << num_homeless_ops.read() << " homeless" + << dendl; + for (map::iterator siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; s->lock.get_read(); _dump_active(s); @@ -4255,7 +4407,8 @@ void Objecter::dump_ops(Formatter *fmt) { fmt->open_array_section("ops"); rwlock.get_read(); - for (map::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { + for (map::const_iterator siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; s->lock.get_read(); _dump_ops(s, fmt); @@ -4285,7 +4438,8 @@ void Objecter::dump_linger_ops(Formatter *fmt) { fmt->open_array_section("linger_ops"); rwlock.get_read(); - for (map::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { + for (map::const_iterator siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; s->lock.get_read(); _dump_linger_ops(s, fmt); @@ -4306,7 +4460,8 @@ void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt) fmt->dump_unsigned("command_id", op->tid); fmt->dump_int("osd", op->session ? op->session->osd : -1); fmt->open_array_section("command"); - for (vector::const_iterator q = op->cmd.begin(); q != op->cmd.end(); ++q) + for (vector::const_iterator q = op->cmd.begin(); + q != op->cmd.end(); ++q) fmt->dump_string("word", *q); fmt->close_section(); if (op->target_osd >= 0) @@ -4321,7 +4476,8 @@ void Objecter::dump_command_ops(Formatter *fmt) { fmt->open_array_section("command_ops"); rwlock.get_read(); - for (map::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { + for (map::const_iterator siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { OSDSession *s = siter->second; s->lock.get_read(); _dump_command_ops(s, fmt); @@ -4415,7 +4571,7 @@ void Objecter::blacklist_self(bool set) vector cmd; cmd.push_back("{\"prefix\":\"osd blacklist\", "); if (set) - cmd.push_back("\"blacklistop\":\"add\","); + cmd.push_back("\"blacklistop\":\"add\","); else cmd.push_back("\"blacklistop\":\"rm\","); stringstream ss; @@ -4442,7 +4598,8 @@ void Objecter::handle_command_reply(MCommandReply *m) map::iterator siter = osd_sessions.find(osd_num); if (siter == osd_sessions.end()) { - ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " osd not found" << dendl; + ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() + << " osd not found" << dendl; m->put(); return; } @@ -4452,7 +4609,8 @@ void Objecter::handle_command_reply(MCommandReply *m) s->lock.get_read(); map::iterator p = s->command_ops.find(m->get_tid()); if (p == s->command_ops.end()) { - ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " not found" << dendl; + ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() + << " not found" << dendl; m->put(); s->lock.unlock(); return; @@ -4461,8 +4619,10 @@ void Objecter::handle_command_reply(MCommandReply *m) CommandOp *c = p->second; if (!c->session || m->get_connection() != c->session->con) { - ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " got reply from wrong connection " - << m->get_connection() << " " << m->get_source_inst() << dendl; + ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() + << " got reply from wrong connection " + << m->get_connection() << " " << m->get_source_inst() + << dendl; m->put(); s->lock.unlock(); return; @@ -4483,8 +4643,9 @@ class C_CancelCommandOp : public Context ceph_tid_t tid; Objecter *objecter; public: - C_CancelCommandOp(Objecter::OSDSession *s, ceph_tid_t tid, Objecter *objecter) : s(s), tid(tid), - objecter(objecter) {} + C_CancelCommandOp(Objecter::OSDSession *s, ceph_tid_t tid, + Objecter *objecter) + : s(s), tid(tid), objecter(objecter) {} void finish(int r) { objecter->command_op_cancel(s, tid, -ETIMEDOUT); } @@ -4568,7 +4729,8 @@ int Objecter::_calc_command_target(CommandOp *c) put_session(s); - ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " << c->session << dendl; + ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " + << c->session << dendl; return RECALC_OP_TARGET_NO_ACTION; } @@ -4635,7 +4797,8 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs) { assert(rwlock.is_wlocked()); - ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " << rs << dendl; + ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " + << rs << dendl; if (c->prs) *c->prs = rs; if (c->onfinish) @@ -4704,8 +4867,9 @@ void Objecter::set_epoch_barrier(epoch_t epoch) { RWLock::WLocker wl(rwlock); - ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was " << epoch_barrier - << ") current epoch " << osdmap->get_epoch() << dendl; + ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was " + << epoch_barrier << ") current epoch " << osdmap->get_epoch() + << dendl; if (epoch >= epoch_barrier) { epoch_barrier = epoch; _maybe_request_map(); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index f9de0442ab090..ffd3ee22d84d6 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -15,21 +15,23 @@ #ifndef CEPH_OBJECTER_H #define CEPH_OBJECTER_H +#include +#include +#include +#include + #include "include/types.h" #include "include/buffer.h" - -#include "osd/OSDMap.h" -#include "messages/MOSDOp.h" +#include "include/rados/rados_types.hpp" #include "common/admin_socket.h" -#include "common/Timer.h" #include "common/RWLock.h" -#include "include/rados/rados_types.hpp" +#include "common/Timer.h" + +#include "messages/MOSDOp.h" +#include "osd/OSDMap.h" + -#include -#include -#include -#include using namespace std; class Context; @@ -94,8 +96,8 @@ struct ObjectOperation { } virtual ~C_TwoContexts() { - delete first; - delete second; + delete first; + delete second; } }; @@ -132,7 +134,9 @@ struct ObjectOperation { osd_op.op.extent.length = len; osd_op.indata.claim_append(bl); } - void add_clone_range(int op, uint64_t off, uint64_t len, const object_t& srcoid, uint64_t srcoff, snapid_t srcsnapid) { + void add_clone_range(int op, uint64_t off, uint64_t len, + const object_t& srcoid, uint64_t srcoff, + snapid_t srcsnapid) { OSDOp& osd_op = add_op(op); osd_op.op.clonerange.offset = off; osd_op.op.clonerange.length = len; @@ -147,7 +151,8 @@ struct ObjectOperation { osd_op.indata.append(name); osd_op.indata.append(data); } - void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& data) { + void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, + uint8_t cmp_mode, const bufferlist& data) { OSDOp& osd_op = add_op(op); osd_op.op.xattr.name_len = (name ? strlen(name) : 0); osd_op.op.xattr.value_len = data.length(); @@ -157,8 +162,9 @@ struct ObjectOperation { osd_op.indata.append(name); osd_op.indata.append(data); } - void add_call(int op, const char *cname, const char *method, bufferlist &indata, - bufferlist *outbl, Context *ctx, int *prval) { + void add_call(int op, const char *cname, const char *method, + bufferlist &indata, + bufferlist *outbl, Context *ctx, int *prval) { OSDOp& osd_op = add_op(op); unsigned p = ops.size() - 1; @@ -173,13 +179,15 @@ struct ObjectOperation { osd_op.indata.append(method, osd_op.op.cls.method_len); osd_op.indata.append(indata); } - void add_pgls(int op, uint64_t count, collection_list_handle_t cookie, epoch_t start_epoch) { + void add_pgls(int op, uint64_t count, collection_list_handle_t cookie, + epoch_t start_epoch) { OSDOp& osd_op = add_op(op); osd_op.op.pgls.count = count; osd_op.op.pgls.start_epoch = start_epoch; ::encode(cookie, osd_op.indata); } - void add_pgls_filter(int op, uint64_t count, bufferlist& filter, collection_list_handle_t cookie, epoch_t start_epoch) { + void add_pgls_filter(int op, uint64_t count, bufferlist& filter, + collection_list_handle_t cookie, epoch_t start_epoch) { OSDOp& osd_op = add_op(op); osd_op.op.pgls.count = count; osd_op.op.pgls.start_epoch = start_epoch; @@ -191,7 +199,7 @@ struct ObjectOperation { ::encode(cookie, osd_op.indata); } void add_alloc_hint(int op, uint64_t expected_object_size, - uint64_t expected_write_size) { + uint64_t expected_write_size) { OSDOp& osd_op = add_op(op); osd_op.op.alloc_hint.expected_object_size = expected_object_size; osd_op.op.alloc_hint.expected_write_size = expected_write_size; @@ -200,19 +208,23 @@ struct ObjectOperation { // ------ // pg - void pg_ls(uint64_t count, bufferlist& filter, collection_list_handle_t cookie, epoch_t start_epoch) { + void pg_ls(uint64_t count, bufferlist& filter, + collection_list_handle_t cookie, epoch_t start_epoch) { if (filter.length() == 0) add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch); else - add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie, start_epoch); + add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie, + start_epoch); flags |= CEPH_OSD_FLAG_PGOP; } - void pg_nls(uint64_t count, bufferlist& filter, collection_list_handle_t cookie, epoch_t start_epoch) { + void pg_nls(uint64_t count, bufferlist& filter, + collection_list_handle_t cookie, epoch_t start_epoch) { if (filter.length() == 0) add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch); else - add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie, start_epoch); + add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie, + start_epoch); flags |= CEPH_OSD_FLAG_PGOP; } @@ -314,8 +326,8 @@ struct ObjectOperation { out_rval[p] = prval; } void write(uint64_t off, bufferlist& bl, - uint64_t truncate_size, - uint32_t truncate_seq) { + uint64_t truncate_size, + uint32_t truncate_seq) { add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl); OSDOp& o = *ops.rbegin(); o.op.extent.truncate_size = truncate_size; @@ -351,8 +363,10 @@ struct ObjectOperation { add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl); } - void clone_range(const object_t& src_oid, uint64_t src_offset, uint64_t len, uint64_t dst_offset) { - add_clone_range(CEPH_OSD_OP_CLONERANGE, dst_offset, len, src_oid, src_offset, CEPH_NOSNAP); + void clone_range(const object_t& src_oid, uint64_t src_offset, uint64_t len, + uint64_t dst_offset) { + add_clone_range(CEPH_OSD_OP_CLONERANGE, dst_offset, len, src_oid, + src_offset, CEPH_NOSNAP); } // object attrs @@ -400,7 +414,7 @@ struct ObjectOperation { if (prval) *prval = -EIO; } - } + } } }; struct C_ObjectOperation_decodewatchers : public Context { @@ -413,27 +427,27 @@ struct ObjectOperation { if (r >= 0) { bufferlist::iterator p = bl.begin(); try { - obj_list_watch_response_t resp; + obj_list_watch_response_t resp; ::decode(resp, p); if (pwatchers) { - for (list::iterator i = resp.entries.begin() ; - i != resp.entries.end() ; ++i) { - obj_watch_t ow; + for (list::iterator i = resp.entries.begin() ; + i != resp.entries.end() ; ++i) { + obj_watch_t ow; ostringstream sa; sa << i->addr; strncpy(ow.addr, sa.str().c_str(), 256); - ow.watcher_id = i->name.num(); - ow.cookie = i->cookie; - ow.timeout_seconds = i->timeout_seconds; - pwatchers->push_back(ow); - } - } + ow.watcher_id = i->name.num(); + ow.cookie = i->cookie; + ow.timeout_seconds = i->timeout_seconds; + pwatchers->push_back(ow); + } + } } catch (buffer::error& e) { if (prval) *prval = -EIO; } - } + } } }; struct C_ObjectOperation_decodesnaps : public Context { @@ -446,27 +460,27 @@ struct ObjectOperation { if (r >= 0) { bufferlist::iterator p = bl.begin(); try { - obj_list_snap_response_t resp; + obj_list_snap_response_t resp; ::decode(resp, p); if (psnaps) { - psnaps->clones.clear(); - for (vector::iterator ci = resp.clones.begin(); - ci != resp.clones.end(); + psnaps->clones.clear(); + for (vector::iterator ci = resp.clones.begin(); + ci != resp.clones.end(); ++ci) { - librados::clone_info_t clone; + librados::clone_info_t clone; - clone.cloneid = ci->cloneid; - clone.snaps.reserve(ci->snaps.size()); - clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(), ci->snaps.end()); - clone.overlap = ci->overlap; - clone.size = ci->size; + clone.cloneid = ci->cloneid; + clone.snaps.reserve(ci->snaps.size()); + clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(), + ci->snaps.end()); + clone.overlap = ci->overlap; + clone.size = ci->size; - psnaps->clones.push_back(clone); - } + psnaps->clones.push_back(clone); + } psnaps->seq = resp.seq; - } - } - catch (buffer::error& e) { + } + } catch (buffer::error& e) { if (prval) *prval = -EIO; } @@ -477,7 +491,8 @@ struct ObjectOperation { add_op(CEPH_OSD_OP_GETXATTRS); if (pattrs || prval) { unsigned p = ops.size() - 1; - C_ObjectOperation_decodevals *h = new C_ObjectOperation_decodevals(pattrs, prval); + C_ObjectOperation_decodevals *h + = new C_ObjectOperation_decodevals(pattrs, prval); out_handler[p] = h; out_bl[p] = &h->bl; out_rval[p] = prval; @@ -491,7 +506,8 @@ struct ObjectOperation { bl.append(s); add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); } - void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& bl) { + void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, + const bufferlist& bl) { add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl); } void rmxattr(const char *name) { @@ -508,7 +524,7 @@ struct ObjectOperation { ::encode(attrs, bl); add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl); } - + // trivialmap void tmap_update(bufferlist& bl) { add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl); @@ -595,7 +611,7 @@ struct ObjectOperation { } } - void omap_cmp(const std::map > &assertions, + void omap_cmp(const std::map > &assertions, int *prval) { OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP); bufferlist bl; @@ -645,10 +661,10 @@ struct ObjectOperation { out_attrs(a), out_data(d), out_omap_header(oh), out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq), out_flags(flags), out_data_digest(dd), out_omap_digest(od), - out_reqids(oreqids), - out_truncate_seq(otseq), - out_truncate_size(otsize), - prval(r) {} + out_reqids(oreqids), + out_truncate_seq(otseq), + out_truncate_size(otsize), + prval(r) {} void finish(int r) { // reqids are copied on ENOENT if (r < 0 && r != -ENOENT) @@ -725,11 +741,11 @@ struct ObjectOperation { out_rval[p] = prval; C_ObjectOperation_copyget *h = new C_ObjectOperation_copyget(cursor, out_size, out_mtime, - out_attrs, out_data, out_omap_header, + out_attrs, out_data, out_omap_header, out_omap_data, out_snaps, out_snap_seq, - out_flags, out_data_digest, out_omap_digest, - out_reqids, truncate_seq, truncate_size, - prval); + out_flags, out_data_digest, + out_omap_digest, out_reqids, truncate_seq, + truncate_size, prval); out_bl[p] = &h->bl; out_handler[p] = h; } @@ -788,9 +804,13 @@ struct ObjectOperation { ::decode(ls, p); if (ptls) { ptls->clear(); - for (list< pair >::iterator p = ls.begin(); p != ls.end(); ++p) - // round initial timestamp up to the next full second to keep this a valid interval. - ptls->push_back(make_pair(p->first.usec() ? p->first.sec() + 1 : p->first.sec(), p->second.sec())); + for (list< pair >::iterator p = ls.begin(); + p != ls.end(); ++p) + // round initial timestamp up to the next full second to + // keep this a valid interval. + ptls->push_back(make_pair(p->first.usec() ? + p->first.sec() + 1 : p->first.sec(), + p->second.sec())); } if (putls) putls->swap(ls); @@ -805,8 +825,9 @@ struct ObjectOperation { /** * list available HitSets. * - * We will get back a list of time intervals. Note that the most recent range may have - * an empty end timestamp if it is still accumulating. + * We will get back a list of time intervals. Note that the most + * recent range may have an empty end timestamp if it is still + * accumulating. * * @param pls [out] list of time intervals * @param prval [out] return value @@ -881,8 +902,8 @@ struct ObjectOperation { add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL); } - void call(const char *cname, const char *method, bufferlist &indata, bufferlist *outdata, - Context *ctx, int *prval) { + void call(const char *cname, const char *method, bufferlist &indata, + bufferlist *outdata, Context *ctx, int *prval) { add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval); } @@ -942,7 +963,8 @@ struct ObjectOperation { OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER); osd_op.op.assert_ver.ver = ver; } - void assert_src_version(const object_t& srcoid, snapid_t srcsnapid, uint64_t ver) { + void assert_src_version(const object_t& srcoid, snapid_t srcsnapid, + uint64_t ver) { OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_SRC_VERSION); osd_op.op.assert_ver.ver = ver; ops.rbegin()->soid = sobject_t(srcoid, srcsnapid); @@ -1024,9 +1046,9 @@ struct ObjectOperation { } void set_alloc_hint(uint64_t expected_object_size, - uint64_t expected_write_size ) { + uint64_t expected_write_size ) { add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size, - expected_write_size); + expected_write_size); // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed // not worth a feature bit. Set FAILOK per-op flag to make @@ -1106,7 +1128,7 @@ private: SafeTimer timer; PerfCounters *logger; - + class C_Tick : public Context { Objecter *ob; public: @@ -1142,19 +1164,20 @@ public: object_t target_oid; object_locator_t target_oloc; - bool precalc_pgid; ///< true if we are directed at base_pgid, not base_oid - pg_t base_pgid; ///< explciti pg target, if any + bool precalc_pgid; ///< true if we are directed at base_pgid, not base_oid + pg_t base_pgid; ///< explciti pg target, if any - pg_t pgid; ///< last pg we mapped to - unsigned pg_num; ///< last pg_num we mapped to + pg_t pgid; ///< last pg we mapped to + unsigned pg_num; ///< last pg_num we mapped to unsigned pg_num_mask; ///< last pg_num_mask we mapped to - vector up; ///< set of up osds for last pg we mapped to - vector acting; ///< set of acting osds for last pg we mapped to - int up_primary; ///< primary for last pg we mapped to based on the up set - int acting_primary; ///< primary for last pg we mapped to based on the acting set - int size; ///< the size of the pool when were were last mapped - int min_size; ///< the min size of the pool when were were last mapped - bool sort_bitwise; ///< whether the hobject_t sort order is bitwise + vector up; ///< set of up osds for last pg we mapped to + vector acting; ///< set of acting osds for last pg we mapped to + int up_primary; ///< primary for last pg we mapped to based on the up set + int acting_primary; ///< primary for last pg we mapped to based on the + /// acting set + int size; ///< the size of the pool when were were last mapped + int min_size; ///< the min size of the pool when were were last mapped + bool sort_bitwise; ///< whether the hobject_t sort order is bitwise bool used_replica; bool paused; @@ -1167,7 +1190,7 @@ public: base_oloc(oloc), precalc_pgid(false), pg_num(0), - pg_num_mask(0), + pg_num_mask(0), up_primary(-1), acting_primary(-1), size(-1), @@ -1203,10 +1226,10 @@ public: int priority; Context *onack, *oncommit, *ontimeout; - Context *oncommit_sync; // used internally by watch/notify + Context *oncommit_sync; // used internally by watch/notify ceph_tid_t tid; - eversion_t replay_version; // for op replay + eversion_t replay_version; // for op replay int attempts; version_t *objver; @@ -1221,9 +1244,10 @@ public: /// true if we should resend this message on failure bool should_resend; - /// true if the throttle budget is get/put on a series of OPs, instead of - /// per OP basis, when this flag is set, the budget is acquired before sending - /// the very first OP of the series and released upon receiving the last OP reply. + /// true if the throttle budget is get/put on a series of OPs, + /// instead of per OP basis, when this flag is set, the budget is + /// acquired before sending the very first OP of the series and + /// released upon receiving the last OP reply. bool ctx_budgeted; int *data_offset; @@ -1256,7 +1280,7 @@ public: data_offset(offset), last_force_resend(0) { ops.swap(op); - + /* initialize out_* to match op vector */ out_bl.resize(ops.size()); out_rval.resize(ops.size()); @@ -1288,7 +1312,8 @@ public: Objecter *objecter; ceph_tid_t tid; version_t latest; - C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), latest(0) {} + C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), + latest(0) {} void finish(int r); }; @@ -1296,7 +1321,8 @@ public: Objecter *objecter; uint64_t tid; version_t latest; - C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), latest(0) {} + C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), + latest(0) {} void finish(int r); }; @@ -1327,7 +1353,8 @@ public: bufferlist bl; map& attrset; Context *fin; - C_GetAttrs(map& set, Context *c) : attrset(set), fin(c) {} + C_GetAttrs(map& set, Context *c) : attrset(set), + fin(c) {} void finish(int r) { if (r >= 0) { bufferlist::iterator p = bl.begin(); @@ -1338,7 +1365,7 @@ public: }; - // Pools and statistics + // Pools and statistics struct NListContext { int current_pg; collection_list_handle_t cookie; @@ -1366,19 +1393,21 @@ public: // the last op reply. int ctx_budget; - NListContext() : current_pg(0), current_pg_epoch(0), starting_pg_num(0), - at_end_of_pool(false), - at_end_of_pg(false), - sort_bitwise(false), - pool_id(0), - pool_snap_seq(0), - max_entries(0), - nspace(), - bl(), - list(), - filter(), - extra_info(), - ctx_budget(-1) {} + NListContext() : current_pg(0), + current_pg_epoch(0), + starting_pg_num(0), + at_end_of_pool(false), + at_end_of_pg(false), + sort_bitwise(false), + pool_id(0), + pool_snap_seq(0), + max_entries(0), + nspace(), + bl(), + list(), + filter(), + extra_info(), + ctx_budget(-1) {} bool at_end() const { return at_end_of_pool; @@ -1398,9 +1427,9 @@ public: list_context(lc), final_finish(finish), objecter(ob), epoch(0) {} void finish(int r) { if (r >= 0) { - objecter->_nlist_reply(list_context, r, final_finish, epoch); + objecter->_nlist_reply(list_context, r, final_finish, epoch); } else { - final_finish->complete(r); + final_finish->complete(r); } } }; @@ -1439,13 +1468,13 @@ public: sort_bitwise(false), pool_id(0), pool_snap_seq(0), - max_entries(0), - nspace(), - bl(), - list(), - filter(), - extra_info(), - ctx_budget(-1) {} + max_entries(0), + nspace(), + bl(), + list(), + filter(), + extra_info(), + ctx_budget(-1) {} bool at_end() const { return at_end_of_pool; @@ -1465,13 +1494,13 @@ public: list_context(lc), final_finish(finish), objecter(ob), epoch(0) {} void finish(int r) { if (r >= 0) { - objecter->_list_reply(list_context, r, final_finish, epoch); + objecter->_list_reply(list_context, r, final_finish, epoch); } else { - final_finish->complete(r); + final_finish->complete(r); } } }; - + struct PoolStatOp { ceph_tid_t tid; list pools; @@ -1518,7 +1547,7 @@ public: pg_t target_pg; int osd; /* calculated osd for sending request */ epoch_t map_dne_bound; - int map_check_error; // error to return if map check fails + int map_check_error; // error to return if map check fails const char *map_check_error_str; Context *onfinish, *ontimeout; utime_t last_submit; @@ -1608,10 +1637,8 @@ public: LingerOp() : linger_id(0), target(object_t(), object_locator_t(), 0), - snap(CEPH_NOSNAP), - poutbl(NULL), pobjver(NULL), - is_watch(false), - last_error(0), + snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL), + is_watch(false), last_error(0), watch_lock("Objecter::LingerOp::watch_lock"), register_gen(0), registered(false), @@ -1702,9 +1729,9 @@ public: Mutex **completion_locks; // pending ops - map ops; - map linger_ops; - map command_ops; + map ops; + map linger_ops; + map command_ops; int osd; int incarnation; @@ -1715,12 +1742,11 @@ public: lock("OSDSession"), osd(o), incarnation(0), - con(NULL) - { + con(NULL) { num_locks = cct->_conf->objecter_completion_locks_per_session; completion_locks = new Mutex *[num_locks]; for (int i = 0; i < num_locks; i++) { - completion_locks[i] = new Mutex("OSDSession::completion_lock"); + completion_locks[i] = new Mutex("OSDSession::completion_lock"); } } @@ -1747,25 +1773,25 @@ public: bool _osdmap_pool_full(const pg_pool_t &p) const; void update_pool_full_map(map& pool_full_map); - map linger_ops; + map linger_ops; // we use this just to confirm a cookie is valid before dereferencing the ptr - set linger_ops_set; + set linger_ops_set; int num_linger_callbacks; Mutex linger_callback_lock; Cond linger_callback_cond; - map poolstat_ops; - map statfs_ops; - map pool_ops; - atomic_t num_homeless_ops; + map poolstat_ops; + map statfs_ops; + map pool_ops; + atomic_t num_homeless_ops; OSDSession *homeless_session; // ops waiting for an osdmap with a new pool or confirmation that // the pool does not exist (may be expanded to other uses later) - map check_latest_map_lingers; - map check_latest_map_ops; - map check_latest_map_commands; + map check_latest_map_lingers; + map check_latest_map_ops; + map check_latest_map_commands; map > > waiting_for_map; @@ -1794,7 +1820,8 @@ public: bool _osdmap_has_pool_full() const; bool target_should_be_paused(op_target_t *op); - int _calc_target(op_target_t *t, epoch_t *last_force_resend=0, bool any_change=false); + int _calc_target(op_target_t *t, epoch_t *last_force_resend = 0, + bool any_change = false); int _map_session(op_target_t *op, OSDSession **s, RWLock::Context& lc); @@ -1805,7 +1832,9 @@ public: void _session_command_op_assign(OSDSession *to, CommandOp *op); void _session_command_op_remove(OSDSession *from, CommandOp *op); - int _assign_op_target_session(Op *op, RWLock::Context& lc, bool src_session_locked, bool dst_session_locked); + int _assign_op_target_session(Op *op, RWLock::Context& lc, + bool src_session_locked, + bool dst_session_locked); int _recalc_linger_op_target(LingerOp *op, RWLock::Context& lc); void _linger_submit(LingerOp *info); @@ -1813,7 +1842,8 @@ public: void _linger_commit(LingerOp *info, int r, bufferlist& outbl); void _linger_reconnect(LingerOp *info, int r); void _send_linger_ping(LingerOp *info); - void _linger_ping(LingerOp *info, int r, utime_t sent, uint32_t register_gen); + void _linger_ping(LingerOp *info, int r, utime_t sent, + uint32_t register_gen); int _normalize_watch_error(int r); void _linger_callback_queue() { @@ -1854,7 +1884,7 @@ private: void get_session(OSDSession *s); void _reopen_session(OSDSession *session); void close_session(OSDSession *session); - + void _nlist_reply(NListContext *list_context, int r, Context *final_finish, epoch_t reply_epoch); void _list_reply(ListContext *list_context, int r, Context *final_finish, @@ -1901,28 +1931,19 @@ private: Finisher *fin, double mon_timeout, double osd_timeout) : - Dispatcher(cct_), - messenger(m), monc(mc), finisher(fin), - osdmap(new OSDMap), - initialized(0), - last_tid(0), client_inc(-1), max_linger_id(0), - num_unacked(0), num_uncommitted(0), - global_op_flags(0), + Dispatcher(cct_), messenger(m), monc(mc), finisher(fin), + osdmap(new OSDMap), initialized(0), last_tid(0), client_inc(-1), + max_linger_id(0), num_unacked(0), num_uncommitted(0), global_op_flags(0), keep_balanced_budget(false), honor_osdmap_full(true), - last_seen_osdmap_version(0), - last_seen_pgmap_version(0), - rwlock("Objecter::rwlock"), - timer_lock("Objecter::timer_lock"), - timer(cct, timer_lock, false), - logger(NULL), tick_event(NULL), - m_request_state_hook(NULL), - num_linger_callbacks(0), + last_seen_osdmap_version(0), last_seen_pgmap_version(0), + rwlock("Objecter::rwlock"), timer_lock("Objecter::timer_lock"), + timer(cct, timer_lock, false), logger(NULL), tick_event(NULL), + m_request_state_hook(NULL), num_linger_callbacks(0), linger_callback_lock("Objecter::linger_callback_lock"), - num_homeless_ops(0), - homeless_session(new OSDSession(cct, -1)), - mon_timeout(mon_timeout), - osd_timeout(osd_timeout), - op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes), + num_homeless_ops(0), homeless_session(new OSDSession(cct, -1)), + mon_timeout(mon_timeout), osd_timeout(osd_timeout), + op_throttle_bytes(cct, "objecter_bytes", + cct->_conf->objecter_inflight_op_bytes), op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops), epoch_barrier(0) { } @@ -1954,15 +1975,17 @@ private: void unset_honor_osdmap_full() { honor_osdmap_full = false; } void _scan_requests(OSDSession *s, - bool force_resend, - bool cluster_full, - map *pool_full_map, - map& need_resend, - list& need_resend_linger, - map& need_resend_command); - - int64_t get_object_hash_position(int64_t pool, const string& key, const string& ns); - int64_t get_object_pg_hash_position(int64_t pool, const string& key, const string& ns); + bool force_resend, + bool cluster_full, + map *pool_full_map, + map& need_resend, + list& need_resend_linger, + map& need_resend_command); + + int64_t get_object_hash_position(int64_t pool, const string& key, + const string& ns); + int64_t get_object_pg_hash_position(int64_t pool, const string& key, + const string& ns); // messages public: @@ -1989,13 +2012,15 @@ private: void wait_for_osd_map(); int pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap); - int pool_snap_get_info(int64_t poolid, snapid_t snap, pool_snap_info_t *info); + int pool_snap_get_info(int64_t poolid, snapid_t snap, + pool_snap_info_t *info); int pool_snap_list(int64_t poolid, vector *snaps); private: // low-level ceph_tid_t _op_submit(Op *op, RWLock::Context& lc); - ceph_tid_t _op_submit_with_budget(Op *op, RWLock::Context& lc, int *ctx_budget = NULL); + ceph_tid_t _op_submit_with_budget(Op *op, RWLock::Context& lc, + int *ctx_budget = NULL); inline void unregister_op(Op *op); // public interface @@ -2003,7 +2028,8 @@ public: ceph_tid_t op_submit(Op *op, int *ctx_budget = NULL); bool is_active() { RWLock::RLocker l(rwlock); - return !((!inflight_ops.read()) && linger_ops.empty() && poolstat_ops.empty() && statfs_ops.empty()); + return !((!inflight_ops.read()) && linger_ops.empty() && + poolstat_ops.empty() && statfs_ops.empty()); } /** @@ -2037,9 +2063,14 @@ public: /** Get the current set of global op flags */ int get_global_op_flags() { return global_op_flags.read(); } /** Add a flag to the global op flags, not really atomic operation */ - void add_global_op_flags(int flag) { global_op_flags.set(global_op_flags.read() | flag); } - /** Clear the passed flags from the global op flag set, not really atomic operation */ - void clear_global_op_flag(int flags) { global_op_flags.set(global_op_flags.read() & ~flags); } + void add_global_op_flags(int flag) { + global_op_flags.set(global_op_flags.read() | flag); + } + /** Clear the passed flags from the global op flag set, not really + atomic operation */ + void clear_global_op_flag(int flags) { + global_op_flags.set(global_op_flags.read() & ~flags); + } /// cancel an in-progress request with the given return code private: @@ -2092,7 +2123,8 @@ public: const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit, version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t()) { - Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->priority = op.priority; o->mtime = mtime; o->snapc = snapc; @@ -2105,14 +2137,17 @@ public: const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit, version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t()) { - Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags, onack, oncommit, objver, reqid); + Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags, onack, + oncommit, objver, reqid); return op_submit(o); } Op *prepare_read_op(const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, snapid_t snapid, bufferlist *pbl, int flags, - Context *onack, version_t *objver = NULL, int *data_offset = NULL) { - Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, objver, data_offset); + Context *onack, version_t *objver = NULL, + int *data_offset = NULL) { + Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_READ, onack, NULL, objver, data_offset); o->priority = op.priority; o->snapid = snapid; o->outbl = pbl; @@ -2126,19 +2161,19 @@ public: ceph_tid_t read(const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, snapid_t snapid, bufferlist *pbl, int flags, - Context *onack, version_t *objver = NULL, int *data_offset = NULL, + Context *onack, version_t *objver = NULL, + int *data_offset = NULL, uint64_t features = 0) { - Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver, data_offset); + Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver, + data_offset); if (features) o->features = features; return op_submit(o); } ceph_tid_t pg_read(uint32_t hash, object_locator_t oloc, - ObjectOperation& op, - bufferlist *pbl, int flags, - Context *onack, - epoch_t *reply_epoch, - int *ctx_budget) { + ObjectOperation& op, bufferlist *pbl, int flags, + Context *onack, epoch_t *reply_epoch, + int *ctx_budget) { Op *o = new Op(object_t(), oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, NULL); @@ -2207,25 +2242,25 @@ public: // high-level helpers - ceph_tid_t stat(const object_t& oid, const object_locator_t& oloc, snapid_t snap, - uint64_t *psize, utime_t *pmtime, int flags, - Context *onfinish, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + ceph_tid_t stat(const object_t& oid, const object_locator_t& oloc, + snapid_t snap, uint64_t *psize, utime_t *pmtime, int flags, + Context *onfinish, version_t *objver = NULL, + ObjectOperation *extra_ops = NULL) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_STAT; C_Stat *fin = new C_Stat(psize, pmtime, onfinish); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, fin, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_READ, fin, 0, objver); o->snapid = snap; o->outbl = &fin->bl; return op_submit(o); } ceph_tid_t read(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL, - int op_flags = 0) { + uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, + int flags, Context *onfinish, version_t *objver = NULL, + ObjectOperation *extra_ops = NULL, int op_flags = 0) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_READ; @@ -2234,18 +2269,19 @@ public: ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_seq = 0; ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; return op_submit(o); } ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, - uint64_t trunc_size, __u32 trunc_seq, - Context *onfinish, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL, - int op_flags = 0) { + uint64_t off, uint64_t len, snapid_t snap, + bufferlist *pbl, int flags, uint64_t trunc_size, + __u32 trunc_seq, Context *onfinish, + version_t *objver = NULL, + ObjectOperation *extra_ops = NULL, int op_flags = 0) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_READ; @@ -2254,15 +2290,16 @@ public: ops[i].op.extent.truncate_size = trunc_size; ops[i].op.extent.truncate_seq = trunc_seq; ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; return op_submit(o); } ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, + int flags, Context *onfinish, version_t *objver = NULL, + ObjectOperation *extra_ops = NULL) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_MAPEXT; @@ -2270,7 +2307,8 @@ public: ops[i].op.extent.length = len; ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_seq = 0; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -2286,52 +2324,54 @@ public: ops[i].op.xattr.value_len = 0; if (name) ops[i].indata.append(name); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; return op_submit(o); } - ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, snapid_t snap, - map& attrset, - int flags, Context *onfinish, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, + snapid_t snap, map& attrset, + int flags, Context *onfinish, version_t *objver = NULL, + ObjectOperation *extra_ops = NULL) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_GETXATTRS; C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, fin, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_READ, fin, 0, objver); o->snapid = snap; o->outbl = &fin->bl; return op_submit(o); } ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc, - snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { - return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, - onfinish, objver, extra_ops); + snapid_t snap, bufferlist *pbl, int flags, + Context *onfinish, version_t *objver = NULL, + ObjectOperation *extra_ops = NULL) { + return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags.read() | + CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops); } // writes ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc, - vector& ops, utime_t mtime, - const SnapContext& snapc, int flags, - Context *onack, Context *oncommit, - version_t *objver = NULL) { - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + vector& ops, utime_t mtime, + const SnapContext& snapc, int flags, + Context *onack, Context *oncommit, + version_t *objver = NULL) { + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } ceph_tid_t write(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl, - utime_t mtime, int flags, - Context *onack, Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL, - int op_flags = 0) { + uint64_t off, uint64_t len, const SnapContext& snapc, + const bufferlist &bl, utime_t mtime, int flags, + Context *onack, Context *oncommit, version_t *objver = NULL, + ObjectOperation *extra_ops = NULL, int op_flags = 0) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_WRITE; @@ -2341,16 +2381,18 @@ public: ops[i].op.extent.truncate_seq = 0; ops[i].indata = bl; ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } ceph_tid_t append(const object_t& oid, const object_locator_t& oloc, - uint64_t len, const SnapContext& snapc, const bufferlist &bl, - utime_t mtime, int flags, - Context *onack, Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + uint64_t len, const SnapContext& snapc, + const bufferlist &bl, utime_t mtime, int flags, + Context *onack, Context *oncommit, + version_t *objver = NULL, + ObjectOperation *extra_ops = NULL) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_APPEND; @@ -2359,18 +2401,19 @@ public: ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_seq = 0; ops[i].indata = bl; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl, - utime_t mtime, int flags, - uint64_t trunc_size, __u32 trunc_seq, - Context *onack, Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL, - int op_flags = 0) { + uint64_t off, uint64_t len, const SnapContext& snapc, + const bufferlist &bl, utime_t mtime, int flags, + uint64_t trunc_size, __u32 trunc_seq, + Context *onack, Context *oncommit, + version_t *objver = NULL, + ObjectOperation *extra_ops = NULL, int op_flags = 0) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_WRITE; @@ -2380,16 +2423,17 @@ public: ops[i].op.extent.truncate_seq = trunc_seq; ops[i].indata = bl; ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } ceph_tid_t write_full(const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, - Context *onack, Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL, - int op_flags = 0) { + const SnapContext& snapc, const bufferlist &bl, + utime_t mtime, int flags, Context *onack, + Context *oncommit, version_t *objver = NULL, + ObjectOperation *extra_ops = NULL, int op_flags = 0) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_WRITEFULL; @@ -2397,65 +2441,70 @@ public: ops[i].op.extent.length = bl.length(); ops[i].indata = bl; ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, - utime_t mtime, int flags, - uint64_t trunc_size, __u32 trunc_seq, - Context *onack, Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + const SnapContext& snapc, utime_t mtime, int flags, + uint64_t trunc_size, __u32 trunc_seq, Context *onack, + Context *oncommit, version_t *objver = NULL, + ObjectOperation *extra_ops = NULL) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_TRUNCATE; ops[i].op.extent.offset = trunc_size; ops[i].op.extent.truncate_size = trunc_size; ops[i].op.extent.truncate_seq = trunc_seq; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, utime_t mtime, int flags, - Context *onack, Context *oncommit, + uint64_t off, uint64_t len, const SnapContext& snapc, + utime_t mtime, int flags, Context *onack, Context *oncommit, version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_ZERO; ops[i].op.extent.offset = off; ops[i].op.extent.length = len; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, snapid_t snapid, - utime_t mtime, Context *onack, Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + const SnapContext& snapc, snapid_t snapid, + utime_t mtime, Context *onack, Context *oncommit, + version_t *objver = NULL, + ObjectOperation *extra_ops = NULL) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_ROLLBACK; ops[i].op.snap.snapid = snapid; - Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit, + objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } ceph_tid_t create(const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, utime_t mtime, - int global_flags, int create_flags, - Context *onack, Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + const SnapContext& snapc, utime_t mtime, int global_flags, + int create_flags, Context *onack, Context *oncommit, + version_t *objver = NULL, + ObjectOperation *extra_ops = NULL) { vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_CREATE; ops[i].op.flags = create_flags; - Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -2467,7 +2516,8 @@ public: vector ops; int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_DELETE; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -2486,7 +2536,8 @@ public: if (name) ops[i].indata.append(name); ops[i].indata.append(bl); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -2503,7 +2554,8 @@ public: ops[i].op.xattr.value_len = 0; if (name) ops[i].indata.append(name); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | + CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -2548,7 +2600,8 @@ private: void _do_delete_pool(int64_t pool, Context *onfinish); public: int create_pool_snap(int64_t pool, string& snapName, Context *onfinish); - int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, Context *onfinish); + int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, + Context *onfinish); int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish); int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish); @@ -2585,7 +2638,8 @@ public: // --------------------------- // some scatter/gather hackery - void _sg_read_finish(vector& extents, vector& resultbl, + void _sg_read_finish(vector& extents, + vector& resultbl, bufferlist *bl, Context *onfinish); struct C_SGRead : public Context { @@ -2594,59 +2648,69 @@ public: vector resultbl; bufferlist *bl; Context *onfinish; - C_SGRead(Objecter *ob, - vector& e, vector& r, bufferlist *b, Context *c) : + C_SGRead(Objecter *ob, + vector& e, vector& r, bufferlist *b, + Context *c) : objecter(ob), bl(b), onfinish(c) { extents.swap(e); resultbl.swap(r); } void finish(int r) { objecter->_sg_read_finish(extents, resultbl, bl, onfinish); - } + } }; - void sg_read_trunc(vector& extents, snapid_t snap, bufferlist *bl, int flags, - uint64_t trunc_size, __u32 trunc_seq, Context *onfinish, int op_flags = 0) { + void sg_read_trunc(vector& extents, snapid_t snap, + bufferlist *bl, int flags, uint64_t trunc_size, + __u32 trunc_seq, Context *onfinish, int op_flags = 0) { if (extents.size() == 1) { - read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, extents[0].length, - snap, bl, flags, extents[0].truncate_size, trunc_seq, onfinish, - 0, 0, op_flags); + read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, + extents[0].length, snap, bl, flags, extents[0].truncate_size, + trunc_seq, onfinish, 0, 0, op_flags); } else { C_GatherBuilder gather(cct); vector resultbl(extents.size()); int i=0; - for (vector::iterator p = extents.begin(); p != extents.end(); ++p) { - read_trunc(p->oid, p->oloc, p->offset, p->length, - snap, &resultbl[i++], flags, p->truncate_size, trunc_seq, gather.new_sub(), - 0, 0, op_flags); + for (vector::iterator p = extents.begin(); + p != extents.end(); + ++p) { + read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++], + flags, p->truncate_size, trunc_seq, gather.new_sub(), + 0, 0, op_flags); } gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish)); gather.activate(); } } - void sg_read(vector& extents, snapid_t snap, bufferlist *bl, int flags, Context *onfinish, int op_flags = 0) { + void sg_read(vector& extents, snapid_t snap, bufferlist *bl, + int flags, Context *onfinish, int op_flags = 0) { sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags); } - void sg_write_trunc(vector& extents, const SnapContext& snapc, const bufferlist& bl, utime_t mtime, - int flags, uint64_t trunc_size, __u32 trunc_seq, - Context *onack, Context *oncommit, int op_flags = 0) { + void sg_write_trunc(vector& extents, const SnapContext& snapc, + const bufferlist& bl, utime_t mtime, int flags, + uint64_t trunc_size, __u32 trunc_seq, Context *onack, + Context *oncommit, int op_flags = 0) { if (extents.size() == 1) { - write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, extents[0].length, - snapc, bl, mtime, flags, extents[0].truncate_size, trunc_seq, onack, oncommit, - 0, 0, op_flags); + write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, + extents[0].length, snapc, bl, mtime, flags, + extents[0].truncate_size, trunc_seq, onack, oncommit, + 0, 0, op_flags); } else { C_GatherBuilder gack(cct, onack); C_GatherBuilder gcom(cct, oncommit); - for (vector::iterator p = extents.begin(); p != extents.end(); ++p) { + for (vector::iterator p = extents.begin(); + p != extents.end(); + ++p) { bufferlist cur; - for (vector >::iterator bit = p->buffer_extents.begin(); + for (vector >::iterator bit + = p->buffer_extents.begin(); bit != p->buffer_extents.end(); ++bit) bl.copy(bit->first, bit->second, cur); assert(cur.length() == p->length); - write_trunc(p->oid, p->oloc, p->offset, p->length, + write_trunc(p->oid, p->oloc, p->offset, p->length, snapc, cur, mtime, flags, p->truncate_size, trunc_seq, onack ? gack.new_sub():0, oncommit ? gcom.new_sub():0, @@ -2657,9 +2721,11 @@ public: } } - void sg_write(vector& extents, const SnapContext& snapc, const bufferlist& bl, utime_t mtime, - int flags, Context *onack, Context *oncommit, int op_flags = 0) { - sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, onack, oncommit, op_flags); + void sg_write(vector& extents, const SnapContext& snapc, + const bufferlist& bl, utime_t mtime, int flags, Context *onack, + Context *oncommit, int op_flags = 0) { + sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, onack, oncommit, + op_flags); } void ms_handle_connect(Connection *con); diff --git a/src/osdc/Striper.cc b/src/osdc/Striper.cc index a8682ded99bfc..9a35ffe82653d 100644 --- a/src/osdc/Striper.cc +++ b/src/osdc/Striper.cc @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -7,9 +7,9 @@ * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software + * License version 2.1, as published by the Free Software * Foundation. See file COPYING. - * + * */ #include "Striper.h" @@ -27,10 +27,11 @@ void Striper::file_to_extents(CephContext *cct, const char *object_format, - const ceph_file_layout *layout, - uint64_t offset, uint64_t len, uint64_t trunc_size, - vector& extents, - uint64_t buffer_offset) + const ceph_file_layout *layout, + uint64_t offset, uint64_t len, + uint64_t trunc_size, + vector& extents, + uint64_t buffer_offset) { map > object_extents; file_to_extents(cct, object_format, layout, offset, len, trunc_size, @@ -38,23 +39,25 @@ void Striper::file_to_extents(CephContext *cct, const char *object_format, assimilate_extents(object_extents, extents); } -void Striper::file_to_extents(CephContext *cct, const char *object_format, - const ceph_file_layout *layout, - uint64_t offset, uint64_t len, uint64_t trunc_size, - map >& object_extents, - uint64_t buffer_offset) +void Striper::file_to_extents( + CephContext *cct, const char *object_format, + const ceph_file_layout *layout, + uint64_t offset, uint64_t len, + uint64_t trunc_size, + map >& object_extents, + uint64_t buffer_offset) { - ldout(cct, 10) << "file_to_extents " << offset << "~" << len + ldout(cct, 10) << "file_to_extents " << offset << "~" << len << " format " << object_format << dendl; assert(len > 0); /* - * we want only one extent per object! - * this means that each extent we read may map into different bits of the - * final read buffer.. hence ObjectExtent.buffer_extents + * we want only one extent per object! this means that each extent + * we read may map into different bits of the final read + * buffer.. hence ObjectExtent.buffer_extents */ - + __u32 object_size = layout->fl_object_size; __u32 su = layout->fl_stripe_unit; __u32 stripe_count = layout->fl_stripe_count; @@ -64,18 +67,23 @@ void Striper::file_to_extents(CephContext *cct, const char *object_format, su = object_size; } uint64_t stripes_per_object = object_size / su; - ldout(cct, 20) << " su " << su << " sc " << stripe_count << " os " << object_size - << " stripes_per_object " << stripes_per_object << dendl; + ldout(cct, 20) << " su " << su << " sc " << stripe_count << " os " + << object_size << " stripes_per_object " << stripes_per_object + << dendl; uint64_t cur = offset; uint64_t left = len; while (left > 0) { // layout into objects - uint64_t blockno = cur / su; // which block - uint64_t stripeno = blockno / stripe_count; // which horizontal stripe (Y) - uint64_t stripepos = blockno % stripe_count; // which object in the object set (X) - uint64_t objectsetno = stripeno / stripes_per_object; // which object set - uint64_t objectno = objectsetno * stripe_count + stripepos; // object id + uint64_t blockno = cur / su; // which block + // which horizontal stripe (Y) + uint64_t stripeno = blockno / stripe_count; + // which object in the object set (X) + uint64_t stripepos = blockno % stripe_count; + // which object set + uint64_t objectsetno = stripeno / stripes_per_object; + // object id + uint64_t objectno = objectsetno * stripe_count + stripepos; // find oid, extent char buf[strlen(object_format) + 32]; @@ -94,12 +102,11 @@ void Striper::file_to_extents(CephContext *cct, const char *object_format, else x_len = left; - ldout(cct, 20) << " off " << cur << " blockno " << blockno << " stripeno " << stripeno - << " stripepos " << stripepos << " objectsetno " << objectsetno - << " objectno " << objectno - << " block_start " << block_start - << " block_off " << block_off - << " " << x_offset << "~" << x_len + ldout(cct, 20) << " off " << cur << " blockno " << blockno << " stripeno " + << stripeno << " stripepos " << stripepos << " objectsetno " + << objectsetno << " objectno " << objectno + << " block_start " << block_start << " block_off " + << block_off << " " << x_offset << "~" << x_len << dendl; ObjectExtent *ex = 0; @@ -113,7 +120,8 @@ void Striper::file_to_extents(CephContext *cct, const char *object_format, ex->offset = x_offset; ex->length = x_len; - ex->truncate_size = object_truncate_size(cct, layout, objectno, trunc_size); + ex->truncate_size = object_truncate_size(cct, layout, objectno, + trunc_size); ldout(cct, 20) << " added new " << *ex << dendl; } else { @@ -122,24 +130,32 @@ void Striper::file_to_extents(CephContext *cct, const char *object_format, ldout(cct, 20) << " adding in to " << *ex << dendl; ex->length += x_len; } - ex->buffer_extents.push_back(make_pair(cur - offset + buffer_offset, x_len)); - - ldout(cct, 15) << "file_to_extents " << *ex << " in " << ex->oloc << dendl; - //ldout(cct, 0) << "map: ino " << ino << " oid " << ex.oid << " osd " << ex.osd << " offset " << ex.offset << " len " << ex.len << " ... left " << left << dendl; - + ex->buffer_extents.push_back(make_pair(cur - offset + buffer_offset, + x_len)); + + ldout(cct, 15) << "file_to_extents " << *ex << " in " << ex->oloc + << dendl; + // ldout(cct, 0) << "map: ino " << ino << " oid " << ex.oid << " osd " + // << ex.osd << " offset " << ex.offset << " len " << ex.len + // << " ... left " << left << dendl; + left -= x_len; cur += x_len; } } -void Striper::assimilate_extents(map >& object_extents, - vector& extents) +void Striper::assimilate_extents( + map >& object_extents, + vector& extents) { // make final list - for (map >::iterator it = object_extents.begin(); + for (map >::iterator it + = object_extents.begin(); it != object_extents.end(); ++it) { - for (vector::iterator p = it->second.begin(); p != it->second.end(); ++p) { + for (vector::iterator p = it->second.begin(); + p != it->second.end(); + ++p) { extents.push_back(*p); } } @@ -149,7 +165,8 @@ void Striper::extent_to_file(CephContext *cct, ceph_file_layout *layout, uint64_t objectno, uint64_t off, uint64_t len, vector >& extents) { - ldout(cct, 10) << "extent_to_file " << objectno << " " << off << "~" << len << dendl; + ldout(cct, 10) << "extent_to_file " << objectno << " " << off << "~" + << len << dendl; __u32 object_size = layout->fl_object_size; __u32 su = layout->fl_stripe_unit; @@ -181,7 +198,8 @@ void Striper::extent_to_file(CephContext *cct, ceph_file_layout *layout, } } -uint64_t Striper::object_truncate_size(CephContext *cct, const ceph_file_layout *layout, +uint64_t Striper::object_truncate_size(CephContext *cct, + const ceph_file_layout *layout, uint64_t objectno, uint64_t trunc_size) { uint64_t obj_trunc_size; @@ -204,20 +222,23 @@ uint64_t Striper::object_truncate_size(CephContext *cct, const ceph_file_layout uint64_t trunc_blockno = trunc_size / su; uint64_t trunc_stripeno = trunc_blockno / stripe_count; uint64_t trunc_stripepos = trunc_blockno % stripe_count; - uint64_t trunc_objectno = trunc_objectsetno * stripe_count + trunc_stripepos; + uint64_t trunc_objectno = trunc_objectsetno * stripe_count + + trunc_stripepos; if (objectno < trunc_objectno) obj_trunc_size = ((trunc_stripeno % stripes_per_object) + 1) * su; else if (objectno > trunc_objectno) obj_trunc_size = (trunc_stripeno % stripes_per_object) * su; else - obj_trunc_size = (trunc_stripeno % stripes_per_object) * su + (trunc_size % su); + obj_trunc_size = (trunc_stripeno % stripes_per_object) * su + + (trunc_size % su); } } ldout(cct, 20) << "object_truncate_size " << objectno << " " << trunc_size << "->" << obj_trunc_size << dendl; return obj_trunc_size; } -uint64_t Striper::get_num_objects(const ceph_file_layout& layout, uint64_t size) +uint64_t Striper::get_num_objects(const ceph_file_layout& layout, + uint64_t size) { __u32 object_size = layout.fl_object_size; __u32 stripe_unit = layout.fl_stripe_unit; @@ -226,19 +247,23 @@ uint64_t Striper::get_num_objects(const ceph_file_layout& layout, uint64_t size) uint64_t num_periods = (size + period - 1) / period; uint64_t remainder_bytes = size % period; uint64_t remainder_objs = 0; - if ((remainder_bytes > 0) && (remainder_bytes < (uint64_t)stripe_count * stripe_unit)) - remainder_objs = stripe_count - ((remainder_bytes + stripe_unit - 1) / stripe_unit); + if ((remainder_bytes > 0) && (remainder_bytes < (uint64_t)stripe_count + * stripe_unit)) + remainder_objs = stripe_count - ((remainder_bytes + stripe_unit - 1) + / stripe_unit); return num_periods * stripe_count - remainder_objs; } // StripedReadResult -void Striper::StripedReadResult::add_partial_result(CephContext *cct, - bufferlist& bl, - const vector >& buffer_extents) +void Striper::StripedReadResult::add_partial_result( + CephContext *cct, bufferlist& bl, + const vector >& buffer_extents) { - ldout(cct, 10) << "add_partial_result(" << this << ") " << bl.length() << " to " << buffer_extents << dendl; - for (vector >::const_iterator p = buffer_extents.begin(); + ldout(cct, 10) << "add_partial_result(" << this << ") " << bl.length() + << " to " << buffer_extents << dendl; + for (vector >::const_iterator p + = buffer_extents.begin(); p != buffer_extents.end(); ++p) { pair& r = partial[p->first]; @@ -248,16 +273,16 @@ void Striper::StripedReadResult::add_partial_result(CephContext *cct, } } -void Striper::StripedReadResult::add_partial_sparse_result(CephContext *cct, - bufferlist& bl, const map& bl_map, - uint64_t bl_off, - const vector >& buffer_extents) +void Striper::StripedReadResult::add_partial_sparse_result( + CephContext *cct, bufferlist& bl, const map& bl_map, + uint64_t bl_off, const vector >& buffer_extents) { ldout(cct, 10) << "add_partial_sparse_result(" << this << ") " << bl.length() << " covering " << bl_map << " (offset " << bl_off << ")" << " to " << buffer_extents << dendl; map::const_iterator s = bl_map.begin(); - for (vector >::const_iterator p = buffer_extents.begin(); + for (vector >::const_iterator p + = buffer_extents.begin(); p != buffer_extents.end(); ++p) { uint64_t tofs = p->first; @@ -319,21 +344,25 @@ void Striper::StripedReadResult::add_partial_sparse_result(CephContext *cct, } } -void Striper::StripedReadResult::assemble_result(CephContext *cct, bufferlist& bl, bool zero_tail) +void Striper::StripedReadResult::assemble_result(CephContext *cct, + bufferlist& bl, + bool zero_tail) { - ldout(cct, 10) << "assemble_result(" << this << ") zero_tail=" << zero_tail << dendl; + ldout(cct, 10) << "assemble_result(" << this << ") zero_tail=" << zero_tail + << dendl; // go backwards, so that we can efficiently discard zeros - map >::reverse_iterator p = partial.rbegin(); + map >::reverse_iterator p + = partial.rbegin(); if (p == partial.rend()) return; uint64_t end = p->first + p->second.second; while (p != partial.rend()) { // sanity check - ldout(cct, 20) << "assemble_result(" << this << ") " << p->first << "~" << p->second.second - << " " << p->second.first.length() << " bytes" - << dendl; + ldout(cct, 20) << "assemble_result(" << this << ") " << p->first << "~" + << p->second.second << " " << p->second.first.length() + << " bytes" << dendl; assert(p->first == end - p->second.second); end = p->first; diff --git a/src/osdc/Striper.h b/src/osdc/Striper.h index ec90f157b43e2..228045aac73ae 100644 --- a/src/osdc/Striper.h +++ b/src/osdc/Striper.h @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -7,9 +7,9 @@ * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software + * License version 2.1, as published by the Free Software * Foundation. See file COPYING. - * + * */ #ifndef CEPH_STRIPER_H @@ -30,19 +30,22 @@ class CephContext; */ static void file_to_extents(CephContext *cct, const char *object_format, const ceph_file_layout *layout, - uint64_t offset, uint64_t len, uint64_t trunc_size, + uint64_t offset, uint64_t len, + uint64_t trunc_size, map >& extents, uint64_t buffer_offset=0); static void file_to_extents(CephContext *cct, const char *object_format, const ceph_file_layout *layout, - uint64_t offset, uint64_t len, uint64_t trunc_size, + uint64_t offset, uint64_t len, + uint64_t trunc_size, vector& extents, uint64_t buffer_offset=0); static void file_to_extents(CephContext *cct, inodeno_t ino, const ceph_file_layout *layout, - uint64_t offset, uint64_t len, uint64_t trunc_size, + uint64_t offset, uint64_t len, + uint64_t trunc_size, vector& extents) { // generate prefix/format char buf[32]; @@ -51,8 +54,9 @@ class CephContext; file_to_extents(cct, buf, layout, offset, len, trunc_size, extents); } - static void assimilate_extents(map >& object_extents, - vector& extents); + static void assimilate_extents( + map >& object_extents, + vector& extents); /** * reverse map an object extent to file extents @@ -61,32 +65,36 @@ class CephContext; uint64_t objectno, uint64_t off, uint64_t len, vector >& extents); - static uint64_t object_truncate_size(CephContext *cct, const ceph_file_layout *layout, - uint64_t objectno, uint64_t trunc_size); + static uint64_t object_truncate_size( + CephContext *cct, const ceph_file_layout *layout, + uint64_t objectno, uint64_t trunc_size); - static uint64_t get_num_objects(const ceph_file_layout& layout, uint64_t size); + static uint64_t get_num_objects(const ceph_file_layout& layout, + uint64_t size); /* * helper to assemble a striped result */ class StripedReadResult { - map > partial; // offset -> (data, intended length) + // offset -> (data, intended length) + map > partial; public: - void add_partial_result(CephContext *cct, - bufferlist& bl, - const vector >& buffer_extents); + void add_partial_result( + CephContext *cct, bufferlist& bl, + const vector >& buffer_extents); /** * add sparse read into results * * @param bl buffer * @param bl_map map of which logical source extents this covers - * @param bl_off logical buffer offset (e.g., first bl_map key if the buffer is not sparse) + * @param bl_off logical buffer offset (e.g., first bl_map key + * if the buffer is not sparse) * @param buffer_extents output buffer extents the data maps to */ - void add_partial_sparse_result(CephContext *cct, - bufferlist& bl, const map& bl_map, - uint64_t bl_off, - const vector >& buffer_extents); + void add_partial_sparse_result( + CephContext *cct, bufferlist& bl, + const map& bl_map, uint64_t bl_off, + const vector >& buffer_extents); void assemble_result(CephContext *cct, bufferlist& bl, bool zero_tail); }; diff --git a/src/osdc/WritebackHandler.h b/src/osdc/WritebackHandler.h index 0e51cb003c212..0bc7c2d55489b 100644 --- a/src/osdc/WritebackHandler.h +++ b/src/osdc/WritebackHandler.h @@ -19,17 +19,19 @@ class WritebackHandler { /** * check if a given extent read result may change due to a write * - * Check if the content we see at the given read offset may change due to a write to - * this object. + * Check if the content we see at the given read offset may change + * due to a write to this object. * * @param oid object * @param read_off read offset * @param read_len read length * @param snapid read snapid */ - virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid) = 0; + virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, + uint64_t read_len, snapid_t snapid) = 0; virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, + uint64_t off, uint64_t len, + const SnapContext& snapc, const bufferlist &bl, utime_t mtime, uint64_t trunc_size, __u32 trunc_seq, ceph_tid_t journal_tid, Context *oncommit) = 0; -- 2.39.5