]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
some housecleaning
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 24 Jul 2007 21:54:02 +0000 (21:54 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 24 Jul 2007 21:54:02 +0000 (21:54 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1546 29311d96-e01e-0410-9327-a35deaab8ce9

13 files changed:
trunk/ceph/Makefile
trunk/ceph/client/Client.h
trunk/ceph/client/msgthread.h [deleted file]
trunk/ceph/common/RWLock.h [new file with mode: 0644]
trunk/ceph/msg/HostMonitor.cc [deleted file]
trunk/ceph/msg/HostMonitor.h [deleted file]
trunk/ceph/msg/RWLock.h [deleted file]
trunk/ceph/msg/SerialMessenger.h [deleted file]
trunk/ceph/msg/mpistarter.cc [deleted file]
trunk/ceph/msg/new_mpistarter.cc [deleted file]
trunk/ceph/osd/rush.cc [deleted file]
trunk/ceph/osd/rush.h [deleted file]
trunk/ceph/osd/tp.cc [deleted file]

index 6f5cf9d5fb5bed981ffa4b7470c6fdf7c18fdde6..3840877d6617380d28d7178116b3d9af1768a3bf 100644 (file)
@@ -140,19 +140,6 @@ csyn: csyn.cc client.o osdc.o msg/SimpleMessenger.o common.o
 cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/SimpleMessenger.o common.o
        ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
-activemaster: active/activemaster.cc client.o osdc.o msg/SimpleMessenger.o common.o
-       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-activeslave: active/activeslave.cc client.o osdc.o msg/SimpleMessenger.o common.o
-       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-echotestclient: active/echotestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
-       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-msgtestclient: active/msgtestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
-       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-
 
 
 # misc
index 9492a3f4517257b03b94749dffad81409555325a..fff60ed96b33f8045f51c405c318b43c47bc30b6 100644 (file)
@@ -24,7 +24,6 @@
 #include "msg/Message.h"
 #include "msg/Dispatcher.h"
 #include "msg/Messenger.h"
-#include "msg/SerialMessenger.h"
 
 #include "messages/MClientReply.h"
 
diff --git a/trunk/ceph/client/msgthread.h b/trunk/ceph/client/msgthread.h
deleted file mode 100644 (file)
index 1e1af02..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#include "msg/Message.h"
-
-// send the message, expecting no response.  threads other than the
-// MPI thread use this function; if the MPI thread uses this function
-// it could deadlock: this function could wait for the out queue to be
-// emptied, but only the MPI thread can empty it.
-void obfsmpi_send(Message *m)
-
-// send the message to a server and wait for the response.  threads
-// other than the MPI thread use this function.
-Message *obfsmpi_sendrecv(Message *m)
diff --git a/trunk/ceph/common/RWLock.h b/trunk/ceph/common/RWLock.h
new file mode 100644 (file)
index 0000000..14e158a
--- /dev/null
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+
+#ifndef _RWLock_Posix_
+#define _RWLock_Posix_
+
+#include <pthread.h>
+
+class RWLock
+{
+  mutable pthread_rwlock_t L;
+
+  public:
+
+  RWLock() {
+    pthread_rwlock_init(&L, NULL);
+  }
+
+  virtual ~RWLock() {
+    pthread_rwlock_unlock(&L);
+    pthread_rwlock_destroy(&L);
+  }
+
+  void unlock() {
+    pthread_rwlock_unlock(&L);
+  }
+  void get_read() {
+    pthread_rwlock_rdlock(&L);    
+  }
+  void put_read() { unlock(); }
+  void get_write() {
+    pthread_rwlock_wrlock(&L);
+  }
+  void put_write() { unlock(); }
+};
+
+#endif // !_Mutex_Posix_
diff --git a/trunk/ceph/msg/HostMonitor.cc b/trunk/ceph/msg/HostMonitor.cc
deleted file mode 100644 (file)
index 969edad..0000000
+++ /dev/null
@@ -1,236 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-
-#include "HostMonitor.h"
-
-#include "msg/Message.h"
-#include "msg/Messenger.h"
-
-#include "messages/MPing.h"
-#include "messages/MPingAck.h"
-#include "messages/MFailure.h"
-#include "messages/MFailureAck.h"
-
-#include "common/Timer.h"
-#include "common/Clock.h"
-
-#define DBL  10
-
-
-#include "config.h"
-#undef dout
-#define  dout(l)    if (l<=g_conf.debug) cout << whoami << " hostmon: "
-
-
-// timer contexts
-
-class C_HM_InitiateHeartbeat : public Context {
-  HostMonitor *hm;
-public:
-  C_HM_InitiateHeartbeat(HostMonitor *hm) {
-     this->hm = hm;
-  }
-  void finish(int r) {
-    //cout << "HEARTBEAT" << endl;
-    hm->pending_events.erase(this);
-    hm->initiate_heartbeat();
-  }
-};
-
-class C_HM_CheckHeartbeat : public Context {
-  HostMonitor *hm;
-public:
-  C_HM_CheckHeartbeat(HostMonitor *hm) {
-    this->hm = hm;
-  }
-  void finish(int r) {
-    //cout << "CHECK" << endl;
-    hm->pending_events.erase(this);
-    hm->check_heartbeat();
-  }
-};
-
-
-
-// startup/shutdown
-
-void HostMonitor::init()
-{
-  dout(DBL) << "init" << endl;
-
-  // hack params for now
-  heartbeat_interval = 10;
-  max_ping_time = 2;
-  max_heartbeat_misses = 3;
-  notify_retry_interval = 10;
-  
-  // schedule first hb
-  schedule_heartbeat();
-}
-
-
-void HostMonitor::shutdown()
-{
-  // cancel any events
-  for (set<Context*>::iterator it = pending_events.begin();
-       it != pending_events.end();
-       it++) {
-    g_timer.cancel_event(*it);
-    delete *it;
-  }
-  pending_events.clear();
-}
-
-
-// schedule next heartbeat
-
-void HostMonitor::schedule_heartbeat()
-{
-  dout(DBL) << "schedule_heartbeat" << endl;
-  Context *e = new C_HM_InitiateHeartbeat(this);
-  pending_events.insert(e);
-  g_timer.add_event_after(heartbeat_interval, e);
-}
-
-
-// take note of a live host
-
-void HostMonitor::host_is_alive(entity_name_t host)
-{
-  if (hosts.count(host))
-    status[host].last_heard_from = g_clock.gettime();
-}
-
-
-// do heartbeat
-
-void HostMonitor::initiate_heartbeat()
-{
-  time_t now = g_clock.gettime();
-  
-  // send out pings
-  inflight_pings.clear();
-  for (set<entity_name_t>::iterator it = hosts.begin();
-       it != hosts.end();
-       it++) {
-    // have i heard from them recently?
-    if (now - status[*it].last_heard_from < heartbeat_interval) {
-      dout(DBL) << "skipping " << *it << ", i heard from them recently" << endl;
-    } else {
-      dout(DBL) << "pinging " << *it << endl;
-      status[*it].last_pinged = now;
-      inflight_pings.insert(*it);
-
-      messenger->send_message(new MPing(1), *it, 0);
-    }
-  }
-  
-  // set timer to check results
-  Context *e = new C_HM_CheckHeartbeat(this);
-  pending_events.insert(e);
-  g_timer.add_event_after(max_ping_time, e);
-  dout(10) << "scheduled check " << e << endl;
-
-  schedule_heartbeat();  // schedule next heartbeat
-}
-
-
-// check results
-
-void HostMonitor::check_heartbeat()
-{
-  dout(DBL) << "check_heartbeat()" << endl;
-
-  // check inflight pings
-  for (set<entity_name_t>::iterator it = inflight_pings.begin();
-       it != inflight_pings.end();
-       it++) {
-    status[*it].num_heartbeats_missed++;
-
-    dout(DBL) << "no response from " << *it << " for " << status[*it].num_heartbeats_missed << " beats" << endl;
-    
-    if (status[*it].num_heartbeats_missed >= max_heartbeat_misses) {
-      if (acked_failures.count(*it)) {
-        dout(DBL) << *it << " is already failed" << endl;
-      } else {
-        if (unacked_failures.count(*it)) {
-          dout(DBL) << *it << " is already failed, but unacked, sending another failure message" << endl;
-        } else {
-          dout(DBL) << "failing " << *it << endl;
-          unacked_failures.insert(*it);
-        }
-        
-        /*if (false)   // do this in NewMessenger for now!  FIXME
-        for (set<msg_addr_t>::iterator nit = notify.begin();
-             nit != notify.end();
-             nit++) {
-          messenger->send_message(new MFailure(*it, messenger->get_inst(*it)),
-                                  *nit, notify_port, 0);
-        }
-        */
-      }
-    }
-  }
-  // forget about the pings.
-  inflight_pings.clear();
-}
-
-
-// incoming messages
-
-void HostMonitor::proc_message(Message *m)
-{
-  switch (m->get_type()) {
-
-  case MSG_PING_ACK:
-    handle_ping_ack((MPingAck*)m);
-    break;
-
-  case MSG_FAILURE_ACK:
-    handle_failure_ack((MFailureAck*)m);
-    break;
-
-  }
-}
-
-void HostMonitor::handle_ping_ack(MPingAck *m)
-{
-  entity_name_t from = m->get_source();
-
-  dout(DBL) << "ping ack from " << from << endl;
-  status[from].last_pinged = g_clock.gettime();
-  status[from].num_heartbeats_missed = 0;
-  inflight_pings.erase(from);
-
-  delete m;
-}
-
-void HostMonitor::handle_failure_ack(MFailureAck *m)
-{
-
-  // FIXME: this doesn't handle failed -> alive transitions gracefully at all..
-
-  // the higher-up's acknowledged our failure notification, we can stop resending it.
-  entity_name_t failed = m->get_failed();
-  dout(DBL) << "handle_failure_ack " << failed << endl;
-  unacked_failures.erase(failed);
-  acked_failures.insert(failed);
-
-  delete m;
-}
-
-
diff --git a/trunk/ceph/msg/HostMonitor.h b/trunk/ceph/msg/HostMonitor.h
deleted file mode 100644 (file)
index 35334b7..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __HOSTMONITOR_H
-#define __HOSTMONITOR_H
-
-#include <time.h>
-
-#include <map>
-#include <set>
-using namespace std;
-
-#include "include/Context.h"
-#include "msg/Message.h"
-
-class Message;
-class Messenger;
-
-typedef struct {
-  time_t last_heard_from;
-  time_t last_pinged;
-  int    num_heartbeats_missed;
-} monitor_rec_t;
-
-class HostMonitor {
-  Messenger *messenger;
-  string whoami;
-
-  // hosts i monitor
-  set<entity_name_t>  hosts;
-
-  // who i tell when they fail
-  set<entity_name_t>  notify;
-  int              notify_port;
-
-  // their status
-  map<entity_name_t,monitor_rec_t>  status;
-
-  set<entity_name_t>  inflight_pings;    // pings we sent that haven't replied yet
-
-  set<entity_name_t>  unacked_failures;  // failed hosts that haven't been acked yet.
-  set<entity_name_t>  acked_failures;    // these failures have been acked.
-
-  float heartbeat_interval;    // how often to do a heartbeat
-  float max_ping_time;         // how long before it's a miss
-  int   max_heartbeat_misses;  // how many misses before i tell
-  float notify_retry_interval; // how often to retry failure notification
-
- public:
-  set<Context*>  pending_events;
-
- private:
-  void schedule_heartbeat();
-
- public:
-  HostMonitor(Messenger *m, string& whoami) {
-    this->messenger = m;
-    this->whoami = whoami;
-    notify_port = 0;
-  }
-  set<entity_name_t>& get_hosts() { return hosts; }
-  set<entity_name_t>& get_notify() { return notify; }
-  void set_notify_port(int p) { notify_port = p; }
-
-  void remove_host(entity_name_t h) {
-    hosts.erase(h);
-    status.erase(h);
-    unacked_failures.erase(h);
-    acked_failures.erase(h);
-  }
-
-  void init();
-  void shutdown();
-  
-  void host_is_alive(entity_name_t who);
-
-  void proc_message(Message *m);
-  void handle_ping_ack(class MPingAck *m);
-  void handle_failure_ack(class MFailureAck *m);
-
-  void initiate_heartbeat();
-  void check_heartbeat();
-
-};
-
-#endif
diff --git a/trunk/ceph/msg/RWLock.h b/trunk/ceph/msg/RWLock.h
deleted file mode 100644 (file)
index 14e158a..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-
-#ifndef _RWLock_Posix_
-#define _RWLock_Posix_
-
-#include <pthread.h>
-
-class RWLock
-{
-  mutable pthread_rwlock_t L;
-
-  public:
-
-  RWLock() {
-    pthread_rwlock_init(&L, NULL);
-  }
-
-  virtual ~RWLock() {
-    pthread_rwlock_unlock(&L);
-    pthread_rwlock_destroy(&L);
-  }
-
-  void unlock() {
-    pthread_rwlock_unlock(&L);
-  }
-  void get_read() {
-    pthread_rwlock_rdlock(&L);    
-  }
-  void put_read() { unlock(); }
-  void get_write() {
-    pthread_rwlock_wrlock(&L);
-  }
-  void put_write() { unlock(); }
-};
-
-#endif // !_Mutex_Posix_
diff --git a/trunk/ceph/msg/SerialMessenger.h b/trunk/ceph/msg/SerialMessenger.h
deleted file mode 100644 (file)
index c17553e..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __SERIAL_MESSENGER_H
-#define __SERIAL_MESSENGER_H
-
-#include "Dispatcher.h"
-#include "Message.h"
-
-class SerialMessenger : public Dispatcher {
- public:
-  virtual void dispatch(Message *m) = 0;      // i receive my messages here
-  virtual void send(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0;          // doesn't block
-  virtual Message *sendrecv(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0;  // blocks for matching reply
-};
-
-#endif
diff --git a/trunk/ceph/msg/mpistarter.cc b/trunk/ceph/msg/mpistarter.cc
deleted file mode 100644 (file)
index 685c104..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-
-#include <mpi.h>
-
-#include "TCPMessenger.h"
-
-/*
- * start up TCPMessenger via MPI.
- */ 
-
-pair<int,int> mpi_bootstrap_tcp(int& argc, char**& argv)
-{
-  tcpmessenger_init();
-  tcpmessenger_start();
-
-  // exchnage addresses with other nodes
-  MPI_Init(&argc, &argv);
-  
-  int mpi_world;
-  int mpi_rank;
-  MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
-  MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
-
-  //dout(1) << "i am " << mpi_rank << " of " << mpi_world << endl;
-  
-  // start up directory?
-  tcpaddr_t ta;
-  if (mpi_rank == 0) {
-    dout(30) << "i am rank 0, starting ns directory" << endl;
-    tcpmessenger_start_nameserver(ta);
-  } else {
-    memset(&ta, 0, sizeof(ta));
-  }
-
-  // distribute tcpaddr
-  int r = MPI_Bcast(&ta, sizeof(ta), MPI_CHAR,
-                    0, MPI_COMM_WORLD);
-
-  dout(30) << "r = " << r << " ns tcpaddr is " << ta << endl;
-  tcpmessenger_start_rankserver(ta);
-  
-  MPI_Barrier(MPI_COMM_WORLD);
-  //g_clock.tare();
-  MPI_Finalize();
-
-  return pair<int,int>(mpi_rank, mpi_world);
-}
-
-
diff --git a/trunk/ceph/msg/new_mpistarter.cc b/trunk/ceph/msg/new_mpistarter.cc
deleted file mode 100644 (file)
index 72adcf9..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-#include <mpi.h>
-#include "NewMessenger.h"
-
-/*
- * start up NewMessenger via MPI.
- */ 
-
-pair<int,int> mpi_bootstrap_new(int& argc, char**& argv)
-{
-  MPI_Init(&argc, &argv);
-  
-  int mpi_world;
-  int mpi_rank;
-  MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
-  MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
-
-  tcpaddr_t nsaddr;
-  memset(&nsaddr, 0, sizeof(nsaddr));
-  
-  if (mpi_rank == 0) {
-    // i am root.
-    rank.my_rank = 0;  
-    rank.start_rank(nsaddr);
-    nsaddr = rank.get_listen_addr();
-  }
-
-  int r = MPI_Bcast(&nsaddr, sizeof(nsaddr), MPI_CHAR,
-                    0, MPI_COMM_WORLD);
-
-  dout(30) << "r = " << r << " ns tcpaddr is " << nsaddr << endl;
-
-  if (mpi_rank != 0) {
-    rank.start_rank(nsaddr);
-  }
-
-  MPI_Barrier(MPI_COMM_WORLD);
-
-  //g_clock.tare();
-
-  MPI_Finalize();
-
-  return pair<int,int>(mpi_rank, mpi_world);
-}
diff --git a/trunk/ceph/osd/rush.cc b/trunk/ceph/osd/rush.cc
deleted file mode 100644 (file)
index 733d71a..0000000
+++ /dev/null
@@ -1,231 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-//
-//
-//    rush.cc
-//
-// $Id$
-//
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <cassert>
-#include "rush.h"
-
-
-static
-unsigned int
-myhash (unsigned int n)
-{
-  unsigned int v = (n ^ 0xdead1234) * (884811920 * 3  + 1);
-  return (v);
-}
-
-Rush::Rush ()
-{
-  nClusters = 0;
-  totalServers = 0;
-}
-
-//----------------------------------------------------------------------
-//
-//    Rush::AddCluster
-//
-//    Add a cluster.  The number of servers in the cluster and
-//    the weight of each server is passed.  The current number of
-//    clusters is returned.
-//
-//----------------------------------------------------------------------
-int
-Rush::AddCluster (int nServers, double weight)
-{
-  clusterSize[nClusters] = nServers;
-  clusterWeight[nClusters] = weight;
-  if (nClusters == 0) {
-    serversInPrevious[0] = 0;
-    totalWeightBefore[0] = 0.0;
-  } else {
-    serversInPrevious[nClusters] = serversInPrevious[nClusters-1] +
-      clusterSize[nClusters-1];
-    totalWeightBefore[nClusters] =
-      totalWeightBefore[nClusters-1] + (double)clusterSize[nClusters-1] *
-      clusterWeight[nClusters-1];
-  }
-  nClusters += 1;
-  totalServers += nServers;
-#if 0
-  for (int i = 0; i < nClusters; i++) {
-    fprintf (stderr, "size=%-3d prev=%-3d weight=%-6.2f prevWeight=%-8.2f\n",
-        clusterSize[i], serversInPrevious[i], clusterWeight[i],
-        totalWeightBefore[i]);
-  }
-#endif
-  return (nClusters);
-}
-
-
-//----------------------------------------------------------------------
-//
-//    Rush::GetServersByKey
-//
-//    This function returns a list of servers on which an object
-//    should be placed.  The servers array must be large enough to
-//    contain the list.
-//
-//----------------------------------------------------------------------
-void
-Rush::GetServersByKey (int key, int nReplicas, int servers[])
-{
-  int    replicasLeft = nReplicas;
-  int    cluster;
-  int    mustAssign, numberAssigned;
-  int    i, toDraw;
-  int    *srv = servers;
-  double    myWeight;
-  RushRNG    rng;
-
-  // There may not be more replicas than servers!
-  assert (nReplicas <= totalServers);
-  
-  for (cluster = nClusters-1; (cluster >= 0) && (replicasLeft > 0); cluster--) {
-    if (serversInPrevious[cluster] < replicasLeft) {
-      mustAssign = replicasLeft - serversInPrevious[cluster];
-    } else {
-      mustAssign = 0;
-    }
-    toDraw = replicasLeft - mustAssign;
-    if (toDraw > (clusterSize[cluster] - mustAssign)) {
-      toDraw = clusterSize[cluster] - mustAssign;
-    }
-    myWeight = (double)clusterSize[cluster] * clusterWeight[cluster];
-    rng.Seed (myhash (key)^cluster, cluster^0xb90738);
-    numberAssigned = mustAssign +
-      rng.HyperGeometricWeighted (toDraw, myWeight,
-                                  totalWeightBefore[cluster] + myWeight,
-                                  clusterWeight[cluster]);
-    if (numberAssigned > 0) {
-      rng.Seed (myhash (key)^cluster ^ 11, cluster^0xfea937);
-      rng.DrawKofN (srv, numberAssigned, clusterSize[cluster]);
-      for (i = 0; i < numberAssigned; i++) {
-        srv[i] += serversInPrevious[cluster];
-      }
-      replicasLeft -= numberAssigned;
-      srv += numberAssigned;
-    }
-  }
-}
-
-\f
-
-//----------------------------------------------------------------------
-//
-//    RushRNG::HyperGeometricWeighted
-//
-//    Use an iterative method to generate a hypergeometric random
-//    variable.  This approach guarantees that, if the number of draws
-//    is reduced, the number of successes must be as well as long as
-//    the seed for the RNG is the same.
-//
-//----------------------------------------------------------------------
-int
-RushRNG::HyperGeometricWeighted (int nDraws, double yesWeighted,
-                 double totalWeighted, double weightOne)
-{
-  int    positives = 0, i;
-  double    curRand;
-
-  // If the weight is too small (or is negative), choose zero objects.
-  if (weightOne <= 1e-9 || nDraws == 0) {
-    return (0);
-  }
-
-  // Draw nDraws items from the "bag".  For each positive, subtract off
-  // the weight of an object from the weight of positives remaining.  For
-  // each draw, subtract off the weight of an object from the total weight
-  // remaining.
-  for (i = 0; i < nDraws; i++) {
-    curRand = RandomDouble ();
-    if (curRand < (yesWeighted / totalWeighted)) {
-      positives += 1;
-      yesWeighted -= weightOne;
-    }
-    totalWeighted -= weightOne;
-  }
-  return (positives);
-}
-
-//----------------------------------------------------------------------
-//
-//    RushRNG::DrawKofN
-//
-//----------------------------------------------------------------------
-void
-RushRNG::DrawKofN (int vals[], int nToDraw, int setSize)
-{
-  int    deck[setSize];
-  int    i, pick;
-
-  assert(nToDraw <= setSize);
-
-  for (i = 0; i < setSize; i++) {
-    deck[i] = i;
-  }
-
-  for (i = 0; i < nToDraw; i++) {
-    pick = (int)(RandomDouble () * (double)(setSize - i));
-    if (pick >= setSize-i) pick = setSize-i-1;  // in case
-    //    assert(i >= 0 && i < nToDraw);
-    //    assert(pick >= 0 && pick < setSize);
-    vals[i] = deck[pick];
-    deck[pick] = deck[setSize-i-1];
-  }
-}
-
-#define    SEED_X 521288629
-#define    SEED_Y 362436069
-RushRNG::RushRNG ()
-{
-  Seed (0, 0);
-}
-
-void
-RushRNG::Seed (unsigned int seed1, unsigned int seed2)
-{
-  state1 = ((seed1 == 0) ? SEED_X : seed1);
-  state2 = ((seed2 == 0) ? SEED_Y : seed2);
-}
-
-unsigned int
-RushRNG::RandomInt ()
-{
-  const unsigned int a = 18000;
-  const unsigned int b = 18879;
-  unsigned int    rndValue;
-
-  state1 = a * (state1 & 0xffff) + (state1 >> 16);
-  state2 = b * (state2 & 0xffff) + (state2 >> 16);
-  rndValue = (state1 << 16) + (state2 & 0xffff);
-  return (rndValue);
-}
-
-double
-RushRNG::RandomDouble ()
-{
-  double    v;
-
-  v = (double)RandomInt() / (65536.0*65536.0);
-  return (v);
-}
diff --git a/trunk/ceph/osd/rush.h b/trunk/ceph/osd/rush.h
deleted file mode 100644 (file)
index 4b43e1a..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-//
-//
-//    rush.h
-//
-//    Classes and definitions for the RUSH algorithm.
-//
-// $Id$
-//
-//
-
-#ifndef    _rush_h_
-#define    _rush_h_
-
-#define    RUSH_MAX_CLUSTERS    100
-
-class RushRNG {
-public:
-  unsigned int    RandomInt ();
-  double RandomDouble ();
-  void    Seed (unsigned int a, unsigned int b);
-  int    HyperGeometricWeighted (int nDraws, double yesWeighted,
-                double totalWeighted, double weightOne);
-  void    DrawKofN (int vals[], int nToDraw, int setSize);
-  RushRNG();
-private:
-  unsigned int state1, state2;
-};
-
-class Rush {
-public:
-  void    GetServersByKey (int key, int nReplicas, int servers[]);
-  int    AddCluster (int nServers, double weight);
-  int    Clusters () {return (nClusters);}
-  int    Servers () {return (totalServers);}
-  Rush ();
-private:
-  int    DrawKofN (int *servers, int n, int clusterSize, RushRNG *g);
-  int    nClusters;
-  int    totalServers;
-  int    clusterSize[RUSH_MAX_CLUSTERS];
-  int    serversInPrevious[RUSH_MAX_CLUSTERS];
-  double clusterWeight[RUSH_MAX_CLUSTERS];
-  double totalWeightBefore[RUSH_MAX_CLUSTERS];
-};
-
-#endif    /* _rush_h_ */
diff --git a/trunk/ceph/osd/tp.cc b/trunk/ceph/osd/tp.cc
deleted file mode 100644 (file)
index b52e9a6..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-
-#include <iostream>
-#include <string>
-
-using namespace std;
-
-#include "common/Mutex.h"
-#include "common/ThreadPool.h"
-// #include <thread.h>
-
-class Op {
-  int i;
-
-public:
-  Op(int i)
-  {
-    this->i = i;
-  }
-
-  int get()
-  {
-    return i;
-  }
-};
-
-void foop(class TP *t, class Op *o);
-
-class TP {
-public:
-
-  void foo(Op *o)
-  {
-    cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
-    usleep(1);
-    
-    //  sched_yield();
-  }
-
-  int main(int argc, char *argv)
-  {
-    ThreadPool<TP,Op> *t = new ThreadPool<TP,Op>(10, (void (*)(TP*, Op*))foop, this);
-    
-    for(int i = 0; i < 100; i++) {
-      Op *o = new Op(i); 
-      t->put_op(o);
-    }
-    
-    sleep(1);
-    
-    delete(t);
-    
-    return 0;
-  }
-};
-
-void foop(class TP *t, class Op *o) {
-  t->foo(o);
-}
-
-int main(int argc, char *argv) {
-  TP t;
-
-  t.main(argc,argv);
-}
-