--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MOSDPGTRIM_H
+#define __MOSDPGTRIM_H
+
+#include "msg/Message.h"
+
+class MOSDPGTrim : public Message {
+public:
+ epoch_t epoch;
+ pg_t pgid;
+ eversion_t trim_to;
+
+ epoch_t get_epoch() { return epoch; }
+
+ MOSDPGTrim() {}
+ MOSDPGTrim(version_t mv, pg_t p, eversion_t tt) :
+ Message(MSG_OSD_PG_TRIM),
+ epoch(mv), pgid(p), trim_to(tt) { }
+
+ const char *get_type_name() { return "pg_trim"; }
+ void print(ostream& out) {
+ out << "pg_trim(" << pgid << " to " << trim_to << " e" << epoch << ")";
+ }
+
+ void encode_payload() {
+ ::encode(epoch, payload);
+ ::encode(pgid, payload);
+ ::encode(trim_to, payload);
+ }
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(epoch, p);
+ ::decode(pgid, p);
+ ::decode(trim_to, p);
+ }
+};
+
+#endif
#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGCreate.h"
+#include "messages/MOSDPGTrim.h"
#include "messages/MOSDScrub.h"
#include "messages/MRemoveSnaps.h"
case MSG_OSD_PG_CREATE:
m = new MOSDPGCreate;
break;
+ case MSG_OSD_PG_TRIM:
+ m = new MOSDPGTrim;
+ break;
case MSG_OSD_SCRUB:
m = new MOSDScrub;
#define MSG_OSD_PG_LOG 83
#define MSG_OSD_PG_REMOVE 84
#define MSG_OSD_PG_INFO 85
+#define MSG_OSD_PG_TRIM 86
-#define MSG_PGSTATS 86
-#define MSG_PGSTATSACK 87
+#define MSG_PGSTATS 87
+#define MSG_PGSTATSACK 88
-#define MSG_OSD_PG_CREATE 88
-#define MSG_REMOVE_SNAPS 89
+#define MSG_OSD_PG_CREATE 89
+#define MSG_REMOVE_SNAPS 90
-#define MSG_OSD_SCRUB 90
+#define MSG_OSD_SCRUB 91
#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGCreate.h"
+#include "messages/MOSDPGTrim.h"
#include "messages/MOSDAlive.h"
case MSG_OSD_PG_INFO:
handle_pg_info((MOSDPGInfo*)m);
break;
+ case MSG_OSD_PG_TRIM:
+ handle_pg_trim((MOSDPGTrim*)m);
+ break;
// client ops
case CEPH_MSG_OSD_OP:
delete m;
}
+void OSD::handle_pg_trim(MOSDPGTrim *m)
+{
+ dout(7) << "handle_pg_trim " << *m << " from " << m->get_source() << dendl;
+
+ int from = m->get_source().num();
+ if (!require_same_or_newer_map(m, m->epoch)) return;
+
+ if (!_have_pg(m->pgid)) {
+ dout(10) << " don't have pg " << m->pgid << dendl;
+ } else {
+ PG *pg = _lookup_lock_pg(m->pgid);
+ if (m->epoch < pg->info.history.same_since) {
+ dout(10) << *pg << " got old trim to " << m->trim_to << ", ignoring" << dendl;
+ pg->unlock();
+ goto out;
+ }
+ assert(pg);
+ assert(from == pg->acting[0]);
+
+ ObjectStore::Transaction t;
+ pg->trim(t, m->trim_to);
+ pg->write_info(t);
+ pg->unlock();
+
+ store->apply_transaction(t);
+ }
+
+ out:
+ delete m;
+}
+
/** PGQuery
* from primary to replica | stray
void handle_pg_notify(class MOSDPGNotify *m);
void handle_pg_log(class MOSDPGLog *m);
void handle_pg_info(class MOSDPGInfo *m);
+ void handle_pg_trim(class MOSDPGTrim *m);
void handle_pg_remove(class MOSDPGRemove *m);
// helper for handle_pg_log and handle_pg_info
dirty_log = false;
}
+void PG::trim(ObjectStore::Transaction& t, eversion_t trim_to)
+{
+ // trim?
+ if (trim_to > log.bottom) {
+ dout(10) << "trim " << log << " to " << trim_to << dendl;
+ log.trim(t, trim_to);
+ info.log_bottom = log.bottom;
+ info.log_backlog = log.backlog;
+ trim_ondisklog_to(t, trim_to);
+ }
+}
+
void PG::trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v)
{
dout(15) << "trim_ondisk_log_to v " << v << dendl;
void PG::append_log(ObjectStore::Transaction &t, bufferlist& bl,
- eversion_t log_version, eversion_t trim_to)
+ eversion_t log_version)
{
dout(10) << "append_log " << ondisklog.bottom << "~" << ondisklog.length()
<< " adding " << bl.length() << dendl;
bufferlist blb(sizeof(ondisklog));
::encode(ondisklog, blb);
t.collection_setattr(info.pgid.to_coll(), "ondisklog", blb);
-
-
- // trim?
- if (is_complete() && trim_to > log.bottom) {
- dout(10) << " trimming " << log << " to " << trim_to << dendl;
- log.trim(t, trim_to);
- info.log_bottom = log.bottom;
- info.log_backlog = log.backlog;
- trim_ondisklog_to(t, trim_to);
- }
- dout(10) << " ondisklog " << ondisklog.bottom << "~" << ondisklog.length() << dendl;
+ dout(10) << "append_log now " << ondisklog.bottom << "~" << ondisklog.length() << dendl;
}
void PG::read_log(ObjectStore *store)
void write_info(ObjectStore::Transaction& t);
void write_log(ObjectStore::Transaction& t);
void append_log(ObjectStore::Transaction &t, bufferlist& bl,
- eversion_t log_version, eversion_t trim_to);
+ eversion_t log_version);
void read_log(ObjectStore *store);
+ void trim(ObjectStore::Transaction& t, eversion_t v);
void trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v);
void read_state(ObjectStore *store);
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGRemove.h"
+#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPing.h"
p != logv.end();
p++)
add_log_entry(*p, log_bl);
- append_log(t, log_bl, logv[0].version, trim_to);
+ append_log(t, log_bl, logv[0].version);
+ trim(t, trim_to);
// update the local pg, pg log
write_info(t);
repop->waitfor_disk.erase(fromosd);
repop->waitfor_nvram.erase(fromosd);
repop->waitfor_ack.erase(fromosd);
- repop->pg_complete_thru[fromosd] = pg_complete_thru;
+ peer_info[fromosd].last_complete;
}
} else if (ack_type & CEPH_OSD_FLAG_ONNVRAM) {
// nvram
tls.push_back(&ctx->op_t);
tls.push_back(&ctx->local_t);
}
+ } else {
+ // just trim the log
+ if (op->pg_trim_to != eversion_t()) {
+ trim(ctx->local_t, op->pg_trim_to);
+ tls.push_back(&ctx->local_t);
+ }
}
C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd,
if (is_all_uptodate()) {
dout(-7) << "recover_primary complete" << dendl;
finish_recovery();
+ trim_replicas();
} else {
dout(-10) << "recover_primary primary now complete, starting peer recovery" << dendl;
}
// nothing to do!
dout(-10) << "recover_replicas - nothing to do!" << dendl;
- if (is_all_uptodate())
+ if (is_all_uptodate()) {
finish_recovery();
- else {
+ trim_replicas();
+ } else {
dout(10) << "recover_replicas not all uptodate, acting " << acting << ", uptodate " << uptodate_set << dendl;
}
}
+void ReplicatedPG::trim_replicas()
+{
+ dout(10) << "trim_replicas" << dendl;
+
+ return; // hmm FIXME
+
+
+ // trim myself
+ eversion_t trim_to;
+
+ for (unsigned i=1; i<acting.size(); i++)
+ osd->messenger->send_message(new MOSDPGTrim(osd->osdmap->get_epoch(), info.pgid, trim_to),
+ osd->osdmap->get_inst(acting[i]));
+}
+
/** clean_up_local
* remove any objects that we're storing but shouldn't.
void finish_recovery_op();
int recover_primary(int max);
int recover_replicas(int max);
+ void trim_replicas();
void sub_op_modify(MOSDSubOp *op);
void sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t last_complete);