]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
merged r1543:1561 from trunk into branches/sage/mds
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 27 Jul 2007 19:43:35 +0000 (19:43 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 27 Jul 2007 19:43:35 +0000 (19:43 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1562 29311d96-e01e-0410-9327-a35deaab8ce9

18 files changed:
branches/sage/mds/Makefile
branches/sage/mds/client/Client.cc
branches/sage/mds/client/Client.h
branches/sage/mds/client/msgthread.h [deleted file]
branches/sage/mds/common/RWLock.h [new file with mode: 0644]
branches/sage/mds/ebofs/BlockDevice.cc
branches/sage/mds/include/buffer.h
branches/sage/mds/mds/Server.cc
branches/sage/mds/msg/HostMonitor.cc [deleted file]
branches/sage/mds/msg/HostMonitor.h [deleted file]
branches/sage/mds/msg/RWLock.h [deleted file]
branches/sage/mds/msg/SerialMessenger.h [deleted file]
branches/sage/mds/msg/mpistarter.cc [deleted file]
branches/sage/mds/msg/new_mpistarter.cc [deleted file]
branches/sage/mds/osd/rush.cc [deleted file]
branches/sage/mds/osd/rush.h [deleted file]
branches/sage/mds/osd/tp.cc [deleted file]
branches/sage/mds/osdc/ObjectCacher.cc

index 6f5cf9d5fb5bed981ffa4b7470c6fdf7c18fdde6..3840877d6617380d28d7178116b3d9af1768a3bf 100644 (file)
@@ -140,19 +140,6 @@ csyn: csyn.cc client.o osdc.o msg/SimpleMessenger.o common.o
 cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/SimpleMessenger.o common.o
        ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
-activemaster: active/activemaster.cc client.o osdc.o msg/SimpleMessenger.o common.o
-       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-activeslave: active/activeslave.cc client.o osdc.o msg/SimpleMessenger.o common.o
-       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-echotestclient: active/echotestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
-       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-msgtestclient: active/msgtestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
-       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-
 
 
 # misc
index 290b0af500fa93a69b0441ec444fe78e570cf226..5c28abf5906101a1c9ac04370a5041bd2c8f2915 100644 (file)
@@ -601,7 +601,12 @@ MClientReply *Client::make_request(MClientRequest *req,
       dout(10) << "target resend_mds specified as mds" << mds << endl;
     } else {
       mds = choose_target_mds(req);
-      dout(10) << "chose target mds" << mds << " based on hierarchy" << endl;
+      if (mds >= 0) {
+       dout(10) << "chose target mds" << mds << " based on hierarchy" << endl;
+      } else {
+       mds = mdsmap->get_random_in_mds();
+       dout(10) << "chose random target mds" << mds << " for lack of anything better" << endl;
+      }
     }
     
     // open a session?
@@ -1845,7 +1850,7 @@ int Client::lstat(const char *relpath, struct stat *stbuf)
   if (res == 0) {
     assert(in);
     fill_stat(in->inode,stbuf);
-    dout(10) << "stat sez size = " << in->inode.size << " ino = " << stbuf->st_ino << endl;
+    dout(10) << "stat sez size = " << in->inode.size << " mode = " << oct << stbuf->st_mode << dec << " ino = " << stbuf->st_ino << endl;
   }
 
   trim_cache();
@@ -2121,6 +2126,7 @@ int Client::getdir(const char *relpath, map<string,inode_t>& contents)
        }
         
         // contents to caller too!
+       dout(15) << "getdir including " << *pdn << " to " << in->inode.ino << endl;
         contents[*pdn] = in->inode;
       }
       if (dir->is_empty())
@@ -2356,8 +2362,6 @@ int Client::open(const char *relpath, int flags, mode_t mode)
   MClientReply *reply = make_request(req);
   
   assert(reply);
-  dout(3) << "op: open_files[" << reply->get_result() << "] = fh;  // fh = " << reply->get_result() << endl;
-  tout << reply->get_result() << endl;
 
   insert_trace(reply);  
   int result = reply->get_result();
index 9492a3f4517257b03b94749dffad81409555325a..457efca53ef2d75a1f98a08a16fc4eec1c3b8636 100644 (file)
@@ -24,7 +24,6 @@
 #include "msg/Message.h"
 #include "msg/Dispatcher.h"
 #include "msg/Messenger.h"
-#include "msg/SerialMessenger.h"
 
 #include "messages/MClientReply.h"
 
