#include "CDir.h"
#include "MDCache.h"
#include "Migrator.h"
+#include "Mantle.h"
#include "include/Context.h"
#include "msg/Messenger.h"
if (mds_load.size() == cluster_size) {
// let's go!
//export_empties(); // no!
+
+ int r = mantle_prep_rebalance();
+ if (!r) {
+ mds->clog->info() << "mantle succeeded; "
+ << "balancer=" << mds->mdsmap->get_balancer();
+ return;
+ }
+
+ mds->clog->warn() << "mantle failed (falling back to original balancer); "
+ << "balancer=" << mds->mdsmap->get_balancer()
+ << " : " << cpp_strerror(r);
prep_rebalance(m->get_beat());
}
}
+int MDBalancer::mantle_prep_rebalance()
+{
+ /* hard-code lua balancer */
+ string script = "BAL_LOG(0, \"I am mds \"..whoami)\n return {11, 12, 3}";
+
+ /* prepare for balancing */
+ int cluster_size = mds->get_mds_map()->get_num_in_mds();
+ rebalance_time = ceph_clock_now(g_ceph_context);
+ my_targets.clear();
+ imported.clear();
+ exported.clear();
+ mds->mdcache->migrator->clear_export_queue();
+
+ /* fill in the metrics for each mds by grabbing load struct */
+ vector < map<string, double> > metrics (cluster_size);
+ for (mds_rank_t i=mds_rank_t(0);
+ i < mds_rank_t(cluster_size);
+ i++) {
+ map<mds_rank_t, mds_load_t>::value_type val(i, mds_load_t(ceph_clock_now(g_ceph_context)));
+ std::pair < map<mds_rank_t, mds_load_t>::iterator, bool > r(mds_load.insert(val));
+ mds_load_t &load(r.first->second);
+
+ metrics[i].insert(make_pair("auth.meta_load", load.auth.meta_load()));
+ metrics[i].insert(make_pair("all.meta_load", load.all.meta_load()));
+ metrics[i].insert(make_pair("req_rate", load.req_rate));
+ metrics[i].insert(make_pair("queue_len", load.queue_len));
+ metrics[i].insert(make_pair("cpu_load_avg", load.cpu_load_avg));
+ }
+
+ /* execute the balancer */
+ Mantle *mantle = new Mantle();
+ int ret = mantle->balance(script, mds->get_nodeid(), metrics, my_targets);
+ delete mantle;
+ dout(2) << " mantle decided that new targets=" << my_targets << dendl;
+
+ /* mantle doesn't know about cluster size, so check target len here */
+ if ((int) my_targets.size() != cluster_size)
+ return -EINVAL;
+ else if (ret)
+ return ret;
+
+ try_rebalance();
+ return 0;
+}
+
+
+
void MDBalancer::try_rebalance()
{
if (!check_targets())
--- /dev/null
+#include "mdstypes.h"
+#include "MDSRank.h"
+#include "Mantle.h"
+#include "msg/Messenger.h"
+
+#include <fstream>
+
+#define dout_subsys ceph_subsys_mds_balancer
+#undef DOUT_COND
+#define DOUT_COND(cct, l) l<=cct->_conf->debug_mds || l <= cct->_conf->debug_mds_balancer
+#undef dout_prefix
+#define dout_prefix *_dout << "mds.mantle "
+
+int dout_wrapper(lua_State *L)
+{
+ #undef dout_prefix
+ #define dout_prefix *_dout << "lua.balancer "
+
+ /* Lua indexes the stack from the bottom up */
+ int bottom = -1 * lua_gettop(L);
+ if (!lua_isinteger(L, bottom) || bottom == 0) {
+ dout(0) << "WARNING: BAL_LOG has no message" << dendl;
+ return -EINVAL;
+ }
+
+ /* bottom of the stack is the log level */
+ int level = lua_tointeger(L, bottom);
+
+ /* rest of the stack is the message */
+ string s = "";
+ for (int i = bottom + 1; i < 0; i++)
+ lua_isstring(L, i) ? s.append(lua_tostring(L, i)) : s.append("<empty>");
+
+ dout(level) << s << dendl;
+ return 0;
+}
+
+int Mantle::start()
+{
+ /* build lua vm state */
+ L = luaL_newstate();
+ if (!L) {
+ dout(0) << "WARNING: mantle could not load Lua state" << dendl;
+ return -ENOEXEC;
+ }
+
+ /* balancer policies can use basic Lua functions */
+ luaopen_base(L);
+
+ /* setup debugging */
+ lua_register(L, "BAL_LOG", dout_wrapper);
+
+ return 0;
+}
+
+int Mantle::execute(string script)
+{
+ if (L == NULL) {
+ dout(0) << "ERROR: mantle was not started" << dendl;
+ return -ENOENT;
+ }
+
+ /* load the balancer */
+ if (luaL_loadstring(L, script.c_str())) {
+ dout(0) << "WARNING: mantle could not load balancer: "
+ << lua_tostring(L, -1) << dendl;
+ return -EINVAL;
+ }
+
+ /* compile/execute balancer */
+ int ret = lua_pcall(L, 0, LUA_MULTRET, 0);
+
+ if (ret) {
+ dout(0) << "WARNING: mantle could not execute script: "
+ << lua_tostring(L, -1) << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int Mantle::balance(string script,
+ mds_rank_t whoami,
+ vector < map<string, double> > metrics,
+ map<mds_rank_t,double> &my_targets)
+{
+ if (start() != 0)
+ return -ENOEXEC;
+
+ /* tell the balancer which mds is making the decision */
+ lua_pushinteger(L, int(whoami));
+ lua_setfield(L, -2, "whoami");
+
+ /* global mds metrics to hold all dictionaries */
+ lua_newtable(L);
+
+ /* push name of mds (i) and its metrics onto Lua stack */
+ for (unsigned i=0; i < metrics.size(); i++) {
+ lua_pushinteger(L, i);
+ lua_newtable(L);
+
+ /* push values into this mds's table; setfield assigns key/pops val */
+ for (map<string, double>::iterator it = metrics[i].begin();
+ it != metrics[i].end();
+ it++) {
+ lua_pushnumber(L, it->second);
+ lua_setfield(L, -2, it->first.c_str());
+ }
+
+ /* in global mds table at stack[-3], set k=stack[-1] to v=stack[-2] */
+ lua_rawset(L, -3);
+ }
+
+ /* set the name of the global mds table */
+ lua_setglobal(L, "mds");
+
+ int ret = execute(script);
+ if (ret != 0) {
+ lua_close(L);
+ return ret;
+ }
+
+ /* parse response by iterating over Lua stack */
+ if (lua_istable(L, -1) == 0) {
+ dout(0) << "WARNING: mantle script returned a malformed response" << dendl;
+ lua_close(L);
+ return -EINVAL;
+ }
+
+ /* fill in return value */
+ mds_rank_t it = mds_rank_t(0);
+ lua_pushnil(L);
+ while (lua_next(L, -2) != 0) {
+ if (!lua_isnumber(L, -1)) {
+ dout(0) << "WARNING: mantle script returned a malformed response" << dendl;
+ lua_close(L);
+ return -EINVAL;
+ }
+ my_targets[it] = (lua_tonumber(L, -1));
+ lua_pop(L, 1);
+ it++;
+ }
+
+ lua_close(L);
+ return 0;
+}