]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
messages/,osd_types: add messages for Push, PushReply, Pull
authorSamuel Just <sam.just@inktank.com>
Mon, 17 Jun 2013 22:41:36 +0000 (15:41 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 8 Jul 2013 23:43:31 +0000 (16:43 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/Makefile.am
src/common/config_opts.h
src/messages/MOSDPGPull.h [new file with mode: 0644]
src/messages/MOSDPGPush.h [new file with mode: 0644]
src/messages/MOSDPGPushReply.h [new file with mode: 0644]
src/msg/Message.cc
src/msg/Message.h
src/osd/OSD.cc
src/osd/osd_types.cc
src/osd/osd_types.h

index c1a7e8020ef3d2b1e81b4bae3105a4b49df12baf..241a83393095d5eacbfbd8024f616ee3b1abb6a4 100644 (file)
@@ -1949,6 +1949,9 @@ noinst_HEADERS = \
         messages/MOSDOpReply.h\
        messages/MOSDPGBackfill.h\
         messages/MOSDPGCreate.h\
+        messages/MOSDPGPush.h\
+        messages/MOSDPGPull.h\
+        messages/MOSDPGPushReply.h\
         messages/MOSDPGInfo.h\
         messages/MOSDPGLog.h\
         messages/MOSDPGMissing.h\
index 775d54cac4872fd7c35947cb85b81fd6c3c55b7f..15e65e3bf98a50cfab17c34728a8998c352198e3 100644 (file)
@@ -425,6 +425,7 @@ OPTION(osd_auto_mark_unfound_lost, OPT_BOOL, false)
 OPTION(osd_recovery_delay_start, OPT_FLOAT, 0)
 OPTION(osd_recovery_max_active, OPT_INT, 5)
 OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20)  // max size of push chunk
+OPTION(osd_push_per_object_cost, OPT_U64, 1000)  // push cost per object
 OPTION(osd_recovery_forget_lost_objects, OPT_BOOL, false)   // off for now
 OPTION(osd_max_scrubs, OPT_INT, 1)
 OPTION(osd_scrub_load_threshold, OPT_FLOAT, 0.5)
diff --git a/src/messages/MOSDPGPull.h b/src/messages/MOSDPGPull.h
new file mode 100644 (file)
index 0000000..870db7f
--- /dev/null
@@ -0,0 +1,75 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef MOSDPGPULL_H
+#define MOSDPGPULL_H
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGPull : public Message {
+  static const int HEAD_VERSION = 1;
+  static const int COMPAT_VERSION = 1;
+
+
+public:
+  pg_t pgid;
+  epoch_t map_epoch;
+  vector<PullOp> pulls;
+  uint64_t cost;
+
+  MOSDPGPull() :
+    Message(MSG_OSD_PG_PULL, HEAD_VERSION, COMPAT_VERSION),
+    cost(0)
+    {}
+
+  void compute_cost(CephContext *cct) {
+    cost = 0;
+    for (vector<PullOp>::iterator i = pulls.begin();
+        i != pulls.end();
+        ++i) {
+      cost += i->cost(cct);
+    }
+  }
+
+  int get_cost() const {
+    return cost;
+  }
+
+  virtual void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(pgid, p);
+    ::decode(map_epoch, p);
+    ::decode(pulls, p);
+    ::decode(cost, p);
+  }
+
+  virtual void encode_payload(uint64_t features) {
+    ::encode(pgid, payload);
+    ::encode(map_epoch, payload);
+    ::encode(pulls, payload);
+    ::encode(cost, payload);
+  }
+
+  const char *get_type_name() const { return "MOSDPGPull"; }
+
+  void print(ostream& out) const {
+    out << "MOSDPGPull(" << pgid
+       << " " << map_epoch
+       << " " << pulls;
+    out << ")";
+  }
+};
+
+#endif
diff --git a/src/messages/MOSDPGPush.h b/src/messages/MOSDPGPush.h
new file mode 100644 (file)
index 0000000..acc0d2a
--- /dev/null
@@ -0,0 +1,75 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef MOSDPGPUSH_H
+#define MOSDPGPUSH_H
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGPush : public Message {
+  static const int HEAD_VERSION = 1;
+  static const int COMPAT_VERSION = 1;
+
+
+public:
+  pg_t pgid;
+  epoch_t map_epoch;
+  vector<PushOp> pushes;
+  uint64_t cost;
+
+  void compute_cost(CephContext *cct) {
+    cost = 0;
+    for (vector<PushOp>::iterator i = pushes.begin();
+        i != pushes.end();
+        ++i) {
+      cost += i->cost(cct);
+    }
+  }
+
+  int get_cost() const {
+    return cost;
+  }
+
+  MOSDPGPush() :
+    Message(MSG_OSD_PG_PUSH, HEAD_VERSION, COMPAT_VERSION),
+    cost(0)
+    {}
+
+  virtual void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(pgid, p);
+    ::decode(map_epoch, p);
+    ::decode(pushes, p);
+    ::decode(cost, p);
+  }
+
+  virtual void encode_payload(uint64_t features) {
+    ::encode(pgid, payload);
+    ::encode(map_epoch, payload);
+    ::encode(pushes, payload);
+    ::encode(cost, payload);
+  }
+
+  const char *get_type_name() const { return "MOSDPGPush"; }
+
+  void print(ostream& out) const {
+    out << "MOSDPGPush(" << pgid
+       << " " << map_epoch
+       << " " << pushes;
+    out << ")";
+  }
+};
+
+#endif
diff --git a/src/messages/MOSDPGPushReply.h b/src/messages/MOSDPGPushReply.h
new file mode 100644 (file)
index 0000000..192dc2c
--- /dev/null
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef MOSDPGPUSHREPLY_H
+#define MOSDPGPUSHREPLY_H
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGPushReply : public Message {
+  static const int HEAD_VERSION = 1;
+  static const int COMPAT_VERSION = 1;
+
+public:
+  pg_t pgid;
+  epoch_t map_epoch;
+  vector<PushReplyOp> replies;
+  uint64_t cost;
+
+  MOSDPGPushReply() :
+    Message(MSG_OSD_PG_PUSH_REPLY, HEAD_VERSION, COMPAT_VERSION),
+    cost(0)
+    {}
+
+  void compute_cost(CephContext *cct) {
+    cost = 0;
+    for (vector<PushReplyOp>::iterator i = replies.begin();
+        i != replies.end();
+        ++i) {
+      cost += i->cost(cct);
+    }
+  }
+
+  int get_cost() const {
+    return cost;
+  }
+
+  virtual void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(pgid, p);
+    ::decode(map_epoch, p);
+    ::decode(replies, p);
+    ::decode(cost, p);
+  }
+
+  virtual void encode_payload(uint64_t features) {
+    ::encode(pgid, payload);
+    ::encode(map_epoch, payload);
+    ::encode(replies, payload);
+    ::encode(cost, payload);
+  }
+
+  void print(ostream& out) const {
+    out << "MOSDPGPushReply(" << pgid
+       << " " << map_epoch
+       << " " << replies;
+    out << ")";
+  }
+
+  const char *get_type_name() const { return "MOSDPGPushReply"; }
+};
+
+#endif
index a6889d39fdf376088653afb185bfbc2e9978ce40..f65bc2be11ad33552319645131de12a05b021c57 100644 (file)
@@ -154,6 +154,10 @@ using namespace std;
 
 #include "common/config.h"
 
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGPull.h"
+
 #define DEBUGLVL  10    // debug level of output
 
 #define dout_subsys ceph_subsys_ms
