// Cancel any commands for missing or laggy GIDs
std::list<ceph_tid_t> cancel_ops;
- for (std::map<ceph_tid_t, CommandOp>::iterator i = commands.begin();
- i != commands.end(); ++i) {
- const mds_gid_t op_mds_gid = i->second.mds_gid;
+ auto commands = command_table.get_commands();
+ for (const auto &i : commands) {
+ const MDSCommandOp *op = i.second;
+ const mds_gid_t op_mds_gid = op->mds_gid;
if (mdsmap->is_dne_gid(op_mds_gid) || mdsmap->is_laggy_gid(op_mds_gid)) {
- ldout(cct, 1) << __func__ << ": cancelling command op " << i->first << dendl;
- cancel_ops.push_back(i->first);
- if (i->second.outs) {
+ ldout(cct, 1) << __func__ << ": cancelling command op " << i.first << dendl;
+ cancel_ops.push_back(i.first);
+ if (op->outs) {
std::ostringstream ss;
ss << "MDS " << op_mds_gid << " went away";
- *(i->second.outs) = ss.str();
+ *(op->outs) = ss.str();
}
- i->second.con->mark_down();
- if (i->second.on_finish) {
- i->second.on_finish->complete(-ETIMEDOUT);
+ op->con->mark_down();
+ if (op->on_finish) {
+ op->on_finish->complete(-ETIMEDOUT);
}
}
}
for (std::list<ceph_tid_t>::iterator i = cancel_ops.begin();
i != cancel_ops.end(); ++i) {
- commands.erase(*i);
+ command_table.erase(*i);
}
// reset session
// Send commands to targets
C_GatherBuilder gather(cct, onfinish);
for (const auto target_gid : non_laggy) {
- ceph_tid_t tid = ++last_tid;
const auto info = fsmap->get_info_gid(target_gid);
// Open a connection to the target MDS
entity_inst_t inst = info.get_inst();
ConnectionRef conn = messenger->get_connection(inst);
- // Generate CommandOp state
- CommandOp op;
- op.tid = tid;
- op.on_finish = gather.new_sub();
- op.outbl = outbl;
- op.outs = outs;
- op.mds_gid = target_gid;
- op.con = conn;
- commands[op.tid] = op;
+ // Generate MDSCommandOp state
+ MDSCommandOp *op = command_table.start_command();
+
+ op->on_finish = gather.new_sub();
+ op->cmd = cmd;
+ op->outbl = outbl;
+ op->outs = outs;
+ op->inbl = inbl;
+ op->mds_gid = target_gid;
+ op->con = conn;
ldout(cct, 4) << __func__ << ": new command op to " << target_gid
- << " tid=" << op.tid << cmd << dendl;
+ << " tid=" << op->tid << cmd << dendl;
// Construct and send MCommand
- MCommand *m = new MCommand(monclient->get_fsid());
- m->cmd = cmd;
- m->set_data(inbl);
- m->set_tid(tid);
+ MCommand *m = op->get_message(monclient->get_fsid());
conn->send_message(m);
}
gather.activate();
ldout(cct, 10) << __func__ << ": tid=" << m->get_tid() << dendl;
- map<ceph_tid_t, CommandOp>::iterator opiter = commands.find(tid);
- if (opiter == commands.end()) {
+ MDSCommandOp *op = command_table.get_command(tid);
+ if (op == nullptr) {
ldout(cct, 1) << __func__ << ": unknown tid " << tid << ", dropping" << dendl;
m->put();
return;
}
- CommandOp const &op = opiter->second;
- if (op.outbl) {
- op.outbl->claim(m->get_data());
+ if (op->outbl) {
+ op->outbl->claim(m->get_data());
}
- if (op.outs) {
- *op.outs = m->rs;
+ if (op->outs) {
+ *op->outs = m->rs;
}
- if (op.on_finish) {
- op.on_finish->complete(m->r);
+ if (op->on_finish) {
+ op->on_finish->complete(m->r);
}
+ command_table.erase(tid);
+
m->put();
}
Mutex::Locker lock(client_lock);
vinodeno_t vino = ll_get_vino(in);
object_t oid = file_object_t(vino.ino, blockid);
- int r = 0;
- C_SaferCond cond;
+ C_SaferCond onfinish;
bufferlist bl;
objecter->read(oid,
vino.snapid,
&bl,
CEPH_OSD_FLAG_READ,
- &cond);
+ &onfinish);
client_lock.Unlock();
- r = cond.wait();
- client_lock.Lock(); // lock is going to unlock on exit.
+ int r = onfinish.wait();
+ client_lock.Lock();
if (r >= 0) {
bl.copy(0, bl.length(), buf);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef COMMAND_TABLE_H_
+#define COMMAND_TABLE_H_
+
+#include "messages/MCommand.h"
+
+class CommandOp
+{
+ public:
+ ConnectionRef con;
+ ceph_tid_t tid;
+
+ std::vector<std::string> cmd;
+ bufferlist inbl;
+ Context *on_finish;
+ bufferlist *outbl;
+ std::string *outs;
+
+ MCommand *get_message(const uuid_d &fsid) const
+ {
+ MCommand *m = new MCommand(fsid);
+ m->cmd = cmd;
+ m->set_data(inbl);
+ m->set_tid(tid);
+
+ return m;
+ }
+
+ CommandOp(const ceph_tid_t t) : tid(t) {}
+};
+
+template<typename T>
+class CommandTable
+{
+protected:
+ ceph_tid_t last_tid;
+ std::map<ceph_tid_t, T*> commands;
+
+public:
+
+ CommandTable()
+ : last_tid(0)
+ {}
+
+ T* start_command()
+ {
+ ceph_tid_t tid = last_tid++;
+ auto cmd = new T(tid);
+ commands[tid] = cmd;
+
+ return cmd;
+ }
+
+ const std::map<ceph_tid_t, T*> &get_commands() const
+ {
+ return commands;
+ }
+
+ T* get_command(ceph_tid_t tid)
+ {
+ auto result = commands.find(tid);
+ if (result == commands.end()) {
+ return nullptr;
+ } else {
+ return result->second;
+ }
+ }
+
+ void erase(ceph_tid_t tid)
+ {
+ auto ptr = commands.at(tid);
+ delete ptr;
+ commands.erase(tid);
+ }
+};
+
+#endif
+