CEPH_OSD_OP_PUSH = CEPH_OSD_OP_MODE_SUB | 2,
CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3,
CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
+ CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5,
/* object data */
CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
case CEPH_OSD_OP_PUSH: return "push";
case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
+ case CEPH_OSD_OP_SCRUB: return "scrub";
default: return "???";
}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MOSDPGSCRUB_H
-#define __MOSDPGSCRUB_H
-
-#include "msg/Message.h"
-
-struct MOSDPGScrub : public Message {
- pg_t pgid;
- epoch_t epoch;
- bufferlist map;
-
- MOSDPGScrub() {}
- MOSDPGScrub(pg_t p, epoch_t e) :
- Message(MSG_OSD_PG_SCRUB),
- pgid(p), epoch(e) {}
-
- const char *get_type_name() { return "pg_scrub"; }
- void print(ostream& out) {
- out << "pg_scrub(" << pgid << " e" << epoch;
- if (map.length())
- out << " " << map.length() << " bytes";
- out << ")";
- }
-
- void encode_payload() {
- ::encode(pgid, payload);
- ::encode(epoch, payload);
- ::encode(map, payload);
- }
- void decode_payload() {
- bufferlist::iterator p = payload.begin();
- ::decode(pgid, p);
- ::decode(epoch, p);
- ::decode(map, p);
- }
-};
-
-#endif
const char *get_type_name() { return "osd_sub_op"; }
void print(ostream& out) {
out << "osd_sub_op(" << reqid
+ << " " << pgid
<< " " << poid
<< " " << ops;
if (noop)
*/
class MOSDSubOpReply : public Message {
+public:
epoch_t map_epoch;
// subop metadata
tid_t rep_tid;
pobject_t poid;
-public:
vector<ceph_osd_op> ops;
// result
void print(ostream& out) {
out << "osd_sub_op_reply(" << reqid
+ << " " << pgid
<< " " << poid << " " << ops;
if (ack_type & CEPH_OSD_OP_ONDISK)
out << " ondisk";
#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGCreate.h"
-#include "messages/MOSDPGScrub.h"
#include "messages/MOSDScrub.h"
#include "messages/MRemoveSnaps.h"
case MSG_OSD_PG_CREATE:
m = new MOSDPGCreate;
break;
- case MSG_OSD_PG_SCRUB:
- m = new MOSDPGScrub;
- break;
case MSG_OSD_SCRUB:
m = new MOSDScrub;
#define MSG_REMOVE_SNAPS 89
#define MSG_OSD_SCRUB 90
-#define MSG_OSD_PG_SCRUB 91
#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGCreate.h"
-#include "messages/MOSDPGScrub.h"
#include "messages/MOSDAlive.h"
case MSG_OSD_PG_INFO:
handle_pg_info((MOSDPGInfo*)m);
break;
- case MSG_OSD_PG_SCRUB:
- handle_pg_scrub((MOSDPGScrub*)m);
- break;
// client ops
case CEPH_MSG_OSD_OP:
delete m;
}
-void OSD::handle_pg_scrub(MOSDPGScrub *m)
-{
- dout(7) << "handle_pg_scrub " << *m << " from " << m->get_source() << dendl;
- int from = m->get_source().num();
- if (!require_same_or_newer_map(m, m->epoch)) return;
-
- PG *pg = _lookup_lock_pg(m->pgid);
- if (pg) {
- if (m->epoch < pg->info.history.same_since) {
- dout(10) << *pg << " has changed since " << m->epoch << dendl;
- } else {
- if (pg->is_primary()) {
- if (pg->peer_scrub_map.count(from)) {
- dout(10) << *pg << " already had osd" << from << " scrub map" << dendl;
- } else {
- dout(10) << *pg << " got osd" << from << " scrub map" << dendl;
- bufferlist::iterator p = m->map.begin();
- pg->peer_scrub_map[from].decode(p);
- pg->kick();
- }
- } else {
- // replica, reply
- dout(10) << *pg << " building scrub map for primary" << dendl;
-
- // do this is a separate thread.. FIXME
- ScrubMap map;
- pg->build_scrub_map(map);
-
- MOSDPGScrub *reply = new MOSDPGScrub(pg->info.pgid, osdmap->get_epoch());
- ::encode(map, reply->map);
- messenger->send_message(reply, m->get_source_inst());
- }
- }
- pg->unlock();
- } else {
- dout(10) << " pg " << m->pgid << " not found" << dendl;
- }
- delete m;
-}
-
/** PGQuery
* from primary to replica | stray
void handle_pg_log(class MOSDPGLog *m);
void handle_pg_info(class MOSDPGInfo *m);
void handle_pg_remove(class MOSDPGRemove *m);
- void handle_pg_scrub(class MOSDPGScrub *m);
// helper for handle_pg_log and handle_pg_info
void _process_pg_info(epoch_t epoch, int from,
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGInfo.h"
-#include "messages/MOSDPGScrub.h"
+
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
#include <sstream>
// ==========================================================================================
// SCRUB
+void PG::sub_op_scrub(MOSDSubOp *op)
+{
+ dout(7) << "sub_op_scrub" << dendl;
+
+ if (op->map_epoch < info.history.same_primary_since) {
+ dout(10) << "sub_op_scrub discarding old sub_op from "
+ << op->map_epoch << " < " << info.history.same_primary_since << dendl;
+ delete op;
+ return;
+ }
+
+ ScrubMap map;
+ build_scrub_map(map);
+
+ MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ACK);
+ ::encode(map, reply->get_data());
+ osd->messenger->send_message(reply, op->get_source_inst());
+
+ delete op;
+}
+
+void PG::sub_op_scrub_reply(MOSDSubOpReply *op)
+{
+ dout(7) << "sub_op_scrub_reply" << dendl;
+
+ if (op->map_epoch < info.history.same_primary_since) {
+ dout(10) << "sub_op_scrub discarding old sub_op from "
+ << op->map_epoch << " < " << info.history.same_primary_since << dendl;
+ delete op;
+ return;
+ }
+
+ int from = op->get_source().num();
+
+ if (peer_scrub_map.count(from)) {
+ dout(10) << " already had osd" << from << " scrub map" << dendl;
+ } else {
+ dout(10) << " got osd" << from << " scrub map" << dendl;
+ bufferlist::iterator p = op->get_data().begin();
+ peer_scrub_map[from].decode(p);
+ kick();
+ }
+
+ delete op;
+}
+
/*
* build a (sorted) summary of pg content for purposes of scrubbing
void PG::scrub()
{
- osd->map_lock.get_read();
+ stringstream ss;
+ ScrubMap scrubmap;
+ osd->map_lock.get_read();
lock();
-
+
epoch_t epoch = info.history.same_since;
if (!is_primary()) {
// request maps from replicas
for (unsigned i=1; i<acting.size(); i++) {
dout(10) << "scrub requesting scrubmap from osd" << acting[i] << dendl;
- osd->messenger->send_message(new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
+ vector<ceph_osd_op> scrub(1);
+ scrub[0].op = CEPH_OSD_OP_SCRUB;
+ pobject_t poid;
+ eversion_t v;
+ osd_reqid_t reqid;
+ MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, scrub, false, 0,
+ osd->osdmap->get_epoch(), osd->get_tid(), 0, v);
+ osd->messenger->send_message(subop, //new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
osd->osdmap->get_inst(acting[i]));
}
-
osd->map_lock.put_read();
+
// wait for any ops in progress
while (is_write_in_progress()) {
dout(10) << "scrub write(s) in progress, waiting" << dendl;
wait();
}
+
//unlock();
dout(10) << "scrub building my map" << dendl;
- ScrubMap scrubmap;
build_scrub_map(scrubmap);
/*
lock();
if (epoch != info.history.same_since) {
dout(10) << "scrub pg changed, aborting" << dendl;
- unlock();
- return;
+ goto out;
}
*/
if (epoch != info.history.same_since) {
dout(10) << "scrub pg changed, aborting" << dendl;
- unlock();
- return;
+ goto out;
}
}
unlock();
*/
- stringstream ss;
-
if (acting.size() > 1) {
dout(10) << "scrub comparing replica scrub maps" << dendl;
lock();
if (epoch != info.history.same_since) {
dout(10) << "scrub pg changed, aborting" << dendl;
- unlock();
- return;
+ goto out;
}
*/
lock();
if (epoch != info.history.same_since) {
dout(10) << "scrub pg changed, aborting" << dendl;
- unlock();
- return;
+ goto out;
}
*/
// finish up
info.stats.last_scrub = info.last_update;
info.stats.last_scrub_stamp = g_clock.now();
+
+
+ out:
state_clear(PG_STATE_SCRUBBING);
update_stats();
void build_scrub_map(ScrubMap &map);
virtual void _scrub(ScrubMap &map) {}
+ void sub_op_scrub(class MOSDSubOp *op);
+ void sub_op_scrub_reply(class MOSDSubOpReply *op);
+
public:
PG(OSD *o, pg_t p) :
case CEPH_OSD_OP_PUSH:
sub_op_push(op);
return;
+ case CEPH_OSD_OP_SCRUB:
+ sub_op_scrub(op);
+ return;
}
}
sub_op_push_reply(r);
return;
}
+ if (first.op == CEPH_OSD_OP_SCRUB) {
+ sub_op_scrub_reply(r);
+ return;
+ }
}
sub_op_modify_reply(r);
// push it back!
push(poid, op->get_source().num(), op->data_subset, op->clone_subsets);
+ delete op;
}
// ==========================================================================================
// SCRUB
+
void ReplicatedPG::_scrub(ScrubMap& scrubmap)
{
dout(10) << "_scrub" << dendl;