From: Samuel Just Date: Mon, 17 Jun 2013 22:41:36 +0000 (-0700) Subject: messages/,osd_types: add messages for Push, PushReply, Pull X-Git-Tag: v0.67-rc1~138^2~1^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=264dbf3f9e590b8404e8a89edf4e7fd1853a8a48;p=ceph.git messages/,osd_types: add messages for Push, PushReply, Pull Signed-off-by: Samuel Just --- diff --git a/src/Makefile.am b/src/Makefile.am index c1a7e8020ef3..241a83393095 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1949,6 +1949,9 @@ noinst_HEADERS = \ messages/MOSDOpReply.h\ messages/MOSDPGBackfill.h\ messages/MOSDPGCreate.h\ + messages/MOSDPGPush.h\ + messages/MOSDPGPull.h\ + messages/MOSDPGPushReply.h\ messages/MOSDPGInfo.h\ messages/MOSDPGLog.h\ messages/MOSDPGMissing.h\ diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 775d54cac487..15e65e3bf98a 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -425,6 +425,7 @@ OPTION(osd_auto_mark_unfound_lost, OPT_BOOL, false) OPTION(osd_recovery_delay_start, OPT_FLOAT, 0) OPTION(osd_recovery_max_active, OPT_INT, 5) OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk +OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object OPTION(osd_recovery_forget_lost_objects, OPT_BOOL, false) // off for now OPTION(osd_max_scrubs, OPT_INT, 1) OPTION(osd_scrub_load_threshold, OPT_FLOAT, 0.5) diff --git a/src/messages/MOSDPGPull.h b/src/messages/MOSDPGPull.h new file mode 100644 index 000000000000..870db7f1a19c --- /dev/null +++ b/src/messages/MOSDPGPull.h @@ -0,0 +1,75 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef MOSDPGPULL_H +#define MOSDPGPULL_H + +#include "msg/Message.h" +#include "osd/osd_types.h" + +class MOSDPGPull : public Message { + static const int HEAD_VERSION = 1; + static const int COMPAT_VERSION = 1; + + +public: + pg_t pgid; + epoch_t map_epoch; + vector pulls; + uint64_t cost; + + MOSDPGPull() : + Message(MSG_OSD_PG_PULL, HEAD_VERSION, COMPAT_VERSION), + cost(0) + {} + + void compute_cost(CephContext *cct) { + cost = 0; + for (vector::iterator i = pulls.begin(); + i != pulls.end(); + ++i) { + cost += i->cost(cct); + } + } + + int get_cost() const { + return cost; + } + + virtual void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(pgid, p); + ::decode(map_epoch, p); + ::decode(pulls, p); + ::decode(cost, p); + } + + virtual void encode_payload(uint64_t features) { + ::encode(pgid, payload); + ::encode(map_epoch, payload); + ::encode(pulls, payload); + ::encode(cost, payload); + } + + const char *get_type_name() const { return "MOSDPGPull"; } + + void print(ostream& out) const { + out << "MOSDPGPull(" << pgid + << " " << map_epoch + << " " << pulls; + out << ")"; + } +}; + +#endif diff --git a/src/messages/MOSDPGPush.h b/src/messages/MOSDPGPush.h new file mode 100644 index 000000000000..acc0d2aee8e0 --- /dev/null +++ b/src/messages/MOSDPGPush.h @@ -0,0 +1,75 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef MOSDPGPUSH_H +#define MOSDPGPUSH_H + +#include "msg/Message.h" +#include "osd/osd_types.h" + +class MOSDPGPush : public Message { + static const int HEAD_VERSION = 1; + static const int COMPAT_VERSION = 1; + + +public: + pg_t pgid; + epoch_t map_epoch; + vector pushes; + uint64_t cost; + + void compute_cost(CephContext *cct) { + cost = 0; + for (vector::iterator i = pushes.begin(); + i != pushes.end(); + ++i) { + cost += i->cost(cct); + } + } + + int get_cost() const { + return cost; + } + + MOSDPGPush() : + Message(MSG_OSD_PG_PUSH, HEAD_VERSION, COMPAT_VERSION), + cost(0) + {} + + virtual void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(pgid, p); + ::decode(map_epoch, p); + ::decode(pushes, p); + ::decode(cost, p); + } + + virtual void encode_payload(uint64_t features) { + ::encode(pgid, payload); + ::encode(map_epoch, payload); + ::encode(pushes, payload); + ::encode(cost, payload); + } + + const char *get_type_name() const { return "MOSDPGPush"; } + + void print(ostream& out) const { + out << "MOSDPGPush(" << pgid + << " " << map_epoch + << " " << pushes; + out << ")"; + } +}; + +#endif diff --git a/src/messages/MOSDPGPushReply.h b/src/messages/MOSDPGPushReply.h new file mode 100644 index 000000000000..192dc2c1f81c --- /dev/null +++ b/src/messages/MOSDPGPushReply.h @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef MOSDPGPUSHREPLY_H +#define MOSDPGPUSHREPLY_H + +#include "msg/Message.h" +#include "osd/osd_types.h" + +class MOSDPGPushReply : public Message { + static const int HEAD_VERSION = 1; + static const int COMPAT_VERSION = 1; + +public: + pg_t pgid; + epoch_t map_epoch; + vector replies; + uint64_t cost; + + MOSDPGPushReply() : + Message(MSG_OSD_PG_PUSH_REPLY, HEAD_VERSION, COMPAT_VERSION), + cost(0) + {} + + void compute_cost(CephContext *cct) { + cost = 0; + for (vector::iterator i = replies.begin(); + i != replies.end(); + ++i) { + cost += i->cost(cct); + } + } + + int get_cost() const { + return cost; + } + + virtual void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(pgid, p); + ::decode(map_epoch, p); + ::decode(replies, p); + ::decode(cost, p); + } + + virtual void encode_payload(uint64_t features) { + ::encode(pgid, payload); + ::encode(map_epoch, payload); + ::encode(replies, payload); + ::encode(cost, payload); + } + + void print(ostream& out) const { + out << "MOSDPGPushReply(" << pgid + << " " << map_epoch + << " " << replies; + out << ")"; + } + + const char *get_type_name() const { return "MOSDPGPushReply"; } +}; + +#endif diff --git a/src/msg/Message.cc b/src/msg/Message.cc index a6889d39fdf3..f65bc2be11ad 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -154,6 +154,10 @@ using namespace std; #include "common/config.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPushReply.h" +#include "messages/MOSDPGPull.h" + #define DEBUGLVL 10 // debug level of output #define dout_subsys ceph_subsys_ms @@ -441,6 +445,15 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot case MSG_OSD_PG_BACKFILL: m = new MOSDPGBackfill; break; + case MSG_OSD_PG_PUSH: + m = new MOSDPGPush; + break; + case MSG_OSD_PG_PULL: + m = new MOSDPGPull; + break; + case MSG_OSD_PG_PUSH_REPLY: + m = new MOSDPGPushReply; + break; // auth case CEPH_MSG_AUTH: m = new MAuth; diff --git a/src/msg/Message.h b/src/msg/Message.h index 2e3d59b886d7..1d2cf7d041be 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -102,6 +102,10 @@ #define MSG_OSD_BACKFILL_RESERVE 99 #define MSG_OSD_RECOVERY_RESERVE 150 +#define MSG_OSD_PG_PUSH 105 +#define MSG_OSD_PG_PULL 106 +#define MSG_OSD_PG_PUSH_REPLY 107 + // *** MDS *** #define MSG_MDS_BEACON 100 // to monitor diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index bab345324d02..c15e93e20475 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -90,6 +90,9 @@ #include "messages/MPGStatsAck.h" #include "messages/MWatchNotify.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPushReply.h" +#include "messages/MOSDPGPull.h" #include "common/perf_counters.h" #include "common/Timer.h" diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 2f05bb9c2468..7d1ed3108a00 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -2859,6 +2859,222 @@ ostream &ObjectRecoveryInfo::print(ostream &out) const << ")"; } +// -- PushReplyOp -- +void PushReplyOp::generate_test_instances(list &o) +{ + o.push_back(new PushReplyOp); + o.push_back(new PushReplyOp); + o.back()->soid = hobject_t(sobject_t("asdf", 2)); + o.push_back(new PushReplyOp); + o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP)); +} + +void PushReplyOp::encode(bufferlist &bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(soid, bl); + ENCODE_FINISH(bl); +} + +void PushReplyOp::decode(bufferlist::iterator &bl) +{ + DECODE_START(1, bl); + ::decode(soid, bl); + DECODE_FINISH(bl); +} + +void PushReplyOp::dump(Formatter *f) const +{ + f->dump_stream("soid") << soid; +} + +ostream &PushReplyOp::print(ostream &out) const +{ + return out + << "PushReplyOp(" << soid + << ")"; +} + +ostream& operator<<(ostream& out, const PushReplyOp &op) +{ + return op.print(out); +} + +uint64_t PushReplyOp::cost(CephContext *cct) const +{ + + return cct->_conf->osd_push_per_object_cost + + cct->_conf->osd_recovery_max_chunk; +} + +// -- PullOp -- +void PullOp::generate_test_instances(list &o) +{ + o.push_back(new PullOp); + o.push_back(new PullOp); + o.back()->soid = hobject_t(sobject_t("asdf", 2)); + o.back()->recovery_info.version = eversion_t(3, 10); + o.push_back(new PullOp); + o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP)); + o.back()->recovery_info.version = eversion_t(0, 0); +} + +void PullOp::encode(bufferlist &bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(soid, bl); + ::encode(recovery_info, bl); + ::encode(recovery_progress, bl); + ENCODE_FINISH(bl); +} + +void PullOp::decode(bufferlist::iterator &bl) +{ + DECODE_START(1, bl); + ::decode(soid, bl); + ::decode(recovery_info, bl); + ::decode(recovery_progress, bl); + DECODE_FINISH(bl); +} + +void PullOp::dump(Formatter *f) const +{ + f->dump_stream("soid") << soid; + { + f->open_object_section("recovery_info"); + recovery_info.dump(f); + f->close_section(); + } + { + f->open_object_section("recovery_progress"); + recovery_progress.dump(f); + f->close_section(); + } +} + +ostream &PullOp::print(ostream &out) const +{ + return out + << "PullOp(" << soid + << ", recovery_info: " << recovery_info + << ", recovery_progress: " << recovery_progress + << ")"; +} + +ostream& operator<<(ostream& out, const PullOp &op) +{ + return op.print(out); +} + +uint64_t PullOp::cost(CephContext *cct) const +{ + return cct->_conf->osd_push_per_object_cost + + cct->_conf->osd_recovery_max_chunk; +} + +// -- PushOp -- +void PushOp::generate_test_instances(list &o) +{ + o.push_back(new PushOp); + o.push_back(new PushOp); + o.back()->soid = hobject_t(sobject_t("asdf", 2)); + o.back()->version = eversion_t(3, 10); + o.push_back(new PushOp); + o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP)); + o.back()->version = eversion_t(0, 0); +} + +void PushOp::encode(bufferlist &bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(soid, bl); + ::encode(version, bl); + ::encode(data, bl); + ::encode(data_included, bl); + ::encode(omap_header, bl); + ::encode(omap_entries, bl); + ::encode(attrset, bl); + ::encode(recovery_info, bl); + ::encode(after_progress, bl); + ::encode(before_progress, bl); + ENCODE_FINISH(bl); +} + +void PushOp::decode(bufferlist::iterator &bl) +{ + DECODE_START(1, bl); + ::decode(soid, bl); + ::decode(version, bl); + ::decode(data, bl); + ::decode(data_included, bl); + ::decode(omap_header, bl); + ::decode(omap_entries, bl); + ::decode(attrset, bl); + ::decode(recovery_info, bl); + ::decode(after_progress, bl); + ::decode(before_progress, bl); + DECODE_FINISH(bl); +} + +void PushOp::dump(Formatter *f) const +{ + f->dump_stream("soid") << soid; + f->dump_stream("version") << version; + f->dump_int("data_len", data.length()); + f->dump_stream("data_included") << data_included; + f->dump_int("omap_header_len", omap_header.length()); + f->dump_int("omap_entries_len", omap_entries.size()); + f->dump_int("attrset_len", attrset.size()); + { + f->open_object_section("recovery_info"); + recovery_info.dump(f); + f->close_section(); + } + { + f->open_object_section("after_progress"); + after_progress.dump(f); + f->close_section(); + } + { + f->open_object_section("before_progress"); + before_progress.dump(f); + f->close_section(); + } +} + +ostream &PushOp::print(ostream &out) const +{ + return out + << "PushOp(" << soid + << ", version: " << version + << ", data_included: " << data_included + << ", omap_header_size: " << omap_header.length() + << ", omap_entries_size: " << omap_entries.size() + << ", attrset_size: " << attrset.size() + << ", recovery_info: " << recovery_info + << ", after_progress: " << after_progress + << ", before_progress: " << before_progress + << ")"; +} + +ostream& operator<<(ostream& out, const PushOp &op) +{ + return op.print(out); +} + +uint64_t PushOp::cost(CephContext *cct) const +{ + uint64_t cost = data_included.size(); + for (map::const_iterator i = + omap_entries.begin(); + i != omap_entries.end(); + ++i) { + cost += i->second.length(); + } + cost += cct->_conf->osd_push_per_object_cost; + return cost; +} + // -- ScrubMap -- void ScrubMap::merge_incr(const ScrubMap &l) diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 467124ea6142..a039a3dd5801 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1989,14 +1989,34 @@ ostream& operator<<(ostream& out, const ObjectRecoveryProgress &prog); struct PushReplyOp { hobject_t soid; + + static void generate_test_instances(list& o); + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &bl); + ostream &print(ostream &out) const; + void dump(Formatter *f) const; + + uint64_t cost(CephContext *cct) const; }; +WRITE_CLASS_ENCODER(PushReplyOp) +ostream& operator<<(ostream& out, const PushReplyOp &op); struct PullOp { hobject_t soid; ObjectRecoveryInfo recovery_info; ObjectRecoveryProgress recovery_progress; + + static void generate_test_instances(list& o); + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &bl); + ostream &print(ostream &out) const; + void dump(Formatter *f) const; + + uint64_t cost(CephContext *cct) const; }; +WRITE_CLASS_ENCODER(PullOp) +ostream& operator<<(ostream& out, const PullOp &op); struct PushOp { hobject_t soid; @@ -2010,7 +2030,17 @@ struct PushOp { ObjectRecoveryInfo recovery_info; ObjectRecoveryProgress before_progress; ObjectRecoveryProgress after_progress; + + static void generate_test_instances(list& o); + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &bl); + ostream &print(ostream &out) const; + void dump(Formatter *f) const; + + uint64_t cost(CephContext *cct) const; }; +WRITE_CLASS_ENCODER(PushOp) +ostream& operator<<(ostream& out, const PushOp &op); /*