]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
messages/: include shard information in various pg messages
authorSamuel Just <sam.just@inktank.com>
Thu, 16 Jan 2014 23:27:36 +0000 (15:27 -0800)
committerSamuel Just <sam.just@inktank.com>
Tue, 18 Feb 2014 04:11:06 +0000 (20:11 -0800)
We can no longer use the messenger source information to determine
the origin of the message since an osd might have more than one
shard of a particular pg.  Thus, we need to include a pg_shard_t
from field to indicate origin.  Similarly, pg_t is no longer
sufficient to specify the destination pg, we instead use spg_t.
In the event that we get a message from an old peer, we default
from to pg_shard_t(get_source().num(), ghobject_t::no_shard())
and spg_t to spg_t(pgid, ghobject_t::no_shard()).  This suffices
because non-NO_SHARD shards can only appear once ec pools have
been enabled -- and doing that bans unenlightened osds.

Signed-off-by: Samuel Just <sam.just@inktank.com>
18 files changed:
src/common/hobject.h
src/include/encoding.h
src/messages/MBackfillReserve.h
src/messages/MOSDPGBackfill.h
src/messages/MOSDPGInfo.h
src/messages/MOSDPGLog.h
src/messages/MOSDPGNotify.h
src/messages/MOSDPGPull.h
src/messages/MOSDPGPush.h
src/messages/MOSDPGPushReply.h
src/messages/MOSDPGQuery.h
src/messages/MOSDPGRemove.h
src/messages/MOSDPGScan.h
src/messages/MOSDPGTrim.h
src/messages/MOSDRepScrub.h
src/messages/MOSDSubOp.h
src/messages/MOSDSubOpReply.h
src/messages/MRecoveryReserve.h

