cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/SimpleMessenger.o common.o
${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
-activemaster: active/activemaster.cc client.o osdc.o msg/SimpleMessenger.o common.o
- ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-activeslave: active/activeslave.cc client.o osdc.o msg/SimpleMessenger.o common.o
- ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-echotestclient: active/echotestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
- ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-msgtestclient: active/msgtestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
- ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-
# misc
#include "msg/Message.h"
#include "msg/Dispatcher.h"
#include "msg/Messenger.h"
-#include "msg/SerialMessenger.h"
#include "messages/MClientReply.h"
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#include "msg/Message.h"
-
-// send the message, expecting no response. threads other than the
-// MPI thread use this function; if the MPI thread uses this function
-// it could deadlock: this function could wait for the out queue to be
-// emptied, but only the MPI thread can empty it.
-void obfsmpi_send(Message *m)
-
-// send the message to a server and wait for the response. threads
-// other than the MPI thread use this function.
-Message *obfsmpi_sendrecv(Message *m)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+
+#ifndef _RWLock_Posix_
+#define _RWLock_Posix_
+
+#include <pthread.h>
+
+class RWLock
+{
+ mutable pthread_rwlock_t L;
+
+ public:
+
+ RWLock() {
+ pthread_rwlock_init(&L, NULL);
+ }
+
+ virtual ~RWLock() {
+ pthread_rwlock_unlock(&L);
+ pthread_rwlock_destroy(&L);
+ }
+
+ void unlock() {
+ pthread_rwlock_unlock(&L);
+ }
+ void get_read() {
+ pthread_rwlock_rdlock(&L);
+ }
+ void put_read() { unlock(); }
+ void get_write() {
+ pthread_rwlock_wrlock(&L);
+ }
+ void put_write() { unlock(); }
+};
+
+#endif // !_Mutex_Posix_
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#include "HostMonitor.h"
-
-#include "msg/Message.h"
-#include "msg/Messenger.h"
-
-#include "messages/MPing.h"
-#include "messages/MPingAck.h"
-#include "messages/MFailure.h"
-#include "messages/MFailureAck.h"
-
-#include "common/Timer.h"
-#include "common/Clock.h"
-
-#define DBL 10
-
-
-#include "config.h"
-#undef dout
-#define dout(l) if (l<=g_conf.debug) cout << whoami << " hostmon: "
-
-
-// timer contexts
-
-class C_HM_InitiateHeartbeat : public Context {
- HostMonitor *hm;
-public:
- C_HM_InitiateHeartbeat(HostMonitor *hm) {
- this->hm = hm;
- }
- void finish(int r) {
- //cout << "HEARTBEAT" << endl;
- hm->pending_events.erase(this);
- hm->initiate_heartbeat();
- }
-};
-
-class C_HM_CheckHeartbeat : public Context {
- HostMonitor *hm;
-public:
- C_HM_CheckHeartbeat(HostMonitor *hm) {
- this->hm = hm;
- }
- void finish(int r) {
- //cout << "CHECK" << endl;
- hm->pending_events.erase(this);
- hm->check_heartbeat();
- }
-};
-
-
-
-// startup/shutdown
-
-void HostMonitor::init()
-{
- dout(DBL) << "init" << endl;
-
- // hack params for now
- heartbeat_interval = 10;
- max_ping_time = 2;
- max_heartbeat_misses = 3;
- notify_retry_interval = 10;
-
- // schedule first hb
- schedule_heartbeat();
-}
-
-
-void HostMonitor::shutdown()
-{
- // cancel any events
- for (set<Context*>::iterator it = pending_events.begin();
- it != pending_events.end();
- it++) {
- g_timer.cancel_event(*it);
- delete *it;
- }
- pending_events.clear();
-}
-
-
-// schedule next heartbeat
-
-void HostMonitor::schedule_heartbeat()
-{
- dout(DBL) << "schedule_heartbeat" << endl;
- Context *e = new C_HM_InitiateHeartbeat(this);
- pending_events.insert(e);
- g_timer.add_event_after(heartbeat_interval, e);
-}
-
-
-// take note of a live host
-
-void HostMonitor::host_is_alive(entity_name_t host)
-{
- if (hosts.count(host))
- status[host].last_heard_from = g_clock.gettime();
-}
-
-
-// do heartbeat
-
-void HostMonitor::initiate_heartbeat()
-{
- time_t now = g_clock.gettime();
-
- // send out pings
- inflight_pings.clear();
- for (set<entity_name_t>::iterator it = hosts.begin();
- it != hosts.end();
- it++) {
- // have i heard from them recently?
- if (now - status[*it].last_heard_from < heartbeat_interval) {
- dout(DBL) << "skipping " << *it << ", i heard from them recently" << endl;
- } else {
- dout(DBL) << "pinging " << *it << endl;
- status[*it].last_pinged = now;
- inflight_pings.insert(*it);
-
- messenger->send_message(new MPing(1), *it, 0);
- }
- }
-
- // set timer to check results
- Context *e = new C_HM_CheckHeartbeat(this);
- pending_events.insert(e);
- g_timer.add_event_after(max_ping_time, e);
- dout(10) << "scheduled check " << e << endl;
-
- schedule_heartbeat(); // schedule next heartbeat
-}
-
-
-// check results
-
-void HostMonitor::check_heartbeat()
-{
- dout(DBL) << "check_heartbeat()" << endl;
-
- // check inflight pings
- for (set<entity_name_t>::iterator it = inflight_pings.begin();
- it != inflight_pings.end();
- it++) {
- status[*it].num_heartbeats_missed++;
-
- dout(DBL) << "no response from " << *it << " for " << status[*it].num_heartbeats_missed << " beats" << endl;
-
- if (status[*it].num_heartbeats_missed >= max_heartbeat_misses) {
- if (acked_failures.count(*it)) {
- dout(DBL) << *it << " is already failed" << endl;
- } else {
- if (unacked_failures.count(*it)) {
- dout(DBL) << *it << " is already failed, but unacked, sending another failure message" << endl;
- } else {
- dout(DBL) << "failing " << *it << endl;
- unacked_failures.insert(*it);
- }
-
- /*if (false) // do this in NewMessenger for now! FIXME
- for (set<msg_addr_t>::iterator nit = notify.begin();
- nit != notify.end();
- nit++) {
- messenger->send_message(new MFailure(*it, messenger->get_inst(*it)),
- *nit, notify_port, 0);
- }
- */
- }
- }
- }
-
- // forget about the pings.
- inflight_pings.clear();
-}
-
-
-// incoming messages
-
-void HostMonitor::proc_message(Message *m)
-{
- switch (m->get_type()) {
-
- case MSG_PING_ACK:
- handle_ping_ack((MPingAck*)m);
- break;
-
- case MSG_FAILURE_ACK:
- handle_failure_ack((MFailureAck*)m);
- break;
-
- }
-}
-
-void HostMonitor::handle_ping_ack(MPingAck *m)
-{
- entity_name_t from = m->get_source();
-
- dout(DBL) << "ping ack from " << from << endl;
- status[from].last_pinged = g_clock.gettime();
- status[from].num_heartbeats_missed = 0;
- inflight_pings.erase(from);
-
- delete m;
-}
-
-void HostMonitor::handle_failure_ack(MFailureAck *m)
-{
-
- // FIXME: this doesn't handle failed -> alive transitions gracefully at all..
-
- // the higher-up's acknowledged our failure notification, we can stop resending it.
- entity_name_t failed = m->get_failed();
- dout(DBL) << "handle_failure_ack " << failed << endl;
- unacked_failures.erase(failed);
- acked_failures.insert(failed);
-
- delete m;
-}
-
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __HOSTMONITOR_H
-#define __HOSTMONITOR_H
-
-#include <time.h>
-
-#include <map>
-#include <set>
-using namespace std;
-
-#include "include/Context.h"
-#include "msg/Message.h"
-
-class Message;
-class Messenger;
-
-typedef struct {
- time_t last_heard_from;
- time_t last_pinged;
- int num_heartbeats_missed;
-} monitor_rec_t;
-
-class HostMonitor {
- Messenger *messenger;
- string whoami;
-
- // hosts i monitor
- set<entity_name_t> hosts;
-
- // who i tell when they fail
- set<entity_name_t> notify;
- int notify_port;
-
- // their status
- map<entity_name_t,monitor_rec_t> status;
-
- set<entity_name_t> inflight_pings; // pings we sent that haven't replied yet
-
- set<entity_name_t> unacked_failures; // failed hosts that haven't been acked yet.
- set<entity_name_t> acked_failures; // these failures have been acked.
-
- float heartbeat_interval; // how often to do a heartbeat
- float max_ping_time; // how long before it's a miss
- int max_heartbeat_misses; // how many misses before i tell
- float notify_retry_interval; // how often to retry failure notification
-
- public:
- set<Context*> pending_events;
-
- private:
- void schedule_heartbeat();
-
- public:
- HostMonitor(Messenger *m, string& whoami) {
- this->messenger = m;
- this->whoami = whoami;
- notify_port = 0;
- }
- set<entity_name_t>& get_hosts() { return hosts; }
- set<entity_name_t>& get_notify() { return notify; }
- void set_notify_port(int p) { notify_port = p; }
-
- void remove_host(entity_name_t h) {
- hosts.erase(h);
- status.erase(h);
- unacked_failures.erase(h);
- acked_failures.erase(h);
- }
-
- void init();
- void shutdown();
-
- void host_is_alive(entity_name_t who);
-
- void proc_message(Message *m);
- void handle_ping_ack(class MPingAck *m);
- void handle_failure_ack(class MFailureAck *m);
-
- void initiate_heartbeat();
- void check_heartbeat();
-
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#ifndef _RWLock_Posix_
-#define _RWLock_Posix_
-
-#include <pthread.h>
-
-class RWLock
-{
- mutable pthread_rwlock_t L;
-
- public:
-
- RWLock() {
- pthread_rwlock_init(&L, NULL);
- }
-
- virtual ~RWLock() {
- pthread_rwlock_unlock(&L);
- pthread_rwlock_destroy(&L);
- }
-
- void unlock() {
- pthread_rwlock_unlock(&L);
- }
- void get_read() {
- pthread_rwlock_rdlock(&L);
- }
- void put_read() { unlock(); }
- void get_write() {
- pthread_rwlock_wrlock(&L);
- }
- void put_write() { unlock(); }
-};
-
-#endif // !_Mutex_Posix_
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __SERIAL_MESSENGER_H
-#define __SERIAL_MESSENGER_H
-
-#include "Dispatcher.h"
-#include "Message.h"
-
-class SerialMessenger : public Dispatcher {
- public:
- virtual void dispatch(Message *m) = 0; // i receive my messages here
- virtual void send(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0; // doesn't block
- virtual Message *sendrecv(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0; // blocks for matching reply
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#include <mpi.h>
-
-#include "TCPMessenger.h"
-
-/*
- * start up TCPMessenger via MPI.
- */
-
-pair<int,int> mpi_bootstrap_tcp(int& argc, char**& argv)
-{
- tcpmessenger_init();
- tcpmessenger_start();
-
- // exchnage addresses with other nodes
- MPI_Init(&argc, &argv);
-
- int mpi_world;
- int mpi_rank;
- MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
- MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
-
- //dout(1) << "i am " << mpi_rank << " of " << mpi_world << endl;
-
- // start up directory?
- tcpaddr_t ta;
- if (mpi_rank == 0) {
- dout(30) << "i am rank 0, starting ns directory" << endl;
- tcpmessenger_start_nameserver(ta);
- } else {
- memset(&ta, 0, sizeof(ta));
- }
-
- // distribute tcpaddr
- int r = MPI_Bcast(&ta, sizeof(ta), MPI_CHAR,
- 0, MPI_COMM_WORLD);
-
- dout(30) << "r = " << r << " ns tcpaddr is " << ta << endl;
- tcpmessenger_start_rankserver(ta);
-
- MPI_Barrier(MPI_COMM_WORLD);
- //g_clock.tare();
- MPI_Finalize();
-
- return pair<int,int>(mpi_rank, mpi_world);
-}
-
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#include <mpi.h>
-#include "NewMessenger.h"
-
-/*
- * start up NewMessenger via MPI.
- */
-
-pair<int,int> mpi_bootstrap_new(int& argc, char**& argv)
-{
- MPI_Init(&argc, &argv);
-
- int mpi_world;
- int mpi_rank;
- MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
- MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
-
- tcpaddr_t nsaddr;
- memset(&nsaddr, 0, sizeof(nsaddr));
-
- if (mpi_rank == 0) {
- // i am root.
- rank.my_rank = 0;
- rank.start_rank(nsaddr);
- nsaddr = rank.get_listen_addr();
- }
-
- int r = MPI_Bcast(&nsaddr, sizeof(nsaddr), MPI_CHAR,
- 0, MPI_COMM_WORLD);
-
- dout(30) << "r = " << r << " ns tcpaddr is " << nsaddr << endl;
-
- if (mpi_rank != 0) {
- rank.start_rank(nsaddr);
- }
-
- MPI_Barrier(MPI_COMM_WORLD);
-
- //g_clock.tare();
-
- MPI_Finalize();
-
- return pair<int,int>(mpi_rank, mpi_world);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-//
-//
-// rush.cc
-//
-// $Id$
-//
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <cassert>
-#include "rush.h"
-
-
-static
-unsigned int
-myhash (unsigned int n)
-{
- unsigned int v = (n ^ 0xdead1234) * (884811920 * 3 + 1);
- return (v);
-}
-
-Rush::Rush ()
-{
- nClusters = 0;
- totalServers = 0;
-}
-
-//----------------------------------------------------------------------
-//
-// Rush::AddCluster
-//
-// Add a cluster. The number of servers in the cluster and
-// the weight of each server is passed. The current number of
-// clusters is returned.
-//
-//----------------------------------------------------------------------
-int
-Rush::AddCluster (int nServers, double weight)
-{
- clusterSize[nClusters] = nServers;
- clusterWeight[nClusters] = weight;
- if (nClusters == 0) {
- serversInPrevious[0] = 0;
- totalWeightBefore[0] = 0.0;
- } else {
- serversInPrevious[nClusters] = serversInPrevious[nClusters-1] +
- clusterSize[nClusters-1];
- totalWeightBefore[nClusters] =
- totalWeightBefore[nClusters-1] + (double)clusterSize[nClusters-1] *
- clusterWeight[nClusters-1];
- }
- nClusters += 1;
- totalServers += nServers;
-#if 0
- for (int i = 0; i < nClusters; i++) {
- fprintf (stderr, "size=%-3d prev=%-3d weight=%-6.2f prevWeight=%-8.2f\n",
- clusterSize[i], serversInPrevious[i], clusterWeight[i],
- totalWeightBefore[i]);
- }
-#endif
- return (nClusters);
-}
-
-
-//----------------------------------------------------------------------
-//
-// Rush::GetServersByKey
-//
-// This function returns a list of servers on which an object
-// should be placed. The servers array must be large enough to
-// contain the list.
-//
-//----------------------------------------------------------------------
-void
-Rush::GetServersByKey (int key, int nReplicas, int servers[])
-{
- int replicasLeft = nReplicas;
- int cluster;
- int mustAssign, numberAssigned;
- int i, toDraw;
- int *srv = servers;
- double myWeight;
- RushRNG rng;
-
- // There may not be more replicas than servers!
- assert (nReplicas <= totalServers);
-
- for (cluster = nClusters-1; (cluster >= 0) && (replicasLeft > 0); cluster--) {
- if (serversInPrevious[cluster] < replicasLeft) {
- mustAssign = replicasLeft - serversInPrevious[cluster];
- } else {
- mustAssign = 0;
- }
- toDraw = replicasLeft - mustAssign;
- if (toDraw > (clusterSize[cluster] - mustAssign)) {
- toDraw = clusterSize[cluster] - mustAssign;
- }
- myWeight = (double)clusterSize[cluster] * clusterWeight[cluster];
- rng.Seed (myhash (key)^cluster, cluster^0xb90738);
- numberAssigned = mustAssign +
- rng.HyperGeometricWeighted (toDraw, myWeight,
- totalWeightBefore[cluster] + myWeight,
- clusterWeight[cluster]);
- if (numberAssigned > 0) {
- rng.Seed (myhash (key)^cluster ^ 11, cluster^0xfea937);
- rng.DrawKofN (srv, numberAssigned, clusterSize[cluster]);
- for (i = 0; i < numberAssigned; i++) {
- srv[i] += serversInPrevious[cluster];
- }
- replicasLeft -= numberAssigned;
- srv += numberAssigned;
- }
- }
-}
-
-\f
-
-//----------------------------------------------------------------------
-//
-// RushRNG::HyperGeometricWeighted
-//
-// Use an iterative method to generate a hypergeometric random
-// variable. This approach guarantees that, if the number of draws
-// is reduced, the number of successes must be as well as long as
-// the seed for the RNG is the same.
-//
-//----------------------------------------------------------------------
-int
-RushRNG::HyperGeometricWeighted (int nDraws, double yesWeighted,
- double totalWeighted, double weightOne)
-{
- int positives = 0, i;
- double curRand;
-
- // If the weight is too small (or is negative), choose zero objects.
- if (weightOne <= 1e-9 || nDraws == 0) {
- return (0);
- }
-
- // Draw nDraws items from the "bag". For each positive, subtract off
- // the weight of an object from the weight of positives remaining. For
- // each draw, subtract off the weight of an object from the total weight
- // remaining.
- for (i = 0; i < nDraws; i++) {
- curRand = RandomDouble ();
- if (curRand < (yesWeighted / totalWeighted)) {
- positives += 1;
- yesWeighted -= weightOne;
- }
- totalWeighted -= weightOne;
- }
- return (positives);
-}
-
-//----------------------------------------------------------------------
-//
-// RushRNG::DrawKofN
-//
-//----------------------------------------------------------------------
-void
-RushRNG::DrawKofN (int vals[], int nToDraw, int setSize)
-{
- int deck[setSize];
- int i, pick;
-
- assert(nToDraw <= setSize);
-
- for (i = 0; i < setSize; i++) {
- deck[i] = i;
- }
-
- for (i = 0; i < nToDraw; i++) {
- pick = (int)(RandomDouble () * (double)(setSize - i));
- if (pick >= setSize-i) pick = setSize-i-1; // in case
- // assert(i >= 0 && i < nToDraw);
- // assert(pick >= 0 && pick < setSize);
- vals[i] = deck[pick];
- deck[pick] = deck[setSize-i-1];
- }
-}
-
-#define SEED_X 521288629
-#define SEED_Y 362436069
-RushRNG::RushRNG ()
-{
- Seed (0, 0);
-}
-
-void
-RushRNG::Seed (unsigned int seed1, unsigned int seed2)
-{
- state1 = ((seed1 == 0) ? SEED_X : seed1);
- state2 = ((seed2 == 0) ? SEED_Y : seed2);
-}
-
-unsigned int
-RushRNG::RandomInt ()
-{
- const unsigned int a = 18000;
- const unsigned int b = 18879;
- unsigned int rndValue;
-
- state1 = a * (state1 & 0xffff) + (state1 >> 16);
- state2 = b * (state2 & 0xffff) + (state2 >> 16);
- rndValue = (state1 << 16) + (state2 & 0xffff);
- return (rndValue);
-}
-
-double
-RushRNG::RandomDouble ()
-{
- double v;
-
- v = (double)RandomInt() / (65536.0*65536.0);
- return (v);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-//
-//
-// rush.h
-//
-// Classes and definitions for the RUSH algorithm.
-//
-// $Id$
-//
-//
-
-#ifndef _rush_h_
-#define _rush_h_
-
-#define RUSH_MAX_CLUSTERS 100
-
-class RushRNG {
-public:
- unsigned int RandomInt ();
- double RandomDouble ();
- void Seed (unsigned int a, unsigned int b);
- int HyperGeometricWeighted (int nDraws, double yesWeighted,
- double totalWeighted, double weightOne);
- void DrawKofN (int vals[], int nToDraw, int setSize);
- RushRNG();
-private:
- unsigned int state1, state2;
-};
-
-class Rush {
-public:
- void GetServersByKey (int key, int nReplicas, int servers[]);
- int AddCluster (int nServers, double weight);
- int Clusters () {return (nClusters);}
- int Servers () {return (totalServers);}
- Rush ();
-private:
- int DrawKofN (int *servers, int n, int clusterSize, RushRNG *g);
- int nClusters;
- int totalServers;
- int clusterSize[RUSH_MAX_CLUSTERS];
- int serversInPrevious[RUSH_MAX_CLUSTERS];
- double clusterWeight[RUSH_MAX_CLUSTERS];
- double totalWeightBefore[RUSH_MAX_CLUSTERS];
-};
-
-#endif /* _rush_h_ */
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#include <iostream>
-#include <string>
-
-using namespace std;
-
-#include "common/Mutex.h"
-#include "common/ThreadPool.h"
-// #include <thread.h>
-
-class Op {
- int i;
-
-public:
-
- Op(int i)
- {
- this->i = i;
- }
-
- int get()
- {
- return i;
- }
-};
-
-void foop(class TP *t, class Op *o);
-
-class TP {
-public:
-
- void foo(Op *o)
- {
- cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
- usleep(1);
-
- // sched_yield();
- }
-
- int main(int argc, char *argv)
- {
- ThreadPool<TP,Op> *t = new ThreadPool<TP,Op>(10, (void (*)(TP*, Op*))foop, this);
-
- for(int i = 0; i < 100; i++) {
- Op *o = new Op(i);
- t->put_op(o);
- }
-
- sleep(1);
-
- delete(t);
-
- return 0;
- }
-};
-
-void foop(class TP *t, class Op *o) {
- t->foo(o);
-}
-
-int main(int argc, char *argv) {
- TP t;
-
- t.main(argc,argv);
-}
-