]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/PrimaryLogPG: include op_returns in dup replies
authorSage Weil <sage@redhat.com>
Sun, 29 Sep 2019 16:34:31 +0000 (11:34 -0500)
committerSage Weil <sage@redhat.com>
Mon, 30 Sep 2019 15:04:48 +0000 (10:04 -0500)
We are storing the return metadata; actually use it when sending dup
replies!

Signed-off-by: Sage Weil <sage@redhat.com>
src/messages/MOSDOpReply.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/PGLog.h
src/osd/PrimaryLogPG.cc
src/test/osd/TestPGLog.cc

index ed79f6587a8014a23a46679ce059f0bb763877d5..206b5df74afa9ad9635fc42e1a2732fb2174ecd9 100644 (file)
@@ -106,6 +106,16 @@ public:
     o.swap(ops);
     bdata_encode = false;
   }
+  void set_op_returns(const vector<pg_log_op_return_item_t>& op_returns) {
+    if (op_returns.size()) {
+      ceph_assert(ops.empty() || ops.size() == op_returns.size());
+      ops.resize(op_returns.size());
+      for (unsigned i = 0; i < op_returns.size(); ++i) {
+       ops[i].rval = op_returns[i].rval;
+       ops[i].outdata = op_returns[i].bl;
+      }
+    }
+  }
 
   /**
    * get retry attempt
index 8429118770b8f1da5a9df2b5164c7bd88c4471d2..fc7f5654e521e98a3f0f5c624d781c1ffe71244d 100644 (file)
@@ -1600,19 +1600,22 @@ OSDMapRef OSDService::try_get_map(epoch_t epoch)
 
 void OSDService::reply_op_error(OpRequestRef op, int err)
 {
-  reply_op_error(op, err, eversion_t(), 0);
+  reply_op_error(op, err, eversion_t(), 0, {});
 }
 
 void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
-                                version_t uv)
+                                version_t uv,
+                               vector<pg_log_op_return_item_t> op_returns)
 {
   auto m = op->get_req<MOSDOp>();
   ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
   int flags;
   flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
 
-  MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags, err >= 0);
+  MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
+                                      !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
   reply->set_reply_versions(v, uv);
+  reply->set_op_returns(op_returns);
   m->get_connection()->send_message(reply);
 }
 
index 1a79ce3b2de5f4a2fdde97f372d9d3e4c263a5a4..26b6ec80dba3047b87f8a5d8a9e426d13bc7a435 100644 (file)
@@ -358,7 +358,8 @@ public:
   void dump_scrub_reservations(Formatter *f);
 
   void reply_op_error(OpRequestRef op, int err);
-  void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
+  void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
+                     vector<pg_log_op_return_item_t> op_returns);
   void handle_misdirected_op(PG *pg, OpRequestRef op);
 
 
index e6a0ffca6be3ac68e11381237b32e93cfc9d3443..949e8507d6660bdad6c72906a5c7f9bb40c5a097 100644 (file)
@@ -819,12 +819,15 @@ bool PG::check_in_progress_op(
   const osd_reqid_t &r,
   eversion_t *version,
   version_t *user_version,
-  int *return_code) const
+  int *return_code,
+  vector<pg_log_op_return_item_t> *op_returns
+  ) const
 {
   return (
-    projected_log.get_request(r, version, user_version, return_code) ||
+    projected_log.get_request(r, version, user_version, return_code,
+                             op_returns) ||
     recovery_state.get_pg_log().get_log().get_request(
-      r, version, user_version, return_code));
+      r, version, user_version, return_code, op_returns));
 }
 
 void PG::publish_stats_to_osd()
index 93b9d5d590fd46a59cdbe6c77b74fe352496d37f..f831925fd7686616403274affe95ee55d6a983b7 100644 (file)
@@ -974,7 +974,9 @@ protected:
   map<hobject_t, list<Context*>> callbacks_for_degraded_object;
 
   map<eversion_t,
-      list<tuple<OpRequestRef, version_t, int> > > waiting_for_ondisk;
+      list<
+       tuple<OpRequestRef, version_t, int,
+             vector<pg_log_op_return_item_t>>>> waiting_for_ondisk;
 
   void requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m);
   void requeue_op(OpRequestRef op);
@@ -1442,7 +1444,8 @@ protected:
     const osd_reqid_t &r,
     eversion_t *version,
     version_t *user_version,
-    int *return_code) const;
+    int *return_code,
+    vector<pg_log_op_return_item_t> *op_returns) const;
   eversion_t projected_last_update;
   eversion_t get_next_version() const {
     eversion_t at_version(
index 50a0ed42beda30e7c90309a4b03b4b661e027da8..c63f8c28b9d04f00e583f6ce488c2e1b8b99eaa0 100644 (file)
@@ -267,7 +267,8 @@ public:
       const osd_reqid_t &r,
       eversion_t *version,
       version_t *user_version,
-      int *return_code) const
+      int *return_code,
+      vector<pg_log_op_return_item_t> *op_returns) const
     {
       ceph_assert(version);
       ceph_assert(user_version);
@@ -281,6 +282,7 @@ public:
        *version = p->second->version;
        *user_version = p->second->user_version;
        *return_code = p->second->return_code;
+       *op_returns = p->second->op_returns;
        return true;
       }
 
@@ -299,6 +301,7 @@ public:
            *version = p->second->version;
            *user_version = i->second;
            *return_code = p->second->return_code;
+           *op_returns = p->second->op_returns;
            if (*return_code >= 0) {
              auto it = p->second->extra_reqid_return_codes.find(idx);
              if (it != p->second->extra_reqid_return_codes.end()) {
@@ -319,6 +322,7 @@ public:
        *version = q->second->version;
        *user_version = q->second->user_version;
        *return_code = q->second->return_code;
+       *op_returns = q->second->op_returns;
        return true;
       }
 
index 684deab2ae66c6b78a3f0a0d96472926cb269d08..695efd54099531badd1d181bfa5ad6c04ece77d2 100644 (file)
@@ -1990,17 +1990,19 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     eversion_t version;
     version_t user_version;
     int return_code = 0;
+    vector<pg_log_op_return_item_t> op_returns;
     bool got = check_in_progress_op(
-      m->get_reqid(), &version, &user_version, &return_code);
+      m->get_reqid(), &version, &user_version, &return_code, &op_returns);
     if (got) {
       dout(3) << __func__ << " dup " << m->get_reqid()
              << " version " << version << dendl;
       if (already_complete(version)) {
-       osd->reply_op_error(op, return_code, version, user_version);
+       osd->reply_op_error(op, return_code, version, user_version, op_returns);
       } else {
        dout(10) << " waiting for " << version << " to commit" << dendl;
         // always queue ondisk waiters, so that we can requeue if needed
-       waiting_for_ondisk[version].emplace_back(op, user_version, return_code);
+       waiting_for_ondisk[version].emplace_back(op, user_version, return_code,
+                                                op_returns);
        op->mark_delayed("waiting for ondisk");
       }
       return;
@@ -3560,7 +3562,8 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
   if (!pwop->sent_reply) {
     // send commit.
     assert(pwop->ctx->reply == nullptr);
-    MOSDOpReply *reply = new MOSDOpReply(m, r, get_osdmap_epoch(), 0, true);
+    MOSDOpReply *reply = new MOSDOpReply(m, r, get_osdmap_epoch(), 0,
+                                        true /* we claim it below */);
     reply->set_reply_versions(eversion_t(), pwop->user_version);
     reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
     reply->claim_op_out_data(pwop->ops);
