lr::ObjectWriteOperation op;
fifo::op::push_part pp;
+ op.assert_exists();
+
pp.data_bufs = data_bufs;
pp.total_len = 0;
canceled = true;
++retries;
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " need new head tid=" << tid << dendl;
+ << " need new head tid=" << tid << dendl;
r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
r = 0;
continue;
}
+ if (r == -ENOENT) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " racing client trimmed part, rereading metadata "
+ << "tid=" << tid << dendl;
+ canceled = true;
+ ++retries;
+ r = read_meta(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ r = 0;
+ continue;
+ }
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " push_entries failed: r=" << r
int i = 0;
std::int64_t head_part_num;
std::uint64_t tid;
- bool new_heading = false;
+ enum { pushing, new_heading, meta_reading } state = pushing;
void prep_then_push(const DoutPrefixProvider *dpp, Ptr&& p, const unsigned successes) {
std::unique_lock l(f->m);
}
void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
- new_heading = true;
+ state = new_heading;
f->_prepare_new_head(dpp, head_part_num + 1, tid, call(std::move(p)));
}
+ void read_meta(const DoutPrefixProvider *dpp, Ptr&& p) {
+ ++i;
+ state = meta_reading;
+ f->read_meta(dpp, tid, call(std::move(p)));
+ }
+
void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
- if (!new_heading) {
+ switch (state) {
+ case pushing:
if (r == -ERANGE) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " need new head tid=" << tid << dendl;
new_head(dpp, std::move(p));
return;
}
+ if (r == -ENOENT) {
+ if (i > MAX_RACE_RETRIES) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " racing client deleted part, but we're out"
+ << " of retries: tid=" << tid << dendl;
+ complete(std::move(p), r);
+ }
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " racing client deleted part: tid=" << tid << dendl;
+ read_meta(dpp, std::move(p));
+ return;
+ }
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " push_entries failed: r=" << r
}
i = 0; // We've made forward progress, so reset the race counter!
prep_then_push(dpp, std::move(p), r);
- } else {
+ break;
+
+ case new_heading:
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " prepare_new_head failed: r=" << r
complete(std::move(p), r);
return;
}
- new_heading = false;
+ state = pushing;
handle_new_head(dpp, std::move(p), r);
+ break;
+
+ case meta_reading:
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: r=" << r
+ << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+ state = pushing;
+ prep_then_push(dpp, std::move(p), r);
+ break;
}
}