// FUSE will chdir("/"); be ready.
g_conf.use_abspaths = true;
+ if (g_conf.clock_tare) g_clock.tare();
+
// load monmap
MonMap monmap;
int r = monmap.read(".ceph_monmap");
}
}
+ if (g_conf.clock_tare) g_clock.tare();
// load monmap
MonMap monmap;
return -1;
}
}
-
+
+ if (g_conf.clock_tare) g_clock.tare();
+
MonMap monmap;
if (whoami < 0) {
public:
Clock() {
// set offset
- tare();
+ //tare();
}
// real time.
void tare() {
gettimeofday(&zero.timeval(), NULL);
}
+ void tare(utime_t z) {
+ zero = z;
+ }
utime_t now() {
//lock.Lock();
utime_t n;
//cout << "log " << filename << endl;
interval = g_conf.log_interval;
- //start = g_clock.now(); // time 0!
+ if (!g_conf.clock_tare)
+ start = g_clock.now(); // time 0! otherwise g_clock does it for us.
+
last_logged = 0;
wrote_header = -1;
open = false;
// --- clock ---
clock_lock: false,
+ clock_tare: true,
// --- messenger ---
ms_single_dispatch: false,
else if (strcmp(args[i], "--clock_lock") == 0)
g_conf.clock_lock = atoi(args[++i]);
+ else if (strcmp(args[i], "--clock_tare") == 0)
+ g_conf.clock_tare = atoi(args[++i]);
else if (strcmp(args[i], "--objecter_buffer_uncommitted") == 0)
g_conf.objecter_buffer_uncommitted = atoi(args[++i]);
// clock
bool clock_lock;
+ bool clock_tare;
// messenger
if (g_conf.debug_after)
g_timer.add_event_after(g_conf.debug_after, new C_Debug);
+ if (g_conf.clock_tare) g_clock.tare();
+
// osd specific args
char *dev;
int whoami = -1;
// args for fuse
vec_to_argv(args, argc, argv);
+ if (g_conf.clock_tare) g_clock.tare();
+
// load monmap
MonMap monmap;
int r = monmap.read(".ceph_monmap");
// FUSE will chdir("/"); be ready.
g_conf.use_abspaths = true;
+ if (g_conf.clock_tare) g_clock.tare();
+
MonMap *monmap = new MonMap(g_conf.num_mon);
Monitor *mon[g_conf.num_mon];
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#include <sys/stat.h>
-#include <iostream>
-#include <string>
-using namespace std;
-
-#include "config.h"
-
-#include "mds/MDCluster.h"
-
-#include "mds/MDS.h"
-#include "osd/OSD.h"
-#include "mon/Monitor.h"
-#include "client/Client.h"
-
-#include "client/SyntheticClient.h"
-
-#include "msg/FakeMessenger.h"
-
-#include "common/Timer.h"
-
-#define NUMMDS g_conf.num_mds
-#define NUMOSD g_conf.num_osd
-#define NUMCLIENT g_conf.num_client
-
-class C_Test : public Context {
-public:
- void finish(int r) {
- cout << "C_Test->finish(" << r << ")" << endl;
- }
-};
-
-
-int main(int argc, char **argv)
-{
- cerr << "fakesyn start" << endl;
-
- //cerr << "inode_t " << sizeof(inode_t) << endl;
-
- vector<char*> args;
- argv_to_vec(argc, argv, args);
-
- parse_config_options(args);
-
- int start = 0;
-
- parse_syn_options(args);
-
- vector<char*> nargs;
-
- for (unsigned i=0; i<args.size(); i++) {
- // unknown arg, pass it on.
- cerr << " stray arg " << args[i] << endl;
- nargs.push_back(args[i]);
- }
- assert(nargs.empty());
-
-
- MDCluster *mdc = new MDCluster(NUMMDS, NUMOSD);
-
-
- char hostname[100];
- gethostname(hostname,100);
- //int pid = getpid();
-
- // create mon
- Monitor *mon[g_conf.num_mon];
- for (int i=0; i<g_conf.num_mon; i++) {
- mon[i] = new Monitor(i, new FakeMessenger(MSG_ADDR_MON(i)));
- }
-
- // create mds
- MDS *mds[NUMMDS];
- OSD *mdsosd[NUMMDS];
- for (int i=0; i<NUMMDS; i++) {
- //cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
- mds[i] = new MDS(mdc, i, new FakeMessenger(MSG_ADDR_MDS(i)));
- if (g_conf.mds_local_osd)
- mdsosd[i] = new OSD(i+10000, new FakeMessenger(MSG_ADDR_OSD(i+10000)));
- start++;
- }
-
- // create osd
- OSD *osd[NUMOSD];
- for (int i=0; i<NUMOSD; i++) {
- //cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
- osd[i] = new OSD(i, new FakeMessenger(MSG_ADDR_OSD(i)));
- start++;
- }
-
- // create client
- Client *client[NUMCLIENT];
- SyntheticClient *syn[NUMCLIENT];
- for (int i=0; i<NUMCLIENT; i++) {
- //cerr << "client" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
- client[i] = new Client(new FakeMessenger(MSG_ADDR_CLIENT(i)));
- start++;
- }
-
-
- // start message loop
- fakemessenger_startthread();
-
- // init
- for (int i=0; i<g_conf.num_mon; i++) {
- mon[i]->init();
- }
- for (int i=0; i<NUMMDS; i++) {
- mds[i]->init();
- if (g_conf.mds_local_osd)
- mdsosd[i]->init();
- }
-
- for (int i=0; i<NUMOSD; i++) {
- osd[i]->init();
- }
-
-
- // create client(s)
- for (int i=0; i<NUMCLIENT; i++) {
- client[i]->init();
-
- // use my argc, argv (make sure you pass a mount point!)
- //cout << "mounting" << endl;
- client[i]->mount();
-
- //cout << "starting synthetic client " << endl;
- syn[i] = new SyntheticClient(client[i]);
-
- syn[i]->start_thread();
- }
-
-
- for (int i=0; i<NUMCLIENT; i++) {
-
- cout << "waiting for synthetic client " << i << " to finish" << endl;
- syn[i]->join_thread();
- delete syn[i];
-
- client[i]->unmount();
- //cout << "unmounted" << endl;
- client[i]->shutdown();
- }
-
-
- // wait for it to finish
- fakemessenger_wait();
-
- // cleanup
- for (int i=0; i<NUMMDS; i++) {
- delete mds[i];
- }
- for (int i=0; i<NUMOSD; i++) {
- delete osd[i];
- }
- for (int i=0; i<NUMCLIENT; i++) {
- delete client[i];
- }
- delete mdc;
-
- cout << "fakesyn done" << endl;
- return 0;
-}
-
if (g_conf.kill_after)
g_timer.add_event_after(g_conf.kill_after, new C_Die);
-
- g_clock.tare();
+ if (g_conf.clock_tare) g_clock.tare();
MonMap *monmap = new MonMap(g_conf.num_mon);
entity_addr_t a;
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
// first, synchronize clocks.
- MPI_Barrier(MPI_COMM_WORLD);
- //dout(-10) << "tare" << endl;
- g_clock.tare();
+ if (g_conf.clock_tare) {
+ if (1) {
+ // use an MPI barrier. probably not terribly precise.
+ MPI_Barrier(MPI_COMM_WORLD);
+ g_clock.tare();
+ } else {
+ // use wall clock; assume NTP has all nodes synchronized already.
+ // FIXME someday: this hangs for some reason. whatever.
+ utime_t z = g_clock.now();
+ MPI_Bcast( &z, sizeof(z), MPI_CHAR,
+ 0, MPI_COMM_WORLD);
+ cout << "z is " << z << endl;
+ g_clock.tare(z);
+ }
+ }
// start up all monitors at known addresses.
entity_inst_t moninst[mpi_world]; // only care about first g_conf.num_mon of these.
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#include <sys/stat.h>
-#include <iostream>
-#include <string>
-using namespace std;
-
-#include "config.h"
-
-#include "mds/MDCluster.h"
-#include "mds/MDS.h"
-#include "osd/OSD.h"
-#include "client/Client.h"
-#include "client/fuse.h"
-
-#include "msg/TCPMessenger.h"
-
-#include "common/Timer.h"
-
-#include <envz.h>
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-int main(int argc, char **argv, char *envp[]) {
-
- //cerr << "tcpfuse starting " << myrank << "/" << world << endl;
- vector<char*> args;
- argv_to_vec(argc, argv, args);
- parse_config_options(args);
-
- // args for fuse
- vec_to_argv(args, argc, argv);
-
- // start up tcpmessenger
- tcpaddr_t nsa;
- if (tcpmessenger_findns(nsa) < 0) exit(1);
- tcpmessenger_init();
- tcpmessenger_start();
- tcpmessenger_start_rankserver(nsa);
-
- Client *client = new Client(new TCPMessenger(MSG_ADDR_CLIENT_NEW));
- client->init();
-
- // start up fuse
- // use my argc, argv (make sure you pass a mount point!)
- cout << "mounting" << endl;
- client->mount();
-
- cerr << "starting fuse on pid " << getpid() << endl;
- ceph_fuse_main(client, argc, argv);
- cerr << "fuse finished on pid " << getpid() << endl;
-
- client->unmount();
- cout << "unmounted" << endl;
- client->shutdown();
-
- delete client;
-
- // wait for it to finish
- tcpmessenger_wait();
- tcpmessenger_shutdown(); // shutdown MPI
-
- return 0;
-}
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#include <sys/stat.h>
-#include <iostream>
-#include <string>
-using namespace std;
-
-#include "config.h"
-
-#include "mds/MDCluster.h"
-#include "mds/MDS.h"
-#include "osd/OSD.h"
-#include "mon/Monitor.h"
-#include "client/Client.h"
-#include "client/SyntheticClient.h"
-
-#include "msg/TCPMessenger.h"
-
-#include "common/Timer.h"
-
-#define NUMMDS g_conf.num_mds
-#define NUMOSD g_conf.num_osd
-#define NUMCLIENT g_conf.num_client
-
-class C_Test : public Context {
-public:
- void finish(int r) {
- cout << "C_Test->finish(" << r << ")" << endl;
- }
-};
-
-
-#include "msg/mpistarter.cc"
-
-utime_t tick_start;
-int tick_count = 0;
-
-class C_Tick : public Context {
-public:
- void finish(int) {
- utime_t now = g_clock.now() - tick_start;
- dout(0) << "tick +" << g_conf.tick << " -> " << now << " (" << tick_count << ")" << endl;
- tick_count += g_conf.tick;
- utime_t next = tick_start;
- next.sec_ref() += tick_count;
- g_timer.add_event_at(next, new C_Tick);
- }
-};
-
-class C_Die : public Context {
-public:
- void finish(int) {
- cerr << "die" << endl;
- exit(1);
- }
-};
-
-class C_Debug : public Context {
- public:
- void finish(int) {
- int size = &g_conf.debug_after - &g_conf.debug;
- memcpy((char*)&g_conf.debug, (char*)&g_debug_after_conf.debug, size);
- dout(0) << "debug_after flipping debug settings" << endl;
- }
-};
-
-
-int main(int argc, char **argv)
-{
- vector<char*> args;
- argv_to_vec(argc, argv, args);
-
- parse_config_options(args);
-
- parse_syn_options(args);
-
- if (g_conf.kill_after)
- g_timer.add_event_after(g_conf.kill_after, new C_Die);
- if (g_conf.debug_after)
- g_timer.add_event_after(g_conf.debug_after, new C_Debug);
-
- if (g_conf.tick) {
- tick_start = g_clock.now();
- g_timer.add_event_after(g_conf.tick, new C_Tick);
- }
-
- vector<char*> nargs;
- for (unsigned i=0; i<args.size(); i++) {
- //cout << "a " << args[i] << endl;
- // unknown arg, pass it on.
- nargs.push_back(args[i]);
- }
-
- args = nargs;
- if (!args.empty()) {
- for (unsigned i=0; i<args.size(); i++)
- cerr << "stray arg " << args[i] << endl;
- }
- assert(args.empty());
-
-
- // start up tcp messenger via MPI
- pair<int,int> mpiwho = mpi_bootstrap_tcp(argc, argv);
- int myrank = mpiwho.first;
- int world = mpiwho.second;
-
- int need = 0;
- if (g_conf.tcp_skip_rank0) need++;
- need += NUMMDS;
- need += NUMOSD;
- if (NUMCLIENT) {
- if (!g_conf.tcp_overlay_clients)
- need += 1;
- }
- assert(need <= world);
-
- if (myrank == 0)
- cerr << "nummds " << NUMMDS << " numosd " << NUMOSD << " numclient " << NUMCLIENT << " .. need " << need << ", have " << world << endl;
-
- MDCluster *mdc = new MDCluster(NUMMDS, NUMOSD);
-
-
- char hostname[100];
- gethostname(hostname,100);
- int pid = getpid();
-
- int started = 0;
-
- //if (myrank == 0) g_conf.debug = 20;
-
- // create mon
- if (myrank == 0) {
- Monitor *mon = new Monitor(0, new TCPMessenger(MSG_ADDR_MON(0)));
- mon->init();
- }
-
- // create mds
- MDS *mds[NUMMDS];
- OSD *mdsosd[NUMMDS];
- for (int i=0; i<NUMMDS; i++) {
- if (myrank != g_conf.tcp_skip_rank0+i) continue;
- TCPMessenger *m = new TCPMessenger(MSG_ADDR_MDS(i));
- cerr << "mds" << i << " on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
- mds[i] = new MDS(mdc, i, m);
- mds[i]->init();
- started++;
-
- if (g_conf.mds_local_osd) {
- mdsosd[i] = new OSD(i+10000, new TCPMessenger(MSG_ADDR_OSD(i+10000)));
- mdsosd[i]->init();
- }
- }
-
- // create osd
- OSD *osd[NUMOSD];
- for (int i=0; i<NUMOSD; i++) {
- if (myrank != g_conf.tcp_skip_rank0+NUMMDS + i) continue;
- TCPMessenger *m = new TCPMessenger(MSG_ADDR_OSD(i));
- cerr << "osd" << i << " on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
- osd[i] = new OSD(i, m);
- osd[i]->init();
- started++;
- }
-
- if (g_conf.tcp_overlay_clients) sleep(5);
-
- // create client
- int skip_osd = NUMOSD;
- if (g_conf.tcp_overlay_clients)
- skip_osd = 0; // put clients with osds too!
- int client_nodes = world - NUMMDS - skip_osd - g_conf.tcp_skip_rank0;
- int clients_per_node = 1;
- if (NUMCLIENT) clients_per_node = (NUMCLIENT-1) / client_nodes + 1;
- set<int> clientlist;
- Client *client[NUMCLIENT];
- SyntheticClient *syn[NUMCLIENT];
- for (int i=0; i<NUMCLIENT; i++) {
- //if (myrank != NUMMDS + NUMOSD + i % client_nodes) continue;
- if (myrank != g_conf.tcp_skip_rank0+NUMMDS + skip_osd + i / clients_per_node) continue;
- clientlist.insert(i);
- client[i] = new Client(new TCPMessenger(MSG_ADDR_CLIENT_NEW));//(i)) );
-
- // logger?
- if (client_logger == 0) {
- char s[80];
- sprintf(s,"clnode.%d", myrank);
- client_logger = new Logger(s, &client_logtype);
-
- client_logtype.add_inc("lsum");
- client_logtype.add_inc("lnum");
- client_logtype.add_inc("lwsum");
- client_logtype.add_inc("lwnum");
- client_logtype.add_inc("lrsum");
- client_logtype.add_inc("lrnum");
- client_logtype.add_inc("trsum");
- client_logtype.add_inc("trnum");
- client_logtype.add_inc("wrlsum");
- client_logtype.add_inc("wrlnum");
- client_logtype.add_inc("lstatsum");
- client_logtype.add_inc("lstatnum");
- client_logtype.add_inc("ldirsum");
- client_logtype.add_inc("ldirnum");
- client_logtype.add_inc("readdir");
- client_logtype.add_inc("stat");
- }
-
- client[i]->init();
- started++;
-
- syn[i] = new SyntheticClient(client[i]);
- }
-
- if (!clientlist.empty()) dout(2) << "i have " << clientlist << endl;
-
- int nclients = 0;
- for (set<int>::iterator it = clientlist.begin();
- it != clientlist.end();
- it++) {
- int i = *it;
-
- //cerr << "starting synthetic client" << i << " on rank " << myrank << endl;
- client[i]->mount();
- syn[i]->start_thread();
-
- nclients++;
- }
- if (nclients) {
- cerr << nclients << " clients on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
- }
-
- for (set<int>::iterator it = clientlist.begin();
- it != clientlist.end();
- it++) {
- int i = *it;
-
- // cout << "waiting for synthetic client" << i << " to finish" << endl;
- syn[i]->join_thread();
- delete syn[i];
-
- client[i]->unmount();
- //cout << "client" << i << " unmounted" << endl;
- client[i]->shutdown();
- }
-
-
- if (myrank && !started) {
- //dout(1) << "IDLE" << endl;
- cerr << "idle on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
- tcpmessenger_stop_rankserver();
- }
-
- // wait for everything to finish
- tcpmessenger_wait();
-
- if (started) cerr << "tcpsyn finishing" << endl;
-
- tcpmessenger_shutdown();
-
-
- /*
- // cleanup
- for (int i=0; i<NUMMDS; i++) {
- if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
- delete mds[i];
- }
- for (int i=0; i<NUMOSD; i++) {
- if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
- delete osd[i];
- }
- for (int i=0; i<NUMCLIENT; i++) {
- if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
- delete client[i];
- }
- */
- delete mdc;
-
-
- return 0;
-}
-