void finish(int r) override {
if (r == -ENOENT) {
r = 0;
- assert(size == 0);
+ ceph_assert(size == 0);
}
bool probe_complete;
}
probe_complete = filer->_probed(probe, oid, size, mtime, pl);
- assert(!pl.owns_lock());
+ ceph_assert(!pl.owns_lock());
}
if (probe_complete) {
probe->onfinish->complete(probe->err);
<< " starting from " << start_from
<< dendl;
- assert(snapid); // (until there is a non-NOSNAP write)
+ ceph_assert(snapid); // (until there is a non-NOSNAP write)
Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
flags, fwd, onfinish);
<< " starting from " << start_from
<< dendl;
- assert(snapid); // (until there is a non-NOSNAP write)
+ ceph_assert(snapid); // (until there is a non-NOSNAP write)
Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
flags, fwd, onfinish);
if (start_from % period)
probe->probing_len += period - (start_from % period);
} else {
- assert(start_from > *end);
+ ceph_assert(start_from > *end);
if (start_from % period)
probe->probing_len -= period - (start_from % period);
probe->probing_off -= probe->probing_len;
Probe::unique_lock pl(probe->lock);
_probe(probe, pl);
- assert(!pl.owns_lock());
+ ceph_assert(!pl.owns_lock());
return 0;
}
*/
void Filer::_probe(Probe *probe, Probe::unique_lock& pl)
{
- assert(pl.owns_lock() && pl.mutex() == &probe->lock);
+ ceph_assert(pl.owns_lock() && pl.mutex() == &probe->lock);
ldout(cct, 10) << "_probe " << hex << probe->ino << dec
<< " " << probe->probing_off << "~" << probe->probing_len
bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size,
ceph::real_time mtime, Probe::unique_lock& pl)
{
- assert(pl.owns_lock() && pl.mutex() == &probe->lock);
+ ceph_assert(pl.owns_lock() && pl.mutex() == &probe->lock);
ldout(cct, 10) << "_probed " << probe->ino << " object " << oid
<< " has size " << size << " mtime " << mtime << dendl;
if (mtime > probe->max_mtime)
probe->max_mtime = mtime;
- assert(probe->ops.count(oid));
+ ceph_assert(probe->ops.count(oid));
probe->ops.erase(oid);
if (!probe->ops.empty()) {
<< dendl;
if (!probe->found_size) {
- assert(probe->known_size[p->oid] <= shouldbe);
+ ceph_assert(probe->known_size[p->oid] <= shouldbe);
if ((probe->fwd && probe->known_size[p->oid] == shouldbe) ||
(!probe->fwd && probe->known_size[p->oid] == 0 &&
uint64_t period = probe->layout.get_period();
if (probe->fwd) {
probe->probing_off += probe->probing_len;
- assert(probe->probing_off % period == 0);
+ ceph_assert(probe->probing_off % period == 0);
probe->probing_len = period;
} else {
// previous period.
- assert(probe->probing_off % period == 0);
+ ceph_assert(probe->probing_off % period == 0);
probe->probing_len = period;
probe->probing_off -= period;
}
_probe(probe, pl);
- assert(!pl.owns_lock());
+ ceph_assert(!pl.owns_lock());
return false;
} else if (probe->pmtime) {
ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
int flags,
Context *oncommit)
{
- assert(num_obj > 0);
+ ceph_assert(num_obj > 0);
// single object? easy!
if (num_obj == 1) {
int flags,
Context *onfinish,
int op_flags = 0) {
- assert(snap); // (until there is a non-NOSNAP write)
+ ceph_assert(snap); // (until there is a non-NOSNAP write)
vector<ObjectExtent> extents;
Striper::file_to_extents(cct, ino, layout, offset, len, 0, extents);
objecter->sg_read(extents, snap, bl, flags, onfinish, op_flags);
__u32 truncate_seq,
Context *onfinish,
int op_flags = 0) {
- assert(snap); // (until there is a non-NOSNAP write)
+ ceph_assert(snap); // (until there is a non-NOSNAP write)
vector<ObjectExtent> extents;
Striper::file_to_extents(cct, ino, layout, offset, len, truncate_size,
extents);
{
lock_guard lk(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
state = STATE_ACTIVE;
stream_format = sf;
}
ldout(cct, 1) << "recover start" << dendl;
- assert(state != STATE_ACTIVE);
- assert(readonly);
+ ceph_assert(state != STATE_ACTIVE);
+ ceph_assert(readonly);
if (onread)
waitfor_recover.push_back(wrap_finisher(onread));
void Journaler::_read_head(Context *on_finish, bufferlist *bl)
{
// lock is locked
- assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
+ ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
object_t oid = file_object_t(ino, 0);
object_locator_t oloc(pg_pool);
void Journaler::_reread_head(Context *onfinish)
{
ldout(cct, 10) << "reread_head" << dendl;
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
state = STATE_REREADHEAD;
C_RereadHead *fin = new C_RereadHead(this, onfinish);
}
//read on-disk header into
- assert(bl.length() || r < 0 );
+ ceph_assert(bl.length() || r < 0 );
// unpack header
if (r == 0) {
if (is_stopping())
return;
- assert(state == STATE_READHEAD);
+ ceph_assert(state == STATE_READHEAD);
if (r!=0) {
ldout(cct, 0) << "error getting journal off disk" << dendl;
{
// lock is locked
ldout(cct, 1) << "probing for end of the log" << dendl;
- assert(state == STATE_PROBING || state == STATE_REPROBING);
+ ceph_assert(state == STATE_PROBING || state == STATE_REPROBING);
// probe the log
filer.probe(ino, &layout, CEPH_NOSNAP,
write_pos, end, true, 0, wrap_finisher(finish));
void Journaler::_reprobe(C_OnFinisher *finish)
{
ldout(cct, 10) << "reprobe" << dendl;
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
state = STATE_REPROBING;
C_ReProbe *fin = new C_ReProbe(this, finish);
return;
}
- assert(new_end >= write_pos || r < 0);
+ ceph_assert(new_end >= write_pos || r < 0);
ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
<< " (header had " << write_pos << ")."
<< dendl;
if (is_stopping())
return;
- assert(state == STATE_PROBING);
+ ceph_assert(state == STATE_PROBING);
if (r < 0) { // error in probing
goto out;
}
<< write_pos << "). log was empty. recovered." << dendl;
ceph_abort(); // hrm.
} else {
- assert(end >= write_pos);
+ ceph_assert(end >= write_pos);
ldout(cct, 1) << "_finish_probe_end write_pos = " << end
<< " (header had " << write_pos << "). recovered."
<< dendl;
{
lock_guard l(lock);
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
_reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
}
return;
}
- assert(!r); //if we get an error, we're boned
+ ceph_assert(!r); //if we get an error, we're boned
_reprobe(onfinish);
}
void Journaler::_write_head(Context *oncommit)
{
- assert(!readonly);
- assert(state == STATE_ACTIVE);
+ ceph_assert(!readonly);
+ ceph_assert(state == STATE_ACTIVE);
last_written.trimmed_pos = trimmed_pos;
last_written.expire_pos = expire_pos;
last_written.unused_field = expire_pos;
ldout(cct, 10) << "write_head " << last_written << dendl;
// Avoid persisting bad pointers in case of bugs
- assert(last_written.write_pos >= last_written.expire_pos);
- assert(last_written.expire_pos >= last_written.trimmed_pos);
+ ceph_assert(last_written.write_pos >= last_written.expire_pos);
+ ceph_assert(last_written.expire_pos >= last_written.trimmed_pos);
last_wrote_head = ceph::real_clock::now();
handle_write_error(r);
return;
}
- assert(!readonly);
+ ceph_assert(!readonly);
ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
last_committed = wrote;
if (oncommit) {
void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
{
lock_guard l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
if (r < 0) {
lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
return;
}
- assert(start < flush_pos);
+ ceph_assert(start < flush_pos);
// calc latency?
if (logger) {
// adjust safe_pos
auto it = pending_safe.find(start);
- assert(it != pending_safe.end());
+ ceph_assert(it != pending_safe.end());
pending_safe.erase(it);
if (pending_safe.empty())
safe_pos = next_safe_pos;
{
unique_lock l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
uint32_t s = bl.length();
// append
// flush previous object?
uint64_t su = get_layout_period();
- assert(su > 0);
+ ceph_assert(su > 0);
uint64_t write_off = write_pos % su;
uint64_t write_obj = write_pos / su;
uint64_t flush_obj = flush_pos / su;
return;
if (write_pos == flush_pos)
return;
- assert(write_pos > flush_pos);
- assert(!readonly);
+ ceph_assert(write_pos > flush_pos);
+ ceph_assert(!readonly);
// flush
uint64_t len = write_pos - flush_pos;
- assert(len == write_buf.length());
+ ceph_assert(len == write_buf.length());
if (amount && amount < len)
len = amount;
wrap_finisher(onsafe), write_iohint);
flush_pos += len;
- assert(write_buf.length() == write_pos - flush_pos);
+ ceph_assert(write_buf.length() == write_pos - flush_pos);
write_buf_throttle.put(len);
ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
void Journaler::_wait_for_flush(Context *onsafe)
{
- assert(!readonly);
+ ceph_assert(!readonly);
// all flushed and safe?
if (write_pos == safe_pos) {
- assert(write_buf.length() == 0);
+ ceph_assert(write_buf.length() == 0);
ldout(cct, 10)
<< "flush nothing to flush, (prezeroing/prezero)/write/flush/safe "
"pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
void Journaler::_flush(C_OnFinisher *onsafe)
{
- assert(!readonly);
+ ceph_assert(!readonly);
if (write_pos == flush_pos) {
- assert(write_buf.length() == 0);
+ ceph_assert(write_buf.length() == 0);
ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/"
"flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos
<< ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
void Journaler::_issue_prezero()
{
- assert(prezeroing_pos >= flush_pos);
+ ceph_assert(prezeroing_pos >= flush_pos);
uint64_t num_periods = cct->_conf.get_val<uint64_t>("journaler_prezero_periods");
/*
return;
}
- assert(r == 0 || r == -ENOENT);
+ ceph_assert(r == 0 || r == -ENOENT);
if (start == prezero_pos) {
prezero_pos += len;
void Journaler::wait_for_prezero(Context *onfinish)
{
- assert(onfinish);
+ ceph_assert(onfinish);
lock_guard l(lock);
if (prezero_pos == prezeroing_pos) {
<< p->second.length() << dendl;
received_pos += p->second.length();
read_buf.claim_append(p->second);
- assert(received_pos <= requested_pos);
+ ceph_assert(received_pos <= requested_pos);
prefetch_buf.erase(p);
got_any = true;
}
{
// stuck at safe_pos? (this is needed if we are reading the tail of
// a journal we are also writing to)
- assert(requested_pos <= safe_pos);
+ ceph_assert(requested_pos <= safe_pos);
if (requested_pos == safe_pos) {
ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
<< ", waiting" << dendl;
- assert(write_pos > requested_pos);
+ ceph_assert(write_pos > requested_pos);
if (pending_safe.empty()) {
_flush(NULL);
}
// adjust write_pos
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
- assert(write_buf.length() == 0);
- assert(waitfor_safe.empty());
+ ceph_assert(write_buf.length() == 0);
+ ceph_assert(waitfor_safe.empty());
// reset read state
requested_pos = received_pos = read_pos;
try {
consumed = journal_stream.read(read_buf, &bl, &start_ptr);
if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
- assert(start_ptr == read_pos);
+ ceph_assert(start_ptr == read_pos);
}
} catch (const buffer::error &e) {
lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
return;
}
- assert(on_readable == 0);
+ ceph_assert(on_readable == 0);
if (!readable) {
ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable "
<< onreadable << dendl;
if (is_stopping())
return;
- assert(!readonly);
+ ceph_assert(!readonly);
uint64_t period = get_layout_period();
uint64_t trim_to = last_committed.expire_pos;
trim_to -= trim_to % period;
}
// trim
- assert(trim_to <= write_pos);
- assert(trim_to <= expire_pos);
- assert(trim_to > trimming_pos);
+ ceph_assert(trim_to <= write_pos);
+ ceph_assert(trim_to <= expire_pos);
+ ceph_assert(trim_to > trimming_pos);
ldout(cct, 10) << "trim trimming to " << trim_to
<< ", trimmed/trimming/expire are "
<< trimmed_pos << "/" << trimming_pos << "/" << expire_pos
{
lock_guard l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
<< ", trimmed/trimming/expire now "
<< to << "/" << trimming_pos << "/" << expire_pos
return;
}
- assert(r >= 0 || r == -ENOENT);
+ ceph_assert(r >= 0 || r == -ENOENT);
- assert(to <= trimming_pos);
- assert(to > trimmed_pos);
+ ceph_assert(to <= trimming_pos);
+ ceph_assert(to > trimmed_pos);
trimmed_pos = to;
}
lderr(cct) << __func__ << ": multiple write errors, handler already called"
<< dendl;
} else {
- assert(0 == "unhandled write error");
+ ceph_assert(0 == "unhandled write error");
}
}
*/
bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
{
- assert(need != NULL);
+ ceph_assert(need != NULL);
uint32_t entry_size = 0;
uint64_t entry_sentinel = 0;
size_t JournalStream::read(bufferlist &from, bufferlist *entry,
uint64_t *start_ptr)
{
- assert(start_ptr != NULL);
- assert(entry != NULL);
- assert(entry->length() == 0);
+ ceph_assert(start_ptr != NULL);
+ ceph_assert(entry != NULL);
+ ceph_assert(entry->length() == 0);
uint32_t entry_size = 0;
decode(entry_sentinel, from_ptr);
// Assertion instead of clean check because of precondition of this
// fn is that readable() already passed
- assert(entry_sentinel == sentinel);
+ ceph_assert(entry_sentinel == sentinel);
}
decode(entry_size, from_ptr);
size_t JournalStream::write(bufferlist &entry, bufferlist *to,
uint64_t const &start_ptr)
{
- assert(to != NULL);
+ ceph_assert(to != NULL);
uint32_t const entry_size = entry.length();
if (format >= JOURNAL_FORMAT_RESILIENT) {
*/
void Journaler::set_write_error_handler(Context *c) {
lock_guard l(lock);
- assert(!on_write_error);
+ ceph_assert(!on_write_error);
on_write_error = wrap_finisher(c);
called_write_error = false;
}
*/
void _do_delayed_flush()
{
- assert(delay_flush_event != NULL);
+ ceph_assert(delay_flush_event != NULL);
lock_guard l(lock);
delay_flush_event = NULL;
_do_flush();
// only init_headers when following or first reading off-disk
void init_headers(Header& h) {
- assert(readonly ||
+ ceph_assert(readonly ||
state == STATE_READHEAD ||
state == STATE_REREADHEAD);
last_written = last_committed = h;
*/
void reset() {
lock_guard l(lock);
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
readonly = true;
delay_flush_event = NULL;
requested_pos = 0;
received_pos = 0;
fetch_len = 0;
- assert(!on_readable);
+ ceph_assert(!on_readable);
expire_pos = 0;
trimming_pos = 0;
trimmed_pos = 0;
void set_read_pos(uint64_t p) {
lock_guard l(lock);
// we can't cope w/ in-progress read right now.
- assert(requested_pos == received_pos);
+ ceph_assert(requested_pos == received_pos);
read_pos = requested_pos = received_pos = p;
read_buf.clear();
}
void trim_tail() {
lock_guard l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
_issue_prezero();
}
ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
loff_t off)
{
- assert(oc->lock.is_locked());
+ ceph_assert(oc->lock.is_locked());
ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
// split off right
bufferlist bl;
bl.claim(left->bl);
if (bl.length()) {
- assert(bl.length() == (left->length() + right->length()));
+ ceph_assert(bl.length() == (left->length() + right->length()));
right->bl.substr_of(bl, left->length(), right->length());
left->bl.substr_of(bl, 0, left->length());
}
ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
<< " to right bh" << dendl;
right->waitfor_read[p->first].swap( p->second );
- assert(p->second.empty());
+ ceph_assert(p->second.empty());
}
left->waitfor_read.erase(start_remove, left->waitfor_read.end());
}
void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
{
- assert(oc->lock.is_locked());
+ ceph_assert(oc->lock.is_locked());
ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
if (left->get_journal_tid() == 0) {
void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
{
- assert(oc->lock.is_locked());
+ ceph_assert(oc->lock.is_locked());
ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
// do not merge rx buffers; last_read_tid may not match
// to the left?
map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
- assert(p->second == bh);
+ ceph_assert(p->second == bh);
if (p != data.begin()) {
--p;
if (can_merge_bh(p->second, bh)) {
}
}
// to the right?
- assert(p->second == bh);
+ ceph_assert(p->second == bh);
++p;
if (p != data.end() && can_merge_bh(bh, p->second))
merge_left(bh, p->second);
*/
bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
{
- assert(oc->lock.is_locked());
+ ceph_assert(oc->lock.is_locked());
map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
while (left > 0) {
if (p == data.end())
*/
bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
{
- assert(oc->lock.is_locked());
+ ceph_assert(oc->lock.is_locked());
if (data.empty())
return true;
map<loff_t, BufferHead*>::iterator first = data.begin();
map<loff_t, BufferHead*>& rx,
map<loff_t, BufferHead*>& errors)
{
- assert(oc->lock.is_locked());
+ ceph_assert(oc->lock.is_locked());
ldout(oc->cct, 10) << "map_read " << ex.oid << " "
<< ex.offset << "~" << ex.length << dendl;
ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
}
cur += left;
- assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
+ ceph_assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
break; // no more.
}
lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
<< " does not match bh start position: "
<< *it->second << dendl;
- assert(it->first == it->second->start());
+ ceph_assert(it->first == it->second->start());
}
if (it->first < offset) {
lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
<< " overlaps with previous bh " << *((--it)->second)
<< dendl;
- assert(it->first >= offset);
+ ceph_assert(it->first >= offset);
}
BufferHead *bh = it->second;
map<loff_t, list<Context*> >::const_iterator w_it;
w_it->first >= bh->start() + bh->length()) {
lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
<< " is not within bh " << *bh << dendl;
- assert(w_it->first >= bh->start());
- assert(w_it->first < bh->start() + bh->length());
+ ceph_assert(w_it->first >= bh->start());
+ ceph_assert(w_it->first < bh->start() + bh->length());
}
}
offset = it->first + it->second->length();
ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
ceph_tid_t tid)
{
- assert(oc->lock.is_locked());
+ ceph_assert(oc->lock.is_locked());
BufferHead *final = 0;
ldout(oc->cct, 10) << "map_write oex " << ex.oid
ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
if (p->first < cur) {
- assert(final == 0);
+ ceph_assert(final == 0);
if (cur + max >= bh->end()) {
// we want right bit (one splice)
final = split(bh, cur); // just split it, take right half.
replace_journal_tid(final, tid);
++p;
- assert(p->second == final);
+ ceph_assert(p->second == final);
} else {
// we want middle bit (two splices)
final = split(bh, cur);
++p;
- assert(p->second == final);
+ ceph_assert(p->second == final);
split(final, cur+max);
replace_journal_tid(final, tid);
}
} else {
- assert(p->first == cur);
+ ceph_assert(p->first == cur);
if (bh->length() <= max) {
// whole bufferhead, piece of cake.
} else {
oc->mark_dirty(bh);
oc->mark_dirty(final);
--p; // move iterator back to final
- assert(p->second == final);
+ ceph_assert(p->second == final);
replace_journal_tid(bh, tid);
merge_left(final, bh);
} else {
}
// set version
- assert(final);
- assert(final->get_journal_tid() == tid);
+ ceph_assert(final);
+ ceph_assert(final->get_journal_tid() == tid);
ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
return final;
ceph_tid_t tid) {
ceph_tid_t bh_tid = bh->get_journal_tid();
- assert(tid == 0 || bh_tid <= tid);
+ ceph_assert(tid == 0 || bh_tid <= tid);
if (bh_tid != 0 && bh_tid != tid) {
// inform journal that it should not expect a writeback from this extent
oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
void ObjectCacher::Object::truncate(loff_t s)
{
- assert(oc->lock.is_locked());
+ ceph_assert(oc->lock.is_locked());
ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
while (!data.empty()) {
}
// remove bh entirely
- assert(bh->start() >= s);
- assert(bh->waitfor_read.empty());
+ ceph_assert(bh->start() >= s);
+ ceph_assert(bh->waitfor_read.empty());
replace_journal_tid(bh, 0);
oc->bh_remove(this, bh);
delete bh;
void ObjectCacher::Object::discard(loff_t off, loff_t len,
C_GatherBuilder* commit_gather)
{
- assert(oc->lock.is_locked());
+ ceph_assert(oc->lock.is_locked());
ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
<< dendl;
continue;
}
- assert(bh->start() >= off);
+ ceph_assert(bh->start() >= off);
if (bh->end() > off + len) {
split(bh, off + len);
}
// we should mark all Rx bh to zero
continue;
} else {
- assert(bh->waitfor_read.empty());
+ ceph_assert(bh->waitfor_read.empty());
}
oc->bh_remove(this, bh);
= objects.begin();
i != objects.end();
++i)
- assert(i->empty());
- assert(bh_lru_rest.lru_get_size() == 0);
- assert(bh_lru_dirty.lru_get_size() == 0);
- assert(ob_lru.lru_get_size() == 0);
- assert(dirty_or_tx_bh.empty());
+ ceph_assert(i->empty());
+ ceph_assert(bh_lru_rest.lru_get_size() == 0);
+ ceph_assert(bh_lru_dirty.lru_get_size() == 0);
+ ceph_assert(ob_lru.lru_get_size() == 0);
+ ceph_assert(dirty_or_tx_bh.empty());
}
void ObjectCacher::perf_start()
void ObjectCacher::perf_stop()
{
- assert(perfcounter);
+ ceph_assert(perfcounter);
cct->get_perfcounters_collection()->remove(perfcounter);
delete perfcounter;
}
uint64_t truncate_seq)
{
// XXX: Add handling of nspace in object_locator_t in cache
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
// have it?
if ((uint32_t)l.pool < objects.size()) {
if (objects[l.pool].count(oid)) {
void ObjectCacher::close_object(Object *ob)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 10) << "close_object " << *ob << dendl;
- assert(ob->can_close());
+ ceph_assert(ob->can_close());
// ok!
ob_lru.lru_remove(ob);
void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
const ZTracer::Trace &parent_trace)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
<< reads_outstanding << dendl;
uint64_t length, bufferlist &bl, int r,
bool trust_enoent)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 7) << "bh_read_finish "
<< oid
<< " tid " << tid
continue;
}
- assert(opos >= bh->start());
- assert(bh->start() == opos); // we don't merge rx bh's... yet!
- assert(bh->length() <= start+(loff_t)length-opos);
+ ceph_assert(opos >= bh->start());
+ ceph_assert(bh->start() == opos); // we don't merge rx bh's... yet!
+ ceph_assert(bh->length() <= start+(loff_t)length-opos);
if (bh->error < 0)
err = bh->error;
int count = 0;
int64_t total_len = 0;
set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
- assert(it != dirty_or_tx_bh.end());
+ ceph_assert(it != dirty_or_tx_bh.end());
for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
p != dirty_or_tx_bh.end();
++p) {
};
void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
Object *ob = blist.front()->ob;
ob->get();
for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
BufferHead *bh = *p;
ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
- assert(bh->ob == ob);
- assert(bh->bl.length() == bh->length());
+ ceph_assert(bh->ob == ob);
+ ceph_assert(bh->bl.length() == bh->length());
ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
int n = io_vec.size();
void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 7) << "bh_write " << *bh << dendl;
bh->ob->get();
vector<pair<loff_t, uint64_t> >& ranges,
ceph_tid_t tid, int r)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
<< " ranges " << ranges << " returned " << r << dendl;
// make sure bh tid matches
if (bh->last_write_tid != tid) {
- assert(bh->last_write_tid > tid);
+ ceph_assert(bh->last_write_tid > tid);
ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
continue;
}
// we don't merge tx buffers. tx buffer should be within the range
- assert(bh->start() >= start);
- assert(bh->end() <= start+(loff_t)length);
+ ceph_assert(bh->start() >= start);
+ ceph_assert(bh->end() <= start+(loff_t)length);
if (r >= 0) {
// ok! mark bh clean and error-free
}
// update last_commit.
- assert(ob->last_commit_tid < tid);
+ ceph_assert(ob->last_commit_tid < tid);
ob->last_commit_tid = tid;
// waiters?
void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
{
- assert(trace != nullptr);
- assert(lock.is_locked());
+ ceph_assert(trace != nullptr);
+ ceph_assert(lock.is_locked());
ceph::real_time cutoff = ceph::real_clock::now();
ldout(cct, 10) << "flush " << amount << dendl;
void ObjectCacher::trim()
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
<< get_stat_clean() << ", objects: max " << max_objects
<< " current " << ob_lru.lru_get_size() << dendl;
break;
ldout(cct, 10) << "trim trimming " << *bh << dendl;
- assert(bh->is_clean() || bh->is_zero() || bh->is_error());
+ ceph_assert(bh->is_clean() || bh->is_zero() || bh->is_error());
Object *ob = bh->ob;
bh_remove(ob, bh);
bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
snapid_t snapid)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
for (vector<ObjectExtent>::iterator ex_it = extents.begin();
ex_it != extents.end();
++ex_it) {
int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bool external_call, ZTracer::Trace *trace)
{
- assert(trace != nullptr);
- assert(lock.is_locked());
+ ceph_assert(trace != nullptr);
+ ceph_assert(lock.is_locked());
bool success = true;
int error = 0;
uint64_t bytes_in_cache = 0;
* passed in a single ObjectExtent. Any caller who wants ENOENT instead of
* zeroed buffers needs to feed single extents into readx().
*/
- assert(!oset->return_enoent || rd->extents.size() == 1);
+ ceph_assert(!oset->return_enoent || rd->extents.size() == 1);
for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
ex_it != rd->extents.end();
touch_bh(bh_it->second);
} else {
- assert(!hits.empty());
+ ceph_assert(!hits.empty());
// make a plain list
for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
// - the buffer frags need not be (and almost certainly aren't)
loff_t opos = ex_it->offset;
map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
- assert(bh_it->second->start() <= opos);
+ ceph_assert(bh_it->second->start() <= opos);
uint64_t bhoff = opos - bh_it->second->start();
vector<pair<uint64_t,uint64_t> >::iterator f_it
= ex_it->buffer_extents.begin();
uint64_t foff = 0;
while (1) {
BufferHead *bh = bh_it->second;
- assert(opos == (loff_t)(bh->start() + bhoff));
+ ceph_assert(opos == (loff_t)(bh->start() + bhoff));
uint64_t len = std::min(f_it->second - foff, bh->length() - bhoff);
ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
if (f_it == ex_it->buffer_extents.end())
break;
}
- assert(f_it == ex_it->buffer_extents.end());
- assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
+ ceph_assert(f_it == ex_it->buffer_extents.end());
+ ceph_assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
}
if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
i != stripe_map.end();
++i) {
- assert(pos == i->first);
+ ceph_assert(pos == i->first);
ldout(cct, 10) << "readx adding buffer len " << i->second.length()
<< " at " << pos << dendl;
pos += i->second.length();
rd->bl->claim_append(i->second);
- assert(rd->bl->length() == pos);
+ ceph_assert(rd->bl->length() == pos);
}
ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
} else if (!error) {
// done with read.
int ret = error ? error : pos;
ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
- assert(pos <= (uint64_t) INT_MAX);
+ ceph_assert(pos <= (uint64_t) INT_MAX);
delete rd;
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
ZTracer::Trace *parent_trace)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ceph::real_time now = ceph::real_clock::now();
uint64_t bytes_written = 0;
uint64_t bytes_written_in_flush = 0;
<< f_it->second << " into " << *bh << " at " << opos
<< dendl;
uint64_t bhoff = opos - bh->start();
- assert(f_it->second <= bh->length() - bhoff);
+ ceph_assert(f_it->second <= bh->length() - bhoff);
// get the frag we're mapping in
bufferlist frag;
void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
ZTracer::Trace *trace)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ceph::mono_time start = ceph::mono_clock::now();
int blocked = 0;
// wait for writeback?
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
ZTracer::Trace *trace, Context *onfreespace)
{
- assert(lock.is_locked());
- assert(trace != nullptr);
+ ceph_assert(lock.is_locked());
+ ceph_assert(trace != nullptr);
int ret = 0;
if (max_dirty > 0 && !(wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_FUA)) {
if (onfreespace)
onfreespace->complete(0);
} else {
- assert(onfreespace);
+ ceph_assert(onfreespace);
finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
}
} else {
bool done = false;
Context *fin = block_writes_upfront ?
new C_Cond(&cond, &done, &ret) : onfreespace;
- assert(fin);
+ ceph_assert(fin);
bool flushed = flush_set(oset, wr->extents, trace, fin);
- assert(!flushed); // we just dirtied it, and didn't drop our lock!
+ ceph_assert(!flushed); // we just dirtied it, and didn't drop our lock!
ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
<< " bytes" << dendl;
if (block_writes_upfront) {
bool ObjectCacher::set_is_empty(ObjectSet *oset)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
if (oset->objects.empty())
return true;
bool ObjectCacher::set_is_cached(ObjectSet *oset)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
if (oset->objects.empty())
return false;
bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
if (oset->objects.empty())
return false;
// purge. non-blocking. violently removes dirty buffers from cache.
void ObjectCacher::purge(Object *ob)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 10) << "purge " << *ob << dendl;
ob->truncate(0);
bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
ZTracer::Trace *trace)
{
- assert(trace != nullptr);
- assert(lock.is_locked());
+ ceph_assert(trace != nullptr);
+ ceph_assert(lock.is_locked());
list<BufferHead*> blist;
bool clean = true;
ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
Context *onfinish)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
if (gather->has_subs()) {
gather->set_finisher(onfinish);
gather->activate();
// returns true if already flushed
bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
{
- assert(lock.is_locked());
- assert(onfinish != NULL);
+ ceph_assert(lock.is_locked());
+ ceph_assert(onfinish != NULL);
if (oset->objects.empty()) {
ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
onfinish->complete(0);
bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
ZTracer::Trace *trace, Context *onfinish)
{
- assert(lock.is_locked());
- assert(trace != nullptr);
- assert(onfinish != NULL);
+ ceph_assert(lock.is_locked());
+ ceph_assert(trace != nullptr);
+ ceph_assert(onfinish != NULL);
if (oset->objects.empty()) {
ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
onfinish->complete(0);
// returns true if already flushed
bool ObjectCacher::flush_all(Context *onfinish)
{
- assert(lock.is_locked());
- assert(onfinish != NULL);
+ ceph_assert(lock.is_locked());
+ ceph_assert(onfinish != NULL);
ldout(cct, 10) << "flush_all " << dendl;
void ObjectCacher::purge_set(ObjectSet *oset)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
if (oset->objects.empty()) {
ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
return;
// Although we have purged rather than flushed, caller should still
// drop any resources associate with dirty data.
- assert(oset->dirty_or_tx == 0);
+ ceph_assert(oset->dirty_or_tx == 0);
if (flush_set_callback && were_dirty) {
flush_set_callback(flush_set_callback_arg, oset);
}
loff_t ObjectCacher::release(Object *ob)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
list<BufferHead*> clean;
loff_t o_unclean = 0;
if (ob->can_close()) {
ldout(cct, 10) << "release trimming " << *ob << dendl;
close_object(ob);
- assert(o_unclean == 0);
+ ceph_assert(o_unclean == 0);
return 0;
}
loff_t ObjectCacher::release_set(ObjectSet *oset)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
// return # bytes not clean (and thus not released).
loff_t unclean = 0;
uint64_t ObjectCacher::release_all()
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 10) << "release_all" << dendl;
uint64_t unclean = 0;
void ObjectCacher::clear_nonexistence(ObjectSet *oset)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
for (xlist<Object*>::iterator p = oset->objects.begin();
*/
void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
bool was_dirty = oset->dirty_or_tx > 0;
_discard(oset, exls, nullptr);
const vector<ObjectExtent>& exls,
Context* on_finish)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
bool was_dirty = oset->dirty_or_tx > 0;
C_GatherBuilder gather(cct);
bool flushed = was_dirty && oset->dirty_or_tx == 0;
gather.set_finisher(new FunctionContext(
[this, oset, flushed, on_finish](int) {
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
if (flushed && flush_set_callback)
flush_set_callback(flush_set_callback_arg, oset);
if (on_finish)
void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
Context* on_finish)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
// did we truncate off dirty data?
if (flush_set_callback && was_dirty && oset->dirty_or_tx == 0) {
void ObjectCacher::verify_stats() const
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 10) << "verify_stats" << dendl;
loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
<< " dirty " << dirty << " missing " << missing
<< " error " << error << dendl;
- assert(clean == stat_clean);
- assert(rx == stat_rx);
- assert(tx == stat_tx);
- assert(dirty == stat_dirty);
- assert(missing == stat_missing);
- assert(zero == stat_zero);
- assert(error == stat_error);
+ ceph_assert(clean == stat_clean);
+ ceph_assert(rx == stat_rx);
+ ceph_assert(tx == stat_tx);
+ ceph_assert(dirty == stat_dirty);
+ ceph_assert(missing == stat_missing);
+ ceph_assert(zero == stat_zero);
+ ceph_assert(error == stat_error);
}
void ObjectCacher::bh_stat_add(BufferHead *bh)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
switch (bh->get_state()) {
case BufferHead::STATE_MISSING:
stat_missing += bh->length();
stat_error += bh->length();
break;
default:
- assert(0 == "bh_stat_add: invalid bufferhead state");
+ ceph_assert(0 == "bh_stat_add: invalid bufferhead state");
}
if (get_stat_dirty_waiting() > 0)
stat_cond.Signal();
void ObjectCacher::bh_stat_sub(BufferHead *bh)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
switch (bh->get_state()) {
case BufferHead::STATE_MISSING:
stat_missing -= bh->length();
stat_error -= bh->length();
break;
default:
- assert(0 == "bh_stat_sub: invalid bufferhead state");
+ ceph_assert(0 == "bh_stat_sub: invalid bufferhead state");
}
}
void ObjectCacher::bh_set_state(BufferHead *bh, int s)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
int state = bh->get_state();
// move between lru lists?
if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
ob->add_bh(bh);
if (bh->is_dirty()) {
void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
{
- assert(lock.is_locked());
- assert(bh->get_journal_tid() == 0);
+ ceph_assert(lock.is_locked());
+ ceph_assert(bh->get_journal_tid() == 0);
ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
ob->remove_bh(bh);
if (bh->is_dirty()) {
// reference counting
int get() {
- assert(ref >= 0);
+ ceph_assert(ref >= 0);
if (ref == 0) lru_pin();
return ++ref;
}
int put() {
- assert(ref > 0);
+ ceph_assert(ref > 0);
if (ref == 1) lru_unpin();
--ref;
return ref;
}
~Object() {
reads.clear();
- assert(ref == 0);
- assert(data.empty());
- assert(dirty_or_tx == 0);
+ ceph_assert(ref == 0);
+ ceph_assert(data.empty());
+ ceph_assert(dirty_or_tx == 0);
set_item.remove_myself();
}
bool can_close() const {
if (lru_is_expireable()) {
- assert(data.empty());
- assert(waitfor_commit.empty());
+ ceph_assert(data.empty());
+ ceph_assert(waitfor_commit.empty());
return true;
}
return false;
void add_bh(BufferHead *bh) {
if (data.empty())
get();
- assert(data.count(bh->start()) == 0);
+ ceph_assert(data.count(bh->start()) == 0);
data[bh->start()] = bh;
}
void remove_bh(BufferHead *bh) {
- assert(data.count(bh->start()));
+ ceph_assert(data.count(bh->start()));
data.erase(bh->start());
if (data.empty())
put();
// reference counting
int get() {
- assert(ref >= 0);
+ ceph_assert(ref >= 0);
if (ref == 0) lru_pin();
return ++ref;
}
int put() {
- assert(ref > 0);
+ ceph_assert(ref > 0);
if (ref == 1) lru_unpin();
--ref;
return ref;
flusher_thread.create("flusher");
}
void stop() {
- assert(flusher_thread.is_started());
+ ceph_assert(flusher_thread.is_started());
lock.Lock(); // hmm.. watch out for deadlock!
flusher_stop = true;
flusher_cond.Signal();
*/
void Objecter::init()
{
- assert(!initialized);
+ ceph_assert(!initialized);
if (!logger) {
PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
void Objecter::shutdown()
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
void Objecter::_send_linger(LingerOp *info,
shunique_lock& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
vector<OSDOp> opv;
Context *oncommit = NULL;
linger_ops.erase(info->linger_id);
linger_ops_set.erase(info);
- assert(linger_ops.size() == linger_ops_set.size());
+ ceph_assert(linger_ops.size() == linger_ops_set.size());
info->canceled = true;
info->put();
<< dendl;
linger_ops[info->linger_id] = info;
linger_ops_set.insert(info);
- assert(linger_ops.size() == linger_ops_set.size());
+ ceph_assert(linger_ops.size() == linger_ops_set.size());
info->get(); // for the caller
return info;
void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
- assert(info->linger_id);
- assert(info->ctx_budget != -1); // caller needs to have taken budget already!
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(info->linger_id);
+ ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
// Populate Op::target
OSDSession *s = NULL;
// Create LingerOp<->OSDSession relation
int r = _get_session(info->target.osd, &s, sul);
- assert(r == 0);
+ ceph_assert(r == 0);
OSDSession::unique_lock sl(s->lock);
_session_linger_op_assign(s, info);
sl.unlock();
ldout(cct, 10) << __func__ << " " << *m << dendl;
shared_lock l(rwlock);
- assert(initialized);
+ ceph_assert(initialized);
if (info->canceled) {
l.unlock();
}
// notify completion?
- assert(info->is_watch);
- assert(info->watch_context);
- assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
+ ceph_assert(info->is_watch);
+ ceph_assert(info->watch_context);
+ ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
l.unlock();
shunique_lock& sul,
const mempool::osdmap::map<int64_t,OSDMap::snap_interval_set_t> *gap_removed_snaps)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
list<LingerOp*> unregister_lingers;
map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
while (lp != s->linger_ops.end()) {
LingerOp *op = lp->second;
- assert(op->session == s);
+ ceph_assert(op->session == s);
// check_linger_pool_dne() may touch linger_ops; prevent iterator
// invalidation
++lp;
if (!initialized)
return;
- assert(osdmap);
+ ceph_assert(osdmap);
if (m->fsid != monc->get_fsid()) {
ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
}
}
- assert(e == osdmap->get_epoch());
+ ceph_assert(e == osdmap->get_epoch());
}
} else {
bool mapped_session = false;
if (!s) {
int r = _map_session(&op->target, &s, sul);
- assert(r == 0);
+ ceph_assert(r == 0);
mapped_session = true;
} else {
get_session(s);
_calc_target(&op->target, nullptr);
OSDSession *s = NULL;
const int r = _get_session(op->target.osd, &s, sul);
- assert(r == 0);
- assert(s != NULL);
+ ceph_assert(r == 0);
+ ceph_assert(s != NULL);
op->session = s;
put_session(s);
}
OSDSession *s = op->session;
if (s) {
- assert(s != NULL);
- assert(sl->mutex() == &s->lock);
+ ceph_assert(s != NULL);
+ ceph_assert(sl->mutex() == &s->lock);
bool session_locked = sl->owns_lock();
if (!session_locked) {
sl->lock();
*/
int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
{
- assert(sul && sul.mutex() == &rwlock);
+ ceph_assert(sul && sul.mutex() == &rwlock);
if (osd < 0) {
*session = homeless_session;
void Objecter::get_session(Objecter::OSDSession *s)
{
- assert(s != NULL);
+ ceph_assert(s != NULL);
if (!s->is_homeless()) {
ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
j != session->linger_ops.end(); ++j) {
LingerOp *op = j->second;
op->get();
- assert(lresend.count(j->first) == 0);
+ ceph_assert(lresend.count(j->first) == 0);
lresend[j->first] = op;
}
void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
unique_lock& ul)
{
- assert(ul.owns_lock());
+ ceph_assert(ul.owns_lock());
shunique_lock sul(std::move(ul));
while (!lresend.empty()) {
LingerOp *op = lresend.begin()->second;
void Objecter::start_tick()
{
- assert(tick_event == 0);
+ ceph_assert(tick_event == 0);
tick_event =
timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
&Objecter::tick, this);
p != s->ops.end();
++p) {
Op *op = p->second;
- assert(op->session);
+ ceph_assert(op->session);
if (op->stamp < cutoff) {
ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
<< " is laggy" << dendl;
++p) {
LingerOp *op = p->second;
LingerOp::unique_lock wl(op->watch_lock);
- assert(op->session);
+ ceph_assert(op->session);
ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
<< " (osd." << op->session->osd << ")" << dendl;
found = true;
p != s->command_ops.end();
++p) {
CommandOp *op = p->second;
- assert(op->session);
+ ceph_assert(op->session);
ldout(cct, 10) << " pinging osd that serves command tid " << p->first
<< " (osd." << op->session->osd << ")" << dendl;
found = true;
ceph_tid_t *ptid,
int *ctx_budget)
{
- assert(initialized);
+ ceph_assert(initialized);
- assert(op->ops.size() == op->out_bl.size());
- assert(op->ops.size() == op->out_rval.size());
- assert(op->ops.size() == op->out_handler.size());
+ ceph_assert(op->ops.size() == op->out_bl.size());
+ ceph_assert(op->ops.size() == op->out_rval.size());
+ ceph_assert(op->ops.size() == op->out_handler.size());
// throttle. before we look at any state, because
// _take_op_budget() may drop our lock while it blocks.
ldout(cct, 10) << __func__ << " op " << op << dendl;
// pick target
- assert(op->session == NULL);
+ ceph_assert(op->session == NULL);
OSDSession *s = NULL;
bool check_for_latest_map = _calc_target(&op->target, nullptr)
}
}
if (r == -EAGAIN) {
- assert(s == NULL);
+ ceph_assert(s == NULL);
r = _get_session(op->target.osd, &s, sul);
}
- assert(r == 0);
- assert(s); // may be homeless
+ ceph_assert(r == 0);
+ ceph_assert(s); // may be homeless
_send_op_account(op);
// send?
- assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
+ ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
if (osdmap_full_try) {
op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
OSDSession::unique_lock sl(s->lock);
int cancel_result = op_cancel(s, *titer, r);
// We hold rwlock across search and cancellation, so cancels
// should always succeed
- assert(cancel_result == 0);
+ ceph_assert(cancel_result == 0);
}
if (!found && to_cancel.size())
found = true;
pg_t pgid;
if (t->precalc_pgid) {
- assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
- assert(t->base_oid.name.empty()); // make sure this is a pg op
- assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
+ ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
+ ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
+ ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
pgid = t->base_pgid;
} else {
int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
t->used_replica = true;
}
}
- assert(best >= 0);
+ ceph_assert(best >= 0);
osd = acting[best];
} else {
osd = acting_primary;
void Objecter::_session_op_assign(OSDSession *to, Op *op)
{
// to->lock is locked
- assert(op->session == NULL);
- assert(op->tid);
+ ceph_assert(op->session == NULL);
+ ceph_assert(op->tid);
get_session(to);
op->session = to;
void Objecter::_session_op_remove(OSDSession *from, Op *op)
{
- assert(op->session == from);
+ ceph_assert(op->session == from);
// from->lock is locked
if (from->is_homeless()) {
void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
{
// to lock is locked unique
- assert(op->session == NULL);
+ ceph_assert(op->session == NULL);
if (to->is_homeless()) {
num_homeless_ops++;
void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
{
- assert(from == op->session);
+ ceph_assert(from == op->session);
// from->lock is locked unique
if (from->is_homeless()) {
void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
{
- assert(from == op->session);
+ ceph_assert(from == op->session);
// from->lock is locked
if (from->is_homeless()) {
void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
{
// to->lock is locked
- assert(op->session == NULL);
- assert(op->tid);
+ ceph_assert(op->session == NULL);
+ ceph_assert(op->tid);
if (to->is_homeless()) {
num_homeless_ops++;
OSDSession *s = NULL;
r = _get_session(linger_op->target.osd, &s, sul);
- assert(r == 0);
+ ceph_assert(r == 0);
if (linger_op->session != s) {
// NB locking two sessions (s and linger_op->session) at the
{
ldout(cct, 15) << "cancel_op " << op->tid << dendl;
- assert(!op->should_resend);
+ ceph_assert(!op->should_resend);
if (op->onfinish) {
delete op->onfinish;
num_in_flight--;
logger->dec(l_osdc_op_active);
- assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
+ ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
inflight_ops--;
}
}
- assert(op->tid > 0);
+ ceph_assert(op->tid > 0);
MOSDOp *m = _prepare_osd_op(op);
if (op->target.actual_pgid != m->get_spg()) {
<< dendl;
ConnectionRef con = op->session->con;
- assert(con);
+ ceph_assert(con);
// preallocated rx buffer?
if (op->con) {
shunique_lock& sul,
int op_budget)
{
- assert(sul && sul.mutex() == &rwlock);
+ ceph_assert(sul && sul.mutex() == &rwlock);
bool locked_for_write = sul.owns_lock();
if (!op_budget)
vector<bufferlist*>::iterator pb = op->out_bl.begin();
vector<int*>::iterator pr = op->out_rval.begin();
vector<Context*>::iterator ph = op->out_handler.begin();
- assert(op->out_bl.size() == op->out_rval.size());
- assert(op->out_bl.size() == op->out_handler.size());
+ ceph_assert(op->out_bl.size() == op->out_rval.size());
+ ceph_assert(op->out_bl.size() == op->out_handler.size());
vector<OSDOp>::iterator p = out_ops.begin();
for (unsigned i = 0;
p != out_ops.end() && pb != op->out_bl.end();
<< " [" << b->begin << "," << b->end
<< ")" << dendl;
auto spgp = s->backoffs.find(b->pgid);
- assert(spgp != s->backoffs.end());
+ ceph_assert(spgp != s->backoffs.end());
spgp->second.erase(b->begin);
if (spgp->second.empty()) {
s->backoffs.erase(spgp);
// map epoch changed, probably because a MOSDMap message
// sneaked in. Do caller-specified callback now or else
// we lose it forever.
- assert(op->onfinish);
+ ceph_assert(op->onfinish);
op->onfinish->complete(m->replyCode);
}
} else {
- assert(op->onfinish);
+ ceph_assert(op->onfinish);
op->onfinish->complete(m->replyCode);
}
op->onfinish = NULL;
int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
c->map_check_error = 0;
OSDSession *s;
int r = _get_session(c->target.osd, &s, sul);
- assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+ ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
if (c->session != s) {
put_session(s);
void Objecter::_assign_command_session(CommandOp *c,
shunique_lock& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
OSDSession *s;
int r = _get_session(c->target.osd, &s, sul);
- assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+ ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
if (c->session != s) {
if (c->session) {
void Objecter::_send_command(CommandOp *c)
{
ldout(cct, 10) << "_send_command " << c->tid << dendl;
- assert(c->session);
- assert(c->session->con);
+ ceph_assert(c->session);
+ ceph_assert(c->session->con);
MCommand *m = new MCommand(monc->monmap.fsid);
m->cmd = c->cmd;
m->set_data(c->inbl);
int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
{
// Caller is responsible for re-assigning or
// destroying any ops that were assigned to us
- assert(ops.empty());
- assert(linger_ops.empty());
- assert(command_ops.empty());
+ ceph_assert(ops.empty());
+ ceph_assert(linger_ops.empty());
+ ceph_assert(command_ops.empty());
}
Objecter::~Objecter()
{
delete osdmap;
- assert(homeless_session->get_nref() == 1);
- assert(num_homeless_ops == 0);
+ ceph_assert(homeless_session->get_nref() == 1);
+ ceph_assert(num_homeless_ops == 0);
homeless_session->put();
- assert(osd_sessions.empty());
- assert(poolstat_ops.empty());
- assert(statfs_ops.empty());
- assert(pool_ops.empty());
- assert(waiting_for_map.empty());
- assert(linger_ops.empty());
- assert(check_latest_map_lingers.empty());
- assert(check_latest_map_ops.empty());
- assert(check_latest_map_commands.empty());
+ ceph_assert(osd_sessions.empty());
+ ceph_assert(poolstat_ops.empty());
+ ceph_assert(statfs_ops.empty());
+ ceph_assert(pool_ops.empty());
+ ceph_assert(waiting_for_map.empty());
+ ceph_assert(linger_ops.empty());
+ ceph_assert(check_latest_map_lingers.empty());
+ ceph_assert(check_latest_map_ops.empty());
+ ceph_assert(check_latest_map_commands.empty());
- assert(!m_request_state_hook);
- assert(!logger);
+ ceph_assert(!m_request_state_hook);
+ ceph_assert(!logger);
}
/**
hobject_t *next,
Context *on_finish)
{
- assert(result);
+ ceph_assert(result);
if (!end.is_max() && start > end) {
lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
}
shared_lock rl(rwlock);
- assert(osdmap->get_epoch());
+ ceph_assert(osdmap->get_epoch());
if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
rl.unlock();
lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
return;
}
- assert(next != NULL);
+ ceph_assert(next != NULL);
// Decode the results
auto iter = bl.cbegin();
{
OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
op->flags |= CEPH_OSD_FLAG_PGOP;
- assert(interval);
+ ceph_assert(interval);
arg.encode(osd_op.indata);
unsigned p = op->ops.size() - 1;
auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
}
void set_last_op_flags(int flags) {
- assert(!ops.empty());
+ ceph_assert(!ops.empty());
ops.rbegin()->op.flags = flags;
}
}
void finished_async() {
unique_lock l(watch_lock);
- assert(!watch_pending_async.empty());
+ ceph_assert(!watch_pending_async.empty());
watch_pending_async.pop_front();
}
int calc_op_budget(const vector<OSDOp>& ops);
void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
int _take_op_budget(Op *op, shunique_lock& sul) {
- assert(sul && sul.mutex() == &rwlock);
+ ceph_assert(sul && sul.mutex() == &rwlock);
int op_budget = calc_op_budget(op->ops);
if (keep_balanced_budget) {
_throttle_op(op, sul, op_budget);
int take_linger_budget(LingerOp *info);
friend class WatchContext; // to invoke put_up_budget_bytes
void put_op_budget_bytes(int op_budget) {
- assert(op_budget >= 0);
+ ceph_assert(op_budget >= 0);
op_throttle_bytes.put(op_budget);
op_throttle_ops.put(1);
}
void osd_command(int osd, const std::vector<string>& cmd,
const bufferlist& inbl, ceph_tid_t *ptid,
bufferlist *poutbl, string *prs, Context *onfinish) {
- assert(osd >= 0);
+ ceph_assert(osd >= 0);
CommandOp *c = new CommandOp(
osd,
cmd,
bit != p->buffer_extents.end();
++bit)
bl.copy(bit->first, bit->second, cur);
- assert(cur.length() == p->length);
+ ceph_assert(cur.length() == p->length);
write_trunc(p->oid, p->oloc, p->offset, p->length,
snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
oncommit ? gcom.new_sub():0,
ldout(cct, 10) << "file_to_extents " << offset << "~" << len
<< " format " << object_format
<< dendl;
- assert(len > 0);
+ ceph_assert(len > 0);
/*
* we want only one extent per object! this means that each extent
__u32 object_size = layout->object_size;
__u32 su = layout->stripe_unit;
__u32 stripe_count = layout->stripe_count;
- assert(object_size >= su);
+ ceph_assert(object_size >= su);
if (stripe_count == 1) {
ldout(cct, 20) << " sc is one, reset su to os" << dendl;
su = object_size;
__u32 object_size = layout->object_size;
__u32 su = layout->stripe_unit;
__u32 stripe_count = layout->stripe_count;
- assert(object_size >= su);
+ ceph_assert(object_size >= su);
uint64_t stripes_per_object = object_size / su;
ldout(cct, 20) << " stripes_per_object " << stripes_per_object << dendl;
__u32 object_size = layout->object_size;
__u32 su = layout->stripe_unit;
__u32 stripe_count = layout->stripe_count;
- assert(object_size >= su);
+ ceph_assert(object_size >= su);
uint64_t stripes_per_object = object_size / su;
uint64_t objectsetno = objectno / stripe_count;
}
}
- assert(s->first <= bl_off);
+ ceph_assert(s->first <= bl_off);
size_t left = (s->first + s->second) - bl_off;
size_t actual = std::min(left, tlen);
void Striper::StripedReadResult::assemble_result(CephContext *cct, char *buffer, size_t length)
{
- assert(buffer && length == total_intended_len);
+ ceph_assert(buffer && length == total_intended_len);
map<uint64_t,pair<bufferlist,uint64_t> >::reverse_iterator p = partial.rbegin();
if (p == partial.rend())
ldout(cct, 20) << "assemble_result(" << this << ") " << p->first << "~" << p->second.second
<< " " << p->second.first.length() << " bytes"
<< dendl;
- assert(p->first == end - p->second.second);
+ ceph_assert(p->first == end - p->second.second);
end = p->first;
size_t len = p->second.first.length();
- assert(curr >= p->second.second);
+ ceph_assert(curr >= p->second.second);
curr -= p->second.second;
if (len < p->second.second) {
if (len)
++p;
}
partial.clear();
- assert(curr == 0);
+ ceph_assert(curr == 0);
}