messages/MOSDOpReply.h\
messages/MOSDPGBackfill.h\
messages/MOSDPGCreate.h\
+ messages/MOSDPGPush.h\
+ messages/MOSDPGPull.h\
+ messages/MOSDPGPushReply.h\
messages/MOSDPGInfo.h\
messages/MOSDPGLog.h\
messages/MOSDPGMissing.h\
OPTION(osd_recovery_delay_start, OPT_FLOAT, 0)
OPTION(osd_recovery_max_active, OPT_INT, 5)
OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk
+OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object
OPTION(osd_recovery_forget_lost_objects, OPT_BOOL, false) // off for now
OPTION(osd_max_scrubs, OPT_INT, 1)
OPTION(osd_scrub_load_threshold, OPT_FLOAT, 0.5)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef MOSDPGPULL_H
+#define MOSDPGPULL_H
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGPull : public Message {
+ static const int HEAD_VERSION = 1;
+ static const int COMPAT_VERSION = 1;
+
+
+public:
+ pg_t pgid;
+ epoch_t map_epoch;
+ vector<PullOp> pulls;
+ uint64_t cost;
+
+ MOSDPGPull() :
+ Message(MSG_OSD_PG_PULL, HEAD_VERSION, COMPAT_VERSION),
+ cost(0)
+ {}
+
+ void compute_cost(CephContext *cct) {
+ cost = 0;
+ for (vector<PullOp>::iterator i = pulls.begin();
+ i != pulls.end();
+ ++i) {
+ cost += i->cost(cct);
+ }
+ }
+
+ int get_cost() const {
+ return cost;
+ }
+
+ virtual void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(pgid, p);
+ ::decode(map_epoch, p);
+ ::decode(pulls, p);
+ ::decode(cost, p);
+ }
+
+ virtual void encode_payload(uint64_t features) {
+ ::encode(pgid, payload);
+ ::encode(map_epoch, payload);
+ ::encode(pulls, payload);
+ ::encode(cost, payload);
+ }
+
+ const char *get_type_name() const { return "MOSDPGPull"; }
+
+ void print(ostream& out) const {
+ out << "MOSDPGPull(" << pgid
+ << " " << map_epoch
+ << " " << pulls;
+ out << ")";
+ }
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef MOSDPGPUSH_H
+#define MOSDPGPUSH_H
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGPush : public Message {
+ static const int HEAD_VERSION = 1;
+ static const int COMPAT_VERSION = 1;
+
+
+public:
+ pg_t pgid;
+ epoch_t map_epoch;
+ vector<PushOp> pushes;
+ uint64_t cost;
+
+ void compute_cost(CephContext *cct) {
+ cost = 0;
+ for (vector<PushOp>::iterator i = pushes.begin();
+ i != pushes.end();
+ ++i) {
+ cost += i->cost(cct);
+ }
+ }
+
+ int get_cost() const {
+ return cost;
+ }
+
+ MOSDPGPush() :
+ Message(MSG_OSD_PG_PUSH, HEAD_VERSION, COMPAT_VERSION),
+ cost(0)
+ {}
+
+ virtual void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(pgid, p);
+ ::decode(map_epoch, p);
+ ::decode(pushes, p);
+ ::decode(cost, p);
+ }
+
+ virtual void encode_payload(uint64_t features) {
+ ::encode(pgid, payload);
+ ::encode(map_epoch, payload);
+ ::encode(pushes, payload);
+ ::encode(cost, payload);
+ }
+
+ const char *get_type_name() const { return "MOSDPGPush"; }
+
+ void print(ostream& out) const {
+ out << "MOSDPGPush(" << pgid
+ << " " << map_epoch
+ << " " << pushes;
+ out << ")";
+ }
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef MOSDPGPUSHREPLY_H
+#define MOSDPGPUSHREPLY_H
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGPushReply : public Message {
+ static const int HEAD_VERSION = 1;
+ static const int COMPAT_VERSION = 1;
+
+public:
+ pg_t pgid;
+ epoch_t map_epoch;
+ vector<PushReplyOp> replies;
+ uint64_t cost;
+
+ MOSDPGPushReply() :
+ Message(MSG_OSD_PG_PUSH_REPLY, HEAD_VERSION, COMPAT_VERSION),
+ cost(0)
+ {}
+
+ void compute_cost(CephContext *cct) {
+ cost = 0;
+ for (vector<PushReplyOp>::iterator i = replies.begin();
+ i != replies.end();
+ ++i) {
+ cost += i->cost(cct);
+ }
+ }
+
+ int get_cost() const {
+ return cost;
+ }
+
+ virtual void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(pgid, p);
+ ::decode(map_epoch, p);
+ ::decode(replies, p);
+ ::decode(cost, p);
+ }
+
+ virtual void encode_payload(uint64_t features) {
+ ::encode(pgid, payload);
+ ::encode(map_epoch, payload);
+ ::encode(replies, payload);
+ ::encode(cost, payload);
+ }
+
+ void print(ostream& out) const {
+ out << "MOSDPGPushReply(" << pgid
+ << " " << map_epoch
+ << " " << replies;
+ out << ")";
+ }
+
+ const char *get_type_name() const { return "MOSDPGPushReply"; }
+};
+
+#endif
#include "common/config.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGPull.h"
+
#define DEBUGLVL 10 // debug level of output
#define dout_subsys ceph_subsys_ms
case MSG_OSD_PG_BACKFILL:
m = new MOSDPGBackfill;
break;
+ case MSG_OSD_PG_PUSH:
+ m = new MOSDPGPush;
+ break;
+ case MSG_OSD_PG_PULL:
+ m = new MOSDPGPull;
+ break;
+ case MSG_OSD_PG_PUSH_REPLY:
+ m = new MOSDPGPushReply;
+ break;
// auth
case CEPH_MSG_AUTH:
m = new MAuth;
#define MSG_OSD_BACKFILL_RESERVE 99
#define MSG_OSD_RECOVERY_RESERVE 150
+#define MSG_OSD_PG_PUSH 105
+#define MSG_OSD_PG_PULL 106
+#define MSG_OSD_PG_PUSH_REPLY 107
+
// *** MDS ***
#define MSG_MDS_BEACON 100 // to monitor
#include "messages/MPGStatsAck.h"
#include "messages/MWatchNotify.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGPull.h"
#include "common/perf_counters.h"
#include "common/Timer.h"
<< ")";
}
+// -- PushReplyOp --
+void PushReplyOp::generate_test_instances(list<PushReplyOp*> &o)
+{
+ o.push_back(new PushReplyOp);
+ o.push_back(new PushReplyOp);
+ o.back()->soid = hobject_t(sobject_t("asdf", 2));
+ o.push_back(new PushReplyOp);
+ o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP));
+}
+
+void PushReplyOp::encode(bufferlist &bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(soid, bl);
+ ENCODE_FINISH(bl);
+}
+
+void PushReplyOp::decode(bufferlist::iterator &bl)
+{
+ DECODE_START(1, bl);
+ ::decode(soid, bl);
+ DECODE_FINISH(bl);
+}
+
+void PushReplyOp::dump(Formatter *f) const
+{
+ f->dump_stream("soid") << soid;
+}
+
+ostream &PushReplyOp::print(ostream &out) const
+{
+ return out
+ << "PushReplyOp(" << soid
+ << ")";
+}
+
+ostream& operator<<(ostream& out, const PushReplyOp &op)
+{
+ return op.print(out);
+}
+
+uint64_t PushReplyOp::cost(CephContext *cct) const
+{
+
+ return cct->_conf->osd_push_per_object_cost +
+ cct->_conf->osd_recovery_max_chunk;
+}
+
+// -- PullOp --
+void PullOp::generate_test_instances(list<PullOp*> &o)
+{
+ o.push_back(new PullOp);
+ o.push_back(new PullOp);
+ o.back()->soid = hobject_t(sobject_t("asdf", 2));
+ o.back()->recovery_info.version = eversion_t(3, 10);
+ o.push_back(new PullOp);
+ o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP));
+ o.back()->recovery_info.version = eversion_t(0, 0);
+}
+
+void PullOp::encode(bufferlist &bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(soid, bl);
+ ::encode(recovery_info, bl);
+ ::encode(recovery_progress, bl);
+ ENCODE_FINISH(bl);
+}
+
+void PullOp::decode(bufferlist::iterator &bl)
+{
+ DECODE_START(1, bl);
+ ::decode(soid, bl);
+ ::decode(recovery_info, bl);
+ ::decode(recovery_progress, bl);
+ DECODE_FINISH(bl);
+}
+
+void PullOp::dump(Formatter *f) const
+{
+ f->dump_stream("soid") << soid;
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("recovery_progress");
+ recovery_progress.dump(f);
+ f->close_section();
+ }
+}
+
+ostream &PullOp::print(ostream &out) const
+{
+ return out
+ << "PullOp(" << soid
+ << ", recovery_info: " << recovery_info
+ << ", recovery_progress: " << recovery_progress
+ << ")";
+}
+
+ostream& operator<<(ostream& out, const PullOp &op)
+{
+ return op.print(out);
+}
+
+uint64_t PullOp::cost(CephContext *cct) const
+{
+ return cct->_conf->osd_push_per_object_cost +
+ cct->_conf->osd_recovery_max_chunk;
+}
+
+// -- PushOp --
+void PushOp::generate_test_instances(list<PushOp*> &o)
+{
+ o.push_back(new PushOp);
+ o.push_back(new PushOp);
+ o.back()->soid = hobject_t(sobject_t("asdf", 2));
+ o.back()->version = eversion_t(3, 10);
+ o.push_back(new PushOp);
+ o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP));
+ o.back()->version = eversion_t(0, 0);
+}
+
+void PushOp::encode(bufferlist &bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(soid, bl);
+ ::encode(version, bl);
+ ::encode(data, bl);
+ ::encode(data_included, bl);
+ ::encode(omap_header, bl);
+ ::encode(omap_entries, bl);
+ ::encode(attrset, bl);
+ ::encode(recovery_info, bl);
+ ::encode(after_progress, bl);
+ ::encode(before_progress, bl);
+ ENCODE_FINISH(bl);
+}
+
+void PushOp::decode(bufferlist::iterator &bl)
+{
+ DECODE_START(1, bl);
+ ::decode(soid, bl);
+ ::decode(version, bl);
+ ::decode(data, bl);
+ ::decode(data_included, bl);
+ ::decode(omap_header, bl);
+ ::decode(omap_entries, bl);
+ ::decode(attrset, bl);
+ ::decode(recovery_info, bl);
+ ::decode(after_progress, bl);
+ ::decode(before_progress, bl);
+ DECODE_FINISH(bl);
+}
+
+void PushOp::dump(Formatter *f) const
+{
+ f->dump_stream("soid") << soid;
+ f->dump_stream("version") << version;
+ f->dump_int("data_len", data.length());
+ f->dump_stream("data_included") << data_included;
+ f->dump_int("omap_header_len", omap_header.length());
+ f->dump_int("omap_entries_len", omap_entries.size());
+ f->dump_int("attrset_len", attrset.size());
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("after_progress");
+ after_progress.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("before_progress");
+ before_progress.dump(f);
+ f->close_section();
+ }
+}
+
+ostream &PushOp::print(ostream &out) const
+{
+ return out
+ << "PushOp(" << soid
+ << ", version: " << version
+ << ", data_included: " << data_included
+ << ", omap_header_size: " << omap_header.length()
+ << ", omap_entries_size: " << omap_entries.size()
+ << ", attrset_size: " << attrset.size()
+ << ", recovery_info: " << recovery_info
+ << ", after_progress: " << after_progress
+ << ", before_progress: " << before_progress
+ << ")";
+}
+
+ostream& operator<<(ostream& out, const PushOp &op)
+{
+ return op.print(out);
+}
+
+uint64_t PushOp::cost(CephContext *cct) const
+{
+ uint64_t cost = data_included.size();
+ for (map<string, bufferlist>::const_iterator i =
+ omap_entries.begin();
+ i != omap_entries.end();
+ ++i) {
+ cost += i->second.length();
+ }
+ cost += cct->_conf->osd_push_per_object_cost;
+ return cost;
+}
+
// -- ScrubMap --
void ScrubMap::merge_incr(const ScrubMap &l)
struct PushReplyOp {
hobject_t soid;
+
+ static void generate_test_instances(list<PushReplyOp*>& o);
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator &bl);
+ ostream &print(ostream &out) const;
+ void dump(Formatter *f) const;
+
+ uint64_t cost(CephContext *cct) const;
};
+WRITE_CLASS_ENCODER(PushReplyOp)
+ostream& operator<<(ostream& out, const PushReplyOp &op);
struct PullOp {
hobject_t soid;
ObjectRecoveryInfo recovery_info;
ObjectRecoveryProgress recovery_progress;
+
+ static void generate_test_instances(list<PullOp*>& o);
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator &bl);
+ ostream &print(ostream &out) const;
+ void dump(Formatter *f) const;
+
+ uint64_t cost(CephContext *cct) const;
};
+WRITE_CLASS_ENCODER(PullOp)
+ostream& operator<<(ostream& out, const PullOp &op);
struct PushOp {
hobject_t soid;
ObjectRecoveryInfo recovery_info;
ObjectRecoveryProgress before_progress;
ObjectRecoveryProgress after_progress;
+
+ static void generate_test_instances(list<PushOp*>& o);
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator &bl);
+ ostream &print(ostream &out) const;
+ void dump(Formatter *f) const;
+
+ uint64_t cost(CephContext *cct) const;
};
+WRITE_CLASS_ENCODER(PushOp)
+ostream& operator<<(ostream& out, const PushOp &op);
/*