index 8d974115847698d80041c127d10010da6ce7e54c..f31604b43bc3483e146642d57e0f9c1630128851 100644 (file)
@@ -247,6 +247,7 @@ struct ghobject_t {
 
 public:
   static const shard_t NO_SHARD = UINT8_MAX;
+  static shard_t no_shard() { return NO_SHARD; }
   static const gen_t NO_GEN = UINT64_MAX;
 
   ghobject_t() : generation(NO_GEN), shard_id(NO_SHARD) {}
index ddb94ecb45d1d9e5ad13bdb402d9257e3b5f65f5..d0974823c1fbc6c33ff902705ab3e61dd01aab97 100644 (file)
@@ -320,6 +320,12 @@ inline void decode(boost::optional<T> &p, bufferlist::iterator &bp)
 
 // pair
 template<class A, class B>
+inline void encode(const std::pair<A,B> &p, bufferlist &bl, uint64_t features)
+{
+  encode(p.first, bl, features);
+  encode(p.second, bl, features);
+}
+template<class A, class B>
 inline void encode(const std::pair<A,B> &p, bufferlist &bl)
 {
   encode(p.first, bl);
index ce35ce76efe948d8585fdfb5ac5cc0705dc25a7e..d30e285cc5dcb34bfc9c27397dadb20373b380a3 100644 (file)
 #include "msg/Message.h"
 
 class MBackfillReserve : public Message {
-  static const int HEAD_VERSION = 2;
+  static const int HEAD_VERSION = 3;
   static const int COMPAT_VERSION = 1;
 public:
-  pg_t pgid;
+  spg_t pgid;
   epoch_t query_epoch;
   enum {
     REQUEST = 0,
@@ -35,7 +35,7 @@ public:
     : Message(MSG_OSD_BACKFILL_RESERVE, HEAD_VERSION, COMPAT_VERSION),
       query_epoch(0), type(-1), priority(-1) {}
   MBackfillReserve(int type,
-                  pg_t pgid,
+                  spg_t pgid,
                   epoch_t query_epoch, unsigned prio = -1)
     : Message(MSG_OSD_BACKFILL_RESERVE, HEAD_VERSION, COMPAT_VERSION),
       pgid(pgid), query_epoch(query_epoch),
@@ -65,20 +65,26 @@ public:
 
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(query_epoch, p);
     ::decode(type, p);
     if (header.version > 1)
       ::decode(priority, p);
     else
       priority = 0;
+    if (header.version >= 3)
+      ::decode(pgid.shard, p);
+    else
+      pgid.shard = ghobject_t::no_shard();
+
   }
 
   void encode_payload(uint64_t features) {
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(query_epoch, payload);
     ::encode(type, payload);
     ::encode(priority, payload);
+    ::encode(pgid.shard, payload);
   }
 };
 
index 5700f9675268f00ba1a396b4276dc7361e076a63..e9ec661cbb5d873668a2b25587982231821decae 100644 (file)
@@ -19,7 +19,7 @@
 #include "osd/osd_types.h"
 
 class MOSDPGBackfill : public Message {
-  static const int HEAD_VERSION = 2;
+  static const int HEAD_VERSION = 3;
   static const int COMPAT_VERSION = 1;
 public:
   enum {
@@ -38,7 +38,7 @@ public:
 
   __u32 op;
   epoch_t map_epoch, query_epoch;
-  pg_t pgid;
+  spg_t pgid;
   hobject_t last_backfill;
   bool compat_stat_sum;
   pg_stat_t stats;
@@ -48,7 +48,7 @@ public:
     ::decode(op, p);
     ::decode(map_epoch, p);
     ::decode(query_epoch, p);
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(last_backfill, p);
 
     // For compatibility with version 1
@@ -64,25 +64,31 @@ public:
     if (!last_backfill.is_max() &&
        last_backfill.pool == -1)
       last_backfill.pool = pgid.pool();
+    if (header.version >= 3)
+      ::decode(pgid.shard, p);
+    else
+      pgid.shard = ghobject_t::no_shard();
   }
 
   virtual void encode_payload(uint64_t features) {
     ::encode(op, payload);
     ::encode(map_epoch, payload);
     ::encode(query_epoch, payload);
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(last_backfill, payload);
 
     // For compatibility with version 1
     ::encode(stats.stats, payload);
 
     ::encode(stats, payload);
+
+    ::encode(pgid.shard, payload);
   }
 
   MOSDPGBackfill() :
     Message(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION),
     compat_stat_sum(false) {}
-  MOSDPGBackfill(__u32 o, epoch_t e, epoch_t qe, pg_t p)
+  MOSDPGBackfill(__u32 o, epoch_t e, epoch_t qe, spg_t p)
     : Message(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION),
       op(o),
       map_epoch(e), query_epoch(e),
index 448b43bff32c24afd1cf5b32d9849d9548509525..83e74fb8c36bd6eb93fa8e6cb558ff367d0d097e 100644 (file)
@@ -20,7 +20,7 @@
 #include "osd/osd_types.h"
 
 class MOSDPGInfo : public Message {
-  static const int HEAD_VERSION = 3;
+  static const int HEAD_VERSION = 4;
   static const int COMPAT_VERSION = 1;
 
   epoch_t epoch;
@@ -79,6 +79,14 @@ public:
         p++)
       ::encode(pair<epoch_t, epoch_t>(
                 p->first.epoch_sent, p->first.query_epoch), payload);
+
+    // v4 needs from, to
+    for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator p = pg_list.begin();
+        p != pg_list.end();
+        ++p) {
+      ::encode(p->first.from, payload);
+      ::encode(p->first.to, payload);
+    }
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
@@ -113,6 +121,16 @@ public:
        i->first.query_epoch = epoch;
       }
     }
+
+    // v4 needs from and to
+    if (header.version >= 4) {
+      for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i = pg_list.begin();
+          i != pg_list.end();
+          i++) {
+       ::decode(i->first.from, p);
+       ::decode(i->first.to, p);
+      }
+    }
   }
 };
 