@@ -270,7 +269,8 @@ class Inode {
 
     if (dir_replicated || ino() == 1) {
       //cout << "num_mds is " << mdcluster->get_num_mds() << endl;
-      return rand() % mdsmap->get_num_mds();  // huh.. pick a random mds!
+      return mdsmap->get_random_in_mds();
+      //return rand() % mdsmap->get_num_mds();  // huh.. pick a random mds!
     }
     else
       return authority(mdsmap);
diff --git a/branches/sage/mds/client/msgthread.h b/branches/sage/mds/client/msgthread.h
deleted file mode 100644 (file)
index 1e1af02..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#include "msg/Message.h"
-
-// send the message, expecting no response.  threads other than the
-// MPI thread use this function; if the MPI thread uses this function
-// it could deadlock: this function could wait for the out queue to be
-// emptied, but only the MPI thread can empty it.
-void obfsmpi_send(Message *m)
-
-// send the message to a server and wait for the response.  threads
-// other than the MPI thread use this function.
-Message *obfsmpi_sendrecv(Message *m)
diff --git a/branches/sage/mds/common/RWLock.h b/branches/sage/mds/common/RWLock.h
new file mode 100644 (file)
index 0000000..14e158a
--- /dev/null
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+
+#ifndef _RWLock_Posix_
+#define _RWLock_Posix_
+
+#include <pthread.h>
+
+class RWLock
+{
+  mutable pthread_rwlock_t L;
+
+  public:
+
+  RWLock() {
+    pthread_rwlock_init(&L, NULL);
+  }
+
+  virtual ~RWLock() {
+    pthread_rwlock_unlock(&L);
+    pthread_rwlock_destroy(&L);
+  }
+
+  void unlock() {
+    pthread_rwlock_unlock(&L);
+  }
+  void get_read() {
+    pthread_rwlock_rdlock(&L);    
+  }
+  void put_read() { unlock(); }
+  void get_write() {
+    pthread_rwlock_wrlock(&L);
+  }
+  void put_write() { unlock(); }
+};
+
+#endif // !_Mutex_Posix_
index 2efb4246c92e223d2f6bed07d5c856c040a871ec..8fcb6bf549a8eb77675b06199e20ab88353c35ee 100644 (file)
@@ -273,32 +273,32 @@ block_t BlockDevice::get_num_blocks()
     // ioctl block device
     uint64_t bytes = 0;
     r = ioctl(fd, BLKGETSIZE64, &bytes);
-    num_blocks = bytes / 4096;
+    num_blocks = bytes / (uint64_t)EBOFS_BLOCK_SIZE;
     if (r == 0) {
       dout(1) << "get_num_blocks ioctl BLKGETSIZE64 reports "
-             << bytes << " bytes, " 
-             << num_blocks << " 4k blocks" 
+             << num_blocks << " 4k blocks, " 
+             << bytes << " bytes" 
              << endl;
 #else
     // hrm, try the 32 bit ioctl?
     unsigned long sectors = 0;
     r = ioctl(fd, BLKGETSIZE, &sectors);
-    num_blocks = sectors/8;
+    num_blocks = sectors/8ULL;
+    bytes = sectors*512ULL;
     if (r == 0) {
       dout(1) << "get_num_blocks ioctl BLKGETSIZE reports " << sectors << " sectors, "
-             << num_blocks << " 4k blocks, " << (num_blocks*4096) << " bytes" << endl;
+             << num_blocks << " 4k blocks, " << bytes << " bytes" << endl;
 #endif
     } else {
       // hmm, try stat!
       dout(10) << "get_num_blocks ioctl(2) failed with " << errno << " " << strerror(errno) << ", using stat(2)" << endl;
       struct stat st;
       fstat(fd, &st);
-      num_blocks = st.st_size;
-      dout(1) << "get_num_blocks stat reports " << num_blocks << " 4k blocks, " << (num_blocks*4096) << " bytes" << endl;
+      uint64_t bytes = st.st_size;
+      num_blocks = bytes / EBOFS_BLOCK_SIZE;
+      dout(1) << "get_num_blocks stat reports " << num_blocks << " 4k blocks, " << bytes << " bytes" << endl;
     }
     
-    num_blocks /= (uint64_t)EBOFS_BLOCK_SIZE;
-
     if (g_conf.bdev_fake_mb) {
       num_blocks = g_conf.bdev_fake_mb * 256;
       dout(0) << "faking dev size " << g_conf.bdev_fake_mb << " mb" << endl;
index 5c96f0f006f37d19064627faddbea52691393e24..df5d58f4fbb8684ed026d2415fdcd178e1a03dc4 100644 (file)
@@ -245,6 +245,18 @@ public:
       release();
     }
 
+    void swap(ptr& other) {
+      raw *r = _raw;
+      unsigned o = _off;
+      unsigned l = _len;
+      _raw = other._raw;
+      _off = other._off;
+      _len = other._len;
+      other._raw = r;
+      other._off = o;
+      other._len = l;
+    }
+
     void release() {
       if (_raw) {
        _raw->lock.Lock();
@@ -359,6 +371,14 @@ public:
 
     const std::list<ptr>& buffers() const { return _buffers; }
     
+    void swap(list& other) {
+      unsigned t = _len;
+      _len = other._len;
+      other._len = t;
+      _buffers.swap(other._buffers);
+      append_buffer.swap(other.append_buffer);
+    }
+
     unsigned length() const {
 #if 1
       // DEBUG: verify _len
index 56a7ee69405cf836808f96748105ef69281f324f..3478311061e068ac0ef47f4e381d2d2bd8a418bb 100644 (file)
@@ -1391,7 +1391,9 @@ void Server::handle_client_chmod(MDRequest *mdr)
 
   // project update
   inode_t *pi = cur->project_inode();
-  pi->mode = req->args.chmod.mode & 04777;
+  pi->mode = 
+    (pi->mode & ~04777) | 
+    (req->args.chmod.mode & 04777);
   pi->version = cur->pre_dirty();
   pi->ctime = g_clock.real_now();
 
@@ -1530,9 +1532,9 @@ void Server::handle_client_readdir(MDRequest *mdr)
   int numfiles = encode_dir_contents(dir, inls, dnls);
   
   // . too
-  dnls.push_back(".");
-  inls.push_back(new InodeStat(diri, mds->get_nodeid()));
-  ++numfiles;
+  //dnls.push_back(".");
+  //inls.push_back(new InodeStat(diri, mds->get_nodeid()));
+  //++numfiles;
   
   // yay, reply
   MClientReply *reply = new MClientReply(req);
diff --git a/branches/sage/mds/msg/HostMonitor.cc b/branches/sage/mds/msg/HostMonitor.cc
deleted file mode 100644 (file)
index 969edad..0000000
+++ /dev/null
@@ -1,236 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-
-#include "HostMonitor.h"
-
-#include "msg/Message.h"
-#include "msg/Messenger.h"
-
-#include "messages/MPing.h"
-#include "messages/MPingAck.h"
-#include "messages/MFailure.h"
-#include "messages/MFailureAck.h"
-
-#include "common/Timer.h"
-#include "common/Clock.h"
-
-#define DBL  10
-
-
-#include "config.h"
-#undef dout
-#define  dout(l)    if (l<=g_conf.debug) cout << whoami << " hostmon: "
-
-
-// timer contexts
-
-class C_HM_InitiateHeartbeat : public Context {
-  HostMonitor *hm;
-public:
-  C_HM_InitiateHeartbeat(HostMonitor *hm) {
-     this->hm = hm;
-  }
-  void finish(int r) {
-    //cout << "HEARTBEAT" << endl;
-    hm->pending_events.erase(this);
-    hm->initiate_heartbeat();
-  }
-};
-
-class C_HM_CheckHeartbeat : public Context {
-  HostMonitor *hm;
-public:
-  C_HM_CheckHeartbeat(HostMonitor *hm) {
-    this->hm = hm;
-  }
-  void finish(int r) {
-    //cout << "CHECK" << endl;
-    hm->pending_events.erase(this);
-    hm->check_heartbeat();
-  }
-};
-
-
-
-// startup/shutdown
-
-void HostMonitor::init()
-{
-  dout(DBL) << "init" << endl;
-
-  // hack params for now
-  heartbeat_interval = 10;
-  max_ping_time = 2;
-  max_heartbeat_misses = 3;
-  notify_retry_interval = 10;
-  
-  // schedule first hb
-  schedule_heartbeat();
-}
-
-
-void HostMonitor::shutdown()
-{
-  // cancel any events
-  for (set<Context*>::iterator it = pending_events.begin();
-       it != pending_events.end();
-       it++) {
-    g_timer.cancel_event(*it);
-    delete *it;
-  }
-  pending_events.clear();
-}
-
-
-// schedule next heartbeat
-
-void HostMonitor::schedule_heartbeat()
-{
-  dout(DBL) << "schedule_heartbeat" << endl;
-  Context *e = new C_HM_InitiateHeartbeat(this);
-  pending_events.insert(e);
-  g_timer.add_event_after(heartbeat_interval, e);
-}
-
-
-// take note of a live host
-
-void HostMonitor::host_is_alive(entity_name_t host)
-{
-  if (hosts.count(host))
-    status[host].last_heard_from = g_clock.gettime();
-}
-
-
-// do heartbeat
-
-void HostMonitor::initiate_heartbeat()
-{
-  time_t now = g_clock.gettime();
-  
-  // send out pings
-  inflight_pings.clear();
-  for (set<entity_name_t>::iterator it = hosts.begin();
-       it != hosts.end();
-       it++) {
-    // have i heard from them recently?
-    if (now - status[*it].last_heard_from < heartbeat_interval) {
-      dout(DBL) << "skipping " << *it << ", i heard from them recently" << endl;
-    } else {
-      dout(DBL) << "pinging " << *it << endl;
-      status[*it].last_pinged = now;
-      inflight_pings.insert(*it);
-
-      messenger->send_message(new MPing(1), *it, 0);
-    }
-  }
-  
-  // set timer to check results
-  Context *e = new C_HM_CheckHeartbeat(this);
-  pending_events.insert(e);
-  g_timer.add_event_after(max_ping_time, e);
-  dout(10) << "scheduled check " << e << endl;
-
-  schedule_heartbeat();  // schedule next heartbeat
-}
-
-
-// check results
-
-void HostMonitor::check_heartbeat()
-{
-  dout(DBL) << "check_heartbeat()" << endl;
-
-  // check inflight pings
-  for (set<entity_name_t>::iterator it = inflight_pings.begin();
-       it != inflight_pings.end();
-       it++) {
-    status[*it].num_heartbeats_missed++;
-
-    dout(DBL) << "no response from " << *it << " for " << status[*it].num_heartbeats_missed << " beats" << endl;
-    
-    if (status[*it].num_heartbeats_missed >= max_heartbeat_misses) {
-      if (acked_failures.count(*it)) {
-        dout(DBL) << *it << " is already failed" << endl;
-      } else {
-        if (unacked_failures.count(*it)) {
-          dout(DBL) << *it << " is already failed, but unacked, sending another failure message" << endl;
-        } else {
-          dout(DBL) << "failing " << *it << endl;
-          unacked_failures.insert(*it);
-        }
-        
-        /*if (false)   // do this in NewMessenger for now!  FIXME
-        for (set<msg_addr_t>::iterator nit = notify.begin();
-             nit != notify.end();
-             nit++) {
-          messenger->send_message(new MFailure(*it, messenger->get_inst(*it)),
-                                  *nit, notify_port, 0);
-        }
-        */
-      }
-    }
-  }
-  // forget about the pings.
-  inflight_pings.clear();
-}
-
-
-// incoming messages
-
-void HostMonitor::proc_message(Message *m)
-{
-  switch (m->get_type()) {
-
-  case MSG_PING_ACK:
-    handle_ping_ack((MPingAck*)m);
-    break;
-
-  case MSG_FAILURE_ACK:
-    handle_failure_ack((MFailureAck*)m);
-    break;
-
-  }
-}
-
-void HostMonitor::handle_ping_ack(MPingAck *m)
-{
-  entity_name_t from = m->get_source();
-
-  dout(DBL) << "ping ack from " << from << endl;
-  status[from].last_pinged = g_clock.gettime();
-  status[from].num_heartbeats_missed = 0;
-  inflight_pings.erase(from);
-
-  delete m;
-}
-
-void HostMonitor::handle_failure_ack(MFailureAck *m)
-{
-
-  // FIXME: this doesn't handle failed -> alive transitions gracefully at all..
-
-  // the higher-up's acknowledged our failure notification, we can stop resending it.
-  entity_name_t failed = m->get_failed();
-  dout(DBL) << "handle_failure_ack " << failed << endl;
-  unacked_failures.erase(failed);
-  acked_failures.insert(failed);
-
-  delete m;
-}
-
-
diff --git a/branches/sage/mds/msg/HostMonitor.h b/branches/sage/mds/msg/HostMonitor.h
deleted file mode 100644 (file)
index 35334b7..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __HOSTMONITOR_H
-#define __HOSTMONITOR_H
-
-#include <time.h>
-
-#include <map>
-#include <set>
-using namespace std;
-
-#include "include/Context.h"
-#include "msg/Message.h"
-
-class Message;
-class Messenger;
-
-typedef struct {
-  time_t last_heard_from;
-  time_t last_pinged;
-  int    num_heartbeats_missed;
-} monitor_rec_t;
-
-class HostMonitor {
-  Messenger *messenger;
-  string whoami;
-
-  // hosts i monitor
-  set<entity_name_t>  hosts;
-
-  // who i tell when they fail
-  set<entity_name_t>  notify;
-  int              notify_port;
-
-  // their status
-  map<entity_name_t,monitor_rec_t>  status;
-
-  set<entity_name_t>  inflight_pings;    // pings we sent that haven't replied yet
-
-  set<entity_name_t>  unacked_failures;  // failed hosts that haven't been acked yet.
-  set<entity_name_t>  acked_failures;    // these failures have been acked.
-
-  float heartbeat_interval;    // how often to do a heartbeat
-  float max_ping_time;         // how long before it's a miss
-  int   max_heartbeat_misses;  // how many misses before i tell
-  float notify_retry_interval; // how often to retry failure notification
-
- public:
-  set<Context*>  pending_events;
-
- private:
-  void schedule_heartbeat();
-
- public:
-  HostMonitor(Messenger *m, string& whoami) {
-    this->messenger = m;
-    this->whoami = whoami;
-    notify_port = 0;
-  }
-  set<entity_name_t>& get_hosts() { return hosts; }
-  set<entity_name_t>& get_notify() { return notify; }
-  void set_notify_port(int p) { notify_port = p; }
-
-  void remove_host(entity_name_t h) {
-    hosts.erase(h);
-    status.erase(h);
-    unacked_failures.erase(h);
-    acked_failures.erase(h);
-  }
-
-  void init();
-  void shutdown();
-  
-  void host_is_alive(entity_name_t who);
-
-  void proc_message(Message *m);
-  void handle_ping_ack(class MPingAck *m);
-  void handle_failure_ack(class MFailureAck *m);
-
-  void initiate_heartbeat();
-  void check_heartbeat();
-
-};
-
-#endif
diff --git a/branches/sage/mds/msg/RWLock.h b/branches/sage/mds/msg/RWLock.h
deleted file mode 100644 (file)
index 14e158a..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-
-#ifndef _RWLock_Posix_
-#define _RWLock_Posix_
-
-#include <pthread.h>
-
-class RWLock
-{
-  mutable pthread_rwlock_t L;
-
-  public:
-
-  RWLock() {
-    pthread_rwlock_init(&L, NULL);
-  }
-
-  virtual ~RWLock() {
-    pthread_rwlock_unlock(&L);
-    pthread_rwlock_destroy(&L);
-  }
-
-  void unlock() {
-    pthread_rwlock_unlock(&L);
-  }
-  void get_read() {
-    pthread_rwlock_rdlock(&L);    
-  }
-  void put_read() { unlock(); }
-  void get_write() {
-    pthread_rwlock_wrlock(&L);
-  }
-  void put_write() { unlock(); }
-};
-
-#endif // !_Mutex_Posix_
diff --git a/branches/sage/mds/msg/SerialMessenger.h b/branches/sage/mds/msg/SerialMessenger.h
deleted file mode 100644 (file)
index c17553e..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __SERIAL_MESSENGER_H
-#define __SERIAL_MESSENGER_H
-
-#include "Dispatcher.h"
-#include "Message.h"
-
-class SerialMessenger : public Dispatcher {
- public:
-  virtual void dispatch(Message *m) = 0;      // i receive my messages here
-  virtual void send(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0;          // doesn't block
-  virtual Message *sendrecv(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0;  // blocks for matching reply
-};
-
-#endif
diff --git a/branches/sage/mds/msg/mpistarter.cc b/branches/sage/mds/msg/mpistarter.cc
deleted file mode 100644 (file)
index 685c104..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-
-#include <mpi.h>
-
-#include "TCPMessenger.h"
-
-/*
- * start up TCPMessenger via MPI.
- */ 
-
-pair<int,int> mpi_bootstrap_tcp(int& argc, char**& argv)
-{
-  tcpmessenger_init();
-  tcpmessenger_start();
-
-  // exchnage addresses with other nodes
-  MPI_Init(&argc, &argv);
-  
-  int mpi_world;
-  int mpi_rank;
-  MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
-  MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
-
-  //dout(1) << "i am " << mpi_rank << " of " << mpi_world << endl;
-  
-  // start up directory?
-  tcpaddr_t ta;
-  if (mpi_rank == 0) {
-    dout(30) << "i am rank 0, starting ns directory" << endl;
-    tcpmessenger_start_nameserver(ta);
-  } else {
-    memset(&ta, 0, sizeof(ta));
-  }
-
-  // distribute tcpaddr
-  int r = MPI_Bcast(&ta, sizeof(ta), MPI_CHAR,
-                    0, MPI_COMM_WORLD);
-
-  dout(30) << "r = " << r << " ns tcpaddr is " << ta << endl;
-  tcpmessenger_start_rankserver(ta);
-  
-  MPI_Barrier(MPI_COMM_WORLD);
-  //g_clock.tare();
-  MPI_Finalize();
-
-  return pair<int,int>(mpi_rank, mpi_world);
-}
-
-
diff --git a/branches/sage/mds/msg/new_mpistarter.cc b/branches/sage/mds/msg/new_mpistarter.cc
deleted file mode 100644 (file)
index 72adcf9..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-#include <mpi.h>
-#include "NewMessenger.h"
-
-/*
- * start up NewMessenger via MPI.
- */ 
-
-pair<int,int> mpi_bootstrap_new(int& argc, char**& argv)
-{
-  MPI_Init(&argc, &argv);
-  
-  int mpi_world;
-  int mpi_rank;
-  MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
-  MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
-
-  tcpaddr_t nsaddr;
-  memset(&nsaddr, 0, sizeof(nsaddr));
-  
-  if (mpi_rank == 0) {
-    // i am root.
-    rank.my_rank = 0;  
-    rank.start_rank(nsaddr);
-    nsaddr = rank.get_listen_addr();
-  }
-
-  int r = MPI_Bcast(&nsaddr, sizeof(nsaddr), MPI_CHAR,
-                    0, MPI_COMM_WORLD);
-
-  dout(30) << "r = " << r << " ns tcpaddr is " << nsaddr << endl;
-
-  if (mpi_rank != 0) {
-    rank.start_rank(nsaddr);
-  }
-
-  MPI_Barrier(MPI_COMM_WORLD);
-
-  //g_clock.tare();
-
-  MPI_Finalize();
-
-  return pair<int,int>(mpi_rank, mpi_world);
-}
diff --git a/branches/sage/mds/osd/rush.cc b/branches/sage/mds/osd/rush.cc
deleted file mode 100644 (file)
index 733d71a..0000000
+++ /dev/null
@@ -1,231 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-//
-//
-//    rush.cc
-//
-// $Id$
-//
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <cassert>
-#include "rush.h"
-
-
-static
-unsigned int
-myhash (unsigned int n)
-{
-  unsigned int v = (n ^ 0xdead1234) * (884811920 * 3  + 1);
-  return (v);
-}
-
-Rush::Rush ()
-{
-  nClusters = 0;
-  totalServers = 0;
-}
-
-//----------------------------------------------------------------------
-//
-//    Rush::AddCluster
-//
-//    Add a cluster.  The number of servers in the cluster and
-//    the weight of each server is passed.  The current number of
-//    clusters is returned.
-//
-//----------------------------------------------------------------------
-int
-Rush::AddCluster (int nServers, double weight)
-{
-  clusterSize[nClusters] = nServers;
-  clusterWeight[nClusters] = weight;
-  if (nClusters == 0) {
-    serversInPrevious[0] = 0;
-    totalWeightBefore[0] = 0.0;
-  } else {
-    serversInPrevious[nClusters] = serversInPrevious[nClusters-1] +
-      clusterSize[nClusters-1];
-    totalWeightBefore[nClusters] =
-      totalWeightBefore[nClusters-1] + (double)clusterSize[nClusters-1] *
-      clusterWeight[nClusters-1];
-  }
-  nClusters += 1;
-  totalServers += nServers;
-#if 0
-  for (int i = 0; i < nClusters; i++) {
-    fprintf (stderr, "size=%-3d prev=%-3d weight=%-6.2f prevWeight=%-8.2f\n",
-        clusterSize[i], serversInPrevious[i], clusterWeight[i],
-        totalWeightBefore[i]);
-  }
-#endif
-  return (nClusters);
-}
-
-
-//----------------------------------------------------------------------
-//
-//    Rush::GetServersByKey
-//
-//    This function returns a list of servers on which an object
-//    should be placed.  The servers array must be large enough to
-//    contain the list.
-//
-//----------------------------------------------------------------------
-void
-Rush::GetServersByKey (int key, int nReplicas, int servers[])
-{
-  int    replicasLeft = nReplicas;
-  int    cluster;
-  int    mustAssign, numberAssigned;
-  int    i, toDraw;
-  int    *srv = servers;
-  double    myWeight;
-  RushRNG    rng;
-
-  // There may not be more replicas than servers!
-  assert (nReplicas <= totalServers);
-  
-  for (cluster = nClusters-1; (cluster >= 0) && (replicasLeft > 0); cluster--) {
-    if (serversInPrevious[cluster] < replicasLeft) {
-      mustAssign = replicasLeft - serversInPrevious[cluster];
-    } else {
-      mustAssign = 0;
-    }
-    toDraw = replicasLeft - mustAssign;
-    if (toDraw > (clusterSize[cluster] - mustAssign)) {
-      toDraw = clusterSize[cluster] - mustAssign;
-    }
-    myWeight = (double)clusterSize[cluster] * clusterWeight[cluster];
-    rng.Seed (myhash (key)^cluster, cluster^0xb90738);
-    numberAssigned = mustAssign +
-      rng.HyperGeometricWeighted (toDraw, myWeight,
-                                  totalWeightBefore[cluster] + myWeight,
-                                  clusterWeight[cluster]);
-    if (numberAssigned > 0) {
-      rng.Seed (myhash (key)^cluster ^ 11, cluster^0xfea937);
-      rng.DrawKofN (srv, numberAssigned, clusterSize[cluster]);
-      for (i = 0; i < numberAssigned; i++) {
-        srv[i] += serversInPrevious[cluster];
-      }
-      replicasLeft -= numberAssigned;
-      srv += numberAssigned;
-    }
-  }
-}
-
-\f
-
-//----------------------------------------------------------------------
-//
-//    RushRNG::HyperGeometricWeighted
-//
-//    Use an iterative method to generate a hypergeometric random
-//    variable.  This approach guarantees that, if the number of draws
-//    is reduced, the number of successes must be as well as long as
-//    the seed for the RNG is the same.
-//
-//----------------------------------------------------------------------
-int
-RushRNG::HyperGeometricWeighted (int nDraws, double yesWeighted,
-                 double totalWeighted, double weightOne)
-{
-  int    positives = 0, i;
-  double    curRand;
-
-  // If the weight is too small (or is negative), choose zero objects.
-  if (weightOne <= 1e-9 || nDraws == 0) {
-    return (0);
-  }
-
-  // Draw nDraws items from the "bag".  For each positive, subtract off
-  // the weight of an object from the weight of positives remaining.  For
-  // each draw, subtract off the weight of an object from the total weight
-  // remaining.
-  for (i = 0; i < nDraws; i++) {
-    curRand = RandomDouble ();
-    if (curRand < (yesWeighted / totalWeighted)) {
-      positives += 1;
-      yesWeighted -= weightOne;
-    }
-    totalWeighted -= weightOne;
-  }
-  return (positives);
-}
-
-//----------------------------------------------------------------------
-//
-//    RushRNG::DrawKofN
-//
-//----------------------------------------------------------------------
-void
-RushRNG::DrawKofN (int vals[], int nToDraw, int setSize)
-{
-  int    deck[setSize];
-  int    i, pick;
-
-  assert(nToDraw <= setSize);
-
-  for (i = 0; i < setSize; i++) {
-    deck[i] = i;
-  }
-
-  for (i = 0; i < nToDraw; i++) {
-    pick = (int)(RandomDouble () * (double)(setSize - i));
-    if (pick >= setSize-i) pick = setSize-i-1;  // in case
-    //    assert(i >= 0 && i < nToDraw);
-    //    assert(pick >= 0 && pick < setSize);
-    vals[i] = deck[pick];
-    deck[pick] = deck[setSize-i-1];
-  }
-}
-
-#define    SEED_X 521288629
-#define    SEED_Y 362436069
-RushRNG::RushRNG ()
-{
-  Seed (0, 0);
-}
-
-void
-RushRNG::Seed (unsigned int seed1, unsigned int seed2)
-{
-  state1 = ((seed1 == 0) ? SEED_X : seed1);
-  state2 = ((seed2 == 0) ? SEED_Y : seed2);
-}
-
-unsigned int
-RushRNG::RandomInt ()
-{
-  const unsigned int a = 18000;
-  const unsigned int b = 18879;
-  unsigned int    rndValue;
-
-  state1 = a * (state1 & 0xffff) + (state1 >> 16);
-  state2 = b * (state2 & 0xffff) + (state2 >> 16);
-  rndValue = (state1 << 16) + (state2 & 0xffff);
-  return (rndValue);
-}
-
-double
-RushRNG::RandomDouble ()
-{
-  double    v;
-
-  v = (double)RandomInt() / (65536.0*65536.0);
-  return (v);
-}
diff --git a/branches/sage/mds/osd/rush.h b/branches/sage/mds/osd/rush.h
deleted file mode 100644 (file)
index 4b43e1a..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-//
-//
-//    rush.h
-//
-//    Classes and definitions for the RUSH algorithm.
-//
-// $Id$
-//
-//
-
-#ifndef    _rush_h_
-#define    _rush_h_
-
-#define    RUSH_MAX_CLUSTERS    100
-
-class RushRNG {
-public:
-  unsigned int    RandomInt ();
-  double RandomDouble ();
-  void    Seed (unsigned int a, unsigned int b);
-  int    HyperGeometricWeighted (int nDraws, double yesWeighted,
-                double totalWeighted, double weightOne);
-  void    DrawKofN (int vals[], int nToDraw, int setSize);
-  RushRNG();
-private:
-  unsigned int state1, state2;
-};
-
-class Rush {
-public:
-  void    GetServersByKey (int key, int nReplicas, int servers[]);
-  int    AddCluster (int nServers, double weight);
-  int    Clusters () {return (nClusters);}
-  int    Servers () {return (totalServers);}
-  Rush ();
-private:
-  int    DrawKofN (int *servers, int n, int clusterSize, RushRNG *g);
-  int    nClusters;
-  int    totalServers;
-  int    clusterSize[RUSH_MAX_CLUSTERS];
-  int    serversInPrevious[RUSH_MAX_CLUSTERS];
-  double clusterWeight[RUSH_MAX_CLUSTERS];
-  double totalWeightBefore[RUSH_MAX_CLUSTERS];
-};
-
-#endif    /* _rush_h_ */
diff --git a/branches/sage/mds/osd/tp.cc b/branches/sage/mds/osd/tp.cc
deleted file mode 100644 (file)
index b52e9a6..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-
-#include <iostream>
-#include <string>
-
-using namespace std;
-
-#include "common/Mutex.h"
-#include "common/ThreadPool.h"
-// #include <thread.h>
-
-class Op {
-  int i;
-
-public:
-  Op(int i)
-  {
-    this->i = i;
-  }
-
-  int get()
-  {
-    return i;
-  }
-};
-
-void foop(class TP *t, class Op *o);
-
-class TP {
-public:
-
-  void foo(Op *o)
-  {
-    cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
-    usleep(1);
-    
-    //  sched_yield();
-  }
-
-  int main(int argc, char *argv)
-  {
-    ThreadPool<TP,Op> *t = new ThreadPool<TP,Op>(10, (void (*)(TP*, Op*))foop, this);
-    
-    for(int i = 0; i < 100; i++) {
-      Op *o = new Op(i); 
-      t->put_op(o);
-    }
-    
-    sleep(1);
-    
-    delete(t);
-    
-    return 0;
-  }
-};
-
-void foop(class TP *t, class Op *o) {
-  t->foo(o);
-}
-
-int main(int argc, char *argv) {
-  TP t;
-
-  t.main(argc,argv);
-}
-
index f8d7d970c453e0f51c488ca51977f66421c4697e..592c8116b5b32d0a902330bb5f846074b5b6ad61 100644 (file)
 #define dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".objectcacher.object(" << oid << ") "
 
 
-ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
+ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, off_t off)
 {
-  dout(20) << "split " << *bh << " at " << off << endl;
+  dout(20) << "split " << *left << " at " << off << endl;
   
   // split off right
   ObjectCacher::BufferHead *right = new BufferHead(this);
-  right->last_write_tid = bh->last_write_tid;
-  right->set_state(bh->get_state());
+  right->last_write_tid = left->last_write_tid;
+  right->set_state(left->get_state());
   
-  off_t newleftlen = off - bh->start();
-  right->set_start( off );
-  right->set_length( bh->length() - newleftlen );
+  off_t newleftlen = off - left->start();
+  right->set_start(off);
+  right->set_length(left->length() - newleftlen);
   
   // shorten left
-  oc->bh_stat_sub(bh);
-  bh->set_length( newleftlen );
-  oc->bh_stat_add(bh);
+  oc->bh_stat_sub(left);
+  left->set_length(newleftlen);
+  oc->bh_stat_add(left);
   
   // add right
   oc->bh_add(this, right);
   
   // split buffers too
   bufferlist bl;
-  bl.claim(bh->bl);
+  bl.claim(left->bl);
   if (bl.length()) {
-    assert(bl.length() == (bh->length() + right->length()));
-    right->bl.substr_of(bl, bh->length(), right->length());
-    bh->bl.substr_of(bl, 0, bh->length());
+    assert(bl.length() == (left->length() + right->length()));
+    right->bl.substr_of(bl, left->length(), right->length());
+    left->bl.substr_of(bl, 0, left->length());
   }
   
   // move read waiters
-  if (!bh->waitfor_read.empty()) {
-    map<off_t, list<Context*> >::iterator o, p = bh->waitfor_read.end();
+  if (!left->waitfor_read.empty()) {
+    map<off_t, list<Context*> >::iterator o, p = left->waitfor_read.end();
     p--;
-    while (p != bh->waitfor_read.begin()) {
+    while (p != left->waitfor_read.begin()) {
       if (p->first < right->start()) break;      
       dout(0) << "split  moving waiters at byte " << p->first << " to right bh" << endl;
       right->waitfor_read[p->first].swap( p->second );
       o = p;
       p--;
-      bh->waitfor_read.erase(o);
+      left->waitfor_read.erase(o);
     }
   }
   
-  dout(20) << "split    left is " << *bh << endl;
+  dout(20) << "split    left is " << *left << endl;
   dout(20) << "split   right is " << *right << endl;
   return right;
 }
@@ -850,11 +850,18 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
       size_t bhoff = bh->start() - opos;
       assert(f_it->second <= bh->length() - bhoff);
 
+      // get the frag we're mapping in
       bufferlist frag; 
       frag.substr_of(wr->bl, 
                      f_it->first, f_it->second);
 
-      bh->bl.claim_append(frag);
+      // keep anything left of bhoff
+      bufferlist newbl;
+      if (bhoff)
+       newbl.substr_of(bh->bl, 0, bhoff);
+      newbl.claim_append(frag);
+      bh->bl.swap(newbl);
+
       opos += f_it->second;
     }
 
@@ -866,18 +873,21 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
     // recombine with left?
     map<off_t,BufferHead*>::iterator p = o->data.find(bh->start());
     if (p != o->data.begin()) {
+      assert(p->second == bh);
       p--;
       if (p->second->is_dirty()) {
-        o->merge_left(p->second,bh);
+        o->merge_left(p->second, bh);
         bh = p->second;
       }
     }
     // right?
-    p = o->data.find(bh->start());
-    p++;
-    if (p != o->data.end() &&
-        p->second->is_dirty()) 
-      o->merge_left(p->second,bh);
+    while (1) {
+      p = o->data.find(bh->start());
+      assert(p->second == bh);
+      p++;
+      if (p == o->data.end() || !p->second->is_dirty()) break;
+      o->merge_left(bh, p->second);
+    }
   }
 
   delete wr;