common/secret.h\
common/strtol.h\
common/static_assert.h\
+ common/AsyncReserver.h\
crush/CrushCompiler.h\
crush/CrushTester.h\
crush/CrushWrapper.h\
messages/MOSDPGQuery.h\
messages/MOSDPGRemove.h\
messages/MOSDPGScan.h\
+ messages/MBackfillReserve.h\
messages/MOSDPGTemp.h\
messages/MOSDPGTrim.h\
messages/MOSDPing.h\
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef ASYNC_RESERVER_H
+#define ASYNC_RESERVER_H
+
+#include <map>
+#include <utility>
+#include <list>
+
+#include "common/Mutex.h"
+#include "common/Finisher.h"
+
+/**
+ * Manages a configurable number of asyncronous reservations.
+ */
+template <typename T>
+class AsyncReserver {
+ Finisher *f;
+ const unsigned max_allowed;
+ Mutex lock;
+
+ list<pair<T, Context*> > queue;
+ map<T, typename list<pair<T, Context*> >::iterator > queue_pointers;
+ set<T> in_progress;
+
+ void do_queues() {
+ while (in_progress.size() < max_allowed &&
+ queue.size()) {
+ pair<T, Context*> p = queue.front();
+ queue_pointers.erase(p.first);
+ queue.pop_front();
+ f->queue(p.second);
+ in_progress.insert(p.first);
+ }
+ }
+public:
+ AsyncReserver(
+ Finisher *f,
+ unsigned max_allowed)
+ : f(f), max_allowed(max_allowed), lock("AsyncReserver::lock") {}
+
+ /**
+ * Requests a reservation
+ *
+ * Note, on_reserved may be called following cancel_reservation. Thus,
+ * the callback must be safe in that case. Callback will be called
+ * with no locks held. cancel_reservation must be called to release the
+ * reservation slot.
+ */
+ void request_reservation(
+ T item, ///< [in] reservation key
+ Context *on_reserved ///< [in] callback to be called on reservation
+ ) {
+ Mutex::Locker l(lock);
+ assert(!queue_pointers.count(item) &&
+ !in_progress.count(item));
+ queue.push_back(make_pair(item, on_reserved));
+ queue_pointers.insert(make_pair(item, --queue.end()));
+ do_queues();
+ }
+
+ /**
+ * Cancels reservation
+ *
+ * Frees the reservation under key for use.
+ * Note, after cancel_reservation, the reservation_callback may or
+ * may not still be called.
+ */
+ void cancel_reservation(
+ T item ///< [in] key for reservation to cancel
+ ) {
+ Mutex::Locker l(lock);
+ if (queue_pointers.count(item)) {
+ queue.erase(queue_pointers[item]);
+ queue_pointers.erase(item);
+ } else {
+ in_progress.erase(item);
+ }
+ do_queues();
+ }
+};
+
+#endif
// If true, TMAPPUT sets uses_tmap DEBUGGING ONLY
OPTION(osd_tmapput_sets_uses_tmap, OPT_BOOL, false)
+// Maximum number of backfills to or from a single osd
+OPTION(osd_max_backfills, OPT_U64, 5)
+
OPTION(osd_uuid, OPT_UUID, uuid_d())
OPTION(osd_data, OPT_STR, "/var/lib/ceph/osd/$cluster-$id")
OPTION(osd_journal, OPT_STR, "/var/lib/ceph/osd/$cluster-$id/journal")
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MBACKFILL_H
+#define CEPH_MBACKFILL_H
+
+#include "msg/Message.h"
+
+class MBackfillReserve : public Message {
+ static const int HEAD_VERSION = 1;
+ static const int COMPAT_VERSION = 1;
+public:
+ pg_t pgid;
+ epoch_t query_epoch;
+ enum {
+ REQUEST = 0,
+ GRANT = 1
+ };
+ int type;
+ MBackfillReserve(int type,
+ pg_t pgid,
+ epoch_t query_epoch)
+ : Message(MSG_OSD_BACKFILL_RESERVE, HEAD_VERSION, COMPAT_VERSION),
+ pgid(pgid), query_epoch(query_epoch),
+ type(type) {}
+
+ MBackfillReserve() :
+ Message(MSG_OSD_BACKFILL_RESERVE, HEAD_VERSION, COMPAT_VERSION) {}
+
+ const char *get_type_name() const {
+ return "MBackfillReserve";
+ }
+
+ void print(ostream& out) const {
+ out << "MBackfillReserve ";
+ switch (type) {
+ case REQUEST:
+ out << "REQUEST ";
+ break;
+ case GRANT:
+ out << "GRANT ";
+ break;
+ }
+ out << " pgid: " << pgid << ", query_epoch: " << query_epoch;
+ return;
+ }
+
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(pgid, p);
+ ::decode(query_epoch, p);
+ ::decode(type, p);
+ }
+
+ void encode_payload(uint64_t features) {
+ ::encode(pgid, payload);
+ ::encode(query_epoch, payload);
+ ::encode(type, payload);
+ }
+};
+
+#endif
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
+#include "messages/MBackfillReserve.h"
#include "messages/MRoute.h"
#include "messages/MForward.h"
case MSG_COMMAND_REPLY:
m = new MCommandReply;
break;
+ case MSG_OSD_BACKFILL_RESERVE:
+ m = new MBackfillReserve;
+ break;
case MSG_ROUTE:
m = new MRoute;
#define MSG_COMMAND 97
#define MSG_COMMAND_REPLY 98
+#define MSG_OSD_BACKFILL_RESERVE 99
+
// *** MDS ***
#define MSG_MDS_BEACON 100 // to monitor
#include "messages/MOSDPGScan.h"
#include "messages/MOSDPGBackfill.h"
#include "messages/MOSDPGMissing.h"
+#include "messages/MBackfillReserve.h"
#include "messages/MOSDAlive.h"
watch(NULL),
last_tid(0),
tid_lock("OSDService::tid_lock"),
+ reserver_finisher(g_ceph_context),
+ local_reserver(&reserver_finisher, g_conf->osd_max_backfills),
+ remote_reserver(&reserver_finisher, g_conf->osd_max_backfills),
pg_temp_lock("OSDService::pg_temp_lock"),
map_cache_lock("OSDService::map_lock"),
map_cache(g_conf->osd_map_cache_size),
void OSDService::shutdown()
{
+ reserver_finisher.stop();
watch_lock.Lock();
watch_timer.shutdown();
watch_lock.Unlock();
void OSDService::init()
{
+ reserver_finisher.start();
watch_timer.init();
watch = new Watch();
}
handle_pg_backfill(op);
break;
+ case MSG_OSD_BACKFILL_RESERVE:
+ handle_pg_backfill_reserve(op);
+ break;
+
// client ops
case CEPH_MSG_OSD_OP:
handle_op(op);
pg->unlock();
}
+void OSD::handle_pg_backfill_reserve(OpRequestRef op)
+{
+ MBackfillReserve *m = static_cast<MBackfillReserve*>(op->request);
+ assert(m->get_header().type == MSG_OSD_BACKFILL_RESERVE);
+
+ if (!require_osd_peer(op))
+ return;
+ if (!require_same_or_newer_map(op, m->query_epoch))
+ return;
+
+ PG *pg = 0;
+ if (!_have_pg(m->pgid))
+ return;
+
+ pg = _lookup_lock_pg(m->pgid);
+ assert(pg);
+
+ if (m->type == MBackfillReserve::REQUEST) {
+ pg->queue_peering_event(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ m->query_epoch,
+ m->query_epoch,
+ PG::RequestBackfill())));
+ } else if (m->type == MBackfillReserve::GRANT) {
+ pg->queue_peering_event(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ m->query_epoch,
+ m->query_epoch,
+ PG::RemoteBackfillReserved())));
+ } else {
+ assert(0);
+ }
+ pg->unlock();
+}
/** PGQuery
* from primary to replica | stray
#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "common/LogClient.h"
+#include "common/AsyncReserver.h"
#include "os/ObjectStore.h"
#include "OSDCap.h"
return t;
}
+ // -- backfill_reservation --
+ Finisher reserver_finisher;
+ AsyncReserver<pg_t> local_reserver;
+ AsyncReserver<pg_t> remote_reserver;
+
// -- pg_temp --
Mutex pg_temp_lock;
map<pg_t, vector<int> > pg_temp_wanted;
void handle_pg_scan(OpRequestRef op);
void handle_pg_backfill(OpRequestRef op);
+ void handle_pg_backfill_reserve(OpRequestRef op);
void handle_pg_remove(OpRequestRef op);
void _remove_pg(PG *pg);
#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPGScan.h"
#include "messages/MOSDPGBackfill.h"
+#include "messages/MBackfillReserve.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDSubOpReply.h"
last_peering_reset(0),
heartbeat_peer_lock("PG::heartbeat_peer_lock"),
backfill_target(-1),
+ backfill_reserved(0),
+ backfill_reserving(0),
pg_stats_lock("PG::pg_stats_lock"),
pg_stats_valid(false),
osr(osd->osr_registry.lookup_or_create(p, (stringify(p)))),
pg->clear_probe_targets();
}
+/*------Backfilling-------*/
+PG::RecoveryState::Backfilling::Backfilling(my_context ctx)
+ : my_base(ctx)
+{
+ state_name = "Started/Primary/Active/Backfilling";
+ context< RecoveryMachine >().log_enter(state_name);
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->backfill_reserved = true;
+ pg->osd->queue_for_recovery(pg);
+}
+
+void PG::RecoveryState::Backfilling::exit()
+{
+ context< RecoveryMachine >().log_exit(state_name, enter_time);
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->backfill_reserved = false;
+ pg->backfill_reserving = false;
+ pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+}
+
+/*--WaitRemoteBackfillReserved--*/
+
+PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_context ctx)
+ : my_base(ctx)
+{
+ state_name = "Started/Primary/Active/WaitRemoteBackfillReserved";
+ context< RecoveryMachine >().log_enter(state_name);
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->osd->cluster_messenger->send_message(
+ new MBackfillReserve(
+ MBackfillReserve::REQUEST,
+ pg->info.pgid,
+ pg->get_osdmap()->get_epoch()),
+ pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
+}
+
+void PG::RecoveryState::WaitRemoteBackfillReserved::exit()
+{
+ context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+
+/*--WaitLocalBackfillReserved--*/
+template <class EVT>
+struct C_QueuePeeringEvt : Context {
+ boost::intrusive_ptr<PG> pg;
+ epoch_t epoch;
+ EVT evt;
+ C_QueuePeeringEvt(PG *pg, epoch_t epoch, EVT evt) :
+ pg(pg), epoch(epoch), evt(evt) {}
+ void finish(int r) {
+ pg->lock();
+ pg->queue_peering_event(PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ epoch,
+ epoch,
+ evt)));
+ pg->unlock();
+ }
+};
+
+PG::RecoveryState::WaitLocalBackfillReserved::WaitLocalBackfillReserved(my_context ctx)
+ : my_base(ctx)
+{
+ state_name = "Started/Primary/Active/WaitLocalBackfillReserved";
+ context< RecoveryMachine >().log_enter(state_name);
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->osd->local_reserver.request_reservation(
+ pg->info.pgid,
+ new C_QueuePeeringEvt<LocalBackfillReserved>(
+ pg, pg->get_osdmap()->get_epoch(),
+ LocalBackfillReserved()));
+}
+
+void PG::RecoveryState::WaitLocalBackfillReserved::exit()
+{
+ context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+/*----NotBackfilling------*/
+PG::RecoveryState::NotBackfilling::NotBackfilling(my_context ctx)
+ : my_base(ctx)
+{
+ state_name = "Started/Primary/Active/NotBackfilling";
+ context< RecoveryMachine >().log_enter(state_name);
+}
+
+void PG::RecoveryState::NotBackfilling::exit()
+{
+ context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+/*---RepNotBackfilling----*/
+PG::RecoveryState::RepNotBackfilling::RepNotBackfilling(my_context ctx)
+ : my_base(ctx)
+{
+ state_name = "Started/Primary/Active/RepNotBackfilling";
+ context< RecoveryMachine >().log_enter(state_name);
+}
+
+void PG::RecoveryState::RepNotBackfilling::exit()
+{
+ context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+/*-RepWaitBackfillReserved*/
+PG::RecoveryState::RepWaitBackfillReserved::RepWaitBackfillReserved(my_context ctx)
+ : my_base(ctx)
+{
+ state_name = "Started/Primary/Active/RepWaitBackfillReserved";
+ context< RecoveryMachine >().log_enter(state_name);
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->osd->remote_reserver.request_reservation(
+ pg->info.pgid,
+ new C_QueuePeeringEvt<RemoteBackfillReserved>(
+ pg, pg->get_osdmap()->get_epoch(),
+ RemoteBackfillReserved()));
+}
+
+void PG::RecoveryState::RepWaitBackfillReserved::exit()
+{
+ context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+boost::statechart::result
+PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt)
+{
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->osd->cluster_messenger->send_message(
+ new MBackfillReserve(
+ MBackfillReserve::GRANT,
+ pg->info.pgid,
+ pg->get_osdmap()->get_epoch()),
+ pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ return transit<RepBackfilling>();
+}
+
+/*---RepBackfilling-------*/
+PG::RecoveryState::RepBackfilling::RepBackfilling(my_context ctx)
+ : my_base(ctx)
+{
+ state_name = "Started/Primary/Active/RepBackfilling";
+ context< RecoveryMachine >().log_enter(state_name);
+}
+
+void PG::RecoveryState::RepBackfilling::exit()
+{
+ context< RecoveryMachine >().log_exit(state_name, enter_time);
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
+}
+
/*---------Active---------*/
PG::RecoveryState::Active::Active(my_context ctx)
: my_base(ctx)
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
+ assert(!pg->backfill_reserving);
+ assert(!pg->backfill_reserved);
assert(pg->is_primary());
dout(10) << "In Active, about to call activate" << dendl;
pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
+ pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+ pg->backfill_reserved = false;
+ pg->backfill_reserving = false;
pg->state_clear(PG_STATE_DEGRADED);
pg->state_clear(PG_STATE_BACKFILL);
pg->state_clear(PG_STATE_REPLAY);
void PG::RecoveryState::ReplicaActive::exit()
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
}
/*-------Stray---*/
BackfillInterval backfill_info;
BackfillInterval peer_backfill_info;
int backfill_target;
+ bool backfill_reserved;
+ bool backfill_reserving;
friend class OSD;
TrivialEvent(CheckRepops)
TrivialEvent(NullEvt)
TrivialEvent(FlushedEvt)
+ TrivialEvent(Backfilled)
+ TrivialEvent(LocalBackfillReserved)
+ TrivialEvent(RemoteBackfillReserved)
+ TrivialEvent(RequestBackfill)
/* Encapsulates PG recovery process */
class RecoveryState {
boost::statechart::result react(const AdvMap &advmap);
};
- struct Active : boost::statechart::state< Active, Primary >, NamedState {
+ struct NotBackfilling;
+ struct Active : boost::statechart::state< Active, Primary, NotBackfilling >, NamedState {
Active(my_context ctx);
void exit();
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< MNotifyRec >,
boost::statechart::custom_reaction< MLogRec >,
- boost::statechart::custom_reaction< RecoveryComplete >
+ boost::statechart::custom_reaction< RecoveryComplete >,
+ boost::statechart::custom_reaction< Backfilled >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const ActMap&);
boost::statechart::result react(const MInfoRec& infoevt);
boost::statechart::result react(const MNotifyRec& notevt);
boost::statechart::result react(const MLogRec& logevt);
+ boost::statechart::result react(const Backfilled&) {
+ return discard_event();
+ }
boost::statechart::result react(const RecoveryComplete&);
};
- struct ReplicaActive : boost::statechart::state< ReplicaActive, Started >, NamedState {
+ struct Backfilling : boost::statechart::state< Backfilling, Active >, NamedState {
+ typedef boost::mpl::list<
+ boost::statechart::transition< Backfilled, NotBackfilling >
+ > reactions;
+ Backfilling(my_context ctx);
+ void exit();
+ };
+
+ struct WaitRemoteBackfillReserved : boost::statechart::state< WaitRemoteBackfillReserved, Active >, NamedState {
+ typedef boost::mpl::list<
+ boost::statechart::transition< RemoteBackfillReserved, Backfilling >
+ > reactions;
+ WaitRemoteBackfillReserved(my_context ctx);
+ void exit();
+ };
+
+ struct WaitLocalBackfillReserved : boost::statechart::state< WaitLocalBackfillReserved, Active >, NamedState {
+ typedef boost::mpl::list<
+ boost::statechart::transition< LocalBackfillReserved, WaitRemoteBackfillReserved >
+ > reactions;
+ WaitLocalBackfillReserved(my_context ctx);
+ void exit();
+ };
+
+ struct NotBackfilling : boost::statechart::state< NotBackfilling, Active>, NamedState {
+ typedef boost::mpl::list<
+ boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved>
+ > reactions;
+ NotBackfilling(my_context ctx);
+ void exit();
+ };
+
+ struct RepNotBackfilling;
+ struct ReplicaActive : boost::statechart::state< ReplicaActive, Started, RepNotBackfilling >, NamedState {
ReplicaActive(my_context ctx);
void exit();
boost::statechart::result react(const Activate&);
};
+ struct RepBackfilling : boost::statechart::state< RepBackfilling, ReplicaActive >, NamedState {
+ typedef boost::mpl::list<
+ boost::statechart::transition< Backfilled, RepNotBackfilling >
+ > reactions;
+ RepBackfilling(my_context ctx);
+ void exit();
+ };
+
+ struct RepWaitBackfillReserved : boost::statechart::state< RepWaitBackfillReserved, ReplicaActive >, NamedState {
+ typedef boost::mpl::list<
+ boost::statechart::custom_reaction< RemoteBackfillReserved >
+ > reactions;
+ RepWaitBackfillReserved(my_context ctx);
+ void exit();
+ boost::statechart::result react(const RemoteBackfillReserved &evt);
+ };
+
+ struct RepNotBackfilling : boost::statechart::state< RepNotBackfilling, ReplicaActive>, NamedState {
+ typedef boost::mpl::list<
+ boost::statechart::transition< RequestBackfill, RepWaitBackfillReserved >
+ > reactions;
+ RepNotBackfilling(my_context ctx);
+ void exit();
+ };
+
struct Stray : boost::statechart::state< Stray, Started >, NamedState {
map<int, pair<pg_query_t, epoch_t> > pending_queries;
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid);
osd->cluster_messenger->send_message(reply, m->get_connection());
+ queue_peering_event(
+ CephPeeringEvtRef(
+ new CephPeeringEvt(
+ get_osdmap()->get_epoch(),
+ get_osdmap()->get_epoch(),
+ Backfilled())));
}
// fall-thru
if (get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL)) {
dout(10) << "deferring backfill due to NOBACKFILL" << dendl;
deferred_backfill = true;
+ } else if (!backfill_reserved) {
+ dout(10) << "deferring backfill due to !backfill_reserved" << dendl;
+ if (!backfill_reserving) {
+ dout(10) << "queueing RequestBackfill" << dendl;
+ backfill_reserving = true;
+ queue_peering_event(
+ CephPeeringEvtRef(
+ new CephPeeringEvt(
+ get_osdmap()->get_epoch(),
+ get_osdmap()->get_epoch(),
+ RequestBackfill())));
+ }
+ deferred_backfill = true;
} else {
started += recover_backfill(max - started);
}
return started;
}
+ queue_peering_event(
+ CephPeeringEvtRef(
+ new CephPeeringEvt(
+ get_osdmap()->get_epoch(),
+ get_osdmap()->get_epoch(),
+ Backfilled())));
handle_recovery_complete(prctx);
return 0;