when client create file, if early_reply is set true, the metadata has not write to journal and the file data is successfully written to the journal of osd, and then mds is crushed.
fix : https://tracker.ceph.com/issues/43208
Signed-off-by: simon gao <simon29rock@gmail.com>
}
-void InoTable::project_release_ids(interval_set<inodeno_t>& ids)
+void InoTable::project_release_ids(const interval_set<inodeno_t>& ids)
{
dout(10) << "project_release_ids " << ids << " to " << projected_free << "/" << free << dendl;
projected_free.insert(ids);
++projected_version;
}
-void InoTable::apply_release_ids(interval_set<inodeno_t>& ids)
+void InoTable::apply_release_ids(const interval_set<inodeno_t>& ids)
{
dout(10) << "apply_release_ids " << ids << " to " << projected_free << "/" << free << dendl;
free.insert(ids);
void project_alloc_ids(interval_set<inodeno_t>& inos, int want);
void apply_alloc_ids(interval_set<inodeno_t>& inos);
- void project_release_ids(interval_set<inodeno_t>& inos);
- void apply_release_ids(interval_set<inodeno_t>& inos);
+ void project_release_ids(const interval_set<inodeno_t>& inos);
+ void apply_release_ids(const interval_set<inodeno_t>& inos);
void replay_alloc_id(inodeno_t ino);
void replay_alloc_ids(interval_set<inodeno_t>& inos);
#include "events/ESlaveUpdate.h"
#include "events/EOpen.h"
#include "events/ECommitted.h"
+#include "events/EPurged.h"
#include "events/ETableClient.h"
#include "events/ETableServer.h"
case EVENT_SLAVEUPDATE: return "SLAVEUPDATE";
case EVENT_OPEN: return "OPEN";
case EVENT_COMMITTED: return "COMMITTED";
+ case EVENT_PURGED: return "PURGED";
case EVENT_TABLECLIENT: return "TABLECLIENT";
case EVENT_TABLESERVER: return "TABLESERVER";
case EVENT_NOOP: return "NOOP";
{"SLAVEUPDATE", EVENT_SLAVEUPDATE},
{"OPEN", EVENT_OPEN},
{"COMMITTED", EVENT_COMMITTED},
+ {"PURGED", EVENT_PURGED},
{"TABLECLIENT", EVENT_TABLECLIENT},
{"TABLESERVER", EVENT_TABLESERVER},
{"NOOP", EVENT_NOOP}
case EVENT_COMMITTED:
le = std::make_unique<ECommitted>();
break;
+ case EVENT_PURGED:
+ le = std::make_unique<EPurged>();
+ break;
case EVENT_TABLECLIENT:
le = std::make_unique<ETableClient>();
break;
#define EVENT_SLAVEUPDATE 21
#define EVENT_OPEN 22
#define EVENT_COMMITTED 23
+#define EVENT_PURGED 24
#define EVENT_TABLECLIENT 42
#define EVENT_TABLESERVER 43
{}
void try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int op_prio);
-
+ void purge_inodes_finish(interval_set<inodeno_t>& inos){
+ purge_inodes.subtract(inos);
+ if (NULL != purged_cb &&
+ purge_inodes.empty())
+ purged_cb->complete(0);
+ }
+ void set_purged_cb(MDSContext* c){
+ ceph_assert(purged_cb == NULL);
+ purged_cb = c;
+ }
void wait_for_expiry(MDSContext *c)
{
ceph_assert(c != NULL);
elist<MDSlaveUpdate*> slave_updates{0}; // passed to begin() manually
set<CInode*> truncating_inodes;
+ interval_set<inodeno_t> purge_inodes;
+ MDSContext* purged_cb = nullptr;
map<int, ceph::unordered_set<version_t> > pending_commit_tids; // mdstable
set<metareqid_t> uncommitted_masters;
#include "events/EImportFinish.h"
#include "events/EFragment.h"
#include "events/ECommitted.h"
+#include "events/EPurged.h"
#include "events/ESessions.h"
#include "InoTable.h"
}
+class C_MDS_purge_completed_finish : public MDCacheLogContext {
+ interval_set<inodeno_t> inos;
+ version_t inotablev;
+ LogSegment *ls;
+public:
+ C_MDS_purge_completed_finish(MDCache *m,
+ interval_set<inodeno_t> i,
+ version_t iv,
+ LogSegment *_ls)
+ : MDCacheLogContext(m),
+ inos(std::move(i)),
+ inotablev(iv),
+ ls(_ls) {}
+ void finish(int r) override {
+ assert(r == 0);
+ if (inotablev) {
+ ls->purge_inodes_finish(inos);
+ mdcache->mds->inotable->apply_release_ids(inos);
+ assert(mdcache->mds->inotable->get_version() == inotablev);
+ }
+ }
+};
+void MDCache::start_purge_inodes(){
+ dout(10) << "start_purge_inodes" << dendl;
+ for (auto& p : mds->mdlog->segments){
+ LogSegment *ls = p.second;
+ if (ls->purge_inodes.size()){
+ purge_inodes(ls->purge_inodes, ls);
+ }
+ }
+}
-
+void MDCache::purge_inodes(const interval_set<inodeno_t>& inos, LogSegment *ls)
+{
+ auto cb = new LambdaContext([this, inos, ls](int r){
+ assert(r == 0 || r == -2);
+ mds->inotable->project_release_ids(inos);
+ version_t piv = mds->inotable->get_projected_version();
+ assert(piv != 0);
+ mds->mdlog->start_submit_entry(new EPurged(inos, piv, ls->seq),
+ new C_MDS_purge_completed_finish(this, inos, piv, ls));
+ mds->mdlog->flush();
+ });
+
+ dout(10) << __func__ << " start purge data : " << inos << dendl;
+ C_GatherBuilder gather(g_ceph_context,
+ new C_OnFinisher( new MDSIOContextWrapper(mds, cb), mds->finisher));
+ SnapContext nullsnapc;
+ uint64_t num = Striper::get_num_objects(default_file_layout, default_file_layout.get_period());
+ for (auto p = inos.begin();
+ p != inos.end();
+ ++p){
+ dout(10) << __func__
+ << " prealloc_inos : " << inos.size()
+ << " start : " << p.get_start().val
+ << " length : " << p.get_len() << " "
+ << " seq : " << ls->seq << dendl;
+
+ for (_inodeno_t i = 0; i < p.get_len(); i++){
+ dout(20) << __func__ << " : " << p.get_start() + i << dendl;
+ filer.purge_range(p.get_start() + i,
+ &default_file_layout,
+ nullsnapc,
+ 0, num,
+ ceph::real_clock::now(),
+ 0, gather.new_sub());
+ }
+ }
+ gather.activate();
+}
// ================================================================================
// cache trimming
void remove_recovered_truncate(CInode *in, LogSegment *ls);
void start_recovered_truncates();
+ // purge unsafe inodes
+ void start_purge_inodes();
+ void purge_inodes(const interval_set<inodeno_t>& i, LogSegment *ls);
+
CDir *get_auth_container(CDir *in);
CDir *get_export_container(CDir *dir);
void find_nested_exports(CDir *dir, set<CDir*>& s);
return;
mdcache->start_recovered_truncates();
+ mdcache->start_purge_inodes();
mdcache->do_file_recover();
// tell connected clients
#include "events/ESession.h"
#include "events/EOpen.h"
#include "events/ECommitted.h"
+#include "events/EPurged.h"
#include "include/stringify.h"
#include "include/filepath.h"
version_t cmapv;
interval_set<inodeno_t> inos;
version_t inotablev;
+ interval_set<inodeno_t> purge_inos;
+ LogSegment *ls = nullptr;
Context *fin;
public:
C_MDS_session_finish(Server *srv, Session *se, uint64_t sseq, bool s, version_t mv, Context *fin_ = NULL) :
ServerLogContext(srv), session(se), state_seq(sseq), open(s), cmapv(mv), inotablev(0), fin(fin_) { }
- C_MDS_session_finish(Server *srv, Session *se, uint64_t sseq, bool s, version_t mv, interval_set<inodeno_t>& i, version_t iv, Context *fin_ = NULL) :
- ServerLogContext(srv), session(se), state_seq(sseq), open(s), cmapv(mv), inos(i), inotablev(iv), fin(fin_) { }
+ C_MDS_session_finish(Server *srv, Session *se, uint64_t sseq, bool s, version_t mv, interval_set<inodeno_t> i, version_t iv, Context *fin_ = NULL) :
+ ServerLogContext(srv), session(se), state_seq(sseq), open(s), cmapv(mv), inos(std::move(i)), inotablev(iv), fin(fin_) { }
+ C_MDS_session_finish(Server *srv, Session *se, uint64_t sseq, bool s, version_t mv, interval_set<inodeno_t> i, version_t iv,
+ interval_set<inodeno_t> _purge_inos, LogSegment *_ls, Context *fin_ = NULL) :
+ ServerLogContext(srv), session(se), state_seq(sseq), open(s), cmapv(mv), inos(std::move(i)), inotablev(iv), purge_inos(std::move(_purge_inos)), ls(_ls), fin(fin_){}
void finish(int r) override {
ceph_assert(r == 0);
- server->_session_logged(session, state_seq, open, cmapv, inos, inotablev);
+ server->_session_logged(session, state_seq, open, cmapv, inos, inotablev, purge_inos, ls);
if (fin) {
fin->complete(r);
}
}
void Server::_session_logged(Session *session, uint64_t state_seq, bool open, version_t pv,
- interval_set<inodeno_t>& inos, version_t piv)
+ const interval_set<inodeno_t>& inos, version_t piv,
+ const interval_set<inodeno_t>& purge_inos, LogSegment *ls)
{
- dout(10) << "_session_logged " << session->info.inst << " state_seq " << state_seq << " " << (open ? "open":"close")
- << " " << pv << dendl;
-
+ dout(10) << "_session_logged " << session->info.inst
+ << " state_seq " << state_seq
+ << " " << (open ? "open":"close")
+ << " " << pv
+ << " purge_inos : " << purge_inos << dendl;
+
+ if (NULL != ls) {
+ dout(10) << "_session_logged seq : " << ls->seq << dendl;
+ if (purge_inos.size()){
+ ls->purge_inodes.insert(purge_inos);
+ mdcache->purge_inodes(purge_inos, ls);
+ }
+ }
+
if (piv) {
ceph_assert(session->is_closing() || session->is_killing() ||
session->is_opening()); // re-open closing session
* XXX bump in the interface here, not using an MDSContext here
* because all the callers right now happen to use a SaferCond
*/
-void Server::kill_session(Session *session, Context *on_safe)
+void Server::kill_session(Session *session, Context *on_safe, bool need_purge_inos)
{
ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
session->is_stale()) &&
!session->is_importing()) {
dout(10) << "kill_session " << session << dendl;
- journal_close_session(session, Session::STATE_KILLING, on_safe);
+ journal_close_session(session, Session::STATE_KILLING, on_safe, need_purge_inos);
} else {
dout(10) << "kill_session importing or already closing/killing " << session << dendl;
if (session->is_closing() ||
return victims.size();
}
-void Server::journal_close_session(Session *session, int state, Context *on_safe)
+void Server::journal_close_session(Session *session, int state, Context *on_safe, bool need_purge_inos)
{
+ dout(10) << __func__ << " : "
+ << "("<< need_purge_inos << ")"
+ << session->info.inst
+ << "(" << session->info.prealloc_inos.size() << "|" << session->pending_prealloc_inos.size() << ")" << dendl;
+
uint64_t sseq = mds->sessionmap.set_state(session, state);
version_t pv = mds->sessionmap.mark_projected(session);
version_t piv = 0;
// release alloc and pending-alloc inos for this session
// and wipe out session state, in case the session close aborts for some reason
interval_set<inodeno_t> both;
- both.insert(session->info.prealloc_inos);
both.insert(session->pending_prealloc_inos);
+ if (!need_purge_inos)
+ both.insert(session->info.prealloc_inos);
if (both.size()) {
mds->inotable->project_release_ids(both);
piv = mds->inotable->get_projected_version();
} else
piv = 0;
-
- mdlog->start_submit_entry(new ESession(session->info.inst, false, pv, both, piv),
- new C_MDS_session_finish(this, session, sseq, false, pv, both, piv, on_safe));
+
+ if(need_purge_inos && session->info.prealloc_inos.size()) {
+ dout(10) << "start purge indoes " << session->info.prealloc_inos << dendl;
+ LogSegment* ls = mdlog->get_current_segment();
+ LogEvent* e = new ESession(session->info.inst, false, pv, both, piv, session->info.prealloc_inos);
+ MDSLogContextBase* c = new C_MDS_session_finish(this, session, sseq, false, pv, both, piv,
+ session->info.prealloc_inos, ls, on_safe);
+ mdlog->start_submit_entry(e, c);
+ } else {
+ interval_set<inodeno_t> empty;
+ LogEvent* e = new ESession(session->info.inst, false, pv, both, piv, empty);
+ MDSLogContextBase* c = new C_MDS_session_finish(this, session, sseq, false, pv, both, piv, on_safe);
+ mdlog->start_submit_entry(e, c);
+ }
mdlog->flush();
// clean up requests, too
mds->evict_client(session->get_client().v, false, true, ss,
gather.new_sub());
} else {
- kill_session(session, NULL);
+ kill_session(session, NULL, true);
}
failed_reconnects++;
if (cap && (cmode & CEPH_FILE_MODE_WR)) {
in->inode.client_ranges[client].range.first = 0;
- in->inode.client_ranges[client].range.last = in->inode.get_layout_size_increment();
+ in->inode.client_ranges[client].range.last = in->inode.layout.stripe_unit;
in->inode.client_ranges[client].follows = follows;
cap->mark_clientwriteable();
}
dout(15) << " setting a client_range too, since this is a regular file" << dendl;
newi->inode.client_ranges[client].range.first = 0;
- newi->inode.client_ranges[client].range.last = newi->inode.get_layout_size_increment();
+ newi->inode.client_ranges[client].range.last = newi->inode.layout.stripe_unit;
newi->inode.client_ranges[client].follows = follows;
cap->mark_clientwriteable();
}
void handle_client_session(const cref_t<MClientSession> &m);
void _session_logged(Session *session, uint64_t state_seq,
- bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv);
+ bool open, version_t pv, const interval_set<inodeno_t>& inos,version_t piv,
+ const interval_set<inodeno_t>& purge_inos, LogSegment *ls);
version_t prepare_force_open_sessions(map<client_t,entity_inst_t> &cm,
map<client_t,client_metadata_t>& cmm,
map<client_t,pair<Session*,uint64_t> >& smap);
void finish_flush_session(Session *session, version_t seq);
void terminate_sessions();
void find_idle_sessions();
- void kill_session(Session *session, Context *on_safe);
+ void kill_session(Session *session, Context *on_safe, bool need_purge_inos = false);
size_t apply_blacklist(const std::set<entity_addr_t> &blacklist);
- void journal_close_session(Session *session, int state, Context *on_safe);
+ void journal_close_session(Session *session, int state, Context *on_safe, bool need_purge_inos = false);
set<client_t> client_reclaim_gather;
size_t get_num_pending_reclaim() const { return client_reclaim_gather.size(); }
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MDS_EPURGE_H
+#define CEPH_MDS_EPURGE_H
+
+#include "common/config.h"
+#include "include/types.h"
+
+#include "../LogEvent.h"
+
+class EPurged : public LogEvent {
+
+ protected:
+ interval_set<inodeno_t> inos;
+ version_t inotablev{0};
+ LogSegment::seq_t seq;
+ public:
+ EPurged() : LogEvent(EVENT_PURGED) {
+ }
+ EPurged(interval_set<inodeno_t> i, version_t iv, LogSegment::seq_t _seq = 0) :
+ LogEvent(EVENT_PURGED), inos(std::move(i)), inotablev(iv), seq(_seq) {
+ }
+ void encode(bufferlist& bl, uint64_t features) const override;
+ void decode(bufferlist::const_iterator& bl) override;
+ void dump(Formatter *f) const override;
+ void print(ostream& out) const override {
+
+ if (inotablev)
+ out << "Eurge complete";
+ else
+ out << "Eurge inodes ";
+ }
+
+ void update_segment() override;
+ void replay(MDSRank *mds) override;
+};
+WRITE_CLASS_ENCODER_FEATURES(EPurged)
+
+#endif // CEPH_MDS_EPURGE_H
interval_set<inodeno_t> inos;
version_t inotablev{0};
+ interval_set<inodeno_t> purge_inos;
+
// Client metadata stored during open
client_metadata_t client_metadata;
client_inst(inst), open(o), cmapv(v), inotablev(0),
client_metadata(cm) { }
ESession(const entity_inst_t& inst, bool o, version_t v,
- const interval_set<inodeno_t>& i, version_t iv) :
+ interval_set<inodeno_t> i, version_t iv,
+ interval_set<inodeno_t> _purge_inos) :
LogEvent(EVENT_SESSION),
- client_inst(inst), open(o), cmapv(v), inos(i), inotablev(iv) { }
+ client_inst(inst), open(o), cmapv(v), inos(std::move(i)), inotablev(iv),
+ purge_inos(std::move(_purge_inos)) {}
void encode(bufferlist& bl, uint64_t features) const override;
void decode(bufferlist::const_iterator& bl) override;
#include "events/ESlaveUpdate.h"
#include "events/EOpen.h"
#include "events/ECommitted.h"
+#include "events/EPurged.h"
#include "events/EExport.h"
#include "events/EImportStart.h"
dout(10) << "try_to_expire waiting for truncate of " << **p << dendl;
(*p)->add_waiter(CInode::WAIT_TRUNC, gather_bld.new_sub());
}
+ // purge inodes
+ dout(10) << "try_to_expire waiting for purge of " << purge_inodes << dendl;
+ if (purge_inodes.size())
+ set_purged_cb(gather_bld.new_sub());
if (gather_bld.has_subs()) {
dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire waiting" << dendl;
}
}
-
// -----------------------
// EMetaBlob
ceph_assert(g_conf()->mds_kill_journal_replay_at != 4);
}
+// -----------------------
+// EPurged
+void EPurged::update_segment()
+{
+ if (inos.size() && inotablev)
+ get_segment()->inotablev = inotablev;
+ return;
+}
+
+void EPurged::replay(MDSRank *mds)
+{
+ if (inos.size()) {
+ LogSegment *ls = mds->mdlog->get_segment(seq);
+ if (ls) {
+ ls->purge_inodes.subtract(inos);
+ }
+ if (mds->inotable->get_version() >= inotablev) {
+ dout(10) << "EPurged.replay inotable " << mds->inotable->get_version()
+ << " >= " << inotablev << ", noop" << dendl;
+ } else {
+ dout(10) << "EPurged.replay inotable " << mds->inotable->get_version()
+ << " < " << inotablev << " " << dendl;
+ mds->inotable->replay_release_ids(inos);
+ assert(mds->inotable->get_version() == inotablev);
+ }
+ }
+ update_segment();
+}
+
+void EPurged::encode(bufferlist& bl, uint64_t features) const
+{
+ ENCODE_START(1, 1, bl);
+ encode(inos, bl);
+ encode(inotablev, bl);
+ encode(seq, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EPurged::decode(bufferlist::const_iterator& bl)
+{
+ DECODE_START(1, bl);
+ decode(inos, bl);
+ decode(inotablev, bl);
+ decode(seq, bl);
+ DECODE_FINISH(bl);
+}
+
+void EPurged::dump(Formatter *f) const
+{
+ f->dump_stream("inos") << inos;
+ f->dump_int("inotable version", inotablev);
+ f->dump_int("segment seq", seq);
+}
+
// -----------------------
// ESession
void ESession::replay(MDSRank *mds)
{
+ if (purge_inos.size())
+ get_segment()->purge_inodes.insert(purge_inos);
+
if (mds->sessionmap.get_version() >= cmapv) {
dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version()
<< " >= " << cmapv << ", noop" << dendl;
void ESession::encode(bufferlist &bl, uint64_t features) const
{
- ENCODE_START(5, 5, bl);
+ ENCODE_START(6, 5, bl);
encode(stamp, bl);
encode(client_inst, bl, features);
encode(open, bl);
encode(inos, bl);
encode(inotablev, bl);
encode(client_metadata, bl);
+ encode(purge_inos, bl);
ENCODE_FINISH(bl);
}
void ESession::decode(bufferlist::const_iterator &bl)
{
- DECODE_START_LEGACY_COMPAT_LEN(5, 3, 3, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(6, 3, 3, bl);
if (struct_v >= 2)
decode(stamp, bl);
decode(client_inst, bl);
} else if (struct_v >= 5) {
decode(client_metadata, bl);
}
+ if (struct_v >= 6){
+ decode(purge_inos, bl);
+ }
+
DECODE_FINISH(bl);
}
if (num_obj == 1) {
object_t oid = file_object_t(ino, first_obj);
object_locator_t oloc = OSDMap::file_to_object_locator(*layout);
+ ldout(cct, 10) << "purge_range removing " << oid << dendl;
objecter->remove(oid, oloc, snapc, mtime, flags, oncommit);
return 0;
}