]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: persist write errors in the pg log
authorJosh Durgin <jdurgin@redhat.com>
Mon, 23 May 2016 23:20:11 +0000 (16:20 -0700)
committerJosh Durgin <jdurgin@redhat.com>
Sat, 9 Jul 2016 01:33:13 +0000 (18:33 -0700)
This is required to prevent re-ordering of guarded writes or deletes
in the presence of network failures and resends.

Use the existing submit_log_entries() method to initiate a repop that
only updates the pg log.

Keep the write error semantics close to the existing implementation -
if we have a buffer, return it, but do not persist the buffer for now.

Refs: http://tracker.ceph.com/issues/14468
Signed-off-by: Josh Durgin <jdurgin@redhat.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index b26759294fd8cb2e9f82e9b4a7b7087389ae01d9..993e59b9e13148cf4091045996d7ff3db4c5d54b 100644 (file)
@@ -1913,7 +1913,11 @@ void ReplicatedPG::do_op(OpRequestRef& op)
       return;
     }
     dout(20) << __func__ << "find_object_context got error " << r << dendl;
-    osd->reply_op_error(op, r);
+    if (op->may_write()) {
+      record_write_error(op, oid, nullptr, r);
+    } else {
+      osd->reply_op_error(op, r);
+    }
     return;
   }
 
@@ -2084,7 +2088,12 @@ void ReplicatedPG::do_op(OpRequestRef& op)
 
   if (r) {
     dout(20) << __func__ << " returned an error: " << r << dendl;
-    reply_ctx(ctx, r);
+    close_op_ctx(ctx);
+    if (op->may_write()) {
+      record_write_error(op, oid, nullptr, r);
+    } else {
+      osd->reply_op_error(op, r);
+    }
     return;
   }
 
@@ -2135,6 +2144,37 @@ void ReplicatedPG::do_op(OpRequestRef& op)
   }
 }
 
+void ReplicatedPG::record_write_error(OpRequestRef op, const hobject_t &soid,
+                                     MOSDOpReply *orig_reply, int r)
+{
+  dout(20) << __func__ << " r=" << r << dendl;
+  assert(op->may_write());
+  const osd_reqid_t &reqid = static_cast<MOSDOp*>(op->get_req())->get_reqid();
+  ObjectContextRef obc;
+  list<pg_log_entry_t> entries;
+  entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, soid,
+                                  get_next_version(), eversion_t(), 0,
+                                  reqid, utime_t(), r));
+  ObcLockManager lock_manager;
+  submit_log_entries(
+    entries,
+    std::move(lock_manager),
+    boost::optional<std::function<void(void)> >(
+      [=]() {
+       dout(20) << "finished " << __func__ << " r=" << r << dendl;
+       int flags = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+       MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+       MOSDOpReply *reply = orig_reply;
+       if (reply == nullptr) {
+         reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(),
+                                 flags, true);
+       }
+       dout(10) << " sending commit on " << *m << " " << reply << dendl;
+       osd->send_message_osd_client(reply, m->get_connection());
+      }
+      ));
+}
+
 ReplicatedPG::cache_result_t ReplicatedPG::maybe_handle_cache_detail(
   OpRequestRef op,
   bool write_ordered,
@@ -2843,6 +2883,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
 {
   dout(10) << __func__ << " " << ctx << dendl;
   ctx->reset_obs(ctx->obc);
+  ctx->update_log_only = false; // reset in case finish_copyfrom() is re-running execute_ctx
   OpRequestRef op = ctx->op;
   MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
   ObjectContextRef obc = ctx->obc;
@@ -2959,7 +3000,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
   ctx->reply->set_result(result);
 
   // read or error?
-  if (ctx->op_t->empty() || result < 0) {
+  if ((ctx->op_t->empty() || result < 0) && !ctx->update_log_only) {
     // finish side-effects
     if (result >= 0)
       do_osd_op_effects(ctx, m->get_connection());
@@ -3001,6 +3042,26 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
     }
   }
 
+  if (ctx->update_log_only) {
+    dout(20) << __func__ << " update_log_only -- result=" << result << dendl;
+    assert(result < 0);
+    // save just what we need from ctx
+    MOSDOpReply *reply = ctx->reply;
+    ctx->reply = nullptr;
+    reply->claim_op_out_data(ctx->ops);
+    reply->get_header().data_off = ctx->data_off;
+    close_op_ctx(ctx);
+
+    if (result == -ENOENT) {
+      reply->set_enoent_reply_versions(info.last_update,
+                                      info.last_user_version);
+    }
+    reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+    // append to pg log for dup detection - don't save buffers for now
+    record_write_error(op, soid, reply, result);
+    return;
+  }
+
   // no need to capture PG ref, repop cancel will handle that
   // Can capture the ctx by pointer, it's owned by the repop
   ctx->register_on_applied(
@@ -6597,8 +6658,14 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
 
   // prepare the actual mutation
   int result = do_osd_ops(ctx, ctx->ops);
-  if (result < 0)
+  if (result < 0) {
+    if (ctx->op->may_write()) {
+      // need to save the error code in the pg log, to detect dup ops,
+      // but do nothing else
+      ctx->update_log_only = true;
+    }
     return result;
+  }
 
   // read-op?  done?
   if (ctx->op_t->empty() && !ctx->modify) {
index e0635996c14843f65bd40d13b107fd0c3eb8ae64..d5a88d9ae06a04d091c54b7cbec8df9952ca50fa 100644 (file)
@@ -509,6 +509,7 @@ public:
     bool cache_evict;     ///< true if this is a cache eviction
     bool ignore_cache;    ///< true if IGNORE_CACHE flag is set
     bool ignore_log_op_stats;  // don't log op stats
+    bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
 
     // side effects
     list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
@@ -640,7 +641,7 @@ public:
       snapset(0),
       new_obs(obs->oi, obs->exists),
       modify(false), user_modify(false), undirty(false), cache_evict(false),
-      ignore_cache(false), ignore_log_op_stats(false),
+      ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
       bytes_written(0), bytes_read(0), user_at_version(0),
       current_osd_subop_num(0),
       obc(obc),
@@ -661,7 +662,7 @@ public:
               vector<OSDOp>& _ops, ReplicatedPG *_pg) :
       op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
       modify(false), user_modify(false), undirty(false), cache_evict(false),
-      ignore_cache(false), ignore_log_op_stats(false),
+      ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
       bytes_written(0), bytes_read(0), user_at_version(0),
       current_osd_subop_num(0),
       data_off(0), reply(NULL), pg(_pg),
@@ -1470,6 +1471,8 @@ public:
     OpRequestRef& op,
     ThreadPool::TPHandle &handle);
   void do_op(OpRequestRef& op);
+  void record_write_error(OpRequestRef op, const hobject_t &soid,
+                         MOSDOpReply *orig_reply, int r);
   bool pg_op_must_wait(MOSDOp *op);
   void do_pg_op(OpRequestRef op);
   void do_sub_op(OpRequestRef op);