#include "messages/MClientCapRelease.h"
#include "messages/MClientLease.h"
#include "messages/MClientSnap.h"
+#include "messages/MCommandReply.h"
#include "messages/MGenericMessage.h"
#include "mon/MonClient.h"
-#include "mds/MDSMap.h"
#include "osd/OSDMap.h"
#include "mon/MonMap.h"
objecter_finisher(m->cct),
tick_event(NULL),
monclient(mc), messenger(m), whoami(m->get_myname().num()),
- initialized(false), mounted(false), unmounting(false),
+ initialized(false), authenticated(false),
+ mounted(false), unmounting(false),
local_osd(-1), local_osd_epoch(0),
unsafe_sync_write(0),
client_lock("Client::client_lock")
case CEPH_MSG_CLIENT_LEASE:
handle_lease(static_cast<MClientLease*>(m));
break;
-
+ case MSG_COMMAND_REPLY:
+ if (m->get_source().type() == CEPH_ENTITY_TYPE_MDS) {
+ handle_command_reply(static_cast<MCommandReply*>(m));
+ } else {
+ return false;
+ }
+ break;
default:
return false;
}
mdsmap = new MDSMap;
mdsmap->decode(m->get_encoded());
+ // Cancel any commands for missing or laggy GIDs
+ std::list<ceph_tid_t> cancel_ops;
+ for (std::map<ceph_tid_t, CommandOp>::iterator i = commands.begin();
+ i != commands.end(); ++i) {
+ const mds_gid_t op_mds_gid = i->second.mds_gid;
+ if (mdsmap->is_dne_gid(op_mds_gid) || mdsmap->is_laggy_gid(op_mds_gid)) {
+ ldout(cct, 1) << __func__ << ": cancelling command op " << i->first << dendl;
+ cancel_ops.push_back(i->first);
+ if (i->second.outs) {
+ std::ostringstream ss;
+ ss << "MDS " << op_mds_gid << " went away";
+ *(i->second.outs) = ss.str();
+ }
+ i->second.con->mark_down();
+ if (i->second.on_finish) {
+ i->second.on_finish->complete(-ETIMEDOUT);
+ }
+ }
+ }
+
+ for (std::list<ceph_tid_t>::iterator i = cancel_ops.begin();
+ i != cancel_ops.end(); ++i) {
+ commands.erase(*i);
+ }
+
// reset session
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
p != mds_sessions.end();
return in->ino;
}
+
+/**
+ * Resolve an MDS spec to a list of MDS daemon GIDs.
+ *
+ * The spec is a string representing a GID, rank or name/id. It may
+ * be * in which case it matches all GIDs.
+ *
+ * If no error is returned, the `targets` vector will be populated with at least
+ * one MDS.
+ */
+int Client::resolve_mds(
+ const std::string &mds_spec,
+ std::vector<mds_gid_t> *targets)
+{
+ std::string strtol_err;
+ long long rank_or_gid = strict_strtoll(mds_spec.c_str(), 10, &strtol_err);
+ if (strtol_err.empty()) {
+ // If it parses as an integer, it's GID or a rank
+ if (rank_or_gid >= 0 && rank_or_gid < MAX_MDS) {
+ const mds_rank_t mds_rank = mds_rank_t(rank_or_gid);
+
+ if (mdsmap->is_dne(mds_rank)) {
+ lderr(cct) << __func__ << ": MDS rank " << mds_rank << " does not exist" << dendl;
+ return -ENOENT;
+ }
+
+ if (!mdsmap->is_up(mds_rank)) {
+ lderr(cct) << __func__ << ": MDS rank " << mds_rank << " is not up" << dendl;
+ return -EAGAIN;
+ }
+
+ const mds_gid_t mds_gid = mdsmap->get_info(mds_rank).global_id;
+ ldout(cct, 10) << __func__ << ": resolved rank " << mds_rank << " to GID " << mds_gid << dendl;
+ targets->push_back(mds_gid);
+ } else {
+ const mds_gid_t mds_gid = mds_gid_t(rank_or_gid);
+ if (mdsmap->is_dne_gid(mds_gid)) {
+ lderr(cct) << __func__ << ": GID " << mds_gid << " not in MDS map" << dendl;
+ return -ENOENT;
+ } else {
+ ldout(cct, 10) << __func__ << ": validated GID " << mds_gid << dendl;
+ targets->push_back(mds_gid);
+ }
+ }
+ } else if (mds_spec == "*") {
+ // It is a wildcard: use all MDSs
+ const std::map<mds_gid_t, MDSMap::mds_info_t> &mds_info = mdsmap->get_mds_info();
+
+ if (mds_info.size() == 0) {
+ lderr(cct) << __func__ << ": * passed but no MDS daemons found" << dendl;
+ return -ENOENT;
+ }
+
+ for (std::map<mds_gid_t, MDSMap::mds_info_t>::const_iterator i = mds_info.begin();
+ i != mds_info.end(); ++i) {
+ targets->push_back(i->first);
+ }
+ } else {
+ // It did not parse as an integer, it is not a wildcard, it must be a name
+ const mds_gid_t mds_gid = mdsmap->find_mds_gid_by_name(mds_spec);
+ if (mds_gid == 0) {
+ lderr(cct) << "MDS ID '" << mds_spec << "' not found" << dendl;
+ return -ENOENT;
+ } else {
+ ldout(cct, 10) << __func__ << ": resolved ID '" << mds_spec << "' to GID " << mds_gid << dendl;
+ targets->push_back(mds_gid);
+ }
+ }
+
+ return 0;
+}
+
+
+/**
+ * Authenticate with mon and establish global ID
+ */
+int Client::authenticate()
+{
+ assert(client_lock.is_locked_by_me());
+
+ if (authenticated) {
+ return 0;
+ }
+
+ client_lock.Unlock();
+ int r = monclient->authenticate(cct->_conf->client_mount_timeout);
+ client_lock.Lock();
+ if (r < 0) {
+ return r;
+ }
+
+ whoami = monclient->get_global_id();
+ messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+ authenticated = true;
+
+ return 0;
+}
+
+
+/**
+ *
+ * @mds_spec one of ID, rank, GID, "*"
+ *
+ */
+int Client::mds_command(
+ const std::string &mds_spec,
+ const vector<string>& cmd,
+ const bufferlist& inbl,
+ bufferlist *outbl,
+ string *outs,
+ Context *onfinish)
+{
+ Mutex::Locker lock(client_lock);
+
+ assert(initialized);
+
+ int r;
+ r = authenticate();
+ if (r < 0) {
+ return r;
+ }
+
+ // Block until we have an MDSMap to resolve IDs
+ if (mdsmap->get_epoch() == 0) {
+ wait_on_list(waiting_for_mdsmap);
+ }
+
+ // Look up MDS target(s) of the command
+ std::vector<mds_gid_t> targets;
+ r = resolve_mds(mds_spec, &targets);
+ if (r < 0) {
+ return r;
+ }
+
+ // If daemons are laggy, we won't send them commands. If all
+ // are laggy then we fail.
+ std::vector<mds_gid_t> non_laggy;
+ for (std::vector<mds_gid_t>::iterator target = targets.begin();
+ target != targets.end(); ++target) {
+ if (!mdsmap->is_laggy_gid(*target)) {
+ non_laggy.push_back(*target);
+ }
+ }
+ if (non_laggy.size() == 0) {
+ *outs = "All targeted MDS daemons are laggy";
+ return -ENOENT;
+ }
+
+ // Send commands to targets
+ C_GatherBuilder gather(cct, onfinish);
+ for (std::vector<mds_gid_t>::iterator target = non_laggy.begin();
+ target != non_laggy.end(); ++target) {
+ ceph_tid_t tid = ++last_tid;
+
+ // Open a connection to the target MDS
+ entity_inst_t inst = mdsmap->get_info_gid(*target).get_inst();
+ ConnectionRef conn = messenger->get_connection(inst);
+
+ // Generate CommandOp state
+ CommandOp op;
+ op.tid = tid;
+ op.on_finish = gather.new_sub();
+ op.outbl = outbl;
+ op.outs = outs;
+ op.mds_gid = *target;
+ op.con = conn;
+ commands[op.tid] = op;
+
+ ldout(cct, 4) << __func__ << ": new command op to " << *target
+ << " tid=" << op.tid << cmd << dendl;
+
+ // Construct and send MCommand
+ MCommand *m = new MCommand(monclient->get_fsid());
+ m->cmd = cmd;
+ m->set_data(inbl);
+ m->set_tid(tid);
+ conn->send_message(m);
+ }
+ gather.activate();
+
+ return 0;
+}
+
+void Client::handle_command_reply(MCommandReply *m)
+{
+ ceph_tid_t const tid = m->get_tid();
+
+ ldout(cct, 10) << __func__ << ": tid=" << m->get_tid() << dendl;
+
+ map<ceph_tid_t, CommandOp>::iterator opiter = commands.find(tid);
+ if (opiter == commands.end()) {
+ ldout(cct, 1) << __func__ << ": unknown tid " << tid << ", dropping" << dendl;
+ m->put();
+ return;
+ }
+
+ CommandOp const &op = opiter->second;
+ if (op.outbl) {
+ op.outbl->claim(m->get_data());
+ }
+ if (op.outs) {
+ *op.outs = m->rs;
+ }
+
+ op.con->mark_down();
+
+ if (op.on_finish) {
+ op.on_finish->complete(m->r);
+ }
+
+ m->put();
+}
+
// -------------------
// MOUNT
return 0;
}
- client_lock.Unlock();
- int r = monclient->authenticate(cct->_conf->client_mount_timeout);
- client_lock.Lock();
- if (r < 0)
+ int r = authenticate();
+ if (r < 0) {
return r;
-
- whoami = monclient->get_global_id();
- messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+ }
mounted = true;
//#include "barrier.h"
#include "mds/mdstypes.h"
+#include "mds/MDSMap.h"
#include "msg/Message.h"
#include "msg/Dispatcher.h"
};
+struct CommandOp
+{
+ ConnectionRef con;
+ mds_gid_t mds_gid;
+ ceph_tid_t tid;
+ Context *on_finish;
+ bufferlist *outbl;
+ std::string *outs;
+};
+
// ============================================
// types for my local metadata cache
map<mds_rank_t, MetaSession*> mds_sessions; // mds -> push seq
list<Cond*> waiting_for_mdsmap;
+ // MDS command state
+ std::map<ceph_tid_t, CommandOp> commands;
+ void handle_command_reply(MCommandReply *m);
+ int resolve_mds(
+ const std::string &mds_spec,
+ std::vector<mds_gid_t> *targets);
+
void get_session_metadata(std::map<std::string, std::string> *meta) const;
bool have_open_session(mds_rank_t mds);
void got_mds_push(MetaSession *s);
void handle_client_reply(MClientReply *reply);
bool initialized;
+ bool authenticated;
bool mounted;
bool unmounting;
void ms_handle_remote_reset(Connection *con);
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
+ int authenticate();
public:
void set_filer_flags(int flags);
int mount(const std::string &mount_root);
void unmount();
+ int mds_command(
+ const std::string &mds_spec,
+ const std::vector<std::string>& cmd,
+ const bufferlist& inbl,
+ bufferlist *poutbl, std::string *prs, Context *onfinish);
+
// these shoud (more or less) mirror the actual system calls.
int statfs(const char *path, struct statvfs *stbuf);