o.swap(ops);
bdata_encode = false;
}
+ void set_op_returns(const vector<pg_log_op_return_item_t>& op_returns) {
+ if (op_returns.size()) {
+ ceph_assert(ops.empty() || ops.size() == op_returns.size());
+ ops.resize(op_returns.size());
+ for (unsigned i = 0; i < op_returns.size(); ++i) {
+ ops[i].rval = op_returns[i].rval;
+ ops[i].outdata = op_returns[i].bl;
+ }
+ }
+ }
/**
* get retry attempt
void OSDService::reply_op_error(OpRequestRef op, int err)
{
- reply_op_error(op, err, eversion_t(), 0);
+ reply_op_error(op, err, eversion_t(), 0, {});
}
void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
- version_t uv)
+ version_t uv,
+ vector<pg_log_op_return_item_t> op_returns)
{
auto m = op->get_req<MOSDOp>();
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
int flags;
flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
- MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags, err >= 0);
+ MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
+ !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
reply->set_reply_versions(v, uv);
+ reply->set_op_returns(op_returns);
m->get_connection()->send_message(reply);
}
void dump_scrub_reservations(Formatter *f);
void reply_op_error(OpRequestRef op, int err);
- void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
+ void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
+ vector<pg_log_op_return_item_t> op_returns);
void handle_misdirected_op(PG *pg, OpRequestRef op);
const osd_reqid_t &r,
eversion_t *version,
version_t *user_version,
- int *return_code) const
+ int *return_code,
+ vector<pg_log_op_return_item_t> *op_returns
+ ) const
{
return (
- projected_log.get_request(r, version, user_version, return_code) ||
+ projected_log.get_request(r, version, user_version, return_code,
+ op_returns) ||
recovery_state.get_pg_log().get_log().get_request(
- r, version, user_version, return_code));
+ r, version, user_version, return_code, op_returns));
}
void PG::publish_stats_to_osd()
map<hobject_t, list<Context*>> callbacks_for_degraded_object;
map<eversion_t,
- list<tuple<OpRequestRef, version_t, int> > > waiting_for_ondisk;
+ list<
+ tuple<OpRequestRef, version_t, int,
+ vector<pg_log_op_return_item_t>>>> waiting_for_ondisk;
void requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m);
void requeue_op(OpRequestRef op);
const osd_reqid_t &r,
eversion_t *version,
version_t *user_version,
- int *return_code) const;
+ int *return_code,
+ vector<pg_log_op_return_item_t> *op_returns) const;
eversion_t projected_last_update;
eversion_t get_next_version() const {
eversion_t at_version(
const osd_reqid_t &r,
eversion_t *version,
version_t *user_version,
- int *return_code) const
+ int *return_code,
+ vector<pg_log_op_return_item_t> *op_returns) const
{
ceph_assert(version);
ceph_assert(user_version);
*version = p->second->version;
*user_version = p->second->user_version;
*return_code = p->second->return_code;
+ *op_returns = p->second->op_returns;
return true;
}
*version = p->second->version;
*user_version = i->second;
*return_code = p->second->return_code;
+ *op_returns = p->second->op_returns;
if (*return_code >= 0) {
auto it = p->second->extra_reqid_return_codes.find(idx);
if (it != p->second->extra_reqid_return_codes.end()) {
*version = q->second->version;
*user_version = q->second->user_version;
*return_code = q->second->return_code;
+ *op_returns = q->second->op_returns;
return true;
}
eversion_t version;
version_t user_version;
int return_code = 0;
+ vector<pg_log_op_return_item_t> op_returns;
bool got = check_in_progress_op(
- m->get_reqid(), &version, &user_version, &return_code);
+ m->get_reqid(), &version, &user_version, &return_code, &op_returns);
if (got) {
dout(3) << __func__ << " dup " << m->get_reqid()
<< " version " << version << dendl;
if (already_complete(version)) {
- osd->reply_op_error(op, return_code, version, user_version);
+ osd->reply_op_error(op, return_code, version, user_version, op_returns);
} else {
dout(10) << " waiting for " << version << " to commit" << dendl;
// always queue ondisk waiters, so that we can requeue if needed
- waiting_for_ondisk[version].emplace_back(op, user_version, return_code);
+ waiting_for_ondisk[version].emplace_back(op, user_version, return_code,
+ op_returns);
op->mark_delayed("waiting for ondisk");
}
return;
if (!pwop->sent_reply) {
// send commit.
assert(pwop->ctx->reply == nullptr);
- MOSDOpReply *reply = new MOSDOpReply(m, r, get_osdmap_epoch(), 0, true);
+ MOSDOpReply *reply = new MOSDOpReply(m, r, get_osdmap_epoch(), 0,
+ true /* we claim it below */);
reply->set_reply_versions(eversion_t(), pwop->user_version);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
reply->claim_op_out_data(pwop->ops);
return_code = std::get<2>(i);
}
osd->reply_op_error(std::get<0>(i), return_code, repop->v,
- std::get<1>(i));
+ std::get<1>(i), std::get<3>(i));
}
waiting_for_ondisk.erase(it);
}
eversion_t replay_version;
version_t user_version;
int return_code = 0;
+ vector<pg_log_op_return_item_t> op_returns;
bool got = log.get_request(
- entry.reqid, &replay_version, &user_version, &return_code);
+ entry.reqid, &replay_version, &user_version, &return_code, &op_returns);
EXPECT_TRUE(got);
EXPECT_EQ(entry.return_code, return_code);
EXPECT_EQ(entry.version, replay_version);
eversion_t version;
version_t user_version;
int return_code;
+ vector<pg_log_op_return_item_t> op_returns;
osd_reqid_t log_reqid = osd_reqid_t(client, 8, 5);
osd_reqid_t dup_reqid = osd_reqid_t(client, 8, 3);
bool result;
- result = log.get_request(log_reqid, &version, &user_version, &return_code);
+ result = log.get_request(log_reqid, &version, &user_version, &return_code,
+ &op_returns);
EXPECT_EQ(true, result);
EXPECT_EQ(mk_evt(21, 165), version);
- result = log.get_request(dup_reqid, &version, &user_version, &return_code);
+ result = log.get_request(dup_reqid, &version, &user_version, &return_code,
+ &op_returns);
EXPECT_EQ(true, result);
EXPECT_EQ(mk_evt(15, 155), version);
- result = log.get_request(bad_reqid, &version, &user_version, &return_code);
+ result = log.get_request(bad_reqid, &version, &user_version, &return_code,
+ &op_returns);
EXPECT_FALSE(result);
}