]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: always include osd op result, result code in the first reply
authorSage Weil <sage@newdream.net>
Tue, 29 Jun 2010 21:31:12 +0000 (14:31 -0700)
committerSage Weil <sage@newdream.net>
Tue, 29 Jun 2010 21:31:12 +0000 (14:31 -0700)
src/messages/MOSDOpReply.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index a3ef19f84c057dc7b8fc94b766ee09bdd3b237c9..484e42b2d6b48dae087d39151d427693648e878b 100644 (file)
@@ -51,6 +51,8 @@ class MOSDOpReply : public Message {
   void set_result(int r) { head.result = r; }
   void set_version(eversion_t v) { head.reassert_version = v; }
 
+  void add_flags(int f) { head.flags = (int)head.flags | f; }
+
   // osdmap
   epoch_t get_map_epoch() { return head.osdmap_epoch; }
 
index bef5c6053f2d385215f3376f9147f8a422bc3d8f..57241370accf6d83fb22ae5bc3a7d0ea54550d32 100644 (file)
@@ -599,19 +599,18 @@ void ReplicatedPG::do_op(MOSDOp *op)
     if (result == -EAGAIN)
       return;
 
+    // prepare the reply
+    ctx->reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0); 
+    ctx->reply->set_data(ctx->outdata);
+    ctx->reply->get_header().data_off = ctx->data_off;
+    ctx->reply->set_result(result);
+
     // read or error?
     if (ctx->op_t.empty() || result < 0) {
-      MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(),
-                                          CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); 
-      reply->set_data(ctx->outdata);
-      reply->get_header().data_off = ctx->data_off;
-      reply->set_result(result);
-      //if the message came from an OSD, it needs to go back to originator,
-      //but if the connection ISN't an OSD that connection is the originator
-      if (op->get_connection()->get_peer_type() != CEPH_ENTITY_TYPE_OSD)
-       osd->messenger->send_message(reply, op->get_connection());
-      else
-       osd->messenger->send_message(reply, op->get_orig_source_inst());
+      MOSDOpReply *reply = ctx->reply;
+      ctx->reply = NULL;
+      reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+      osd->messenger->send_message(reply, op->get_connection());
       op->put();
       delete ctx;
       put_object_context(obc);
@@ -2190,14 +2189,14 @@ void ReplicatedPG::eval_repop(RepGather *repop)
     // disk?
     if (repop->can_send_disk() && op->wants_ondisk()) {
       // send commit.
-      MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK);
-      dout(10) << " sending commit on " << *repop << " " << reply << dendl;
-      //if the message came from an OSD, it needs to go back to originator,
-      //but if the connection ISN't an OSD that connection is the originator
-      if (op->get_connection()->get_peer_type() != CEPH_ENTITY_TYPE_OSD)
-       osd->messenger->send_message(reply, op->get_connection());
+      MOSDOpReply *reply = repop->ctx->reply;
+      if (reply)
+       repop->ctx->reply = NULL;
       else
-       osd->messenger->send_message(reply, op->get_orig_source_inst());
+       reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0);
+      reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+      dout(10) << " sending commit on " << *repop << " " << reply << dendl;
+      osd->messenger->send_message(reply, op->get_connection());
       repop->sent_disk = true;
     }
 
@@ -2217,14 +2216,14 @@ void ReplicatedPG::eval_repop(RepGather *repop)
     else if (repop->can_send_ack()) {
       if (op->wants_ack()) {
        // send ack
-       MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
-       dout(10) << " sending ack on " << *repop << " " << reply << dendl;
-       //if the message came from an OSD, it needs to go back to originator,
-       //but if the connection ISN't an OSD that connection is the originator
-       if (op->get_connection()->get_peer_type() != CEPH_ENTITY_TYPE_OSD)
-         osd->messenger->send_message(reply, op->get_connection());
+       MOSDOpReply *reply = repop->ctx->reply;
+       if (reply)
+         repop->ctx->reply = NULL;
        else
-         osd->messenger->send_message(reply, op->get_orig_source_inst());
+         reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0);
+       reply->add_flags(CEPH_OSD_FLAG_ACK);
+       dout(10) << " sending ack on " << *repop << " " << reply << dendl;
+       osd->messenger->send_message(reply, op->get_connection());
        repop->sent_ack = true;
       }
 
index fc1cb1655e96db49a4a1e364a4cf106a74905009..c5ea432fa733dd5aab70cab0c80fb0d9bc99ce10 100644 (file)
@@ -18,6 +18,7 @@
 #include "PG.h"
 
 #include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
 class MOSDSubOp;
 class MOSDSubOpReply;
 
@@ -300,14 +301,18 @@ public:
 
     int data_off;        // FIXME: we may want to kill this msgr hint off at some point!
 
+    MOSDOpReply *reply;
+
     ReplicatedPG *pg;
 
     OpContext(Message *_op, osd_reqid_t _reqid, vector<OSDOp>& _ops, bufferlist& _data,
              ObjectState *_obs, ReplicatedPG *_pg) :
       op(_op), reqid(_reqid), ops(_ops), indata(_data), obs(_obs),
-      clone_obc(0), snapset_obc(0), data_off(0), pg(_pg) {}
+      clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) {}
     ~OpContext() {
       assert(!clone_obc);
+      if (reply)
+       reply->put();
     }
   };