@@ -10424,7 +10427,7 @@ void PrimaryLogPG::eval_repop(RepGather *repop)
           return_code = std::get<2>(i);
         }
         osd->reply_op_error(std::get<0>(i), return_code, repop->v,
-                            std::get<1>(i));
+                            std::get<1>(i), std::get<3>(i));
       }
       waiting_for_ondisk.erase(it);
     }
index 684b0b21c23a5e777e8df546d3d9757733c04268..99e22f82df9688cd11c7469969e6490611a5028e 100644 (file)
@@ -2204,8 +2204,9 @@ TEST_F(PGLogTest, get_request) {
     eversion_t replay_version;
     version_t user_version;
     int return_code = 0;
+    vector<pg_log_op_return_item_t> op_returns;
     bool got = log.get_request(
-      entry.reqid, &replay_version, &user_version, &return_code);
+      entry.reqid, &replay_version, &user_version, &return_code, &op_returns);
     EXPECT_TRUE(got);
     EXPECT_EQ(entry.return_code, return_code);
     EXPECT_EQ(entry.version, replay_version);
@@ -2887,6 +2888,7 @@ TEST_F(PGLogTrimTest, TestGetRequest) {
   eversion_t version;
   version_t user_version;
   int return_code;
+  vector<pg_log_op_return_item_t> op_returns;
 
   osd_reqid_t log_reqid = osd_reqid_t(client, 8, 5);
   osd_reqid_t dup_reqid = osd_reqid_t(client, 8, 3);
@@ -2894,15 +2896,18 @@ TEST_F(PGLogTrimTest, TestGetRequest) {
 
   bool result;
 
-  result = log.get_request(log_reqid, &version, &user_version, &return_code);
+  result = log.get_request(log_reqid, &version, &user_version, &return_code,
+                          &op_returns);
   EXPECT_EQ(true, result);
   EXPECT_EQ(mk_evt(21, 165), version);
 
-  result = log.get_request(dup_reqid, &version, &user_version, &return_code);
+  result = log.get_request(dup_reqid, &version, &user_version, &return_code,
+                          &op_returns);
   EXPECT_EQ(true, result);
   EXPECT_EQ(mk_evt(15, 155), version);
 
-  result = log.get_request(bad_reqid, &version, &user_version, &return_code);
+  result = log.get_request(bad_reqid, &version, &user_version, &return_code,
+                          &op_returns);
   EXPECT_FALSE(result);
 }