if (is_active()) {
balancer->tick();
+ if (snapserver)
+ snapserver->check_osd_map(false);
}
}
break;
case CEPH_MSG_OSD_MAP:
objecter->handle_osd_map((MOSDMap*)m);
+ if (snapserver)
+ snapserver->check_osd_map(true);
break;
// MDS
#include "SnapServer.h"
#include "MDS.h"
+#include "osd/OSDMap.h"
#include "include/types.h"
#include "messages/MMDSTableRequest.h"
+#include "messages/MRemoveSnaps.h"
+
+#include "msg/Messenger.h"
#include "config.h"
{
last_snap = 0;
snaps.clear();
- pending_removal.clear();
-}
-
-snapid_t SnapServer::create(inodeno_t base, const string& name, utime_t stamp, version_t *psnapv)
-{
- assert(is_active());
-
- snapid_t sn = ++last_snap;
- snaps[sn].snapid = sn;
- snaps[sn].ino = base;
- snaps[sn].name = name;
- snaps[sn].stamp = stamp;
- *psnapv = ++version;
-
- dout(10) << "create(" << base << "," << name << ") = " << sn << dendl;
-
- return sn;
-}
-
-void SnapServer::remove(snapid_t sn)
-{
- assert(is_active());
-
- snaps.erase(sn);
- pending_removal.insert(sn);
- version++;
+ pending_purge.clear();
}
void SnapServer::_prepare(bufferlist &bl, __u64 reqid, int bymds)
{
bufferlist::iterator p = bl.begin();
- __u32 what;
- ::decode(what, p);
+ __u32 op;
+ ::decode(op, p);
- switch (what) {
+ switch (op) {
case TABLE_OP_CREATE:
{
version++;
::decode(info.stamp, p);
info.snapid = version;
pending_create[version] = info;
+ dout(10) << "prepare v" << version << " create " << info << dendl;
} else {
pending_noop.insert(version);
+ dout(10) << "prepare v" << version << " noop" << dendl;
}
}
break;
case TABLE_OP_DESTROY:
{
+ inodeno_t ino;
snapid_t snapid;
+ ::decode(ino, p); // not used, currently.
::decode(snapid, p);
version++;
pending_destroy[version] = snapid;
+ dout(10) << "prepare v" << version << " destroy " << snapid << dendl;
}
break;
}
else if (pending_destroy.count(tid)) {
- dout(7) << "commit " << tid << " destroy " << pending_destroy[tid] << dendl;
- snaps.erase(pending_destroy[tid]);
+ snapid_t sn = pending_destroy[tid];
+ dout(7) << "commit " << tid << " destroy " << sn << dendl;
+ snaps.erase(sn);
+ pending_purge.insert(sn);
pending_destroy.erase(tid);
}
else if (pending_noop.count(tid)) {
}
+
+void SnapServer::check_osd_map(bool force)
+{
+ if (!force && version == last_checked_osdmap) {
+ dout(10) << "check_osd_map - version unchanged" << dendl;
+ return;
+ }
+ dout(10) << "check_osd_map pending_purge=" << pending_purge << dendl;
+
+ vector<snapid_t> purge;
+ bool purged = false;
+
+ set<snapid_t>::iterator p = pending_purge.begin();
+ while (p != pending_purge.end()) {
+ if (mds->osdmap->is_removed_snap(*p)) {
+ dout(10) << " osdmap marks " << *p << " as removed" << dendl;
+ pending_purge.erase(p++);
+ purged = true;
+ } else {
+ purge.push_back(*p);
+ p++;
+ }
+ }
+
+ if (purged)
+ version++;
+
+ if (!purge.empty()) {
+ dout(10) << "requesting removal of " << purge << dendl;
+ MRemoveSnaps *m = new MRemoveSnaps(purge);
+ int mon = mds->monmap->pick_mon();
+ mds->messenger->send_message(m, mds->monmap->get_inst(mon));
+ }
+
+ last_checked_osdmap = version;
+}
protected:
snapid_t last_snap;
map<snapid_t, SnapInfo> snaps;
- set<snapid_t> pending_removal;
+ set<snapid_t> pending_purge;
map<version_t, SnapInfo> pending_create;
map<version_t, snapid_t> pending_destroy;
set<version_t> pending_noop;
+ version_t last_checked_osdmap;
+
public:
SnapServer(MDS *m) : MDSTableServer(m, TABLE_SNAP) { }
-
- // alloc or reclaim ids
- snapid_t create(inodeno_t base, const string& name, utime_t stamp, version_t *snapv);
- void remove(snapid_t sn);
-
+
void init_inode();
void reset_state();
void encode_state(bufferlist& bl) {
::encode(last_snap, bl);
::encode(snaps, bl);
- ::encode(pending_removal, bl);
+ ::encode(pending_purge, bl);
::encode(pending_create, bl);
::encode(pending_destroy, bl);
::encode(pending_noop, bl);
void decode_state(bufferlist::iterator& bl) {
::decode(last_snap, bl);
::decode(snaps, bl);
- ::decode(pending_removal, bl);
+ ::decode(pending_purge, bl);
::decode(pending_create, bl);
::decode(pending_destroy, bl);
::decode(pending_noop, bl);
void _commit(version_t tid);
void _rollback(version_t tid);
void handle_query(MMDSTableRequest *m);
+
+ void check_osd_map(bool force);
};
#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MREMOVESNAPS_H
+#define __MREMOVESNAPS_H
+
+#include "msg/Message.h"
+
+struct MRemoveSnaps : public Message {
+ vector<snapid_t> snaps;
+
+ MRemoveSnaps() :
+ Message(MSG_REMOVE_SNAPS) { }
+ MRemoveSnaps(vector<snapid_t>& s) :
+ Message(MSG_REMOVE_SNAPS) {
+ snaps.swap(s);
+ }
+
+ const char *get_type_name() { return "remove_snaps"; }
+ void print(ostream& out) {
+ out << "remove_snaps(" << snaps << ")";
+ }
+
+ void encode_payload() {
+ ::encode(snaps, payload);
+ }
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(snaps, p);
+ assert(p.end());
+ }
+
+};
+
+#endif
case MSG_OSD_IN:
case MSG_OSD_OUT:
case MSG_OSD_ALIVE:
+ case MSG_REMOVE_SNAPS:
osdmon->dispatch(m);
break;
#include "messages/MOSDBoot.h"
#include "messages/MOSDAlive.h"
#include "messages/MMonCommand.h"
+#include "messages/MRemoveSnaps.h"
#include "common/Timer.h"
case MSG_OSD_OUT:
return preprocess_out((MOSDOut*)m);
*/
+
+ case MSG_REMOVE_SNAPS:
+ return preprocess_remove_snaps((MRemoveSnaps*)m);
default:
assert(0);
return prepare_out((MOSDOut*)m);
*/
+ case MSG_REMOVE_SNAPS:
+ return prepare_remove_snaps((MRemoveSnaps*)m);
+
default:
assert(0);
delete m;
}
+// ---
+
+bool OSDMonitor::preprocess_remove_snaps(MRemoveSnaps *m)
+{
+ dout(7) << "preprocess_remove_snaps " << *m << dendl;
+
+ for (vector<snapid_t>::iterator p = m->snaps.begin();
+ p != m->snaps.end();
+ p++) {
+ if (*p > osdmap.max_snap ||
+ !osdmap.removed_snaps.contains(*p))
+ return false;
+ }
+ delete m;
+ return true;
+}
+
+bool OSDMonitor::prepare_remove_snaps(MRemoveSnaps *m)
+{
+ dout(7) << "prepare_remove_snaps " << *m << dendl;
+
+ snapid_t max;
+ for (vector<snapid_t>::iterator p = m->snaps.begin();
+ p != m->snaps.end();
+ p++) {
+ if (*p > max)
+ max = *p;
+
+ if (!osdmap.removed_snaps.contains(*p)) {
+ dout(10) << " adding " << *p << " to removed_snaps" << dendl;
+ pending_inc.removed_snaps.insert(*p);
+ }
+ }
+
+ if (max > osdmap.max_snap &&
+ (__s64)max > pending_inc.new_max_snap) {
+ dout(10) << " new_max_snap " << max << dendl;
+ pending_inc.new_max_snap = max;
+ } else {
+ dout(10) << " max_snap " << osdmap.max_snap << " still >= " << max << dendl;
+ }
+
+ delete m;
+ return true;
+}
+
+
// ---------------
// map helpers
bool preprocess_out(class MOSDOut *m);
bool prepare_out(class MOSDOut *m);
+ bool preprocess_remove_snaps(class MRemoveSnaps *m);
+ bool prepare_remove_snaps(class MRemoveSnaps *m);
+
public:
OSDMonitor(Monitor *mn, Paxos *p) :
PaxosService(mn, p) { }
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGCreate.h"
+#include "messages/MRemoveSnaps.h"
+
#include "messages/MMonMap.h"
#include "messages/MMonGetMap.h"
m = new MOSDPGCreate;
break;
+ case MSG_REMOVE_SNAPS:
+ m = new MRemoveSnaps;
+ break;
+
// clients
case CEPH_MSG_CLIENT_MOUNT:
#define MSG_PGSTATSACK 87
#define MSG_OSD_PG_CREATE 88
-
+#define MSG_REMOVE_SNAPS 89
map<pg_t,uint32_t> new_pg_swap_primary;
list<pg_t> old_pg_swap_primary;
- int64_t new_max_snap;
+ snapid_t new_max_snap;
interval_set<snapid_t> removed_snaps;
void encode(bufferlist& bl) {
return last_pg_change;
}
+ snapid_t get_max_snap() { return max_snap; }
+ bool is_removed_snap(snapid_t sn) {
+ if (sn > max_snap)
+ return false;
+ return removed_snaps.contains(sn);
+ }
+
/***** cluster state *****/
/* osds */
int get_max_osd() const { return max_osd; }
return -1;
}
- // snaps
-
- // returns true if a snap has been deleted.
- bool snap_removed(snapid_t s) {
- if (s > max_snap)
- return false;
- return removed_snaps.contains(s);
- }
void apply_incremental(Incremental &inc) {
if (inc.epoch == 1)
do
$CEPH_BIN/cosd --mkfs_for_osd $osd dev/osd$osd # initialize empty object store
# echo valgrind --leak-check=full --show-reachable=yes $CEPH_BIN/cosd dev/osd$osd --debug_ms 1 --debug_osd 20 --debug_filestore 10 --debug_ebofs 20 #1>out/o$osd #& #--debug_osd 40
- $CEPH_BIN/cosd -m $IP:12345 dev/osd$osd -d --debug_ms 1 --debug_osd 20 # --debug_filestore 10
+ $CEPH_BIN/cosd -m $IP:12345 dev/osd$osd -d --debug_ms 1 --debug_osd 20 --debug_filestore 20
done
# mds