#include "cls_fifo_legacy.h"
namespace rgw::cls::fifo {
+static constexpr auto dout_subsys = ceph_subsys_rgw;
namespace cb = ceph::buffer;
namespace fifo = rados::cls::fifo;
std::optional<fifo::objv> objv, fifo::info* info,
std::uint32_t* part_header_size,
std::uint32_t* part_entry_overhead,
+ uint64_t tid,
optional_yield y)
{
lr::ObjectReadOperation op;
if (part_entry_overhead)
*part_entry_overhead = reply.part_entry_overhead;
} catch (const cb::error& err) {
+ lderr(static_cast<CephContext*>(ioctx.cct()))
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " decode failed: " << err.what()
+ << " tid=" << tid << dendl;
r = from_error_code(err.code());
- }
+ } else {
+ lderr(static_cast<CephContext*>(ioctx.cct()))
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::GET_META failed r=" << r << " tid=" << tid
+ << dendl;
+ }
return r;
};
}
int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
- std::deque<cb::list> data_bufs, optional_yield y)
+ std::deque<cb::list> data_bufs, std::uint64_t tid,
+ optional_yield y)
{
lr::ObjectWriteOperation op;
fifo::op::push_part pp;
auto retval = 0;
op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, nullptr, &retval);
auto r = rgw_rados_operate(ioctx, oid, &op, y, lr::OPERATION_RETURNVEC);
- return r < 0 ? r : retval;
+ if (r < 0) {
+ lderr(static_cast<CephContext*>(ioctx.cct()))
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::PUSH_PART failed r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ if (retval < 0) {
+ lderr(static_cast<CephContext*>(ioctx.cct()))
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " error handling response retval=" << retval
+ << " tid=" << tid << dendl;
+ }
+ return retval;
}
void trim_part(lr::ObjectWriteOperation* op,
std::uint64_t max_entries,
std::vector<fifo::part_list_entry>* entries,
bool* more, bool* full_part, std::string* ptag,
- optional_yield y)
+ std::uint64_t tid, optional_yield y)
{
lr::ObjectReadOperation op;
fifo::op::list_part lp;
if (full_part) *full_part = reply.full_part;
if (ptag) *ptag = reply.tag;
} catch (const cb::error& err) {
+ lderr(static_cast<CephContext*>(ioctx.cct()))
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " decode failed: " << err.what()
+ << " tid=" << tid << dendl;
r = from_error_code(err.code());
- }
+ } else if (r != -ENOENT) {
+ lderr(static_cast<CephContext*>(ioctx.cct()))
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
+ << dendl;
+ }
return r;
}
int get_part_info(lr::IoCtx& ioctx, const std::string& oid,
- fifo::part_header* header, optional_yield y)
+ fifo::part_header* header,
+ std::uint64_t tid, optional_yield y)
{
lr::ObjectReadOperation op;
fifo::op::get_part_info gpi;
decode(reply, iter);
if (header) *header = std::move(reply.header);
} catch (const cb::error& err) {
+ lderr(static_cast<CephContext*>(ioctx.cct()))
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " decode failed: " << err.what()
+ << " tid=" << tid << dendl;
r = from_error_code(err.code());
- }
+ } else {
+ lderr(static_cast<CephContext*>(ioctx.cct()))
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
+ << dendl;
+ }
return r;
}
int FIFO::apply_update(fifo::info* info,
const fifo::objv& objv,
- const fifo::update& update)
+ const fifo::update& update,
+ std::uint64_t tid)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
std::unique_lock l(m);
- auto err = info->apply_update(update);
if (objv != info->version) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " version mismatch, canceling: tid=" << tid << dendl;
return -ECANCELED;
}
+ auto err = info->apply_update(update);
if (err) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " error applying update: " << *err << " tid=" << tid << dendl;
return -ECANCELED;
}
int FIFO::_update_meta(const fifo::update& update,
fifo::objv version, bool* pcanceled,
- optional_yield y)
+ std::uint64_t tid, optional_yield y)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
lr::ObjectWriteOperation op;
bool canceled = false;
update_meta(&op, info.version, update);
if (r >= 0 || r == -ECANCELED) {
canceled = (r == -ECANCELED);
if (!canceled) {
- r = apply_update(&info, version, update);
+ r = apply_update(&info, version, update, tid);
if (r < 0) canceled = true;
}
if (canceled) {
- r = read_meta(y);
+ r = read_meta(tid, y);
canceled = r < 0 ? false : true;
}
}
if (pcanceled) *pcanceled = canceled;
+ if (canceled) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled: tid=" << tid << dendl;
+ }
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " returning error: r=" << r << " tid=" << tid << dendl;
+ }
return r;
}
fifo::objv version;
bool reread = false;
bool* pcanceled = nullptr;
+ std::uint64_t tid;
Updater(FIFO* fifo, lr::AioCompletion* super,
const fifo::update& update, fifo::objv version,
- bool* pcanceled)
+ bool* pcanceled, std::uint64_t tid)
: fifo(fifo), super(super), update(update), version(version),
- pcanceled(pcanceled) {
+ pcanceled(pcanceled), tid(tid) {
super->pc->get();
}
~Updater() {
}
};
-
void FIFO::update_callback(lr::completion_t, void* arg)
{
auto updater = static_cast<Updater*>(arg);
+ auto cct = updater->fifo->cct;
+ auto tid = updater->tid;
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
if (!updater->reread) {
int r = updater->cur->get_return_value();
if (r < 0 && r != -ECANCELED) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " update failed: r=" << r << " tid=" << tid << dendl;
complete(updater->super, r);
delete updater;
return;
}
bool canceled = (r == -ECANCELED);
if (!canceled) {
- int r = updater->fifo->apply_update(&updater->fifo->info, updater->version,
- updater->update);
- if (r < 0)
+ int r = updater->fifo->apply_update(&updater->fifo->info,
+ updater->version,
+ updater->update, tid);
+ if (r < 0) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " update failed, marking canceled: r=" << r << " tid="
+ << tid << dendl;
canceled = true;
+ }
}
if (!canceled) {
if (updater->pcanceled)
*updater->pcanceled = false;
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " completing: tid=" << tid << dendl;
complete(updater->super, 0);
} else {
updater->cur->release();
arg, &FIFO::update_callback);
assert(uintptr_t(updater->cur) >= 0x1000);
updater->reread = true;
- auto r = updater->fifo->read_meta(updater->cur);
+ auto r = updater->fifo->read_meta(tid, updater->cur);
if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed dispatching read_meta: r=" << r << " tid="
+ << tid << dendl;
complete(updater->super, r);
delete updater;
}
} else if (r >= 0 && updater->pcanceled) {
*updater->pcanceled = true;
}
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed dispatching read_meta: r=" << r << " tid="
+ << tid << dendl;
+ } else {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " completing: tid=" << tid << dendl;
+ }
complete(updater->super, r);
delete updater;
}
int FIFO::_update_meta(const fifo::update& update,
fifo::objv version, bool* pcanceled,
- lr::AioCompletion* c)
+ std::uint64_t tid, lr::AioCompletion* c)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
lr::ObjectWriteOperation op;
update_meta(&op, info.version, update);
- auto updater = new Updater(this, c, update, version, pcanceled);
+ auto updater = new Updater(this, c, update, version, pcanceled, tid);
lr::AioCompletion* cur = lr::Rados::aio_create_completion(
static_cast<void*>(updater), &FIFO::update_callback);
updater->cur = cur;
assert(uintptr_t(updater->cur) >= 0x1000);
auto r = ioctx.aio_operate(oid, cur, &op);
if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed dispatching update_meta: r=" << r << " tid="
+ << tid << dendl;
delete updater;
}
return r;
}
-int FIFO::create_part(int64_t part_num, std::string_view tag, optional_yield y)
+int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid,
+ optional_yield y)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
lr::ObjectWriteOperation op;
op.create(false); /* We don't need exclusivity, part_init ensures
we're creating from the same journal entry. */
part_init(&op, tag, info.params);
auto oid = info.part_oid(part_num);
l.unlock();
- return rgw_rados_operate(ioctx, oid, &op, y);
+ auto r = rgw_rados_operate(ioctx, oid, &op, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " part_init failed: r=" << r << " tid="
+ << tid << dendl;
+ }
+ return r;
}
-int FIFO::remove_part(int64_t part_num, std::string_view tag, optional_yield y)
+int FIFO::remove_part(int64_t part_num, std::string_view tag, std::uint64_t tid,
+ optional_yield y)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
lr::ObjectWriteOperation op;
op.remove();
std::unique_lock l(m);
auto oid = info.part_oid(part_num);
l.unlock();
- return rgw_rados_operate(ioctx, oid, &op, y);
+ auto r = rgw_rados_operate(ioctx, oid, &op, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " remove failed: r=" << r << " tid="
+ << tid << dendl;
+ }
+ return r;
}
-int FIFO::process_journal(optional_yield y)
+int FIFO::process_journal(std::uint64_t tid, optional_yield y)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
std::vector<fifo::journal_entry> processed;
std::unique_lock l(m);
int r = 0;
for (auto& [n, entry] : tmpjournal) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing entry: entry=" << entry << " tid=" << tid
+ << dendl;
switch (entry.op) {
case fifo::journal_entry::Op::create:
- r = create_part(entry.part_num, entry.part_tag, y);
+ r = create_part(entry.part_num, entry.part_tag, tid, y);
if (entry.part_num > new_max) {
new_max = entry.part_num;
}
}
break;
case fifo::journal_entry::Op::remove:
- r = remove_part(entry.part_num, entry.part_tag, y);
+ r = remove_part(entry.part_num, entry.part_tag, tid, y);
if (r == -ENOENT) r = 0;
if (entry.part_num >= new_tail) {
new_tail = entry.part_num + 1;
}
break;
default:
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " unknown journaled op: entry=" << entry << " tid="
+ << tid << dendl;
return -EIO;
}
if (r < 0) {
- return -EIO;
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing entry failed: entry=" << entry
+ << " r=" << r << " tid=" << tid << dendl;
+ return -r;
}
processed.push_back(std::move(entry));
}
// Postprocess
-
bool canceled = true;
for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " postprocessing: i=" << i << " tid=" << tid << dendl;
+
std::optional<int64_t> tail_part_num;
std::optional<int64_t> head_part_num;
std::optional<int64_t> max_part_num;
if (processed.empty() &&
!tail_part_num &&
!max_part_num) {
- /* nothing to update anymore */
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " nothing to update any more: i=" << i << " tid="
+ << tid << dendl;
canceled = false;
break;
}
- r = _update_meta(fifo::update()
- .tail_part_num(tail_part_num)
- .head_part_num(head_part_num)
- .max_push_part_num(max_part_num)
- .journal_entries_rm(processed),
- objv, &canceled, y);
- if (r < 0) break;
+ auto u = fifo::update().tail_part_num(tail_part_num)
+ .head_part_num(head_part_num).max_push_part_num(max_part_num)
+ .journal_entries_rm(processed);
+ r = _update_meta(u, objv, &canceled, tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: update=" << u
+ << " r=" << r << " tid=" << tid << dendl;
+ break;
+ }
if (canceled) {
std::vector<fifo::journal_entry> new_processed;
std::unique_lock l(m);
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " update canceled, retrying: i=" << i << " tid="
+ << tid << dendl;
for (auto& e : processed) {
auto jiter = info.journal.find(e.part_num);
/* journal entry was already processed */
processed = std::move(new_processed);
}
}
- if (canceled)
+ if (r == 0 && canceled) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
r = -ECANCELED;
+ }
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed, r=: " << r << " tid=" << tid << dendl;
+ }
return r;
}
-int FIFO::_prepare_new_part(bool is_head, optional_yield y)
+int FIFO::_prepare_new_part(bool is_head, std::uint64_t tid, optional_yield y)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
std::unique_lock l(m);
std::vector jentries = { info.next_journal_entry(generate_tag()) };
std::int64_t new_head_part_num = info.head_part_num;
auto version = info.version;
if (is_head) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " needs new head: tid=" << tid << dendl;
auto new_head_jentry = jentries.front();
new_head_jentry.op = fifo::journal_entry::Op::set_head;
new_head_part_num = jentries.front().part_num;
bool canceled = true;
for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
canceled = false;
- r = _update_meta(fifo::update{}.journal_entries_add(jentries),
- version, &canceled, y);
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " updating metadata: i=" << i << " tid=" << tid << dendl;
+ auto u = fifo::update{}.journal_entries_add(jentries);
+ r = _update_meta(u, version, &canceled, tid, y);
if (r >= 0 && canceled) {
std::unique_lock l(m);
auto found = (info.journal.find(jentries.front().part_num) !=
info.journal.end());
if ((info.max_push_part_num >= jentries.front().part_num &&
info.head_part_num >= new_head_part_num)) {
- // We don't even need to process the journal.
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but journaled and processed: i=" << i
+ << " tid=" << tid << dendl;
return 0;
}
if (found) {
- // Journaled, but not processed.
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, journaled but not processed: i=" << i
+ << " tid=" << tid << dendl;
canceled = false;
}
l.unlock();
}
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: update=" << u << " r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ }
+ if (canceled) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ return -ECANCELED;
+ }
+ r = process_journal(tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " process_journal failed: r=" << r << " tid=" << tid << dendl;
}
- if (canceled)
- r = -ECANCELED;
- if (r >= 0)
- r = process_journal(y);
return r;
}
-int FIFO::_prepare_new_head(optional_yield y) {
+int FIFO::_prepare_new_head(std::uint64_t tid, optional_yield y)
+{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
std::unique_lock l(m);
std::int64_t new_head_num = info.head_part_num + 1;
auto max_push_part_num = info.max_push_part_num;
int r = 0;
if (max_push_part_num < new_head_num) {
- r = _prepare_new_part(true, y);
- if (r >= 0) {
- std::unique_lock l(m);
- if (info.max_push_part_num < new_head_num) {
- r = -EIO;
- }
- l.unlock();
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new part: tid=" << tid << dendl;
+ r = _prepare_new_part(true, tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _prepare_new_part failed: r=" << r
+ << " tid=" << tid << dendl;
+ return r;
}
- } else {
- bool canceled = true;
- for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
- r = _update_meta(fifo::update{}.head_part_num(new_head_num),
- version, &canceled, y);
- if (r < 0)
- break;
- std::unique_lock l(m);
- auto head_part_num = info.head_part_num;
- version = info.version;
- l.unlock();
- if (canceled && (head_part_num >= new_head_num)) {
- // Race, but someone did it for us
- canceled = false;
- }
+ std::unique_lock l(m);
+ if (info.max_push_part_num < new_head_num) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " inconsistency, push part less than head part: "
+ << " tid=" << tid << dendl;
+ return -EIO;
+ }
+ l.unlock();
+ return 0;
+ }
+
+ bool canceled = true;
+ for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " updating head: i=" << i << " tid=" << tid << dendl;
+ auto u = fifo::update{}.head_part_num(new_head_num);
+ r = _update_meta(u, version, &canceled, tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: update=" << u << " r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ std::unique_lock l(m);
+ auto head_part_num = info.head_part_num;
+ version = info.version;
+ l.unlock();
+ if (canceled && (head_part_num >= new_head_num)) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but completed by the other caller: i=" << i
+ << " tid=" << tid << dendl;
+ canceled = false;
}
- if (canceled)
- r = -ECANCELED;
+ }
+ if (canceled) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ return -ECANCELED;
}
return 0;
}
int FIFO::push_entries(const std::deque<cb::list>& data_bufs,
- optional_yield y)
+ std::uint64_t tid, optional_yield y)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
std::unique_lock l(m);
auto head_part_num = info.head_part_num;
auto tag = info.head_tag;
const auto part_oid = info.part_oid(head_part_num);
l.unlock();
- return push_part(ioctx, part_oid, tag, data_bufs, y);
+ auto r = push_part(ioctx, part_oid, tag, data_bufs, tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " push_part failed: r=" << r << " tid=" << tid << dendl;
+ }
+ return r;
}
int FIFO::trim_part(int64_t part_num, uint64_t ofs,
std::optional<std::string_view> tag,
- bool exclusive,
+ bool exclusive, std::uint64_t tid,
optional_yield y)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
lr::ObjectWriteOperation op;
std::unique_lock l(m);
const auto part_oid = info.part_oid(part_num);
l.unlock();
rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
- return rgw_rados_operate(ioctx, part_oid, &op, y);
+ auto r = rgw_rados_operate(ioctx, part_oid, &op, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim_part failed: r=" << r << " tid=" << tid << dendl;
+ }
+ return 0;
}
int FIFO::trim_part(int64_t part_num, uint64_t ofs,
std::optional<std::string_view> tag,
- bool exclusive,
+ bool exclusive, std::uint64_t tid,
lr::AioCompletion* c)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
lr::ObjectWriteOperation op;
std::unique_lock l(m);
const auto part_oid = info.part_oid(part_num);
l.unlock();
rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
- return ioctx.aio_operate(part_oid, c, &op);
+ auto r = ioctx.aio_operate(part_oid, c, &op);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed scheduling trim_part: r=" << r
+ << " tid=" << tid << dendl;
+ }
+ return r;
}
int FIFO::open(lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
optional_yield y, std::optional<fifo::objv> objv)
{
+ auto cct = static_cast<CephContext*>(ioctx.cct());
+ ldout(cct, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
fifo::info info;
std::uint32_t size;
std::uint32_t over;
- int r = get_meta(ioctx, std::move(oid), objv, &info, &size, &over, y);
- if (r >= 0) {
- std::unique_ptr<FIFO> f(new FIFO(std::move(ioctx), oid));
- f->info = info;
- f->part_header_size = size;
- f->part_entry_overhead = over;
- // If there are journal entries, process them, in case
- // someone crashed mid-transaction.
- if (!info.journal.empty()) {
- r = f->process_journal(y);
- if (r >= 0 && fifo)
- *fifo = std::move(f);
- } else {
- *fifo = std::move(f);
+ int r = get_meta(ioctx, std::move(oid), objv, &info, &size, &over, 0, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " get_meta failed: r=" << r << dendl;
+ return r;
+ }
+ std::unique_ptr<FIFO> f(new FIFO(std::move(ioctx), oid));
+ f->info = info;
+ f->part_header_size = size;
+ f->part_entry_overhead = over;
+ // If there are journal entries, process them, in case
+ // someone crashed mid-transaction.
+ if (!info.journal.empty()) {
+ ldout(cct, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing leftover journal" << dendl;
+ r = f->process_journal(0, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " process_journal failed: r=" << r << dendl;
+ return r;
}
}
- return r;
+ *fifo = std::move(f);
+ return 0;
}
int FIFO::create(lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
bool exclusive, std::uint64_t max_part_size,
std::uint64_t max_entry_size)
{
+ auto cct = static_cast<CephContext*>(ioctx.cct());
+ ldout(cct, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
lr::ObjectWriteOperation op;
create_meta(&op, oid, objv, oid_prefix, exclusive, max_part_size,
max_entry_size);
- int r = rgw_rados_operate(ioctx, oid, &op, y);
- if (r >= 0) {
- r = open(std::move(ioctx), std::move(oid), fifo, y, objv);
+ auto r = rgw_rados_operate(ioctx, oid, &op, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " create_meta failed: r=" << r << dendl;
+ return r;
}
+ r = open(std::move(ioctx), std::move(oid), fifo, y, objv);
return r;
}
-int FIFO::read_meta(optional_yield y) {
+int FIFO::read_meta(std::uint64_t tid, optional_yield y) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
fifo::info _info;
std::uint32_t _phs;
std::uint32_t _peo;
- auto r = get_meta(ioctx, oid, nullopt, &_info, &_phs, &_peo, y);
- if (r >= 0) {
- std::unique_lock l(m);
- // We have a newer version already!
- if (_info.version.same_or_later(this->info.version)) {
- info = std::move(_info);
- part_header_size = _phs;
- part_entry_overhead = _peo;
- }
+ auto r = get_meta(ioctx, oid, nullopt, &_info, &_phs, &_peo, tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " get_meta failed: r=" << r << " tid=" << tid << dendl;
+ return r;
}
- return r;
+ std::unique_lock l(m);
+ // We have a newer version already!
+ if (_info.version.same_or_later(this->info.version)) {
+ info = std::move(_info);
+ part_header_size = _phs;
+ part_entry_overhead = _peo;
+ }
+ return 0;
+}
+
+int FIFO::read_meta(optional_yield y) {
+ std::unique_lock l(m);
+ auto tid = ++next_tid;
+ l.unlock();
+ return read_meta(tid, y);
}
struct Reader {
FIFO* fifo;
cb::list bl;
lr::AioCompletion* super;
+ std::uint64_t tid;
lr::AioCompletion* cur = lr::Rados::aio_create_completion(
static_cast<void*>(this), &FIFO::read_callback);
- Reader(FIFO* fifo, lr::AioCompletion* super)
- : fifo(fifo), super(super) {
+ Reader(FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid)
+ : fifo(fifo), super(super), tid(tid) {
super->pc->get();
}
~Reader() {
void FIFO::read_callback(lr::completion_t, void* arg)
{
auto reader = static_cast<Reader*>(arg);
+ auto cct = reader->fifo->cct;
+ auto tid = reader->tid;
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
auto r = reader->cur->get_return_value();
if (r >= 0) try {
fifo::op::get_meta_reply reply;
reader->fifo->part_entry_overhead = reply.part_entry_overhead;
}
} catch (const cb::error& err) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed to decode response err=" << err.what()
+ << " tid=" << tid << dendl;
r = from_error_code(err.code());
- }
+ } else {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed r=" << r
+ << " tid=" << tid << dendl;
+ }
complete(reader->super, r);
delete reader;
}
-int FIFO::read_meta(lr::AioCompletion* c)
+int FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c)
{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
lr::ObjectReadOperation op;
fifo::op::get_meta gm;
cb::list in;
encode(gm, in);
- auto reader = new Reader(this, c);
+ auto reader = new Reader(this, c, tid);
auto r = ioctx.aio_exec(oid, reader->cur, fifo::op::CLASS,
fifo::op::GET_META, in, &reader->bl);
if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed scheduling read_meta r=" << r
+ << " tid=" << tid << dendl;
delete reader;
}
return r;
}
-
const fifo::info& FIFO::meta() const {
return info;
}
int FIFO::push(const std::vector<cb::list>& data_bufs, optional_yield y)
{
std::unique_lock l(m);
+ auto tid = ++next_tid;
auto max_entry_size = info.params.max_entry_size;
auto need_new_head = info.need_new_head();
l.unlock();
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
if (data_bufs.empty()) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " empty push, returning success tid=" << tid << dendl;
return 0;
}
// Validate sizes
- for (const auto& bl : data_bufs)
- if (bl.length() > max_entry_size)
+ for (const auto& bl : data_bufs) {
+ if (bl.length() > max_entry_size) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entry bigger than max_entry_size tid=" << tid << dendl;
return -E2BIG;
+ }
+ }
int r = 0;
if (need_new_head) {
- r = _prepare_new_head(y);
- if (r < 0)
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new head tid=" << tid << dendl;
+ r = _prepare_new_head(tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _prepare_new_head failed: r=" << r
+ << " tid=" << tid << dendl;
return r;
+ }
}
std::deque<cb::list> remaining(data_bufs.begin(), data_bufs.end());
bool canceled = true;
while ((!remaining.empty() || !batch.empty()) &&
(retries <= MAX_RACE_RETRIES)) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " preparing push: remaining=" << remaining.size()
+ << " batch=" << batch.size() << " retries=" << retries
+ << " tid=" << tid << dendl;
std::unique_lock l(m);
auto max_part_size = info.params.max_part_size;
auto overhead = part_entry_overhead;
batch.push_back(std::move(remaining.front()));
remaining.pop_front();
}
-
- auto r = push_entries(batch, y);
- if (r >= 0) {
- // Made forward progress!
- canceled = false;
- retries = 0;
- batch_len = 0;
- if (static_cast<unsigned>(r) == batch.size()) {
- batch.clear();
- } else {
- batch.erase(batch.begin(), batch.begin() + r);
- for (const auto& b : batch) {
- batch_len += b.length() + part_entry_overhead;
- }
- }
- } else if (r == -ERANGE) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " prepared push: remaining=" << remaining.size()
+ << " batch=" << batch.size() << " retries=" << retries
+ << " batch_len=" << batch_len
+ << " tid=" << tid << dendl;
+
+ auto r = push_entries(batch, tid, y);
+ if (r == -ERANGE) {
canceled = true;
++retries;
- r = _prepare_new_head(y);
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new head tid=" << tid << dendl;
+ r = _prepare_new_head(tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " prepare_new_head failed: r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ r = 0;
+ continue;
+ }
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " push_entries failed: r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ // Made forward progress!
+ canceled = false;
+ retries = 0;
+ batch_len = 0;
+ if (static_cast<unsigned>(r) == batch.size()) {
+ batch.clear();
+ } else {
+ batch.erase(batch.begin(), batch.begin() + r);
+ for (const auto& b : batch) {
+ batch_len += b.length() + part_entry_overhead;
+ }
}
- if (r < 0)
- break;
}
- if (canceled && r == 0)
- r = -ECANCELED;
- return r;
+ if (canceled) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ return -ECANCELED;
+ }
+ return 0;
}
int FIFO::list(int max_entries,
optional_yield y)
{
std::unique_lock l(m);
+ auto tid = ++next_tid;
std::int64_t part_num = info.tail_part_num;
l.unlock();
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
std::uint64_t ofs = 0;
if (markstr) {
auto marker = to_marker(*markstr);
- if (!marker) return -EINVAL;
+ if (!marker) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " invalid marker string: " << markstr
+ << " tid= "<< tid << dendl;
+ return -EINVAL;
+ }
part_num = marker->num;
ofs = marker->ofs;
}
std::vector<fifo::part_list_entry> entries;
int r = 0;
while (max_entries > 0) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " max_entries=" << max_entries << " tid=" << tid << dendl;
bool part_more = false;
bool part_full = false;
l.unlock();
r = list_part(ioctx, part_oid, {}, ofs, max_entries, &entries,
- &part_more, &part_full, nullptr, y);
+ &part_more, &part_full, nullptr, tid, y);
if (r == -ENOENT) {
- r = read_meta(y);
- if (r < 0) return r;
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " missing part, rereading metadata"
+ << " tid= "<< tid << dendl;
+ r = read_meta(tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: r=" << r
+ << " tid= "<< tid << dendl;
+ return r;
+ }
if (part_num < info.tail_part_num) {
/* raced with trim? restart */
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced with trim, restarting: tid=" << tid << dendl;
max_entries += result.size();
result.clear();
std::unique_lock l(m);
ofs = 0;
continue;
}
- /* assuming part was not written yet, so end of data */
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " assuming part was not written yet, so end of data: "
+ << "tid=" << tid << dendl;
more = false;
r = 0;
break;
}
if (r < 0) {
- break;
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " list_entries failed: r=" << r
+ << " tid= "<< tid << dendl;
+ return r;
}
more = part_full || part_more;
for (auto& entry : entries) {
}
if (!part_full) {
- /* head part is not full, so we can assume we're done. */
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " head part is not full, so we can assume we're done: "
+ << "tid=" << tid << dendl;
break;
}
if (!part_more) {
ofs = 0;
}
}
- if (r >= 0) {
- if (presult) *presult = std::move(result);
- if (pmore) *pmore = more;
- }
- return r;
+ if (presult)
+ *presult = std::move(result);
+ if (pmore)
+ *pmore = more;
+ return 0;
}
int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
auto part_num = marker->num;
auto ofs = marker->ofs;
std::unique_lock l(m);
+ auto tid = ++next_tid;
auto pn = info.tail_part_num;
l.unlock();
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
int r = 0;
while (pn < part_num) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " pn=" << pn << " tid=" << tid << dendl;
std::unique_lock l(m);
auto max_part_size = info.params.max_part_size;
l.unlock();
- r = trim_part(pn, max_part_size, std::nullopt, false, y);
+ r = trim_part(pn, max_part_size, std::nullopt, false, tid, y);
if (r < 0 && r == -ENOENT) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim_part failed: r=" << r
+ << " tid= "<< tid << dendl;
return r;
}
++pn;
}
- r = trim_part(part_num, ofs, std::nullopt, exclusive, y);
+ r = trim_part(part_num, ofs, std::nullopt, exclusive, tid, y);
if (r < 0 && r != -ENOENT) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim_part failed: r=" << r
+ << " tid= "<< tid << dendl;
return r;
}
canceled &&
(retries <= MAX_RACE_RETRIES)) {
r = _update_meta(fifo::update{}.tail_part_num(part_num), objv, &canceled,
- y);
+ tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: r=" << r
+ << " tid= "<< tid << dendl;
+ return r;
+ }
if (canceled) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled: retries=" << retries
+ << " tid=" << tid << dendl;
l.lock();
tail_part_num = info.tail_part_num;
objv = info.version;
}
}
if (canceled) {
- r = -EIO;
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ return -EIO;
}
- return r;
+ return 0;
}
struct Trimmer {
std::int64_t pn;
bool exclusive;
lr::AioCompletion* super;
+ std::uint64_t tid;
lr::AioCompletion* cur = lr::Rados::aio_create_completion(
static_cast<void*>(this), &FIFO::trim_callback);
bool update = false;
int retries = 0;
Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
- bool exclusive, lr::AioCompletion* super)
+ bool exclusive, lr::AioCompletion* super, std::uint64_t tid)
: fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), exclusive(exclusive),
- super(super) {
+ super(super), tid(tid) {
super->pc->get();
}
~Trimmer() {
void FIFO::trim_callback(lr::completion_t, void* arg)
{
auto trimmer = static_cast<Trimmer*>(arg);
+ auto cct = trimmer->fifo->cct;
+ auto tid = trimmer->tid;
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
int r = trimmer->cur->get_return_value();
if (r == -ENOENT) {
r = 0;
}
if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim failed: r=" << r << " tid=" << tid << dendl;
complete(trimmer->super, r);
delete trimmer;
} else if (!trimmer->update) {
trimmer->cur->release();
trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback);
r = trimmer->fifo->trim_part(trimmer->pn++, max_part_size, std::nullopt,
- false, trimmer->cur);
+ false, tid, trimmer->cur);
if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim failed: r=" << r << " tid=" << tid << dendl;
complete(trimmer->super, r);
delete trimmer;
}
trimmer->update = true;
trimmer->canceled = tail_part_num < trimmer->part_num;
r = trimmer->fifo->trim_part(trimmer->part_num, trimmer->ofs,
- std::nullopt, trimmer->exclusive, trimmer->cur);
+ std::nullopt, trimmer->exclusive, tid, trimmer->cur);
if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed scheduling trim: r=" << r << " tid=" << tid << dendl;
complete(trimmer->super, r);
delete trimmer;
}
if ((tail_part_num < trimmer->part_num) &&
trimmer->canceled) {
if (trimmer->retries > MAX_RACE_RETRIES) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
complete(trimmer->super, -EIO);
delete trimmer;
} else {
r = trimmer->fifo->_update_meta(fifo::update{}
.tail_part_num(trimmer->part_num),
objv, &trimmer->canceled,
- trimmer->cur);
+ tid, trimmer->cur);
if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed scheduling _update_meta: r="
+ << r << " tid=" << tid << dendl;
complete(trimmer->super, r);
delete trimmer;
}
const auto max_part_size = info.params.max_part_size;
const auto pn = info.tail_part_num;
const auto part_oid = info.part_oid(pn);
+ auto tid = ++next_tid;
l.unlock();
- auto trimmer = new Trimmer(this, marker->num, marker->ofs, pn, exclusive, c);
+ auto trimmer = new Trimmer(this, marker->num, marker->ofs, pn, exclusive, c,
+ tid);
++trimmer->pn;
auto ofs = marker->ofs;
if (pn < marker->num) {
} else {
trimmer->update = true;
}
- auto r = trimmer->fifo->trim_part(pn, ofs, std::nullopt, exclusive,
- trimmer->cur);
+ auto r = trim_part(pn, ofs, std::nullopt, exclusive,
+ tid, trimmer->cur);
if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed scheduling trim_part: r="
+ << r << " tid=" << tid << dendl;
complete(trimmer->super, r);
delete trimmer;
}
{
std::unique_lock l(m);
const auto part_oid = info.part_oid(part_num);
+ auto tid = ++next_tid;
l.unlock();
- return rgw::cls::fifo::get_part_info(ioctx, part_oid, header, y);
+ auto r = rgw::cls::fifo::get_part_info(ioctx, part_oid, header, tid, y);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " get_part_info failed: r="
+ << r << " tid=" << tid << dendl;
+ }
+ return r;
}
}