@@ -441,6 +445,15 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot
   case MSG_OSD_PG_BACKFILL:
     m = new MOSDPGBackfill;
     break;
+  case MSG_OSD_PG_PUSH:
+    m = new MOSDPGPush;
+    break;
+  case MSG_OSD_PG_PULL:
+    m = new MOSDPGPull;
+    break;
+  case MSG_OSD_PG_PUSH_REPLY:
+    m = new MOSDPGPushReply;
+    break;
    // auth
   case CEPH_MSG_AUTH:
     m = new MAuth;
index 2e3d59b886d7c5b8b17777fee3641d8204c2632f..1d2cf7d041be4c32dcfb870c99a98fb8937686f8 100644 (file)
 #define MSG_OSD_BACKFILL_RESERVE 99
 #define MSG_OSD_RECOVERY_RESERVE 150
 
+#define MSG_OSD_PG_PUSH        105
+#define MSG_OSD_PG_PULL        106
+#define MSG_OSD_PG_PUSH_REPLY  107
+
 // *** MDS ***
 
 #define MSG_MDS_BEACON             100  // to monitor
index bab345324d0254e77a39207b7389a125881419dd..c15e93e2047520db16b1abed3efb015757c5dde5 100644 (file)
@@ -90,6 +90,9 @@
 #include "messages/MPGStatsAck.h"
 
 #include "messages/MWatchNotify.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGPull.h"
 
 #include "common/perf_counters.h"
 #include "common/Timer.h"
