From: John Spray Date: Thu, 18 Sep 2014 17:56:42 +0000 (+0100) Subject: client: add mds_command operation X-Git-Tag: v0.88~97^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=688396ac652bdd22c0092f2860605d2907b2e193;p=ceph.git client: add mds_command operation akin to Objecter::osd_command Signed-off-by: John Spray --- diff --git a/src/client/Client.cc b/src/client/Client.cc index b0092d8816eaf..fa79829e8db73 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -47,6 +47,7 @@ using namespace std; #include "messages/MClientCapRelease.h" #include "messages/MClientLease.h" #include "messages/MClientSnap.h" +#include "messages/MCommandReply.h" #include "messages/MGenericMessage.h" @@ -54,7 +55,6 @@ using namespace std; #include "mon/MonClient.h" -#include "mds/MDSMap.h" #include "osd/OSDMap.h" #include "mon/MonMap.h" @@ -166,7 +166,8 @@ Client::Client(Messenger *m, MonClient *mc) objecter_finisher(m->cct), tick_event(NULL), monclient(mc), messenger(m), whoami(m->get_myname().num()), - initialized(false), mounted(false), unmounting(false), + initialized(false), authenticated(false), + mounted(false), unmounting(false), local_osd(-1), local_osd_epoch(0), unsafe_sync_write(0), client_lock("Client::client_lock") @@ -2006,7 +2007,13 @@ bool Client::ms_dispatch(Message *m) case CEPH_MSG_CLIENT_LEASE: handle_lease(static_cast(m)); break; - + case MSG_COMMAND_REPLY: + if (m->get_source().type() == CEPH_ENTITY_TYPE_MDS) { + handle_command_reply(static_cast(m)); + } else { + return false; + } + break; default: return false; } @@ -2045,6 +2052,31 @@ void Client::handle_mds_map(MMDSMap* m) mdsmap = new MDSMap; mdsmap->decode(m->get_encoded()); + // Cancel any commands for missing or laggy GIDs + std::list cancel_ops; + for (std::map::iterator i = commands.begin(); + i != commands.end(); ++i) { + const mds_gid_t op_mds_gid = i->second.mds_gid; + if (mdsmap->is_dne_gid(op_mds_gid) || mdsmap->is_laggy_gid(op_mds_gid)) { + ldout(cct, 1) << __func__ << ": cancelling command op " << i->first << dendl; + cancel_ops.push_back(i->first); + if (i->second.outs) { + std::ostringstream ss; + ss << "MDS " << op_mds_gid << " went away"; + *(i->second.outs) = ss.str(); + } + i->second.con->mark_down(); + if (i->second.on_finish) { + i->second.on_finish->complete(-ETIMEDOUT); + } + } + } + + for (std::list::iterator i = cancel_ops.begin(); + i != cancel_ops.end(); ++i) { + commands.erase(*i); + } + // reset session for (map::iterator p = mds_sessions.begin(); p != mds_sessions.end(); @@ -4062,6 +4094,219 @@ inodeno_t Client::_get_inodeno(Inode *in) return in->ino; } + +/** + * Resolve an MDS spec to a list of MDS daemon GIDs. + * + * The spec is a string representing a GID, rank or name/id. It may + * be * in which case it matches all GIDs. + * + * If no error is returned, the `targets` vector will be populated with at least + * one MDS. + */ +int Client::resolve_mds( + const std::string &mds_spec, + std::vector *targets) +{ + std::string strtol_err; + long long rank_or_gid = strict_strtoll(mds_spec.c_str(), 10, &strtol_err); + if (strtol_err.empty()) { + // If it parses as an integer, it's GID or a rank + if (rank_or_gid >= 0 && rank_or_gid < MAX_MDS) { + const mds_rank_t mds_rank = mds_rank_t(rank_or_gid); + + if (mdsmap->is_dne(mds_rank)) { + lderr(cct) << __func__ << ": MDS rank " << mds_rank << " does not exist" << dendl; + return -ENOENT; + } + + if (!mdsmap->is_up(mds_rank)) { + lderr(cct) << __func__ << ": MDS rank " << mds_rank << " is not up" << dendl; + return -EAGAIN; + } + + const mds_gid_t mds_gid = mdsmap->get_info(mds_rank).global_id; + ldout(cct, 10) << __func__ << ": resolved rank " << mds_rank << " to GID " << mds_gid << dendl; + targets->push_back(mds_gid); + } else { + const mds_gid_t mds_gid = mds_gid_t(rank_or_gid); + if (mdsmap->is_dne_gid(mds_gid)) { + lderr(cct) << __func__ << ": GID " << mds_gid << " not in MDS map" << dendl; + return -ENOENT; + } else { + ldout(cct, 10) << __func__ << ": validated GID " << mds_gid << dendl; + targets->push_back(mds_gid); + } + } + } else if (mds_spec == "*") { + // It is a wildcard: use all MDSs + const std::map &mds_info = mdsmap->get_mds_info(); + + if (mds_info.size() == 0) { + lderr(cct) << __func__ << ": * passed but no MDS daemons found" << dendl; + return -ENOENT; + } + + for (std::map::const_iterator i = mds_info.begin(); + i != mds_info.end(); ++i) { + targets->push_back(i->first); + } + } else { + // It did not parse as an integer, it is not a wildcard, it must be a name + const mds_gid_t mds_gid = mdsmap->find_mds_gid_by_name(mds_spec); + if (mds_gid == 0) { + lderr(cct) << "MDS ID '" << mds_spec << "' not found" << dendl; + return -ENOENT; + } else { + ldout(cct, 10) << __func__ << ": resolved ID '" << mds_spec << "' to GID " << mds_gid << dendl; + targets->push_back(mds_gid); + } + } + + return 0; +} + + +/** + * Authenticate with mon and establish global ID + */ +int Client::authenticate() +{ + assert(client_lock.is_locked_by_me()); + + if (authenticated) { + return 0; + } + + client_lock.Unlock(); + int r = monclient->authenticate(cct->_conf->client_mount_timeout); + client_lock.Lock(); + if (r < 0) { + return r; + } + + whoami = monclient->get_global_id(); + messenger->set_myname(entity_name_t::CLIENT(whoami.v)); + authenticated = true; + + return 0; +} + + +/** + * + * @mds_spec one of ID, rank, GID, "*" + * + */ +int Client::mds_command( + const std::string &mds_spec, + const vector& cmd, + const bufferlist& inbl, + bufferlist *outbl, + string *outs, + Context *onfinish) +{ + Mutex::Locker lock(client_lock); + + assert(initialized); + + int r; + r = authenticate(); + if (r < 0) { + return r; + } + + // Block until we have an MDSMap to resolve IDs + if (mdsmap->get_epoch() == 0) { + wait_on_list(waiting_for_mdsmap); + } + + // Look up MDS target(s) of the command + std::vector targets; + r = resolve_mds(mds_spec, &targets); + if (r < 0) { + return r; + } + + // If daemons are laggy, we won't send them commands. If all + // are laggy then we fail. + std::vector non_laggy; + for (std::vector::iterator target = targets.begin(); + target != targets.end(); ++target) { + if (!mdsmap->is_laggy_gid(*target)) { + non_laggy.push_back(*target); + } + } + if (non_laggy.size() == 0) { + *outs = "All targeted MDS daemons are laggy"; + return -ENOENT; + } + + // Send commands to targets + C_GatherBuilder gather(cct, onfinish); + for (std::vector::iterator target = non_laggy.begin(); + target != non_laggy.end(); ++target) { + ceph_tid_t tid = ++last_tid; + + // Open a connection to the target MDS + entity_inst_t inst = mdsmap->get_info_gid(*target).get_inst(); + ConnectionRef conn = messenger->get_connection(inst); + + // Generate CommandOp state + CommandOp op; + op.tid = tid; + op.on_finish = gather.new_sub(); + op.outbl = outbl; + op.outs = outs; + op.mds_gid = *target; + op.con = conn; + commands[op.tid] = op; + + ldout(cct, 4) << __func__ << ": new command op to " << *target + << " tid=" << op.tid << cmd << dendl; + + // Construct and send MCommand + MCommand *m = new MCommand(monclient->get_fsid()); + m->cmd = cmd; + m->set_data(inbl); + m->set_tid(tid); + conn->send_message(m); + } + gather.activate(); + + return 0; +} + +void Client::handle_command_reply(MCommandReply *m) +{ + ceph_tid_t const tid = m->get_tid(); + + ldout(cct, 10) << __func__ << ": tid=" << m->get_tid() << dendl; + + map::iterator opiter = commands.find(tid); + if (opiter == commands.end()) { + ldout(cct, 1) << __func__ << ": unknown tid " << tid << ", dropping" << dendl; + m->put(); + return; + } + + CommandOp const &op = opiter->second; + if (op.outbl) { + op.outbl->claim(m->get_data()); + } + if (op.outs) { + *op.outs = m->rs; + } + + op.con->mark_down(); + + if (op.on_finish) { + op.on_finish->complete(m->r); + } + + m->put(); +} + // ------------------- // MOUNT @@ -4074,14 +4319,10 @@ int Client::mount(const std::string &mount_root) return 0; } - client_lock.Unlock(); - int r = monclient->authenticate(cct->_conf->client_mount_timeout); - client_lock.Lock(); - if (r < 0) + int r = authenticate(); + if (r < 0) { return r; - - whoami = monclient->get_global_id(); - messenger->set_myname(entity_name_t::CLIENT(whoami.v)); + } mounted = true; diff --git a/src/client/Client.h b/src/client/Client.h index 1e0bcc7f2de8a..eddf4c00a20f8 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -38,6 +38,7 @@ using std::fstream; //#include "barrier.h" #include "mds/mdstypes.h" +#include "mds/MDSMap.h" #include "msg/Message.h" #include "msg/Dispatcher.h" @@ -86,6 +87,16 @@ enum { }; +struct CommandOp +{ + ConnectionRef con; + mds_gid_t mds_gid; + ceph_tid_t tid; + Context *on_finish; + bufferlist *outbl; + std::string *outs; +}; + // ============================================ // types for my local metadata cache @@ -242,6 +253,13 @@ public: map mds_sessions; // mds -> push seq list waiting_for_mdsmap; + // MDS command state + std::map commands; + void handle_command_reply(MCommandReply *m); + int resolve_mds( + const std::string &mds_spec, + std::vector *targets); + void get_session_metadata(std::map *meta) const; bool have_open_session(mds_rank_t mds); void got_mds_push(MetaSession *s); @@ -286,6 +304,7 @@ public: void handle_client_reply(MClientReply *reply); bool initialized; + bool authenticated; bool mounted; bool unmounting; @@ -424,6 +443,7 @@ protected: void ms_handle_remote_reset(Connection *con); bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); + int authenticate(); public: void set_filer_flags(int flags); @@ -670,6 +690,12 @@ public: int mount(const std::string &mount_root); void unmount(); + int mds_command( + const std::string &mds_spec, + const std::vector& cmd, + const bufferlist& inbl, + bufferlist *poutbl, std::string *prs, Context *onfinish); + // these shoud (more or less) mirror the actual system calls. int statfs(const char *path, struct statvfs *stbuf);