<< " entering: tid=" << tid << dendl;
std::unique_lock l(m);
if (objv != info->version) {
- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " version mismatch, canceling: tid=" << tid << dendl;
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " version mismatch, canceling: tid=" << tid << dendl;
return -ECANCELED;
}
<< " entering: tid=" << tid << dendl;
lr::ObjectWriteOperation op;
bool canceled = false;
- update_meta(&op, info.version, update);
+ update_meta(&op, version, update);
auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
if (r >= 0 || r == -ECANCELED) {
canceled = (r == -ECANCELED);
return r;
}
-int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, optional_yield y)
+int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp,
+ std::int64_t new_part_num, bool is_head,
+ std::uint64_t tid, optional_yield y)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
std::unique_lock l(m);
- std::vector<fifo::journal_entry> jentries{{
- fifo::journal_entry::Op::create, info.max_push_part_num + 1
- }};
- if (info.journal.contains(jentries.front())) {
+ std::vector<fifo::journal_entry> jentries{{ fifo::journal_entry::Op::create, new_part_num }};
+ if (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) &&
+ (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}))) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
}
return r;
}
- std::int64_t new_head_part_num = info.head_part_num;
auto version = info.version;
if (is_head) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " needs new head: tid=" << tid << dendl;
- jentries.push_back({ fifo::journal_entry::Op::set_head, jentries.front().part_num });
+ jentries.push_back({ fifo::journal_entry::Op::set_head, new_part_num });
}
l.unlock();
r = _update_meta(dpp, u, version, &canceled, tid, y);
if (r >= 0 && canceled) {
std::unique_lock l(m);
- auto found = (info.journal.contains({fifo::journal_entry::Op::create, jentries.front().part_num}) ||
- info.journal.contains({fifo::journal_entry::Op::set_head, jentries.front().part_num}));
- if ((info.max_push_part_num >= jentries.front().part_num &&
- info.head_part_num >= new_head_part_num)) {
+ version = info.version;
+ auto found = (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) ||
+ info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}));
+ if ((info.max_push_part_num >= new_part_num &&
+ info.head_part_num >= new_part_num)) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " raced, but journaled and processed: i=" << i
<< " tid=" << tid << dendl;
return r;
}
-int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
+int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp,
+ std::int64_t new_head_part_num,
+ std::uint64_t tid, optional_yield y)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " entering: tid=" << tid << dendl;
+ << " entering: tid=" << tid << dendl;
std::unique_lock l(m);
- std::int64_t new_head_num = info.head_part_num + 1;
auto max_push_part_num = info.max_push_part_num;
auto version = info.version;
l.unlock();
int r = 0;
- if (max_push_part_num < new_head_num) {
+ if (max_push_part_num < new_head_part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " need new part: tid=" << tid << dendl;
- r = _prepare_new_part(dpp, true, tid, y);
+ r = _prepare_new_part(dpp, new_head_part_num, true, tid, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " _prepare_new_part failed: r=" << r
return r;
}
std::unique_lock l(m);
- if (info.max_push_part_num < new_head_num) {
+ if (info.max_push_part_num < new_head_part_num) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " inconsistency, push part less than head part: "
<< " tid=" << tid << dendl;
return 0;
}
+ fifo::journal_entry jentry;
+ jentry.op = fifo::journal_entry::Op::set_head;
+ jentry.part_num = new_head_part_num;
+
+ r = 0;
bool canceled = true;
for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ canceled = false;
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " updating head: i=" << i << " tid=" << tid << dendl;
- auto u = fifo::update{}.head_part_num(new_head_num);
+ << " updating metadata: i=" << i << " tid=" << tid << dendl;
+ auto u = fifo::update{}.journal_entries_add({{ jentry }});
r = _update_meta(dpp, u, version, &canceled, tid, y);
+ if (r >= 0 && canceled) {
+ std::unique_lock l(m);
+ auto found = (info.journal.contains({fifo::journal_entry::Op::create, new_head_part_num}) ||
+ info.journal.contains({fifo::journal_entry::Op::set_head, new_head_part_num}));
+ version = info.version;
+ if ((info.head_part_num >= new_head_part_num)) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but journaled and processed: i=" << i
+ << " tid=" << tid << dendl;
+ return 0;
+ }
+ if (found) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, journaled but not processed: i=" << i
+ << " tid=" << tid << dendl;
+ canceled = false;
+ }
+ l.unlock();
+ }
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " _update_meta failed: update=" << u << " r=" << r
<< " tid=" << tid << dendl;
return r;
}
- std::unique_lock l(m);
- auto head_part_num = info.head_part_num;
- version = info.version;
- l.unlock();
- if (canceled && (head_part_num >= new_head_num)) {
- ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " raced, but completed by the other caller: i=" << i
- << " tid=" << tid << dendl;
- canceled = false;
- }
}
if (canceled) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " canceled too many times, giving up: tid=" << tid << dendl;
return -ECANCELED;
}
- return 0;
+ r = process_journal(dpp, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " process_journal failed: r=" << r << " tid=" << tid << dendl;
+ }
+ return r;
}
struct NewPartPreparer : public Completion<NewPartPreparer> {
FIFO* f;
std::vector<fifo::journal_entry> jentries;
int i = 0;
- std::int64_t new_head_part_num;
+ std::int64_t new_part_num;
bool canceled = false;
uint64_t tid;
NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
std::vector<fifo::journal_entry> jentries,
- std::int64_t new_head_part_num,
+ std::int64_t new_part_num,
std::uint64_t tid)
: Completion(dpp, super), f(f), jentries(std::move(jentries)),
- new_head_part_num(new_head_part_num), tid(tid) {}
+ new_part_num(new_part_num), tid(tid) {}
void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
if (canceled) {
std::unique_lock l(f->m);
- auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, jentries.front().part_num}) ||
- f->info.journal.contains({fifo::journal_entry::Op::set_head, jentries.front().part_num}));
+ auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) ||
+ f->info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}));
auto max_push_part_num = f->info.max_push_part_num;
auto head_part_num = f->info.head_part_num;
auto version = f->info.version;
l.unlock();
- if ((max_push_part_num >= jentries.front().part_num &&
- head_part_num >= new_head_part_num)) {
+ if ((max_push_part_num >= new_part_num &&
+ head_part_num >= new_part_num)) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " raced, but journaled and processed: i=" << i
<< " tid=" << tid << dendl;
}
};
-void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid,
- lr::AioCompletion* c)
+void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num,
+ bool is_head, std::uint64_t tid, lr::AioCompletion* c)
{
std::unique_lock l(m);
- std::vector<fifo::journal_entry> jentries{{
- fifo::journal_entry::Op::create, info.max_push_part_num + 1
- }};
- if (info.journal.contains({fifo::journal_entry::Op::create, jentries.front().part_num}) &&
- (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, jentries.front().part_num}))) {
+ std::vector<fifo::journal_entry> jentries{{fifo::journal_entry::Op::create, new_part_num}};
+ if (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) &&
+ (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}))) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
process_journal(dpp, tid, c);
return;
}
- std::int64_t new_head_part_num = info.head_part_num;
auto version = info.version;
if (is_head) {
- jentries.push_back({ fifo::journal_entry::Op::set_head, jentries.front().part_num });
+ jentries.push_back({ fifo::journal_entry::Op::set_head, new_part_num });
}
l.unlock();
auto n = std::make_unique<NewPartPreparer>(dpp, this, c, jentries,
- new_head_part_num, tid);
+ new_part_num, tid);
auto np = n.get();
_update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version,
&np->canceled, tid, NewPartPreparer::call(std::move(n)));
FIFO* f;
int i = 0;
bool newpart;
- std::int64_t new_head_num;
+ std::int64_t new_head_part_num;
bool canceled = false;
std::uint64_t tid;
NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
- bool newpart, std::int64_t new_head_num, std::uint64_t tid)
- : Completion(dpp, super), f(f), newpart(newpart), new_head_num(new_head_num),
- tid(tid) {}
+ bool newpart, std::int64_t new_head_part_num,
+ std::uint64_t tid)
+ : Completion(dpp, super), f(f), newpart(newpart),
+ new_head_part_num(new_head_part_num), tid(tid) {}
void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
if (newpart)
return;
}
std::unique_lock l(f->m);
- if (f->info.max_push_part_num < new_head_num) {
+ if (f->info.max_push_part_num < new_head_part_num) {
l.unlock();
lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " _prepare_new_part failed: r=" << r
}
void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
- std::unique_lock l(f->m);
- auto head_part_num = f->info.head_part_num;
- auto version = f->info.version;
- l.unlock();
-
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " _update_meta failed: r=" << r
+ << " _update_meta failed: r=" << r
<< " tid=" << tid << dendl;
complete(std::move(p), r);
return;
}
+
if (canceled) {
+ std::unique_lock l(f->m);
+ auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, new_head_part_num }) ||
+ f->info.journal.contains({fifo::journal_entry::Op::set_head, new_head_part_num }));
+ auto head_part_num = f->info.head_part_num;
+ auto version = f->info.version;
+
+ l.unlock();
+ if ((head_part_num >= new_head_part_num)) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but journaled and processed: i=" << i
+ << " tid=" << tid << dendl;
+ complete(std::move(p), 0);
+ return;
+ }
if (i >= MAX_RACE_RETRIES) {
- ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " canceled too many times, giving up: tid=" << tid << dendl;
complete(std::move(p), -ECANCELED);
return;
}
-
- // Raced, but there's still work to do!
- if (head_part_num < new_head_num) {
- canceled = false;
+ if (!found) {
++i;
- ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " updating head: i=" << i << " tid=" << tid << dendl;
- f->_update_meta(dpp, fifo::update{}.head_part_num(new_head_num),
- version, &this->canceled, tid, call(std::move(p)));
+ fifo::journal_entry jentry;
+ jentry.op = fifo::journal_entry::Op::set_head;
+ jentry.part_num = new_head_part_num;
+ f->_update_meta(dpp, fifo::update{}
+ .journal_entries_add({{jentry}}),
+ version, &canceled, tid, call(std::move(p)));
return;
+ } else {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, journaled but not processed: i=" << i
+ << " tid=" << tid << dendl;
+ canceled = false;
}
+ // Fall through. We still need to process the journal.
}
- ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " succeeded : i=" << i << " tid=" << tid << dendl;
- complete(std::move(p), 0);
+ f->process_journal(dpp, tid, super());
return;
}
};
-void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
+void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num,
+ std::uint64_t tid, lr::AioCompletion* c)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
std::unique_lock l(m);
- int64_t new_head_num = info.head_part_num + 1;
auto max_push_part_num = info.max_push_part_num;
auto version = info.version;
l.unlock();
- if (max_push_part_num < new_head_num) {
+ if (max_push_part_num < new_head_part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " need new part: tid=" << tid << dendl;
- auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_num,
+ auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_part_num,
tid);
- _prepare_new_part(dpp, true, tid, NewHeadPreparer::call(std::move(n)));
+ _prepare_new_part(dpp, new_head_part_num, true, tid,
+ NewHeadPreparer::call(std::move(n)));
} else {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " updating head: tid=" << tid << dendl;
- auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_num,
+ auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_part_num,
tid);
auto np = n.get();
- _update_meta(dpp, fifo::update{}.head_part_num(new_head_num), version,
+ fifo::journal_entry jentry;
+ jentry.op = fifo::journal_entry::Op::set_head;
+ jentry.part_num = new_head_part_num;
+ _update_meta(dpp, fifo::update{}.journal_entries_add({{jentry}}), version,
&np->canceled, tid, NewHeadPreparer::call(std::move(n)));
}
}
auto tid = ++next_tid;
auto max_entry_size = info.params.max_entry_size;
auto need_new_head = info.need_new_head();
+ auto head_part_num = info.head_part_num;
l.unlock();
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
if (need_new_head) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " need new head tid=" << tid << dendl;
- r = _prepare_new_head(dpp, tid, y);
+ r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " _prepare_new_head failed: r=" << r
<< " batch=" << batch.size() << " retries=" << retries
<< " tid=" << tid << dendl;
std::unique_lock l(m);
+ head_part_num = info.head_part_num;
auto max_part_size = info.params.max_part_size;
auto overhead = part_entry_overhead;
l.unlock();
++retries;
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " need new head tid=" << tid << dendl;
- r = _prepare_new_head(dpp, tid, y);
+ r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " prepare_new_head failed: r=" << r
std::deque<cb::list> remaining;
std::deque<cb::list> batch;
int i = 0;
+ std::int64_t head_part_num;
std::uint64_t tid;
bool new_heading = false;
std::unique_lock l(f->m);
auto max_part_size = f->info.params.max_part_size;
auto part_entry_overhead = f->part_entry_overhead;
+ head_part_num = f->info.head_part_num;
l.unlock();
ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
new_heading = true;
- f->_prepare_new_head(dpp, tid, call(std::move(p)));
+ f->_prepare_new_head(dpp, head_part_num + 1, tid, call(std::move(p)));
}
void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
}
Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque<cb::list>&& remaining,
- std::uint64_t tid, lr::AioCompletion* super)
+ std::int64_t head_part_num, std::uint64_t tid,
+ lr::AioCompletion* super)
: Completion(dpp, super), f(f), remaining(std::move(remaining)),
- tid(tid) {}
+ head_part_num(head_part_num), tid(tid) {}
};
void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs,
auto tid = ++next_tid;
auto max_entry_size = info.params.max_entry_size;
auto need_new_head = info.need_new_head();
+ auto head_part_num = info.head_part_num;
l.unlock();
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
auto p = std::make_unique<Pusher>(dpp, this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
- tid, c);
+ head_part_num, tid, c);
// Validate sizes
for (const auto& bl : data_bufs) {
if (bl.length() > max_entry_size) {