mds/LogStream.o\
mds/IdAllocator.o\
mds/MDLog.o\
- mds/MDCluster.o\
mds/OSDMonitor.o
COMMON_OBJS= \
osd/OSDMap.o\
osd/PG.o\
osd/ObjectStore.o\
+ mds/MDCluster.o\
ebofs.o\
common/Logger.o\
common/Clock.o\
msg/TCPMessenger.o\
msg/TCPDirectory.o
-TARGETS = fakefuse fakesyn tcpsyn tcpfuse client/ldceph.o
+TARGETS = fakefuse fakesyn tcpsyn tcpfuse libceph.o
SRCS=*.cc */*.cc
ebofs: mkfs.ebofs test.ebofs
+# libceph
+libceph.o: client/ldceph.o client/Client.o client/Buffercache.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS}
+ ld -i ${LIBS} $^ -o $@
+
#
%.so: %.cc
%.o: %.cc
${CC} ${CFLAGS} -c $< -o $@
+%.po: %.cc
+ ${CC} -fPIC ${CFLAGS} -c $< -o $@
+
count:
cat *.cc */*.cc */*.h */*/*.h | wc -l
cat *.cc */*.cc */*.h */*/*.h | grep -c \;
MAR 1
+- aged object stores on googoo
+
+- confirm block dev versus big file
+
- ldceph + mdtest
- stability
root->last_updated = now;
root->dir_auth = in_info->dir_auth;
+ assert(root->dir_auth == 0);
root->dir_hashed = in_info->hashed;
root->dir_replicated = in_info->replicated;
if (in_info->spec_defined)
mds = diri->pick_replica(mdcluster);
}
}
+ //cout << "mds is " << mds << endl;
// force use of a particular mds?
if (use_mds >= 0) mds = use_mds;
}
int authority(MDCluster *mdcluster) {
+ //cout << "authority on " << inode.ino << " .. dir_auth is " << dir_auth<< endl;
// parent?
if (dn && dn->dir && dn->dir->parent_inode) {
// parent hashed?
int pick_replica(MDCluster *mdcluster) {
// replicas?
if (dir_contacts.size()) {
+ cout << "dir_contacts if " << dir_contacts << endl;
set<int>::iterator it = dir_contacts.begin();
if (dir_contacts.size() == 1)
return *it;
}
}
- if (dir_replicated)
+ if (dir_replicated) {
+ cout << "num_mds is " << mdcluster->get_num_mds() << endl;
return rand() % mdcluster->get_num_mds(); // huh.. pick a random mds!
+ }
else
return authority(mdcluster);
}
mount_point(0), mount_point_parent(0),
mount_point_len(0),
cwd_above_mp(false), cwd_in_mp(false) {
-
+
+ // args
+ vector<char *> args;
+ env_to_vec(args);
+ parse_config_options(args);
+
+
+ tcpaddr_t nsa;
+ if (tcpmessenger_findns(nsa) < 0)
+ return;
+ tcpmessenger_init();
+ tcpmessenger_start();
+ tcpmessenger_start_rankserver(nsa);
+
client = new Client(new TCPMessenger(MSG_ADDR_CLIENT_NEW));
client->init();
int r = client->mount();
fp_mount_point = mount_point;
cerr << "ldceph mounted on " << mount_point << " as " << client->get_myaddr() << endl;
+
+ refresh_cwd();
}
}
~LdCeph() {
cout << "ldceph fini" << endl;
+ if (false && client) { // no unmount for now..
+ client->unmount();
+ client->shutdown();
+ delete client;
+ client = 0;
+ tcpmessenger_wait();
+ tcpmessenger_shutdown();
+ }
}
} ldceph;
}
int mkdir(const char *pathname, mode_t mode) {
char buf[255];
- if (const char *c = ldceph.get_ceph_path(pathname, buf))
+ if (const char *c = ldceph.get_ceph_path(pathname, buf))
return ldceph.client->mkdir(c, mode);
else
return syscall(SYS_mkdir, pathname, mode);
return syscall(SYS_unlink, pathname);
}
- //int stat(const char *pathname, struct stat *st) {
- int __xstat(int __ver, const char *pathname, struct stat *st) { // stoopid GLIBC
+ int stat(const char *pathname, struct stat *st) {
+ //int __xstat64(int __ver, const char *pathname, struct stat64 *st64) { // stoopid GLIBC
+ //struct stat *st = (struct stat*)st64;
char buf[255];
if (const char *c = ldceph.get_ceph_path(pathname, buf))
return ldceph.client->lstat(c, st); // FIXME
char buf[255];
if (const char *c = ldceph.get_ceph_path(pathname, buf)) {
int r = ldceph.client->chdir(c);
- if (r) {
+ if (r == 0) {
if (!ldceph.cwd_in_mp)
syscall(SYS_chdir, ldceph.mount_point_parent);
ldceph.cwd_in_mp = true;
if (it->first > now) break;
utime_t t = it->first;
- dout(DBL) << "queuing event(s) scheduled at " << t << endl;
+ dout(DBL) << "queueing event(s) scheduled at " << t << endl;
if (messenger) {
for (set<Context*>::iterator cit = it->second.begin();
cit++) {
pending.push_back(*cit);
event_times.erase(*cit);
+ num_event--;
}
}
{ // make sure we're not holding any locks while we talk to the messenger
for (list<Context*>::iterator cit = pending.begin();
cit != pending.end();
- cit++)
+ cit++) {
+ dout(DBL) << "queue callback " << *cit << endl;
messenger->queue_callback(*cit);
+ }
pending.clear();
}
lock.Lock();
scheduled[ when ].insert(callback);
event_times[callback] = when;
+ num_event++;
+
// make sure i wake up
register_timer();
void register_timer(); // make sure i get a callback
void cancel_timer(); // make sure i get a callback
+
pthread_t thread_id;
bool thread_stop;
Mutex lock;
bool timed_sleep;
Cond sleep_cond;
Cond timeout_cond;
+
public:
void timer_thread(); // waiter thread (that wakes us up)
+ int num_event;
+
+
public:
Timer() {
thread_id = 0;
thread_stop = false;
+ num_event = 0;
}
~Timer() {
// scheduled
using namespace std;
+void env_to_vec(vector<char*>& args)
+{
+ const char *p = getenv("CEPH_ARGS");
+ if (!p) return;
+
+ static char buf[1000];
+ int len = strlen(p);
+ memcpy(buf, p, len);
+ buf[len] = 0;
+ cout << "args " << buf << endl;
+
+ int l = 0;
+ for (int i=0; i<len; i++) {
+ if (buf[i] == ' ') {
+ buf[i] = 0;
+ args.push_back(buf+l);
+ cout << "arg " << (buf+l) << endl;
+ l = i+1;
+ }
+ }
+ args.push_back(buf+l);
+ cout << "arg " << (buf+l) << endl;
+}
+
+
void argv_to_vec(int argc, char **argv,
vector<char*>& args)
{
#define dout(x) if ((x) <= g_conf.debug) cout
#define dout2(x) if ((x) <= g_conf.debug) cout
+void env_to_vec(vector<char*>& args);
void argv_to_vec(int argc, char **argv,
vector<char*>& args);
void vec_to_argv(vector<char*>& args,
#include "include/types.h"
#include "include/lru.h"
#include "common/DecayCounter.h"
-#include <sys/stat.h>
+//#include <sys/stat.h>
#include "CDentry.h"
#include "Lock.h"
} else
spec_defined = false;
+ if (in->dir)
+ dir_auth = in->dir->get_dir_auth();
+ else
+ dir_auth = -1;
+
// dir info
- dir_auth = (in->dir && in->dir->get_dir_auth());
hashed = (in->dir && in->dir->is_hashed()); // FIXME not quite right.
replicated = (in->dir && in->dir->is_rep());
}
using namespace __gnu_cxx;
#include <sys/types.h>
-#include <sys/stat.h>
+//#include <sys/stat.h>
#include <fcntl.h>
class TCPDirectory : public Dispatcher {
}
+int tcpmessenger_findns(tcpaddr_t &nsa)
+{
+ char *nsaddr = 0;
+ bool have_nsa = false;
+
+ // env var?
+ /*int e_len = 0;
+ for (int i=0; envp[i]; i++)
+ e_len += strlen(envp[i]) + 1;
+ */
+ nsaddr = getenv("CEPH_NAMESERVER");////envz_entry(*envp, e_len, "CEPH_NAMESERVER");
+ if (nsaddr) {
+ while (nsaddr[0] != '=') nsaddr++;
+ nsaddr++;
+ }
+
+ else {
+ // file?
+ int fd = ::open(".ceph_ns",O_RDONLY);
+ if (fd > 0) {
+ ::read(fd, (void*)&nsa, sizeof(nsa));
+ ::close(fd);
+ have_nsa = true;
+ nsaddr = "from .ceph_ns";
+ }
+ }
+
+ if (!nsaddr && !have_nsa) {
+ cerr << "i need ceph ns addr.. either CEPH_NAMESERVER env var or --ns blah" << endl;
+ return -1;
+ //exit(-1);
+ }
+
+ // look up nsaddr?
+ if (!have_nsa && tcpmessenger_lookup(nsaddr, nsa) < 0) {
+ return -1;
+ }
+
+ cout << "ceph ns is " << nsaddr << " or " << nsa << endl;
+ return 0;
+}
+
+
+
/** rankserver
*
* one per rank. handles entity->rank lookup replies.
extern int tcpmessenger_lookup(char *str, tcpaddr_t& ta);
+extern int tcpmessenger_findns(tcpaddr_t &nsa);
+
extern int tcpmessenger_init();
extern int tcpmessenger_start(); // start thread
extern void tcpmessenger_wait(); // wait for thread to finish.
int OSD::shutdown()
{
- dout(1) << "shutdown" << endl;
+ dout(1) << "shutdown, timer has " << g_timer.num_event << endl;
// finish ops
wait_for_no_ops();
argv_to_vec(argc, argv, args);
parse_config_options(args);
- vector<char*> nargs;
-
- char *nsaddr = 0;
- tcpaddr_t nsa;
- bool have_nsa = false;
-
- for (unsigned i=0; i<args.size(); i++) {
- if (strcmp(args[i], "--ns") == 0) {
- nsaddr = args[++i];
- }
- else {
- // unknown arg, pass it on.
- nargs.push_back(args[i]);
- cout << "fuse arg: " << args[i] << endl;
- }
- }
-
- if (nsaddr == 0) {
- // env var?
- int e_len = 0;
- for (int i=0; envp[i]; i++)
- e_len += strlen(envp[i]) + 1;
- nsaddr = envz_entry(*envp, e_len, "CEPH_NAMESERVER");
- if (nsaddr) {
- while (nsaddr[0] != '=') nsaddr++;
- nsaddr++;
- }
- }
-
- if (!nsaddr) {
- // file?
- int fd = ::open(".ceph_ns",O_RDONLY);
- if (fd > 0) {
- ::read(fd, (void*)&nsa, sizeof(nsa));
- ::close(fd);
- have_nsa = true;
- nsaddr = "from .ceph_ns";
- }
- }
-
- if (!nsaddr && !have_nsa) {
- cerr << "i need ceph ns addr.. either CEPH_NAMESERVER env var or --ns blah" << endl;
- exit(-1);
- }
-
- // look up nsaddr?
- if (!have_nsa && tcpmessenger_lookup(nsaddr, nsa) < 0) {
- return 1;
- }
-
- cout << "ceph ns is " << nsaddr << " or " << nsa << endl;
-
// args for fuse
- args = nargs;
vec_to_argv(args, argc, argv);
-
// start up tcpmessenger
+ tcpaddr_t nsa;
+ if (tcpmessenger_findns(nsa) < 0) exit(1);
tcpmessenger_init();
tcpmessenger_start();
tcpmessenger_start_rankserver(nsa);