return;
}
dout(20) << __func__ << "find_object_context got error " << r << dendl;
- osd->reply_op_error(op, r);
+ if (op->may_write()) {
+ record_write_error(op, oid, nullptr, r);
+ } else {
+ osd->reply_op_error(op, r);
+ }
return;
}
if (r) {
dout(20) << __func__ << " returned an error: " << r << dendl;
- reply_ctx(ctx, r);
+ close_op_ctx(ctx);
+ if (op->may_write()) {
+ record_write_error(op, oid, nullptr, r);
+ } else {
+ osd->reply_op_error(op, r);
+ }
return;
}
}
}
+void ReplicatedPG::record_write_error(OpRequestRef op, const hobject_t &soid,
+ MOSDOpReply *orig_reply, int r)
+{
+ dout(20) << __func__ << " r=" << r << dendl;
+ assert(op->may_write());
+ const osd_reqid_t &reqid = static_cast<MOSDOp*>(op->get_req())->get_reqid();
+ ObjectContextRef obc;
+ list<pg_log_entry_t> entries;
+ entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, soid,
+ get_next_version(), eversion_t(), 0,
+ reqid, utime_t(), r));
+ ObcLockManager lock_manager;
+ submit_log_entries(
+ entries,
+ std::move(lock_manager),
+ boost::optional<std::function<void(void)> >(
+ [=]() {
+ dout(20) << "finished " << __func__ << " r=" << r << dendl;
+ int flags = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ MOSDOpReply *reply = orig_reply;
+ if (reply == nullptr) {
+ reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(),
+ flags, true);
+ }
+ dout(10) << " sending commit on " << *m << " " << reply << dendl;
+ osd->send_message_osd_client(reply, m->get_connection());
+ }
+ ));
+}
+
ReplicatedPG::cache_result_t ReplicatedPG::maybe_handle_cache_detail(
OpRequestRef op,
bool write_ordered,
{
dout(10) << __func__ << " " << ctx << dendl;
ctx->reset_obs(ctx->obc);
+ ctx->update_log_only = false; // reset in case finish_copyfrom() is re-running execute_ctx
OpRequestRef op = ctx->op;
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
ObjectContextRef obc = ctx->obc;
ctx->reply->set_result(result);
// read or error?
- if (ctx->op_t->empty() || result < 0) {
+ if ((ctx->op_t->empty() || result < 0) && !ctx->update_log_only) {
// finish side-effects
if (result >= 0)
do_osd_op_effects(ctx, m->get_connection());
}
}
+ if (ctx->update_log_only) {
+ dout(20) << __func__ << " update_log_only -- result=" << result << dendl;
+ assert(result < 0);
+ // save just what we need from ctx
+ MOSDOpReply *reply = ctx->reply;
+ ctx->reply = nullptr;
+ reply->claim_op_out_data(ctx->ops);
+ reply->get_header().data_off = ctx->data_off;
+ close_op_ctx(ctx);
+
+ if (result == -ENOENT) {
+ reply->set_enoent_reply_versions(info.last_update,
+ info.last_user_version);
+ }
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ // append to pg log for dup detection - don't save buffers for now
+ record_write_error(op, soid, reply, result);
+ return;
+ }
+
// no need to capture PG ref, repop cancel will handle that
// Can capture the ctx by pointer, it's owned by the repop
ctx->register_on_applied(
// prepare the actual mutation
int result = do_osd_ops(ctx, ctx->ops);
- if (result < 0)
+ if (result < 0) {
+ if (ctx->op->may_write()) {
+ // need to save the error code in the pg log, to detect dup ops,
+ // but do nothing else
+ ctx->update_log_only = true;
+ }
return result;
+ }
// read-op? done?
if (ctx->op_t->empty() && !ctx->modify) {
bool cache_evict; ///< true if this is a cache eviction
bool ignore_cache; ///< true if IGNORE_CACHE flag is set
bool ignore_log_op_stats; // don't log op stats
+ bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
// side effects
list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
snapset(0),
new_obs(obs->oi, obs->exists),
modify(false), user_modify(false), undirty(false), cache_evict(false),
- ignore_cache(false), ignore_log_op_stats(false),
+ ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
obc(obc),
vector<OSDOp>& _ops, ReplicatedPG *_pg) :
op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
modify(false), user_modify(false), undirty(false), cache_evict(false),
- ignore_cache(false), ignore_log_op_stats(false),
+ ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
data_off(0), reply(NULL), pg(_pg),
OpRequestRef& op,
ThreadPool::TPHandle &handle);
void do_op(OpRequestRef& op);
+ void record_write_error(OpRequestRef op, const hobject_t &soid,
+ MOSDOpReply *orig_reply, int r);
bool pg_op_must_wait(MOSDOp *op);
void do_pg_op(OpRequestRef op);
void do_sub_op(OpRequestRef op);