index 906a8595100cd2243f9495adf95c441abeff6964..44cd98962b87001cbb9a739eb79d174511a63f39 100644 (file)
@@ -20,7 +20,7 @@
 
 class MOSDPGLog : public Message {
 
-  static const int HEAD_VERSION = 3;
+  static const int HEAD_VERSION = 4;
   static const int COMPAT_VERSION = 2;
 
   epoch_t epoch;
@@ -31,22 +31,29 @@ class MOSDPGLog : public Message {
   epoch_t query_epoch;
 
 public:
+  shard_id_t to;
+  shard_id_t from;
   pg_info_t info;
   pg_log_t log;
   pg_missing_t missing;
   pg_interval_map_t past_intervals;
 
   epoch_t get_epoch() { return epoch; }
-  pg_t get_pgid() { return info.pgid; }
+  spg_t get_pgid() { return spg_t(info.pgid.pgid, to); }
   epoch_t get_query_epoch() { return query_epoch; }
 
   MOSDPGLog() : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION) { }
-  MOSDPGLog(version_t mv, pg_info_t& i)
+  MOSDPGLog(shard_id_t to, shard_id_t from, version_t mv, pg_info_t& i)
     : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
-      epoch(mv), query_epoch(mv), info(i)  { }
-  MOSDPGLog(version_t mv, pg_info_t& i, epoch_t query_epoch)
+      epoch(mv), query_epoch(mv),
+      to(to), from(from),
+      info(i)  { }
+  MOSDPGLog(shard_id_t to, shard_id_t from,
+           version_t mv, pg_info_t& i, epoch_t query_epoch)
     : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
-      epoch(mv), query_epoch(query_epoch), info(i)  { }
+      epoch(mv), query_epoch(query_epoch),
+      to(to), from(from),
+      info(i)  { }
 
 private:
   ~MOSDPGLog() {}
@@ -66,6 +73,8 @@ public:
     ::encode(missing, payload);
     ::encode(query_epoch, payload);
     ::encode(past_intervals, payload);
+    ::encode(to, payload);
+    ::encode(from, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
@@ -79,6 +88,13 @@ public:
     if (header.version >= 3) {
       ::decode(past_intervals, p);
     }
+    if (header.version >= 4) {
+      ::decode(to, p);
+      ::decode(from, p);
+    } else {
+      to = ghobject_t::NO_SHARD;
+      from = ghobject_t::NO_SHARD;
+    }
   }
 };
 
index 3d2b269f8354a60ed60185d34bfb0d17bea98d21..6b9bdb33a825687747a6a157ef54a01f897b9afc 100644 (file)
@@ -25,7 +25,7 @@
 
 class MOSDPGNotify : public Message {
 
-  static const int HEAD_VERSION = 4;
+  static const int HEAD_VERSION = 5;
   static const int COMPAT_VERSION = 2;
 
   epoch_t epoch;
@@ -83,6 +83,14 @@ public:
       ::encode(pair<epoch_t, epoch_t>(
          p->first.epoch_sent, p->first.query_epoch),
        payload);
+
+    // v5 needs from, to
+    for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator p = pg_list.begin();
+        p != pg_list.end();
+        ++p) {
+      ::encode(p->first.from, payload);
+      ::encode(p->first.to, payload);
+    }
   }
   void decode_payload() {
     epoch_t query_epoch;
@@ -120,6 +128,16 @@ public:
        i->first.query_epoch = query_epoch;
       }
     }