index 2f05bb9c24685bd8ebf9f60820646db395c73c54..7d1ed3108a0054511622533fd8d9bb0844353342 100644 (file)
@@ -2859,6 +2859,222 @@ ostream &ObjectRecoveryInfo::print(ostream &out) const
             << ")";
 }
 
+// -- PushReplyOp --
+void PushReplyOp::generate_test_instances(list<PushReplyOp*> &o)
+{
+  o.push_back(new PushReplyOp);
+  o.push_back(new PushReplyOp);
+  o.back()->soid = hobject_t(sobject_t("asdf", 2));
+  o.push_back(new PushReplyOp);
+  o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP));
+}
+
+void PushReplyOp::encode(bufferlist &bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ::encode(soid, bl);
+  ENCODE_FINISH(bl);
+}
+
+void PushReplyOp::decode(bufferlist::iterator &bl)
+{
+  DECODE_START(1, bl);
+  ::decode(soid, bl);
+  DECODE_FINISH(bl);
+}
+
+void PushReplyOp::dump(Formatter *f) const
+{
+  f->dump_stream("soid") << soid;
+}
+
+ostream &PushReplyOp::print(ostream &out) const
+{
+  return out
+    << "PushReplyOp(" << soid
+    << ")";
+}
+
+ostream& operator<<(ostream& out, const PushReplyOp &op)
+{
+  return op.print(out);
+}
+
+uint64_t PushReplyOp::cost(CephContext *cct) const
+{
+
+  return cct->_conf->osd_push_per_object_cost +
+    cct->_conf->osd_recovery_max_chunk;
+}
+
+// -- PullOp --
+void PullOp::generate_test_instances(list<PullOp*> &o)
+{
+  o.push_back(new PullOp);
+  o.push_back(new PullOp);
+  o.back()->soid = hobject_t(sobject_t("asdf", 2));
+  o.back()->recovery_info.version = eversion_t(3, 10);
+  o.push_back(new PullOp);
+  o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP));
+  o.back()->recovery_info.version = eversion_t(0, 0);
+}
+
+void PullOp::encode(bufferlist &bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ::encode(soid, bl);
+  ::encode(recovery_info, bl);
+  ::encode(recovery_progress, bl);
+  ENCODE_FINISH(bl);
+}
+
+void PullOp::decode(bufferlist::iterator &bl)
+{
+  DECODE_START(1, bl);
+  ::decode(soid, bl);
+  ::decode(recovery_info, bl);
+  ::decode(recovery_progress, bl);
+  DECODE_FINISH(bl);
+}
+
+void PullOp::dump(Formatter *f) const
+{
+  f->dump_stream("soid") << soid;
+  {
+    f->open_object_section("recovery_info");
+    recovery_info.dump(f);
+    f->close_section();
+  }
+  {
+    f->open_object_section("recovery_progress");
+    recovery_progress.dump(f);
+    f->close_section();
+  }
+}
+
+ostream &PullOp::print(ostream &out) const
+{
+  return out
+    << "PullOp(" << soid
+    << ", recovery_info: " << recovery_info
+    << ", recovery_progress: " << recovery_progress
+    << ")";
+}
+
+ostream& operator<<(ostream& out, const PullOp &op)
+{
+  return op.print(out);
+}
+
+uint64_t PullOp::cost(CephContext *cct) const
+{
+  return cct->_conf->osd_push_per_object_cost +
+    cct->_conf->osd_recovery_max_chunk;
+}
+
+// -- PushOp --
+void PushOp::generate_test_instances(list<PushOp*> &o)
+{
+  o.push_back(new PushOp);
+  o.push_back(new PushOp);
+  o.back()->soid = hobject_t(sobject_t("asdf", 2));
+  o.back()->version = eversion_t(3, 10);
+  o.push_back(new PushOp);
+  o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP));
+  o.back()->version = eversion_t(0, 0);
+}
+
+void PushOp::encode(bufferlist &bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ::encode(soid, bl);
+  ::encode(version, bl);
+  ::encode(data, bl);
+  ::encode(data_included, bl);
+  ::encode(omap_header, bl);
+  ::encode(omap_entries, bl);
+  ::encode(attrset, bl);
+  ::encode(recovery_info, bl);
+  ::encode(after_progress, bl);
+  ::encode(before_progress, bl);
+  ENCODE_FINISH(bl);
+}
+
+void PushOp::decode(bufferlist::iterator &bl)
+{
+  DECODE_START(1, bl);
+  ::decode(soid, bl);
+  ::decode(version, bl);
+  ::decode(data, bl);
+  ::decode(data_included, bl);
+  ::decode(omap_header, bl);
+  ::decode(omap_entries, bl);
+  ::decode(attrset, bl);
+  ::decode(recovery_info, bl);
+  ::decode(after_progress, bl);
+  ::decode(before_progress, bl);
+  DECODE_FINISH(bl);
+}
+
+void PushOp::dump(Formatter *f) const
+{
+  f->dump_stream("soid") << soid;
+  f->dump_stream("version") << version;
+  f->dump_int("data_len", data.length());
+  f->dump_stream("data_included") << data_included;
+  f->dump_int("omap_header_len", omap_header.length());
+  f->dump_int("omap_entries_len", omap_entries.size());
+  f->dump_int("attrset_len", attrset.size());
+  {
+    f->open_object_section("recovery_info");
+    recovery_info.dump(f);
+    f->close_section();
+  }
+  {
+    f->open_object_section("after_progress");
+    after_progress.dump(f);
+    f->close_section();
+  }
+  {
+    f->open_object_section("before_progress");
+    before_progress.dump(f);
+    f->close_section();
+  }
+}
+
+ostream &PushOp::print(ostream &out) const
+{
+  return out
+    << "PushOp(" << soid
+    << ", version: " << version
+    << ", data_included: " << data_included
+    << ", omap_header_size: " << omap_header.length()
+    << ", omap_entries_size: " << omap_entries.size()
+    << ", attrset_size: " << attrset.size()
+    << ", recovery_info: " << recovery_info
+    << ", after_progress: " << after_progress
+    << ", before_progress: " << before_progress
+    << ")";
+}
+
+ostream& operator<<(ostream& out, const PushOp &op)
+{
+  return op.print(out);
+}
+
+uint64_t PushOp::cost(CephContext *cct) const
+{
+  uint64_t cost = data_included.size();
+  for (map<string, bufferlist>::const_iterator i =
+        omap_entries.begin();
+       i != omap_entries.end();
+       ++i) {
+    cost += i->second.length();
+  }
+  cost += cct->_conf->osd_push_per_object_cost;
+  return cost;
+}
+
 // -- ScrubMap --
 
 void ScrubMap::merge_incr(const ScrubMap &l)
