From 918d2ad6b538e89215db10d594453520c7586858 Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 24 Jul 2007 21:54:02 +0000 Subject: [PATCH] some housecleaning git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1546 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/Makefile | 13 -- trunk/ceph/client/Client.h | 1 - trunk/ceph/client/msgthread.h | 26 --- trunk/ceph/{msg => common}/RWLock.h | 0 trunk/ceph/msg/HostMonitor.cc | 236 ---------------------------- trunk/ceph/msg/HostMonitor.h | 98 ------------ trunk/ceph/msg/SerialMessenger.h | 29 ---- trunk/ceph/msg/mpistarter.cc | 63 -------- trunk/ceph/msg/new_mpistarter.cc | 45 ------ trunk/ceph/osd/rush.cc | 231 --------------------------- trunk/ceph/osd/rush.h | 61 ------- trunk/ceph/osd/tp.cc | 81 ---------- 12 files changed, 884 deletions(-) delete mode 100644 trunk/ceph/client/msgthread.h rename trunk/ceph/{msg => common}/RWLock.h (100%) delete mode 100644 trunk/ceph/msg/HostMonitor.cc delete mode 100644 trunk/ceph/msg/HostMonitor.h delete mode 100644 trunk/ceph/msg/SerialMessenger.h delete mode 100644 trunk/ceph/msg/mpistarter.cc delete mode 100644 trunk/ceph/msg/new_mpistarter.cc delete mode 100644 trunk/ceph/osd/rush.cc delete mode 100644 trunk/ceph/osd/rush.h delete mode 100644 trunk/ceph/osd/tp.cc diff --git a/trunk/ceph/Makefile b/trunk/ceph/Makefile index 6f5cf9d5fb5be..3840877d66173 100644 --- a/trunk/ceph/Makefile +++ b/trunk/ceph/Makefile @@ -140,19 +140,6 @@ csyn: csyn.cc client.o osdc.o msg/SimpleMessenger.o common.o cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/SimpleMessenger.o common.o ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ -activemaster: active/activemaster.cc client.o osdc.o msg/SimpleMessenger.o common.o - ${CC} ${CFLAGS} ${LIBS} $^ -o $@ - -activeslave: active/activeslave.cc client.o osdc.o msg/SimpleMessenger.o common.o - ${CC} ${CFLAGS} ${LIBS} $^ -o $@ - -echotestclient: active/echotestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o - ${CC} ${CFLAGS} ${LIBS} $^ -o $@ - -msgtestclient: active/msgtestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o - ${CC} ${CFLAGS} ${LIBS} $^ -o $@ - - # misc diff --git a/trunk/ceph/client/Client.h b/trunk/ceph/client/Client.h index 9492a3f451725..fff60ed96b33f 100644 --- a/trunk/ceph/client/Client.h +++ b/trunk/ceph/client/Client.h @@ -24,7 +24,6 @@ #include "msg/Message.h" #include "msg/Dispatcher.h" #include "msg/Messenger.h" -#include "msg/SerialMessenger.h" #include "messages/MClientReply.h" diff --git a/trunk/ceph/client/msgthread.h b/trunk/ceph/client/msgthread.h deleted file mode 100644 index 1e1af025b0d57..0000000000000 --- a/trunk/ceph/client/msgthread.h +++ /dev/null @@ -1,26 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#include "msg/Message.h" - -// send the message, expecting no response. threads other than the -// MPI thread use this function; if the MPI thread uses this function -// it could deadlock: this function could wait for the out queue to be -// emptied, but only the MPI thread can empty it. -void obfsmpi_send(Message *m) - -// send the message to a server and wait for the response. threads -// other than the MPI thread use this function. -Message *obfsmpi_sendrecv(Message *m) diff --git a/trunk/ceph/msg/RWLock.h b/trunk/ceph/common/RWLock.h similarity index 100% rename from trunk/ceph/msg/RWLock.h rename to trunk/ceph/common/RWLock.h diff --git a/trunk/ceph/msg/HostMonitor.cc b/trunk/ceph/msg/HostMonitor.cc deleted file mode 100644 index 969edadd424d6..0000000000000 --- a/trunk/ceph/msg/HostMonitor.cc +++ /dev/null @@ -1,236 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - - -#include "HostMonitor.h" - -#include "msg/Message.h" -#include "msg/Messenger.h" - -#include "messages/MPing.h" -#include "messages/MPingAck.h" -#include "messages/MFailure.h" -#include "messages/MFailureAck.h" - -#include "common/Timer.h" -#include "common/Clock.h" - -#define DBL 10 - - -#include "config.h" -#undef dout -#define dout(l) if (l<=g_conf.debug) cout << whoami << " hostmon: " - - -// timer contexts - -class C_HM_InitiateHeartbeat : public Context { - HostMonitor *hm; -public: - C_HM_InitiateHeartbeat(HostMonitor *hm) { - this->hm = hm; - } - void finish(int r) { - //cout << "HEARTBEAT" << endl; - hm->pending_events.erase(this); - hm->initiate_heartbeat(); - } -}; - -class C_HM_CheckHeartbeat : public Context { - HostMonitor *hm; -public: - C_HM_CheckHeartbeat(HostMonitor *hm) { - this->hm = hm; - } - void finish(int r) { - //cout << "CHECK" << endl; - hm->pending_events.erase(this); - hm->check_heartbeat(); - } -}; - - - -// startup/shutdown - -void HostMonitor::init() -{ - dout(DBL) << "init" << endl; - - // hack params for now - heartbeat_interval = 10; - max_ping_time = 2; - max_heartbeat_misses = 3; - notify_retry_interval = 10; - - // schedule first hb - schedule_heartbeat(); -} - - -void HostMonitor::shutdown() -{ - // cancel any events - for (set::iterator it = pending_events.begin(); - it != pending_events.end(); - it++) { - g_timer.cancel_event(*it); - delete *it; - } - pending_events.clear(); -} - - -// schedule next heartbeat - -void HostMonitor::schedule_heartbeat() -{ - dout(DBL) << "schedule_heartbeat" << endl; - Context *e = new C_HM_InitiateHeartbeat(this); - pending_events.insert(e); - g_timer.add_event_after(heartbeat_interval, e); -} - - -// take note of a live host - -void HostMonitor::host_is_alive(entity_name_t host) -{ - if (hosts.count(host)) - status[host].last_heard_from = g_clock.gettime(); -} - - -// do heartbeat - -void HostMonitor::initiate_heartbeat() -{ - time_t now = g_clock.gettime(); - - // send out pings - inflight_pings.clear(); - for (set::iterator it = hosts.begin(); - it != hosts.end(); - it++) { - // have i heard from them recently? - if (now - status[*it].last_heard_from < heartbeat_interval) { - dout(DBL) << "skipping " << *it << ", i heard from them recently" << endl; - } else { - dout(DBL) << "pinging " << *it << endl; - status[*it].last_pinged = now; - inflight_pings.insert(*it); - - messenger->send_message(new MPing(1), *it, 0); - } - } - - // set timer to check results - Context *e = new C_HM_CheckHeartbeat(this); - pending_events.insert(e); - g_timer.add_event_after(max_ping_time, e); - dout(10) << "scheduled check " << e << endl; - - schedule_heartbeat(); // schedule next heartbeat -} - - -// check results - -void HostMonitor::check_heartbeat() -{ - dout(DBL) << "check_heartbeat()" << endl; - - // check inflight pings - for (set::iterator it = inflight_pings.begin(); - it != inflight_pings.end(); - it++) { - status[*it].num_heartbeats_missed++; - - dout(DBL) << "no response from " << *it << " for " << status[*it].num_heartbeats_missed << " beats" << endl; - - if (status[*it].num_heartbeats_missed >= max_heartbeat_misses) { - if (acked_failures.count(*it)) { - dout(DBL) << *it << " is already failed" << endl; - } else { - if (unacked_failures.count(*it)) { - dout(DBL) << *it << " is already failed, but unacked, sending another failure message" << endl; - } else { - dout(DBL) << "failing " << *it << endl; - unacked_failures.insert(*it); - } - - /*if (false) // do this in NewMessenger for now! FIXME - for (set::iterator nit = notify.begin(); - nit != notify.end(); - nit++) { - messenger->send_message(new MFailure(*it, messenger->get_inst(*it)), - *nit, notify_port, 0); - } - */ - } - } - } - - // forget about the pings. - inflight_pings.clear(); -} - - -// incoming messages - -void HostMonitor::proc_message(Message *m) -{ - switch (m->get_type()) { - - case MSG_PING_ACK: - handle_ping_ack((MPingAck*)m); - break; - - case MSG_FAILURE_ACK: - handle_failure_ack((MFailureAck*)m); - break; - - } -} - -void HostMonitor::handle_ping_ack(MPingAck *m) -{ - entity_name_t from = m->get_source(); - - dout(DBL) << "ping ack from " << from << endl; - status[from].last_pinged = g_clock.gettime(); - status[from].num_heartbeats_missed = 0; - inflight_pings.erase(from); - - delete m; -} - -void HostMonitor::handle_failure_ack(MFailureAck *m) -{ - - // FIXME: this doesn't handle failed -> alive transitions gracefully at all.. - - // the higher-up's acknowledged our failure notification, we can stop resending it. - entity_name_t failed = m->get_failed(); - dout(DBL) << "handle_failure_ack " << failed << endl; - unacked_failures.erase(failed); - acked_failures.insert(failed); - - delete m; -} - - diff --git a/trunk/ceph/msg/HostMonitor.h b/trunk/ceph/msg/HostMonitor.h deleted file mode 100644 index 35334b7f6a61f..0000000000000 --- a/trunk/ceph/msg/HostMonitor.h +++ /dev/null @@ -1,98 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef __HOSTMONITOR_H -#define __HOSTMONITOR_H - -#include - -#include -#include -using namespace std; - -#include "include/Context.h" -#include "msg/Message.h" - -class Message; -class Messenger; - -typedef struct { - time_t last_heard_from; - time_t last_pinged; - int num_heartbeats_missed; -} monitor_rec_t; - -class HostMonitor { - Messenger *messenger; - string whoami; - - // hosts i monitor - set hosts; - - // who i tell when they fail - set notify; - int notify_port; - - // their status - map status; - - set inflight_pings; // pings we sent that haven't replied yet - - set unacked_failures; // failed hosts that haven't been acked yet. - set acked_failures; // these failures have been acked. - - float heartbeat_interval; // how often to do a heartbeat - float max_ping_time; // how long before it's a miss - int max_heartbeat_misses; // how many misses before i tell - float notify_retry_interval; // how often to retry failure notification - - public: - set pending_events; - - private: - void schedule_heartbeat(); - - public: - HostMonitor(Messenger *m, string& whoami) { - this->messenger = m; - this->whoami = whoami; - notify_port = 0; - } - set& get_hosts() { return hosts; } - set& get_notify() { return notify; } - void set_notify_port(int p) { notify_port = p; } - - void remove_host(entity_name_t h) { - hosts.erase(h); - status.erase(h); - unacked_failures.erase(h); - acked_failures.erase(h); - } - - void init(); - void shutdown(); - - void host_is_alive(entity_name_t who); - - void proc_message(Message *m); - void handle_ping_ack(class MPingAck *m); - void handle_failure_ack(class MFailureAck *m); - - void initiate_heartbeat(); - void check_heartbeat(); - -}; - -#endif diff --git a/trunk/ceph/msg/SerialMessenger.h b/trunk/ceph/msg/SerialMessenger.h deleted file mode 100644 index c17553e2fb88d..0000000000000 --- a/trunk/ceph/msg/SerialMessenger.h +++ /dev/null @@ -1,29 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef __SERIAL_MESSENGER_H -#define __SERIAL_MESSENGER_H - -#include "Dispatcher.h" -#include "Message.h" - -class SerialMessenger : public Dispatcher { - public: - virtual void dispatch(Message *m) = 0; // i receive my messages here - virtual void send(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0; // doesn't block - virtual Message *sendrecv(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0; // blocks for matching reply -}; - -#endif diff --git a/trunk/ceph/msg/mpistarter.cc b/trunk/ceph/msg/mpistarter.cc deleted file mode 100644 index 685c104d8d92d..0000000000000 --- a/trunk/ceph/msg/mpistarter.cc +++ /dev/null @@ -1,63 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - - -#include - -#include "TCPMessenger.h" - -/* - * start up TCPMessenger via MPI. - */ - -pair mpi_bootstrap_tcp(int& argc, char**& argv) -{ - tcpmessenger_init(); - tcpmessenger_start(); - - // exchnage addresses with other nodes - MPI_Init(&argc, &argv); - - int mpi_world; - int mpi_rank; - MPI_Comm_size(MPI_COMM_WORLD, &mpi_world); - MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); - - //dout(1) << "i am " << mpi_rank << " of " << mpi_world << endl; - - // start up directory? - tcpaddr_t ta; - if (mpi_rank == 0) { - dout(30) << "i am rank 0, starting ns directory" << endl; - tcpmessenger_start_nameserver(ta); - } else { - memset(&ta, 0, sizeof(ta)); - } - - // distribute tcpaddr - int r = MPI_Bcast(&ta, sizeof(ta), MPI_CHAR, - 0, MPI_COMM_WORLD); - - dout(30) << "r = " << r << " ns tcpaddr is " << ta << endl; - tcpmessenger_start_rankserver(ta); - - MPI_Barrier(MPI_COMM_WORLD); - //g_clock.tare(); - MPI_Finalize(); - - return pair(mpi_rank, mpi_world); -} - - diff --git a/trunk/ceph/msg/new_mpistarter.cc b/trunk/ceph/msg/new_mpistarter.cc deleted file mode 100644 index 72adcf90b5265..0000000000000 --- a/trunk/ceph/msg/new_mpistarter.cc +++ /dev/null @@ -1,45 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#include -#include "NewMessenger.h" - -/* - * start up NewMessenger via MPI. - */ - -pair mpi_bootstrap_new(int& argc, char**& argv) -{ - MPI_Init(&argc, &argv); - - int mpi_world; - int mpi_rank; - MPI_Comm_size(MPI_COMM_WORLD, &mpi_world); - MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); - - tcpaddr_t nsaddr; - memset(&nsaddr, 0, sizeof(nsaddr)); - - if (mpi_rank == 0) { - // i am root. - rank.my_rank = 0; - rank.start_rank(nsaddr); - nsaddr = rank.get_listen_addr(); - } - - int r = MPI_Bcast(&nsaddr, sizeof(nsaddr), MPI_CHAR, - 0, MPI_COMM_WORLD); - - dout(30) << "r = " << r << " ns tcpaddr is " << nsaddr << endl; - - if (mpi_rank != 0) { - rank.start_rank(nsaddr); - } - - MPI_Barrier(MPI_COMM_WORLD); - - //g_clock.tare(); - - MPI_Finalize(); - - return pair(mpi_rank, mpi_world); -} diff --git a/trunk/ceph/osd/rush.cc b/trunk/ceph/osd/rush.cc deleted file mode 100644 index 733d71aa4b322..0000000000000 --- a/trunk/ceph/osd/rush.cc +++ /dev/null @@ -1,231 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -// -// -// rush.cc -// -// $Id$ -// - -#include -#include -#include -#include "rush.h" - - -static -unsigned int -myhash (unsigned int n) -{ - unsigned int v = (n ^ 0xdead1234) * (884811920 * 3 + 1); - return (v); -} - -Rush::Rush () -{ - nClusters = 0; - totalServers = 0; -} - -//---------------------------------------------------------------------- -// -// Rush::AddCluster -// -// Add a cluster. The number of servers in the cluster and -// the weight of each server is passed. The current number of -// clusters is returned. -// -//---------------------------------------------------------------------- -int -Rush::AddCluster (int nServers, double weight) -{ - clusterSize[nClusters] = nServers; - clusterWeight[nClusters] = weight; - if (nClusters == 0) { - serversInPrevious[0] = 0; - totalWeightBefore[0] = 0.0; - } else { - serversInPrevious[nClusters] = serversInPrevious[nClusters-1] + - clusterSize[nClusters-1]; - totalWeightBefore[nClusters] = - totalWeightBefore[nClusters-1] + (double)clusterSize[nClusters-1] * - clusterWeight[nClusters-1]; - } - nClusters += 1; - totalServers += nServers; -#if 0 - for (int i = 0; i < nClusters; i++) { - fprintf (stderr, "size=%-3d prev=%-3d weight=%-6.2f prevWeight=%-8.2f\n", - clusterSize[i], serversInPrevious[i], clusterWeight[i], - totalWeightBefore[i]); - } -#endif - return (nClusters); -} - - -//---------------------------------------------------------------------- -// -// Rush::GetServersByKey -// -// This function returns a list of servers on which an object -// should be placed. The servers array must be large enough to -// contain the list. -// -//---------------------------------------------------------------------- -void -Rush::GetServersByKey (int key, int nReplicas, int servers[]) -{ - int replicasLeft = nReplicas; - int cluster; - int mustAssign, numberAssigned; - int i, toDraw; - int *srv = servers; - double myWeight; - RushRNG rng; - - // There may not be more replicas than servers! - assert (nReplicas <= totalServers); - - for (cluster = nClusters-1; (cluster >= 0) && (replicasLeft > 0); cluster--) { - if (serversInPrevious[cluster] < replicasLeft) { - mustAssign = replicasLeft - serversInPrevious[cluster]; - } else { - mustAssign = 0; - } - toDraw = replicasLeft - mustAssign; - if (toDraw > (clusterSize[cluster] - mustAssign)) { - toDraw = clusterSize[cluster] - mustAssign; - } - myWeight = (double)clusterSize[cluster] * clusterWeight[cluster]; - rng.Seed (myhash (key)^cluster, cluster^0xb90738); - numberAssigned = mustAssign + - rng.HyperGeometricWeighted (toDraw, myWeight, - totalWeightBefore[cluster] + myWeight, - clusterWeight[cluster]); - if (numberAssigned > 0) { - rng.Seed (myhash (key)^cluster ^ 11, cluster^0xfea937); - rng.DrawKofN (srv, numberAssigned, clusterSize[cluster]); - for (i = 0; i < numberAssigned; i++) { - srv[i] += serversInPrevious[cluster]; - } - replicasLeft -= numberAssigned; - srv += numberAssigned; - } - } -} - - - -//---------------------------------------------------------------------- -// -// RushRNG::HyperGeometricWeighted -// -// Use an iterative method to generate a hypergeometric random -// variable. This approach guarantees that, if the number of draws -// is reduced, the number of successes must be as well as long as -// the seed for the RNG is the same. -// -//---------------------------------------------------------------------- -int -RushRNG::HyperGeometricWeighted (int nDraws, double yesWeighted, - double totalWeighted, double weightOne) -{ - int positives = 0, i; - double curRand; - - // If the weight is too small (or is negative), choose zero objects. - if (weightOne <= 1e-9 || nDraws == 0) { - return (0); - } - - // Draw nDraws items from the "bag". For each positive, subtract off - // the weight of an object from the weight of positives remaining. For - // each draw, subtract off the weight of an object from the total weight - // remaining. - for (i = 0; i < nDraws; i++) { - curRand = RandomDouble (); - if (curRand < (yesWeighted / totalWeighted)) { - positives += 1; - yesWeighted -= weightOne; - } - totalWeighted -= weightOne; - } - return (positives); -} - -//---------------------------------------------------------------------- -// -// RushRNG::DrawKofN -// -//---------------------------------------------------------------------- -void -RushRNG::DrawKofN (int vals[], int nToDraw, int setSize) -{ - int deck[setSize]; - int i, pick; - - assert(nToDraw <= setSize); - - for (i = 0; i < setSize; i++) { - deck[i] = i; - } - - for (i = 0; i < nToDraw; i++) { - pick = (int)(RandomDouble () * (double)(setSize - i)); - if (pick >= setSize-i) pick = setSize-i-1; // in case - // assert(i >= 0 && i < nToDraw); - // assert(pick >= 0 && pick < setSize); - vals[i] = deck[pick]; - deck[pick] = deck[setSize-i-1]; - } -} - -#define SEED_X 521288629 -#define SEED_Y 362436069 -RushRNG::RushRNG () -{ - Seed (0, 0); -} - -void -RushRNG::Seed (unsigned int seed1, unsigned int seed2) -{ - state1 = ((seed1 == 0) ? SEED_X : seed1); - state2 = ((seed2 == 0) ? SEED_Y : seed2); -} - -unsigned int -RushRNG::RandomInt () -{ - const unsigned int a = 18000; - const unsigned int b = 18879; - unsigned int rndValue; - - state1 = a * (state1 & 0xffff) + (state1 >> 16); - state2 = b * (state2 & 0xffff) + (state2 >> 16); - rndValue = (state1 << 16) + (state2 & 0xffff); - return (rndValue); -} - -double -RushRNG::RandomDouble () -{ - double v; - - v = (double)RandomInt() / (65536.0*65536.0); - return (v); -} diff --git a/trunk/ceph/osd/rush.h b/trunk/ceph/osd/rush.h deleted file mode 100644 index 4b43e1a9a1160..0000000000000 --- a/trunk/ceph/osd/rush.h +++ /dev/null @@ -1,61 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -// -// -// rush.h -// -// Classes and definitions for the RUSH algorithm. -// -// $Id$ -// -// - -#ifndef _rush_h_ -#define _rush_h_ - -#define RUSH_MAX_CLUSTERS 100 - -class RushRNG { -public: - unsigned int RandomInt (); - double RandomDouble (); - void Seed (unsigned int a, unsigned int b); - int HyperGeometricWeighted (int nDraws, double yesWeighted, - double totalWeighted, double weightOne); - void DrawKofN (int vals[], int nToDraw, int setSize); - RushRNG(); -private: - unsigned int state1, state2; -}; - -class Rush { -public: - void GetServersByKey (int key, int nReplicas, int servers[]); - int AddCluster (int nServers, double weight); - int Clusters () {return (nClusters);} - int Servers () {return (totalServers);} - Rush (); -private: - int DrawKofN (int *servers, int n, int clusterSize, RushRNG *g); - int nClusters; - int totalServers; - int clusterSize[RUSH_MAX_CLUSTERS]; - int serversInPrevious[RUSH_MAX_CLUSTERS]; - double clusterWeight[RUSH_MAX_CLUSTERS]; - double totalWeightBefore[RUSH_MAX_CLUSTERS]; -}; - -#endif /* _rush_h_ */ diff --git a/trunk/ceph/osd/tp.cc b/trunk/ceph/osd/tp.cc deleted file mode 100644 index b52e9a69df050..0000000000000 --- a/trunk/ceph/osd/tp.cc +++ /dev/null @@ -1,81 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - - -#include -#include - -using namespace std; - -#include "common/Mutex.h" -#include "common/ThreadPool.h" -// #include - -class Op { - int i; - -public: - - Op(int i) - { - this->i = i; - } - - int get() - { - return i; - } -}; - -void foop(class TP *t, class Op *o); - -class TP { -public: - - void foo(Op *o) - { - cout << "Thread "<< pthread_self() << ": " << o->get() << "\n"; - usleep(1); - - // sched_yield(); - } - - int main(int argc, char *argv) - { - ThreadPool *t = new ThreadPool(10, (void (*)(TP*, Op*))foop, this); - - for(int i = 0; i < 100; i++) { - Op *o = new Op(i); - t->put_op(o); - } - - sleep(1); - - delete(t); - - return 0; - } -}; - -void foop(class TP *t, class Op *o) { - t->foo(o); -} - -int main(int argc, char *argv) { - TP t; - - t.main(argc,argv); -} - -- 2.39.5