+
+    // v5 needs from and to
+    if (header.version >= 5) {
+      for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i = pg_list.begin();
+          i != pg_list.end();
+          i++) {
+       ::decode(i->first.from, p);
+       ::decode(i->first.to, p);
+      }
+    }
   }
   void print(ostream& out) const {
     out << "pg_notify(";
index 870db7f1a19c387e7d88846f3c3949ecf9486614..91072320fe677994127510fbbe9703f5747e0699 100644 (file)
 #include "osd/osd_types.h"
 
 class MOSDPGPull : public Message {
-  static const int HEAD_VERSION = 1;
+  static const int HEAD_VERSION = 2;
   static const int COMPAT_VERSION = 1;
 
 
 public:
-  pg_t pgid;
+  pg_shard_t from;
+  spg_t pgid;
   epoch_t map_epoch;
   vector<PullOp> pulls;
   uint64_t cost;
@@ -49,17 +50,26 @@ public:
 
   virtual void decode_payload() {
     bufferlist::iterator p = payload.begin();
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(map_epoch, p);
     ::decode(pulls, p);
     ::decode(cost, p);
+    if (header.version >= 2) {
+      ::decode(pgid.shard, p);
+      ::decode(from, p);
+    } else {
+      pgid.shard = ghobject_t::NO_SHARD;
+      from = pg_shard_t(get_source().num(), ghobject_t::NO_SHARD);
+    }
   }
 
   virtual void encode_payload(uint64_t features) {
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(map_epoch, payload);
     ::encode(pulls, payload);
     ::encode(cost, payload);
+    ::encode(pgid.shard, payload);
+    ::encode(from, payload);
   }
 
   const char *get_type_name() const { return "MOSDPGPull"; }
index acc0d2aee8e07d3dab0615f970666091efc962de..46a8f1becd44afc3a1fc6154b55bce6aa0d0326e 100644 (file)
 #include "osd/osd_types.h"
 
 class MOSDPGPush : public Message {
-  static const int HEAD_VERSION = 1;
+  static const int HEAD_VERSION = 2;
   static const int COMPAT_VERSION = 1;
 
 
 public:
-  pg_t pgid;
+  pg_shard_t from;
+  spg_t pgid;
   epoch_t map_epoch;
   vector<PushOp> pushes;
   uint64_t cost;
@@ -49,17 +50,26 @@ public:
 
   virtual void decode_payload() {
     bufferlist::iterator p = payload.begin();
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(map_epoch, p);
     ::decode(pushes, p);
     ::decode(cost, p);
+    if (header.version >= 2) {
+      ::decode(pgid.shard, p);
+      ::decode(from, p);
+    } else {
+      pgid.shard = ghobject_t::NO_SHARD;
+      from = pg_shard_t(get_source().num(), ghobject_t::NO_SHARD);
+    }
   }
 
   virtual void encode_payload(uint64_t features) {
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(map_epoch, payload);
     ::encode(pushes, payload);
     ::encode(cost, payload);
+    ::encode(pgid.shard, payload);
+    ::encode(from, payload);
   }
 
   const char *get_type_name() const { return "MOSDPGPush"; }
index 192dc2c1f81c340f819203fa376e89167ed3b57d..1875235dbb7d480249fbfc46996b8d240cf5d00e 100644 (file)
 #include "osd/osd_types.h"
 
 class MOSDPGPushReply : public Message {
-  static const int HEAD_VERSION = 1;
+  static const int HEAD_VERSION = 2;
   static const int COMPAT_VERSION = 1;
 
 public:
-  pg_t pgid;
+  pg_shard_t from;
+  spg_t pgid;
   epoch_t map_epoch;
   vector<PushReplyOp> replies;
   uint64_t cost;
@@ -48,17 +49,27 @@ public:
 
   virtual void decode_payload() {
     bufferlist::iterator p = payload.begin();
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(map_epoch, p);
     ::decode(replies, p);
     ::decode(cost, p);
+
+    if (header.version >= 2) {
+      ::decode(pgid.shard, p);
+      ::decode(from, p);
+    } else {
+      pgid.shard = ghobject_t::NO_SHARD;
+      from = pg_shard_t(get_source().num(), ghobject_t::NO_SHARD);
+    }
   }
 
   virtual void encode_payload(uint64_t features) {
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(map_epoch, payload);
     ::encode(replies, payload);
     ::encode(cost, payload);
+    ::encode(pgid.shard, payload);
+    ::encode(from, payload);
   }
 
   void print(ostream& out) const {
index b637412a38768d8c237e5f45e6e964e96618873c..c2c6f695d39ab4b6fdf710ecf5dfabe8f7e2e2ce 100644 (file)
@@ -16,6 +16,7 @@
 #ifndef CEPH_MOSDPGQUERY_H
 #define CEPH_MOSDPGQUERY_H
 
+#include "common/hobject.h"
 #include "msg/Message.h"
 
 /*
  */
 
 class MOSDPGQuery : public Message {
-  static const int HEAD_VERSION = 2;
+  static const int HEAD_VERSION = 3;
   static const int COMPAT_VERSION = 1;
   version_t       epoch;
 
  public:
   version_t get_epoch() { return epoch; }
-  map<pg_t,pg_query_t>  pg_list;
+  map<spg_t, pg_query_t>  pg_list;
 
   MOSDPGQuery() : Message(MSG_OSD_PG_QUERY,
                          HEAD_VERSION,
                          COMPAT_VERSION) {}
-  MOSDPGQuery(epoch_t e, map<pg_t,pg_query_t>& ls) :
+  MOSDPGQuery(epoch_t e, map<spg_t,pg_query_t>& ls) :
     Message(MSG_OSD_PG_QUERY,
            HEAD_VERSION,
            COMPAT_VERSION),
@@ -48,7 +49,8 @@ public:
   const char *get_type_name() const { return "pg_query"; }
   void print(ostream& out) const {
     out << "pg_query(";
-    for (map<pg_t,pg_query_t>::const_iterator p = pg_list.begin(); p != pg_list.end(); ++p) {
+    for (map<spg_t,pg_query_t>::const_iterator p = pg_list.begin();
+        p != pg_list.end(); ++p) {
       if (p != pg_list.begin())
        out << ",";
       out << p->first;
@@ -58,15 +60,38 @@ public:
 
   void encode_payload(uint64_t features) {
     ::encode(epoch, payload);
-    ::encode(pg_list, payload, features);
+    vector<pair<pg_t, pg_query_t> > _pg_list;
+    _pg_list.reserve(pg_list.size());
+    vector<shard_id_t> _shard_list;
+    _shard_list.reserve(pg_list.size());
+    for (map<spg_t, pg_query_t>::iterator i = pg_list.begin();
+        i != pg_list.end();
+        ++i) {
+      _pg_list.push_back(make_pair(i->first.pgid, i->second));
+      _shard_list.push_back(i->first.shard);
+    }
+    ::encode(_pg_list, payload, features);
+    ::encode(_shard_list, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(epoch, p);
-    ::decode(pg_list, p);
+    vector<pair<pg_t, pg_query_t> > _pg_list;
+    ::decode(_pg_list, p);
+    vector<shard_id_t> _shard_list(_pg_list.size(), ghobject_t::no_shard());
+    if (header.version >= 3) {
+      _shard_list.clear();
+      ::decode(_shard_list, p);
+    }
+    assert(_pg_list.size() == _shard_list.size());
+    for (unsigned i = 0; i < _pg_list.size(); ++i) {
+      pg_list.insert(
+       make_pair(
+         spg_t(_pg_list[i].first, _shard_list[i]), _pg_list[i].second));
+    }
 
     if (header.version < 2) {
-      for (map<pg_t, pg_query_t>::iterator i = pg_list.begin();
+      for (map<spg_t, pg_query_t>::iterator i = pg_list.begin();
           i != pg_list.end();
           ++i) {
        i->second.epoch_sent = epoch;
index c6ec797466e4e870e83d5c39ebbf9d6a8f1fd604..b55b5d28182decff44141487a763447eb595fa13 100644 (file)
 #ifndef CEPH_MOSDPGREMOVE_H
 #define CEPH_MOSDPGREMOVE_H
 
+#include "common/hobject.h"
 #include "msg/Message.h"
 
 
 class MOSDPGRemove : public Message {
+
+  static const int HEAD_VERSION = 2;
+  static const int COMPAT_VERSION = 1;
+
   epoch_t epoch;
 
  public:
-  vector<pg_t> pg_list;
+  vector<spg_t> pg_list;
 
   epoch_t get_epoch() { return epoch; }
 
-  MOSDPGRemove() : Message(MSG_OSD_PG_REMOVE) {}
-  MOSDPGRemove(epoch_t e, vector<pg_t>& l) :
-    Message(MSG_OSD_PG_REMOVE) {
+  MOSDPGRemove() :
+    Message(MSG_OSD_PG_REMOVE, HEAD_VERSION, COMPAT_VERSION) {}
+  MOSDPGRemove(epoch_t e, vector<spg_t>& l) :
+    Message(MSG_OSD_PG_REMOVE, HEAD_VERSION, COMPAT_VERSION) {
     this->epoch = e;
     pg_list.swap(l);
   }
@@ -41,16 +47,38 @@ public:
 
   void encode_payload(uint64_t features) {
     ::encode(epoch, payload);
-    ::encode(pg_list, payload);
+
+    vector<pg_t> _pg_list;
+    _pg_list.reserve(pg_list.size());
+    vector<shard_id_t> _shard_list;
+    _shard_list.reserve(pg_list.size());
+    for (vector<spg_t>::iterator i = pg_list.begin(); i != pg_list.end(); ++i) {
+      _pg_list.push_back(i->pgid);
+      _shard_list.push_back(i->shard);
+    }
+    ::encode(_pg_list, payload);
+    ::encode(_shard_list, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(epoch, p);
-    ::decode(pg_list, p);
+    vector<pg_t> _pg_list;
+    ::decode(_pg_list, p);
+
+    vector<shard_id_t> _shard_list(_pg_list.size(), ghobject_t::no_shard());
+    if (header.version >= 2) {
+      _shard_list.clear();
+      ::decode(_shard_list, p);
+    }
+    assert(_shard_list.size() == _pg_list.size());
+    pg_list.reserve(_shard_list.size());
+    for (unsigned i = 0; i < _shard_list.size(); ++i) {
+      pg_list.push_back(spg_t(_pg_list[i], _shard_list[i]));
+    }
   }
   void print(ostream& out) const {
     out << "osd pg remove(" << "epoch " << epoch << "; ";
-    for (vector<pg_t>::const_iterator i = pg_list.begin();
+    for (vector<spg_t>::const_iterator i = pg_list.begin();
          i != pg_list.end();
          ++i) {
       out << "pg" << *i << "; ";
index 4c86a3c7e7aa7b1973b1bc545b8674cccf55516c..2c0c1ad7abc8a4b30f2152d0f78357edec401218 100644 (file)
 #include "osd/osd_types.h"
 
 class MOSDPGScan : public Message {
+
+  static const int HEAD_VERSION = 2;
+  static const int COMPAT_VERSION = 1;
+
 public:
   enum {
     OP_SCAN_GET_DIGEST = 1,      // just objects and versions
@@ -34,7 +38,8 @@ public:
 
   __u32 op;
   epoch_t map_epoch, query_epoch;
-  pg_t pgid;
+  pg_shard_t from;
+  spg_t pgid;
   hobject_t begin, end;
 
   virtual void decode_payload() {
@@ -42,7 +47,7 @@ public:
     ::decode(op, p);
     ::decode(map_epoch, p);
     ::decode(query_epoch, p);
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(begin, p);
     ::decode(end, p);
 
@@ -51,22 +56,36 @@ public:
       begin.pool = pgid.pool();
     if (!end.is_max() && end.pool == -1)
       end.pool = pgid.pool();
+
+    if (header.version >= 2) {
+      ::decode(from, p);
+      ::decode(pgid.shard, p);
+    } else {
+      from = pg_shard_t(
+       get_source().num(),
+       ghobject_t::NO_SHARD);
+      pgid.shard = ghobject_t::NO_SHARD;
+    }
   }
 
   virtual void encode_payload(uint64_t features) {
     ::encode(op, payload);
     ::encode(map_epoch, payload);
     ::encode(query_epoch, payload);
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(begin, payload);
     ::encode(end, payload);
+    ::encode(from, payload);
+    ::encode(pgid.shard, payload);
   }
 
-  MOSDPGScan() : Message(MSG_OSD_PG_SCAN) {}
-  MOSDPGScan(__u32 o, epoch_t e, epoch_t qe, pg_t p, hobject_t be, hobject_t en)
-    : Message(MSG_OSD_PG_SCAN),
+  MOSDPGScan() : Message(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION) {}
+  MOSDPGScan(__u32 o, pg_shard_t from,
+            epoch_t e, epoch_t qe, spg_t p, hobject_t be, hobject_t en)
+    : Message(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION),
       op(o),
       map_epoch(e), query_epoch(e),
+      from(from),
       pgid(p),
       begin(be), end(en) {
   }
index ad52a7f5fe78a0185bfcc812f67d54b81f0d3c91..12a0e7c824037bf448d98a4f33ec72b753054fe3 100644 (file)
 #include "msg/Message.h"
 
 class MOSDPGTrim : public Message {
+
+  static const int HEAD_VERSION = 2;
+  static const int COMPAT_VERSION = 1;
+
 public:
   epoch_t epoch;
-  pg_t pgid;
+  spg_t pgid;
   eversion_t trim_to;
 
   epoch_t get_epoch() { return epoch; }
 
-  MOSDPGTrim() : Message(MSG_OSD_PG_TRIM) {}
-  MOSDPGTrim(version_t mv, pg_t p, eversion_t tt) :
+  MOSDPGTrim() : Message(MSG_OSD_PG_TRIM, HEAD_VERSION, COMPAT_VERSION) {}
+  MOSDPGTrim(version_t mv, spg_t p, eversion_t tt) :
     Message(MSG_OSD_PG_TRIM),
     epoch(mv), pgid(p), trim_to(tt) { }
 private:
@@ -40,14 +44,19 @@ public:
 
   void encode_payload(uint64_t features) {
     ::encode(epoch, payload);
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(trim_to, payload);
+    ::encode(pgid.shard, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(epoch, p);
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(trim_to, p);
+    if (header.version >= 2)
+      ::decode(pgid.shard, p);
+    else
+      pgid.shard = ghobject_t::no_shard();
   }
 };
 
index 4fae008c17e4dd3c8a4aa7d4167e86c9bf6d5344..3f67021ef14c128d87566b5cce184757246bfbcd 100644 (file)
 
 struct MOSDRepScrub : public Message {
 
-  static const int HEAD_VERSION = 4;
+  static const int HEAD_VERSION = 5;
   static const int COMPAT_VERSION = 2;
 
-  pg_t pgid;             // PG to scrub
+  spg_t pgid;             // PG to scrub
   eversion_t scrub_from; // only scrub log entries after scrub_from
   eversion_t scrub_to;   // last_update_applied when message sent
   epoch_t map_epoch;
@@ -40,7 +40,7 @@ struct MOSDRepScrub : public Message {
       chunky(false),
       deep(false) { }
 
-  MOSDRepScrub(pg_t pgid, eversion_t scrub_from, eversion_t scrub_to,
+  MOSDRepScrub(spg_t pgid, eversion_t scrub_from, eversion_t scrub_to,
               epoch_t map_epoch)
     : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION),
       pgid(pgid),
@@ -50,7 +50,7 @@ struct MOSDRepScrub : public Message {
       chunky(false),
       deep(false) { }
 
-  MOSDRepScrub(pg_t pgid, eversion_t scrub_to, epoch_t map_epoch,
+  MOSDRepScrub(spg_t pgid, eversion_t scrub_to, epoch_t map_epoch,
                hobject_t start, hobject_t end, bool deep)
     : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION),
       pgid(pgid),
@@ -78,7 +78,7 @@ public:
   }
 
   void encode_payload(uint64_t features) {
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(scrub_from, payload);
     ::encode(scrub_to, payload);
     ::encode(map_epoch, payload);
@@ -86,10 +86,11 @@ public:
     ::encode(start, payload);
     ::encode(end, payload);
     ::encode(deep, payload);
+    ::encode(pgid.shard, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(scrub_from, p);
     ::decode(scrub_to, p);
     ::decode(map_epoch, p);
@@ -107,6 +108,12 @@ public:
       chunky = false;
       deep = false;
     }
+
+    if (header.version >= 5) {
+      ::decode(pgid.shard, p);
+    } else {
+      pgid.shard = ghobject_t::no_shard();
+    }
   }
 };
 
index 7e9f087fec809e7141149d4da095a596018f5cb4..74022476f3e3104f9df5089f9871acebfdf6a095 100644 (file)
@@ -25,7 +25,7 @@
 
 class MOSDSubOp : public Message {
 
-  static const int HEAD_VERSION = 8;
+  static const int HEAD_VERSION = 9;
   static const int COMPAT_VERSION = 1;
 
 public:
@@ -35,7 +35,8 @@ public:
   osd_reqid_t reqid;
   
   // subop
-  pg_t pgid;
+  pg_shard_t from;
+  spg_t pgid;
   hobject_t poid;
   object_locator_t oloc;
   
@@ -100,7 +101,7 @@ public:
     bufferlist::iterator p = payload.begin();
     ::decode(map_epoch, p);
     ::decode(reqid, p);
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(poid, p);
 
     __u32 num_ops;
@@ -158,12 +159,22 @@ public:
       ::decode(new_temp_oid, p);
       ::decode(discard_temp_oid, p);
     }
+
+    if (header.version >= 9) {
+      ::decode(from, p);
+      ::decode(pgid.shard, p);
+    } else {
+      from = pg_shard_t(
+       get_source().num(),
+       ghobject_t::NO_SHARD);
+      pgid.shard = ghobject_t::NO_SHARD;
+    }
   }
 
   virtual void encode_payload(uint64_t features) {
     ::encode(map_epoch, payload);
     ::encode(reqid, payload);
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(poid, payload);
 
     __u32 num_ops = ops.size();
@@ -204,15 +215,19 @@ public:
     ::encode(omap_header, payload);
     ::encode(new_temp_oid, payload);
     ::encode(discard_temp_oid, payload);
+    ::encode(from, payload);
+    ::encode(pgid.shard, payload);
   }
 
   MOSDSubOp()
     : Message(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION) { }
-  MOSDSubOp(osd_reqid_t r, pg_t p, const hobject_t& po, bool noop_, int aw,
+  MOSDSubOp(osd_reqid_t r, pg_shard_t from,
+           spg_t p, const hobject_t& po, bool noop_, int aw,
            epoch_t mape, tid_t rtid, eversion_t v)
     : Message(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION),
       map_epoch(mape),
       reqid(r),
+      from(from),
       pgid(p),
       poid(po),
       acks_wanted(aw),
index 6b0738a212f0ef64d481b9b2304f0e21588c771d..270629f91f3d1daf0af5ff6c509fd0659d1fc0f4 100644 (file)
  */
 
 class MOSDSubOpReply : public Message {
+  static const int HEAD_VERSION = 2;
+  static const int COMPAT_VERSION = 1;
 public:
   epoch_t map_epoch;
   
   // subop metadata
   osd_reqid_t reqid;
-  pg_t pgid;
+  pg_shard_t from;
+  spg_t pgid;
   hobject_t poid;
 
   vector<OSDOp> ops;
@@ -54,7 +57,7 @@ public:
     bufferlist::iterator p = payload.begin();
     ::decode(map_epoch, p);
     ::decode(reqid, p);
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(poid, p);
 
     unsigned num_ops;
@@ -71,11 +74,21 @@ public:
 
     if (!poid.is_max() && poid.pool == -1)
       poid.pool = pgid.pool();
+
+    if (header.version >= 2) {
+      ::decode(from, p);
+      ::decode(pgid.shard, p);
+    } else {
+      from = pg_shard_t(
+       get_source().num(),
+       ghobject_t::NO_SHARD);
+      pgid.shard = ghobject_t::NO_SHARD;
+    }
   }
   virtual void encode_payload(uint64_t features) {
     ::encode(map_epoch, payload);
     ::encode(reqid, payload);
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(poid, payload);
     __u32 num_ops = ops.size();
     ::encode(num_ops, payload);
@@ -87,11 +100,13 @@ public:
     ::encode(last_complete_ondisk, payload);
     ::encode(peer_stat, payload);
     ::encode(attrset, payload);
+    ::encode(from, payload);
+    ::encode(pgid.shard, payload);
   }
 
   epoch_t get_map_epoch() { return map_epoch; }
 
-  pg_t get_pg() { return pgid; }
+  spg_t get_pg() { return pgid; }
   hobject_t get_poid() { return poid; }
 
   int get_ack_type() { return ack_type; }
@@ -110,11 +125,13 @@ public:
   map<string,bufferptr>& get_attrset() { return attrset; } 
 
 public:
-  MOSDSubOpReply(MOSDSubOp *req, int result_, epoch_t e, int at) :
-    Message(MSG_OSD_SUBOPREPLY),
+  MOSDSubOpReply(
+    MOSDSubOp *req, pg_shard_t from, int result_, epoch_t e, int at) :
+    Message(MSG_OSD_SUBOPREPLY, HEAD_VERSION, COMPAT_VERSION),
     map_epoch(e),
     reqid(req->reqid),
-    pgid(req->pgid),
+    from(from),
+    pgid(req->pgid.pgid, req->from.shard),
     poid(req->poid),
     ops(req->ops),
     ack_type(at),
index e87d8afc87114bebbf2b2a96931f6bd944e493e3..2a88bfdeb9f3bdd6ba4c317c274ae7b64aba67fe 100644 (file)
 #include "msg/Message.h"
 
 class MRecoveryReserve : public Message {
-  static const int HEAD_VERSION = 1;
+  static const int HEAD_VERSION = 2;
   static const int COMPAT_VERSION = 1;
 public:
-  pg_t pgid;
+  spg_t pgid;
   epoch_t query_epoch;
   enum {
     REQUEST = 0,
@@ -34,7 +34,7 @@ public:
     : Message(MSG_OSD_RECOVERY_RESERVE, HEAD_VERSION, COMPAT_VERSION),
       query_epoch(0), type(-1) {}
   MRecoveryReserve(int type,
-                  pg_t pgid,
+                  spg_t pgid,
                   epoch_t query_epoch)
     : Message(MSG_OSD_RECOVERY_RESERVE, HEAD_VERSION, COMPAT_VERSION),
       pgid(pgid), query_epoch(query_epoch),
@@ -63,15 +63,20 @@ public:
 
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
-    ::decode(pgid, p);
+    ::decode(pgid.pgid, p);
     ::decode(query_epoch, p);
     ::decode(type, p);
+    if (header.version >= 2)
+      ::decode(pgid.shard, p);
+    else
+      pgid.shard = ghobject_t::no_shard();
   }
 
   void encode_payload(uint64_t features) {
-    ::encode(pgid, payload);
+    ::encode(pgid.pgid, payload);
     ::encode(query_epoch, payload);
     ::encode(type, payload);
+    ::encode(pgid.shard, payload);
   }
 };