index 467124ea614254764e47cd35184e1e09116ae648..a039a3dd5801ee58826d74bcc6d28ee9e9aec138 100644 (file)
@@ -1989,14 +1989,34 @@ ostream& operator<<(ostream& out, const ObjectRecoveryProgress &prog);
 
 struct PushReplyOp {
   hobject_t soid;
+
+  static void generate_test_instances(list<PushReplyOp*>& o);
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &bl);
+  ostream &print(ostream &out) const;
+  void dump(Formatter *f) const;
+
+  uint64_t cost(CephContext *cct) const;
 };
+WRITE_CLASS_ENCODER(PushReplyOp)
+ostream& operator<<(ostream& out, const PushReplyOp &op);
 
 struct PullOp {
   hobject_t soid;
 
   ObjectRecoveryInfo recovery_info;
   ObjectRecoveryProgress recovery_progress;
+
+  static void generate_test_instances(list<PullOp*>& o);
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &bl);
+  ostream &print(ostream &out) const;
+  void dump(Formatter *f) const;
+
+  uint64_t cost(CephContext *cct) const;
 };
+WRITE_CLASS_ENCODER(PullOp)
+ostream& operator<<(ostream& out, const PullOp &op);
 
 struct PushOp {
   hobject_t soid;
@@ -2010,7 +2030,17 @@ struct PushOp {
   ObjectRecoveryInfo recovery_info;
   ObjectRecoveryProgress before_progress;
   ObjectRecoveryProgress after_progress;
+
+  static void generate_test_instances(list<PushOp*>& o);
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &bl);
+  ostream &print(ostream &out) const;
+  void dump(Formatter *f) const;
+
+  uint64_t cost(CephContext *cct) const;
 };
+WRITE_CLASS_ENCODER(PushOp)
+ostream& operator<<(ostream& out, const PushOp &op);
 
 
 /*