MAR 1
+
+
- aged object stores on googoo
- confirm block dev versus big file
mpiexec -l -n 30 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 65536 --nummds 1 --numclient 100 --numosd 6 --kill_after 300 --file_layout_num_rep 1 --debug_after 110 --debug_osd 15 --debug_filer 15 --debug 5 --mds_shutdown_check 60 --log_name osd/write_sizes.sdb2.ebo.file2/fs=ebofs,writefile_size=65536
+
+
+
- mds
- vary log stripe size, count (on ebofs and fakestore)
? 4k for ebofs, 64k for fakestore
-// -- load balancing stuff --
-
-
-
-
-// popularity
-#define MDS_POP_JUSTME 0 // just me
-#define MDS_POP_NESTED 1 // me + children, auth or not
-#define MDS_POP_CURDOM 2 // me + children in current domain
-#define MDS_POP_ANYDOM 3 // me + children in any (nested) domain
-#define MDS_POP_DIRMOD 4 // just this dir, modifications only
-#define MDS_NPOP 5
-
-class mds_load_t {
- public:
- double root_pop;
- double req_rate, rd_rate, wr_rate;
- double cache_hit_rate;
-
- mds_load_t() :
- root_pop(0), req_rate(0), rd_rate(0), wr_rate(0), cache_hit_rate(0) { }
-
-};
-
// -- io helpers --
set<int> dir_rep_by; // if dir_rep == CDIR_REP_LIST
// popularity
- DecayCounter popularity[MDS_NPOP];
+ meta_load_t popularity[MDS_NPOP];
// friends
friend class CInode;
// for giving to clients
void get_dist_spec(set<int>& ls, int auth) {
- if (( popularity[MDS_POP_CURDOM].get() > g_conf.mds_bal_replicate_threshold)) {
+ if (( popularity[MDS_POP_CURDOM].pop[META_POP_RD].get() > g_conf.mds_bal_replicate_threshold)) {
//if (!cached_by.empty() && inode.ino > 1) dout(1) << "distributed spec for " << *this << endl;
ls = open_by;
if (!ls.empty()) ls.insert(auth);
__uint64_t nden; // num dentries (including null ones)
__uint64_t version;
unsigned state;
- DecayCounter popularity_justme;
- DecayCounter popularity_curdom;
+ meta_load_t popularity_justme;
+ meta_load_t popularity_curdom;
int dir_auth;
int dir_rep;
int nopen_by;
st.popularity_justme.take( dir->popularity[MDS_POP_JUSTME] );
st.popularity_curdom.take( dir->popularity[MDS_POP_CURDOM] );
- dir->popularity[MDS_POP_ANYDOM].adjust_down(st.popularity_curdom);
+ dir->popularity[MDS_POP_ANYDOM] -= st.popularity_curdom;
+ dir->popularity[MDS_POP_NESTED] -= st.popularity_curdom;
rep_by = dir->dir_rep_by;
open_by = dir->open_by;
dir->dir_auth = st.dir_auth;
dir->dir_rep = st.dir_rep;
- double newcurdom = st.popularity_curdom.get() - dir->popularity[MDS_POP_CURDOM].get();
- dir->popularity[MDS_POP_JUSTME].take( st.popularity_justme );
- dir->popularity[MDS_POP_CURDOM].take( st.popularity_curdom );
- dir->popularity[MDS_POP_ANYDOM].adjust(newcurdom);
+ dir->popularity[MDS_POP_JUSTME] += st.popularity_justme;
+ dir->popularity[MDS_POP_CURDOM] += st.popularity_curdom;
+ dir->popularity[MDS_POP_ANYDOM] += st.popularity_curdom;
+ dir->popularity[MDS_POP_NESTED] += st.popularity_curdom;
dir->replica_nonce = 0; // no longer defined
#include "Lock.h"
#include "Capability.h"
+#include "mdstypes.h"
+
#include <cassert>
#include <list>
#include <vector>
+
+
// pins for keeping an item in cache (and debugging)
#define CINODE_PIN_DIR 0
#define CINODE_PIN_CACHED 1
int nested_auth_pins;
public:
- DecayCounter popularity[MDS_NPOP];
+ meta_load_t popularity[MDS_NPOP];
// friends
friend class MDCache;
int w = 0;
for (map<int,Capability>::iterator it = client_caps.begin();
it != client_caps.end();
- it++)
+ it++) {
w |= it->second.wanted();
+ //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
+ }
if (is_auth())
for (map<int,int>::iterator it = mds_caps_wanted.begin();
it != mds_caps_wanted.end();
- it++)
+ it++) {
w |= it->second;
+ //cout << " get_caps_wanted mds " << it->first << " " << cap_string(it->second) << endl;
+ }
return w;
}
typedef struct {
inode_t inode;
__uint64_t version;
- DecayCounter popularity_justme;
- DecayCounter popularity_curdom;
+ meta_load_t popularity_justme;
+ meta_load_t popularity_curdom;
bool is_dirty; // dirty inode?
int ncached_by; // int pairs follow
st.popularity_justme.take( in->popularity[MDS_POP_JUSTME] );
st.popularity_curdom.take( in->popularity[MDS_POP_CURDOM] );
- in->popularity[MDS_POP_ANYDOM].adjust_down(st.popularity_curdom);
+ in->popularity[MDS_POP_ANYDOM] -= st.popularity_curdom;
+ in->popularity[MDS_POP_NESTED] -= st.popularity_curdom;
// steal WRITER caps from inode
in->take_client_caps(cap_map);
in->version = st.version;
- double newcurdom = st.popularity_curdom.get() - in->popularity[MDS_POP_CURDOM].get();
- in->popularity[MDS_POP_JUSTME].take( st.popularity_justme );
- in->popularity[MDS_POP_CURDOM].take( st.popularity_curdom );
- in->popularity[MDS_POP_ANYDOM].adjust(newcurdom);
+ in->popularity[MDS_POP_JUSTME] += st.popularity_justme;
+ in->popularity[MDS_POP_CURDOM] += st.popularity_curdom;
+ in->popularity[MDS_POP_ANYDOM] += st.popularity_curdom;
+ in->popularity[MDS_POP_NESTED] += st.popularity_curdom;
if (st.is_dirty) {
in->mark_dirty();
*
*/
-
-
+#include "mdstypes.h"
#include "MDBalancer.h"
#include "MDS.h"
#define MIN_REEXPORT 5 // will automatically reexport
#define MIN_OFFLOAD 10 // point at which i stop trying, close enough
-ostream& operator<<( ostream& out, mds_load_t& load )
-{
- return out << "load<" << load.root_pop << "," << load.req_rate << "," << load.rd_rate << "," << load.wr_rate << ">";
-}
-
-mds_load_t& operator+=( mds_load_t& l, mds_load_t& r )
-{
- l.root_pop += r.root_pop;
- l.req_rate += r.req_rate;
- return l;
-}
-
-mds_load_t operator/( mds_load_t& a, double d )
-{
- mds_load_t r;
- r.root_pop = a.root_pop / d;
- r.req_rate = a.req_rate / d;
- return r;
-}
-
int MDBalancer::proc_message(Message *m)
{
{
mds_load_t load;
if (mds->mdcache->get_root())
- load.root_pop =
- mds->mdcache->get_root()->popularity[MDS_POP_ANYDOM].get() +
- mds->mdcache->get_root()->popularity[MDS_POP_NESTED].get();
- else
- load.root_pop = 0;
+ load.root =
+ //mds->mdcache->get_root()->popularity[MDS_POP_ANYDOM] +
+ mds->mdcache->get_root()->popularity[MDS_POP_NESTED];
+
+ load.queue_len = mds->messenger->get_dispatch_queue_len();
return load;
}
CDir *im = *it;
if (im->inode->is_root()) continue;
int from = im->inode->authority();
- import_map[from] += im->popularity[MDS_POP_CURDOM].get();
+ import_map[from] += im->popularity[MDS_POP_CURDOM].meta_load();
}
mds_import_map[ mds->get_nodeid() ] = import_map;
dout(5) << " do_rebalance: cluster loads are" << endl;
- mds_load_t total_load;
+ double total_load;
multimap<double,int> load_map;
for (int i=0; i<cluster_size; i++) {
- dout(5) << " mds" << i << " load " << mds_load[i] << endl;
- total_load += mds_load[i];
+ double l = mds_load[i].mds_load();
+ dout(5) << " mds" << i << " load " << mds_load[i] << " -> " << l << endl;
+ total_load += l;
- load_map.insert(pair<double,int>( mds_load[i].root_pop, i ));
+ load_map.insert(pair<double,int>( l, i ));
}
dout(5) << " total load " << total_load << endl;
// target load
target_load = total_load / (double)cluster_size;
dout(5) << " target load " << target_load << endl;
-
+
// under or over?
- if (mds_load[whoami].root_pop < target_load.root_pop) {
+ if (mds_load[whoami].mds_load() < target_load) {
dout(5) << " i am underloaded, doing nothing." << endl;
show_imports();
return;
for (multimap<double,int>::iterator it = load_map.begin();
it != load_map.end();
it++) {
- if (it->first < target_load.root_pop) {
+ if (it->first < target_load) {
//dout(5) << " mds" << it->second << " is importer" << endl;
importers.insert(pair<double,int>(it->first,it->second));
importer_set.insert(it->second);
for (set<CDir*>::iterator it = mds->mdcache->imports.begin();
it != mds->mdcache->imports.end();
it++) {
- double pop = (*it)->popularity[MDS_POP_CURDOM].get();
+ double pop = (*it)->popularity[MDS_POP_CURDOM].meta_load();
if (pop < g_conf.mds_bal_idle_threshold &&
(*it)->inode != mds->mdcache->get_root()) {
dout(5) << " exporting idle import " << **it << endl;
if (dir->inode->is_root()) continue;
if (dir->is_freezing() || dir->is_frozen()) continue; // export pbly already in progress
- double pop = dir->popularity[MDS_POP_CURDOM].get();
+ double pop = dir->popularity[MDS_POP_CURDOM].meta_load();
assert(dir->inode->authority() == target); // cuz that's how i put it in the map, dummy
if (pop <= amount) {
}
for (list<CDir*>::iterator it = exports.begin(); it != exports.end(); it++) {
- dout(5) << " exporting fragment " << **it << " pop " << (*it)->popularity[MDS_POP_CURDOM].get() << endl;
+ dout(5) << " exporting fragment " << **it << " pop " << (*it)->popularity[MDS_POP_CURDOM].meta_load() << endl;
mds->mdcache->export_dir(*it, target);
}
}
if (in->dir->get_size() == 0) continue; // don't export empty dirs, even if they're not complete. for now!
// how popular?
- double pop = in->dir->popularity[MDS_POP_CURDOM].get();
+ double pop = in->dir->popularity[MDS_POP_CURDOM].meta_load();
//cout << " in " << in->inode.ino << " " << pop << endl;
if (pop < minchunk) continue;
-void MDBalancer::hit_inode(CInode *in)
+void MDBalancer::hit_inode(CInode *in, int type)
{
// hit me
- in->popularity[MDS_POP_JUSTME].hit();
- in->popularity[MDS_POP_NESTED].hit();
+ in->popularity[MDS_POP_JUSTME].pop[type].hit();
+ in->popularity[MDS_POP_NESTED].pop[type].hit();
if (in->is_auth()) {
- in->popularity[MDS_POP_CURDOM].hit();
- in->popularity[MDS_POP_ANYDOM].hit();
+ in->popularity[MDS_POP_CURDOM].pop[type].hit();
+ in->popularity[MDS_POP_ANYDOM].pop[type].hit();
}
// hit auth up to import
CDir *dir = in->get_parent_dir();
- if (dir) hit_recursive(dir);
+ if (dir) hit_recursive(dir, type);
}
-void MDBalancer::hit_dir(CDir *dir, bool modify)
+void MDBalancer::hit_dir(CDir *dir, int type)
{
// hit me
- dir->popularity[MDS_POP_JUSTME].hit();
+ float v = dir->popularity[MDS_POP_JUSTME].pop[type].hit();
// hit modify counter, if this was a modify
- if (modify) {
- float p = dir->popularity[MDS_POP_DIRMOD].hit();
-
- if (g_conf.num_mds > 1 &&
- dir->is_auth()) {
- // hash this dir? (later?)
- if (p > g_conf.mds_bal_hash_threshold &&
- !(dir->is_hashed() || dir->is_hashing()) &&
- hash_queue.count(dir->ino()) == 0) {
- dout(0) << "hit_dir DIRMOD pop is " << p << ", putting in hash_queue: " << *dir << endl;
- hash_queue.insert(dir->ino());
- }
+ if (type == META_POP_WR &&
+ g_conf.num_mds > 1 &&
+ dir->is_auth()) {
+ // hash this dir? (later?)
+ if (v > g_conf.mds_bal_hash_threshold &&
+ !(dir->is_hashed() || dir->is_hashing()) &&
+ hash_queue.count(dir->ino()) == 0) {
+ dout(0) << "hit_dir WR pop is " << v << ", putting in hash_queue: " << *dir << endl;
+ hash_queue.insert(dir->ino());
}
}
-
- hit_recursive(dir);
+
+ hit_recursive(dir, type);
}
-void MDBalancer::hit_recursive(CDir *dir)
+void MDBalancer::hit_recursive(CDir *dir, int type)
{
bool anydom = dir->is_auth();
bool curdom = dir->is_auth();
// replicate?
- float dir_pop = dir->popularity[MDS_POP_CURDOM].get(); // hmm??
+ float dir_pop = dir->popularity[MDS_POP_CURDOM].pop[type].get(); // hmm??
if (dir->is_auth()) {
if (!dir->is_rep() &&
while (dir) {
CInode *in = dir->inode;
- dir->popularity[MDS_POP_NESTED].hit();
- in->popularity[MDS_POP_NESTED].hit();
+ dir->popularity[MDS_POP_NESTED].pop[type].hit();
+ in->popularity[MDS_POP_NESTED].pop[type].hit();
if (anydom) {
- dir->popularity[MDS_POP_ANYDOM].hit();
- in->popularity[MDS_POP_ANYDOM].hit();
+ dir->popularity[MDS_POP_ANYDOM].pop[type].hit();
+ in->popularity[MDS_POP_ANYDOM].pop[type].hit();
}
if (curdom) {
- dir->popularity[MDS_POP_CURDOM].hit();
- in->popularity[MDS_POP_CURDOM].hit();
+ dir->popularity[MDS_POP_CURDOM].pop[type].hit();
+ in->popularity[MDS_POP_CURDOM].pop[type].hit();
}
if (dir->is_import())
*/
void MDBalancer::subtract_export(CDir *dir)
{
- double curdom = -dir->popularity[MDS_POP_CURDOM].get();
-
+ meta_load_t curdom = dir->popularity[MDS_POP_CURDOM];
+
bool in_domain = !dir->is_import();
while (true) {
CInode *in = dir->inode;
- in->popularity[MDS_POP_ANYDOM].adjust(curdom);
- if (in_domain) in->popularity[MDS_POP_CURDOM].adjust(curdom);
+ in->popularity[MDS_POP_ANYDOM] -= curdom;
+ if (in_domain) in->popularity[MDS_POP_CURDOM] -= curdom;
dir = in->get_parent_dir();
if (!dir) break;
if (dir->is_import()) in_domain = false;
- dir->popularity[MDS_POP_ANYDOM].adjust(curdom);
- if (in_domain) dir->popularity[MDS_POP_CURDOM].adjust(curdom);
+ dir->popularity[MDS_POP_ANYDOM] -= curdom;
+ if (in_domain) dir->popularity[MDS_POP_CURDOM] -= curdom;
}
}
void MDBalancer::add_import(CDir *dir)
{
- double curdom = dir->popularity[MDS_POP_CURDOM].get();
+ meta_load_t curdom = dir->popularity[MDS_POP_CURDOM];
bool in_domain = !dir->is_import();
while (true) {
CInode *in = dir->inode;
- in->popularity[MDS_POP_ANYDOM].adjust(curdom);
- if (in_domain) in->popularity[MDS_POP_CURDOM].adjust(curdom);
+ in->popularity[MDS_POP_ANYDOM] += curdom;
+ if (in_domain) in->popularity[MDS_POP_CURDOM] += curdom;
dir = in->get_parent_dir();
if (!dir) break;
if (dir->is_import()) in_domain = false;
- dir->popularity[MDS_POP_ANYDOM].adjust(curdom);
- if (in_domain) dir->popularity[MDS_POP_CURDOM].adjust(curdom);
+ dir->popularity[MDS_POP_ANYDOM] += curdom;
+ if (in_domain) dir->popularity[MDS_POP_CURDOM] += curdom;
}
}
CDir *im = *it;
if (im->is_import()) {
- dout(db) << " + import (" << im->popularity[MDS_POP_CURDOM].get() << "/" << im->popularity[MDS_POP_ANYDOM].get() << ") " << *im << endl;
+ dout(db) << " + import (" << im->popularity[MDS_POP_CURDOM] << "/" << im->popularity[MDS_POP_ANYDOM] << ") " << *im << endl;
assert( im->is_auth() );
}
else if (im->is_hashed()) {
if (im->is_import()) continue; // if import AND hash, list as import.
- dout(db) << " + hash (" << im->popularity[MDS_POP_CURDOM].get() << "/" << im->popularity[MDS_POP_ANYDOM].get() << ") " << *im << endl;
+ dout(db) << " + hash (" << im->popularity[MDS_POP_CURDOM] << "/" << im->popularity[MDS_POP_ANYDOM] << ") " << *im << endl;
}
for (set<CDir*>::iterator p = mds->mdcache->nested_exports[im].begin();
CDir *exp = *p;
if (exp->is_hashed()) {
assert(0); // we don't do it this way actually
- dout(db) << " - hash (" << exp->popularity[MDS_POP_NESTED].get() << ", " << exp->popularity[MDS_POP_ANYDOM].get() << ") " << *exp << " to " << exp->dir_auth << endl;
+ dout(db) << " - hash (" << exp->popularity[MDS_POP_NESTED] << ", " << exp->popularity[MDS_POP_ANYDOM] << ") " << *exp << " to " << exp->dir_auth << endl;
assert( exp->is_auth() );
} else {
- dout(db) << " - ex (" << exp->popularity[MDS_POP_NESTED].get() << ", " << exp->popularity[MDS_POP_ANYDOM].get() << ") " << *exp << " to " << exp->dir_auth << endl;
+ dout(db) << " - ex (" << exp->popularity[MDS_POP_NESTED] << ", " << exp->popularity[MDS_POP_ANYDOM] << ") " << *exp << " to " << exp->dir_auth << endl;
assert( exp->is_export() );
assert( !exp->is_auth() );
}
*/
+
#ifndef __MDBALANCER_H
#define __MDBALANCER_H
#include "include/types.h"
#include "common/Clock.h"
+#include "CInode.h"
+
class MDS;
class Message;
map<int, map<int, float> > mds_import_map;
// per-epoch state
- mds_load_t target_load;
+ double target_load;
map<int,double> my_targets;
map<int,double> imported;
map<int,double> exported;
double try_match(int ex, double& maxex,
int im, double& maxim);
double get_maxim(int im) {
- return target_load.root_pop - mds_load[im].root_pop - imported[im];
+ return target_load - mds_load[im].mds_load() - imported[im];
}
double get_maxex(int ex) {
- return mds_load[ex].root_pop - target_load.root_pop - exported[ex];
+ return mds_load[ex].mds_load() - target_load - exported[ex];
}
public:
void subtract_export(class CDir *ex);
void add_import(class CDir *im);
- void hit_inode(class CInode *in);
- void hit_dir(class CDir *dir, bool modify=false);
- void hit_recursive(class CDir *dir);
+ void hit_inode(class CInode *in, int type=0);
+ void hit_dir(class CDir *dir, int type=0);
+ void hit_recursive(class CDir *dir, int type=0);
void show_imports(bool external=false);
};
-ostream& operator<<( ostream& out, mds_load_t& load );
#endif
}
if (onfail == MDS_TRAVERSE_FORWARD) {
// forward
- dout(0) << "traverse: not auth for " << path[depth] << ", fwd to mds" << dauth << endl;
+ dout(7) << "traverse: not auth for " << path[depth] << ", fwd to mds" << dauth << endl;
if (is_client_req && cur->dir->is_rep()) {
dout(15) << "traverse: REP fw to mds" << dauth << ", requesting rep under " << *cur->dir << endl;
// replica
// fw to auth
int auth = in->authority();
- dout(0) << "inode_hard_write_start " << *in << " on replica, fw to auth " << auth << endl;
+ dout(7) << "inode_hard_write_start " << *in << " on replica, fw to auth " << auth << endl;
assert(auth != mds->get_nodeid());
request_forward(m, auth);
return false;
mds_logtype.add_inc("fw");
mds_logtype.add_inc("cfw");
+ mds_logtype.add_set("l");
+ mds_logtype.add_set("q");
mds_logtype.add_set("popanyd");
mds_logtype.add_set("popnest");
// log
last_log = now;
- //mds_load_t load = balancer->get_load();
+ mds_load_t load = balancer->get_load();
+
+ logger->set("l", (int)load.mds_load());
+ logger->set("q", messenger->get_dispatch_queue_len());
if (mdcache->get_root()) {
- logger->set("popanyd", (int)mdcache->get_root()->popularity[MDS_POP_ANYDOM].get());
- logger->set("popnest", (int)mdcache->get_root()->popularity[MDS_POP_NESTED].get());
+ logger->set("popanyd", (int)mdcache->get_root()->popularity[MDS_POP_ANYDOM].meta_load());
+ logger->set("popnest", (int)mdcache->get_root()->popularity[MDS_POP_NESTED].meta_load());
}
logger->set("c", mdcache->lru.lru_get_size());
logger->set("cpin", mdcache->lru.lru_get_num_pinned());
mdcache->inode_file_read_finish(ref);
- balancer->hit_inode(ref);
+ balancer->hit_inode(ref, META_POP_RD);
// reply
reply_request(req, reply, ref);
mdcache->inode_file_write_finish(cur);
- balancer->hit_inode(cur);
+ balancer->hit_inode(cur, META_POP_WR);
// init reply
MClientReply *reply = new MClientReply(req, 0);
mdcache->inode_hard_write_finish(cur);
- balancer->hit_inode(cur);
+ balancer->hit_inode(cur, META_POP_WR);
// start reply
MClientReply *reply = new MClientReply(req, 0);
mdcache->inode_hard_write_finish(cur);
- balancer->hit_inode(cur);
+ balancer->hit_inode(cur, META_POP_WR);
// start reply
MClientReply *reply = new MClientReply(req, 0);
newi->inode.mode &= ~INODE_TYPE_MASK;
newi->inode.mode |= INODE_MODE_FILE;
- balancer->hit_inode(newi);
+ balancer->hit_inode(newi, META_POP_WR);
// commit
commit_request(req, new MClientReply(req, 0), ref,
// set target
newi->symlink = req->get_sarg();
- balancer->hit_inode(newi);
+ balancer->hit_inode(newi, META_POP_WR);
// commit
commit_request(req, new MClientReply(req, 0), diri,
mdcache->inode_file_write_finish(cur);
- balancer->hit_inode(cur);
+ balancer->hit_inode(cur, META_POP_WR);
// start reply
MClientReply *reply = new MClientReply(req, 0);
dout(12) << "open gets caps " << cap_string(cap->pending()) << endl;
- balancer->hit_inode(cur);
+ balancer->hit_inode(cur, META_POP_RD);
// reply
MClientReply *reply = new MClientReply(req, 0); // fh # is return code
--- /dev/null
+#ifndef __MDSTYPES_H
+#define __MDSTYPES_H
+
+
+#include <math.h>
+#include <ostream>
+using namespace std;
+
+#include "common/DecayCounter.h"
+
+
+/* meta_load_t
+ * hierarchical load for an inode/dir and it's children
+ */
+#define META_POP_RD 0
+#define META_POP_WR 1
+#define META_POP_LOG 2
+#define META_POP_FDIR 3
+#define META_POP_CDIR 4
+#define META_NPOP 5
+
+class meta_load_t {
+ public:
+ DecayCounter pop[META_NPOP];
+
+ double meta_load() {
+ return pop[META_POP_RD].get() + pop[META_POP_WR].get();
+ }
+
+ void take(meta_load_t& other) {
+ for (int i=0; i<META_NPOP; i++) {
+ pop[i] = other.pop[i];
+ other.pop[i].reset();
+ }
+ }
+};
+
+inline ostream& operator<<( ostream& out, meta_load_t& load )
+{
+ return out << "metaload<rd " << load.pop[META_POP_RD].get()
+ << ", wr " << load.pop[META_POP_WR].get()
+ << ">";
+}
+
+
+inline meta_load_t& operator-=(meta_load_t& l, meta_load_t& r)
+{
+ for (int i=0; i<META_NPOP; i++)
+ l.pop[i].adjust(- r.pop[i].get());
+ return l;
+}
+
+inline meta_load_t& operator+=(meta_load_t& l, meta_load_t& r)
+{
+ for (int i=0; i<META_NPOP; i++)
+ l.pop[i].adjust(r.pop[i].get());
+ return l;
+}
+
+
+
+/* mds_load_t
+ * mds load
+ */
+
+// popularity classes
+#define MDS_POP_JUSTME 0 // just me (this dir or inode)
+#define MDS_POP_NESTED 1 // me + children, auth or not
+#define MDS_POP_CURDOM 2 // me + children in current auth domain
+#define MDS_POP_ANYDOM 3 // me + children in any (nested) auth domain
+//#define MDS_POP_DIRMOD 4 // just this dir, modifications only
+#define MDS_NPOP 4
+
+class mds_load_t {
+ public:
+ meta_load_t root;
+
+ double req_rate;
+ double cache_hit_rate;
+ double queue_len;
+
+ mds_load_t() :
+ req_rate(0), cache_hit_rate(0), queue_len(0) { }
+
+ double mds_load() {
+ return root.pop[META_POP_RD].get()
+ + root.pop[META_POP_WR].get()
+ + 100*queue_len;
+ }
+
+};
+
+
+inline ostream& operator<<( ostream& out, mds_load_t& load )
+{
+ return out << "mdsload<" << load.root
+ << ", req " << load.req_rate
+ << ", hr " << load.cache_hit_rate
+ << ", qlen " << load.queue_len
+ << ">";
+}
+
+/*
+inline mds_load_t& operator+=( mds_load_t& l, mds_load_t& r )
+{
+ l.root_pop += r.root_pop;
+ l.req_rate += r.req_rate;
+ l.queue_len += r.queue_len;
+ return l;
+}
+
+inline mds_load_t operator/( mds_load_t& a, double d )
+{
+ mds_load_t r;
+ r.root_pop = a.root_pop / d;
+ r.req_rate = a.req_rate / d;
+ r.queue_len = a.queue_len / d;
+ return r;
+}
+*/
+
+
+#endif
g_timer.set_messenger(this);
+ qlen = 0;
+
/*
string name;
name = "m.";
class Logger *logger;
+ int qlen;
list<Message*> incoming; // incoming queue
public:
// events
//virtual void trigger_timer(Timer *t);
+ int get_dispatch_queue_len() { return qlen; }
// -- incoming queue --
// (that nothing uses)
if (!incoming.empty()) {
Message *m = incoming.front();
incoming.pop_front();
+ qlen--;
return m;
}
return NULL;
}
bool queue_incoming(Message *m) {
incoming.push_back(m);
+ qlen++;
return true;
}
int num_incoming() {
- return incoming.size();
+ //return incoming.size();
+ return qlen;
}
};
void queue_callback(Context *c);
+ virtual int get_dispatch_queue_len() { return 0; };
+
// setup
void set_dispatcher(Dispatcher *d) { dispatcher = d; ready(); }
Dispatcher *get_dispatcher() { return dispatcher; }
}
+int TCPMessenger::get_dispatch_queue_len()
+{
+ return stat_inq;
+}
+
int TCPMessenger::shutdown()
{
void map_entity_rank(msg_addr_t e, int r);
void map_rank_addr(int r, tcpaddr_t a);
+ int get_dispatch_queue_len();
+
// init, shutdown MPI and associated event loop thread.
virtual int shutdown();