out << " state=" << dn.get_state();
if (dn.is_new()) out << "|new";
if (dn.state_test(CDentry::STATE_BOTTOMLRU)) out << "|bottomlru";
+ if (dn.state_test(CDentry::STATE_UNLINKING)) out << "|unlinking";
+ if (dn.state_test(CDentry::STATE_REINTEGRATING)) out << "|reintegrating";
if (dn.get_num_ref()) {
out << " |";
static const int STATE_PURGINGPINNED = (1<<5);
static const int STATE_BOTTOMLRU = (1<<6);
static const int STATE_UNLINKING = (1<<7);
+ static const int STATE_REINTEGRATING = (1<<8);
// stray dentry needs notification of releasing reference
static const int STATE_STRAY = STATE_NOTIFYREF;
static const int MASK_STATE_IMPORT_KEPT = STATE_BOTTOMLRU;
static const unsigned EXPORT_NONCE = 1;
- const static uint64_t WAIT_UNLINK_STATE = (1<<0);
- const static uint64_t WAIT_UNLINK_FINISH = (1<<1);
+ const static uint64_t WAIT_UNLINK_STATE = (1<<0);
+ const static uint64_t WAIT_UNLINK_FINISH = (1<<1);
+ const static uint64_t WAIT_REINTEGRATE_FINISH = (1<<2);
uint32_t replica_unlinking_ref = 0;
CDentry(std::string_view n, __u32 h,
type == CEPH_MSG_CLIENT_RECONNECT ||
type == CEPH_MSG_CLIENT_RECLAIM ||
type == CEPH_MSG_CLIENT_REQUEST ||
+ type == CEPH_MSG_CLIENT_REPLY ||
type == MSG_MDS_PEER_REQUEST ||
type == MSG_MDS_HEARTBEAT ||
type == MSG_MDS_TABLE_REQUEST ||
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
// fall-thru
case CEPH_MSG_CLIENT_REQUEST:
+ case CEPH_MSG_CLIENT_REPLY:
server->dispatch(m);
break;
case MSG_MDS_PEER_REQUEST:
class ScrubStack;
class C_ExecAndReply;
+struct MDSMetaRequest {
+private:
+ int _op;
+ CDentry *_dentry;
+ ceph_tid_t _tid;
+public:
+ explicit MDSMetaRequest(int op, CDentry *dn, ceph_tid_t tid) :
+ _op(op), _dentry(dn), _tid(tid) {
+ if (_dentry) {
+ _dentry->get(CDentry::PIN_PURGING);
+ }
+ }
+ ~MDSMetaRequest() {
+ if (_dentry) {
+ _dentry->put(CDentry::PIN_PURGING);
+ }
+ }
+
+ CDentry *get_dentry() { return _dentry; }
+ int get_op() { return _op; }
+ ceph_tid_t get_tid() { return _tid; }
+};
+
/**
* The public part of this class's interface is what's exposed to all
* the various subsystems (server, mdcache, etc), such as pointers
PerfCounters *logger = nullptr, *mlogger = nullptr;
OpTracker op_tracker;
+ std::map<ceph_tid_t, MDSMetaRequest> internal_client_requests;
+
// The last different state I held before current
MDSMap::DaemonState last_state = MDSMap::STATE_BOOT;
// The state assigned to me by the MDSMap
#include "Mutation.h"
#include "MetricsHandler.h"
#include "cephfs_features.h"
+#include "MDSContext.h"
#include "msg/Messenger.h"
case CEPH_MSG_CLIENT_REQUEST:
handle_client_request(ref_cast<MClientRequest>(m));
return;
+ case CEPH_MSG_CLIENT_REPLY:
+ handle_client_reply(ref_cast<MClientReply>(m));
+ return;
case CEPH_MSG_CLIENT_RECLAIM:
handle_client_reclaim(ref_cast<MClientReclaim>(m));
return;
mds->send_message_client(reply, session);
}
+ if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
+ mds->send_message(reply, mdr->client_request->get_connection());
+ }
+
if (req->is_queued_for_replay() &&
(mdr->has_completed || reply->get_result() < 0)) {
if (reply->get_result() < 0) {
return;
}
+void Server::handle_client_reply(const cref_t<MClientReply> &reply)
+{
+ dout(4) << "handle_client_reply " << *reply << dendl;
+
+ ceph_assert(reply->is_safe());
+ ceph_tid_t tid = reply->get_tid();
+
+ if (mds->internal_client_requests.count(tid) == 0) {
+ dout(1) << " no pending request on tid " << tid << dendl;
+ return;
+ }
+
+ auto &req = mds->internal_client_requests.at(tid);
+ CDentry *dn = req.get_dentry();
+
+ switch (reply->get_op()) {
+ case CEPH_MDS_OP_RENAME:
+ if (dn) {
+ dn->state_clear(CDentry::STATE_REINTEGRATING);
+
+ MDSContext::vec finished;
+ dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
+ mds->queue_waiters(finished);
+ }
+ break;
+ default:
+ dout(5) << " unknown client op " << reply->get_op() << dendl;
+ }
+
+ mds->internal_client_requests.erase(tid);
+}
+
void Server::handle_osd_map()
{
/* Note that we check the OSDMAP_FULL flag directly rather than
dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
}
+struct C_WaitReintegrateToFinish : public MDSContext {
+protected:
+ MDCache *mdcache;
+ CDentry *dn;
+ MDSContext *fin;
+
+ MDSRank *get_mds() override
+ {
+ ceph_assert(mdcache != NULL);
+ return mdcache->mds;
+ }
+
+public:
+ C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+ mdcache(m), dn(d), fin(f) {}
+ void finish(int r) override {
+ fin->complete(r);
+ dn->put(CDentry::PIN_PURGING);
+ }
+};
+
+bool Server::is_reintegrate_pending(CDentry *dn)
+{
+ CDentry::linkage_t *dnl = dn->get_projected_linkage();
+ if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
+ return true;
+ }
+ return false;
+}
+
+void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
+{
+ dout(20) << __func__ << " dn " << *dn << dendl;
+ mds->locker->drop_locks(mdr.get());
+ auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+ dn->get(CDentry::PIN_PURGING);
+ dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
+}
+
// MKNOD
class C_MDS_mknod_finish : public ServerLogContext {
if (!dn)
return;
+ if (is_reintegrate_pending(dn)) {
+ wait_for_pending_reintegrate(dn, mdr);
+ return;
+ }
+
// notify replica MDSes the dentry is under unlink
if (!dn->state_test(CDentry::STATE_UNLINKING)) {
dn->state_set(CDentry::STATE_UNLINKING);
// -- requests --
void handle_client_request(const cref_t<MClientRequest> &m);
+ void handle_client_reply(const cref_t<MClientReply> &m);
void journal_and_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn,
LogEvent *le, MDSLogContextBase *fin);
bool is_unlink_pending(CDentry *dn);
void wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr);
+ bool is_reintegrate_pending(CDentry *dn);
+ void wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr);
+
// open
void handle_client_open(MDRequestRef& mdr);
void handle_client_openc(MDRequestRef& mdr); // O_CREAT variant.
dout(10) << __func__ << " " << *straydn << " to " << *rdn << dendl;
logger->inc(l_mdc_strays_reintegrated);
-
+
// rename it to remote linkage .
filepath src(straydn->get_name(), straydn->get_dir()->ino());
filepath dst(rdn->get_name(), rdn->get_dir()->ino());
+ ceph_tid_t tid = mds->issue_tid();
+
auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
req->set_filepath(dst);
req->set_filepath2(src);
- req->set_tid(mds->issue_tid());
+ req->set_tid(tid);
+
+ rdn->state_set(CDentry::STATE_REINTEGRATING);
+ mds->internal_client_requests.emplace(std::piecewise_construct,
+ std::make_tuple(tid),
+ std::make_tuple(CEPH_MDS_OP_RENAME,
+ rdn, tid));
mds->send_message_mds(req, rdn->authority().first);
}
-
+
void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
{
dout(10) << __func__ << " " << *dn << " to mds." << to << dendl;
filepath src(dn->get_name(), dirino);
filepath dst(dn->get_name(), MDS_INO_STRAY(to, MDS_INO_STRAY_INDEX(dirino)));
+ ceph_tid_t tid = mds->issue_tid();
+
auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
req->set_filepath(dst);
req->set_filepath2(src);
- req->set_tid(mds->issue_tid());
+ req->set_tid(tid);
+
+ mds->internal_client_requests.emplace(std::piecewise_construct,
+ std::make_tuple(tid),
+ std::make_tuple(CEPH_MDS_OP_RENAME,
+ nullptr, tid));
mds->send_message_mds(req, to);
}