cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/SimpleMessenger.o common.o
${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
-activemaster: active/activemaster.cc client.o osdc.o msg/SimpleMessenger.o common.o
- ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-activeslave: active/activeslave.cc client.o osdc.o msg/SimpleMessenger.o common.o
- ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-echotestclient: active/echotestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
- ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-msgtestclient: active/msgtestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
- ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-
-
# misc
dout(10) << "target resend_mds specified as mds" << mds << endl;
} else {
mds = choose_target_mds(req);
- dout(10) << "chose target mds" << mds << " based on hierarchy" << endl;
+ if (mds >= 0) {
+ dout(10) << "chose target mds" << mds << " based on hierarchy" << endl;
+ } else {
+ mds = mdsmap->get_random_in_mds();
+ dout(10) << "chose random target mds" << mds << " for lack of anything better" << endl;
+ }
}
// open a session?
if (res == 0) {
assert(in);
fill_stat(in->inode,stbuf);
- dout(10) << "stat sez size = " << in->inode.size << " ino = " << stbuf->st_ino << endl;
+ dout(10) << "stat sez size = " << in->inode.size << " mode = " << oct << stbuf->st_mode << dec << " ino = " << stbuf->st_ino << endl;
}
trim_cache();
}
// contents to caller too!
+ dout(15) << "getdir including " << *pdn << " to " << in->inode.ino << endl;
contents[*pdn] = in->inode;
}
if (dir->is_empty())
MClientReply *reply = make_request(req);
assert(reply);
- dout(3) << "op: open_files[" << reply->get_result() << "] = fh; // fh = " << reply->get_result() << endl;
- tout << reply->get_result() << endl;
insert_trace(reply);
int result = reply->get_result();
#include "msg/Message.h"
#include "msg/Dispatcher.h"
#include "msg/Messenger.h"
-#include "msg/SerialMessenger.h"
#include "messages/MClientReply.h"
if (dir_replicated || ino() == 1) {
//cout << "num_mds is " << mdcluster->get_num_mds() << endl;
- return rand() % mdsmap->get_num_mds(); // huh.. pick a random mds!
+ return mdsmap->get_random_in_mds();
+ //return rand() % mdsmap->get_num_mds(); // huh.. pick a random mds!
}
else
return authority(mdsmap);
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#include "msg/Message.h"
-
-// send the message, expecting no response. threads other than the
-// MPI thread use this function; if the MPI thread uses this function
-// it could deadlock: this function could wait for the out queue to be
-// emptied, but only the MPI thread can empty it.
-void obfsmpi_send(Message *m)
-
-// send the message to a server and wait for the response. threads
-// other than the MPI thread use this function.
-Message *obfsmpi_sendrecv(Message *m)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+
+#ifndef _RWLock_Posix_
+#define _RWLock_Posix_
+
+#include <pthread.h>
+
+class RWLock
+{
+ mutable pthread_rwlock_t L;
+
+ public:
+
+ RWLock() {
+ pthread_rwlock_init(&L, NULL);
+ }
+
+ virtual ~RWLock() {
+ pthread_rwlock_unlock(&L);
+ pthread_rwlock_destroy(&L);
+ }
+
+ void unlock() {
+ pthread_rwlock_unlock(&L);
+ }
+ void get_read() {
+ pthread_rwlock_rdlock(&L);
+ }
+ void put_read() { unlock(); }
+ void get_write() {
+ pthread_rwlock_wrlock(&L);
+ }
+ void put_write() { unlock(); }
+};
+
+#endif // !_Mutex_Posix_
// ioctl block device
uint64_t bytes = 0;
r = ioctl(fd, BLKGETSIZE64, &bytes);
- num_blocks = bytes / 4096;
+ num_blocks = bytes / (uint64_t)EBOFS_BLOCK_SIZE;
if (r == 0) {
dout(1) << "get_num_blocks ioctl BLKGETSIZE64 reports "
- << bytes << " bytes, "
- << num_blocks << " 4k blocks"
+ << num_blocks << " 4k blocks, "
+ << bytes << " bytes"
<< endl;
#else
// hrm, try the 32 bit ioctl?
unsigned long sectors = 0;
r = ioctl(fd, BLKGETSIZE, §ors);
- num_blocks = sectors/8;
+ num_blocks = sectors/8ULL;
+ bytes = sectors*512ULL;
if (r == 0) {
dout(1) << "get_num_blocks ioctl BLKGETSIZE reports " << sectors << " sectors, "
- << num_blocks << " 4k blocks, " << (num_blocks*4096) << " bytes" << endl;
+ << num_blocks << " 4k blocks, " << bytes << " bytes" << endl;
#endif
} else {
// hmm, try stat!
dout(10) << "get_num_blocks ioctl(2) failed with " << errno << " " << strerror(errno) << ", using stat(2)" << endl;
struct stat st;
fstat(fd, &st);
- num_blocks = st.st_size;
- dout(1) << "get_num_blocks stat reports " << num_blocks << " 4k blocks, " << (num_blocks*4096) << " bytes" << endl;
+ uint64_t bytes = st.st_size;
+ num_blocks = bytes / EBOFS_BLOCK_SIZE;
+ dout(1) << "get_num_blocks stat reports " << num_blocks << " 4k blocks, " << bytes << " bytes" << endl;
}
- num_blocks /= (uint64_t)EBOFS_BLOCK_SIZE;
-
if (g_conf.bdev_fake_mb) {
num_blocks = g_conf.bdev_fake_mb * 256;
dout(0) << "faking dev size " << g_conf.bdev_fake_mb << " mb" << endl;
release();
}
+ void swap(ptr& other) {
+ raw *r = _raw;
+ unsigned o = _off;
+ unsigned l = _len;
+ _raw = other._raw;
+ _off = other._off;
+ _len = other._len;
+ other._raw = r;
+ other._off = o;
+ other._len = l;
+ }
+
void release() {
if (_raw) {
_raw->lock.Lock();
const std::list<ptr>& buffers() const { return _buffers; }
+ void swap(list& other) {
+ unsigned t = _len;
+ _len = other._len;
+ other._len = t;
+ _buffers.swap(other._buffers);
+ append_buffer.swap(other.append_buffer);
+ }
+
unsigned length() const {
#if 1
// DEBUG: verify _len
// project update
inode_t *pi = cur->project_inode();
- pi->mode = req->args.chmod.mode & 04777;
+ pi->mode =
+ (pi->mode & ~04777) |
+ (req->args.chmod.mode & 04777);
pi->version = cur->pre_dirty();
pi->ctime = g_clock.real_now();
int numfiles = encode_dir_contents(dir, inls, dnls);
// . too
- dnls.push_back(".");
- inls.push_back(new InodeStat(diri, mds->get_nodeid()));
- ++numfiles;
+ //dnls.push_back(".");
+ //inls.push_back(new InodeStat(diri, mds->get_nodeid()));
+ //++numfiles;
// yay, reply
MClientReply *reply = new MClientReply(req);
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#include "HostMonitor.h"
-
-#include "msg/Message.h"
-#include "msg/Messenger.h"
-
-#include "messages/MPing.h"
-#include "messages/MPingAck.h"
-#include "messages/MFailure.h"
-#include "messages/MFailureAck.h"
-
-#include "common/Timer.h"
-#include "common/Clock.h"
-
-#define DBL 10
-
-
-#include "config.h"
-#undef dout
-#define dout(l) if (l<=g_conf.debug) cout << whoami << " hostmon: "
-
-
-// timer contexts
-
-class C_HM_InitiateHeartbeat : public Context {
- HostMonitor *hm;
-public:
- C_HM_InitiateHeartbeat(HostMonitor *hm) {
- this->hm = hm;
- }
- void finish(int r) {
- //cout << "HEARTBEAT" << endl;
- hm->pending_events.erase(this);
- hm->initiate_heartbeat();
- }
-};
-
-class C_HM_CheckHeartbeat : public Context {
- HostMonitor *hm;
-public:
- C_HM_CheckHeartbeat(HostMonitor *hm) {
- this->hm = hm;
- }
- void finish(int r) {
- //cout << "CHECK" << endl;
- hm->pending_events.erase(this);
- hm->check_heartbeat();
- }
-};
-
-
-
-// startup/shutdown
-
-void HostMonitor::init()
-{
- dout(DBL) << "init" << endl;
-
- // hack params for now
- heartbeat_interval = 10;
- max_ping_time = 2;
- max_heartbeat_misses = 3;
- notify_retry_interval = 10;
-
- // schedule first hb
- schedule_heartbeat();
-}
-
-
-void HostMonitor::shutdown()
-{
- // cancel any events
- for (set<Context*>::iterator it = pending_events.begin();
- it != pending_events.end();
- it++) {
- g_timer.cancel_event(*it);
- delete *it;
- }
- pending_events.clear();
-}
-
-
-// schedule next heartbeat
-
-void HostMonitor::schedule_heartbeat()
-{
- dout(DBL) << "schedule_heartbeat" << endl;
- Context *e = new C_HM_InitiateHeartbeat(this);
- pending_events.insert(e);
- g_timer.add_event_after(heartbeat_interval, e);
-}
-
-
-// take note of a live host
-
-void HostMonitor::host_is_alive(entity_name_t host)
-{
- if (hosts.count(host))
- status[host].last_heard_from = g_clock.gettime();
-}
-
-
-// do heartbeat
-
-void HostMonitor::initiate_heartbeat()
-{
- time_t now = g_clock.gettime();
-
- // send out pings
- inflight_pings.clear();
- for (set<entity_name_t>::iterator it = hosts.begin();
- it != hosts.end();
- it++) {
- // have i heard from them recently?
- if (now - status[*it].last_heard_from < heartbeat_interval) {
- dout(DBL) << "skipping " << *it << ", i heard from them recently" << endl;
- } else {
- dout(DBL) << "pinging " << *it << endl;
- status[*it].last_pinged = now;
- inflight_pings.insert(*it);
-
- messenger->send_message(new MPing(1), *it, 0);
- }
- }
-
- // set timer to check results
- Context *e = new C_HM_CheckHeartbeat(this);
- pending_events.insert(e);
- g_timer.add_event_after(max_ping_time, e);
- dout(10) << "scheduled check " << e << endl;
-
- schedule_heartbeat(); // schedule next heartbeat
-}
-
-
-// check results
-
-void HostMonitor::check_heartbeat()
-{
- dout(DBL) << "check_heartbeat()" << endl;
-
- // check inflight pings
- for (set<entity_name_t>::iterator it = inflight_pings.begin();
- it != inflight_pings.end();
- it++) {
- status[*it].num_heartbeats_missed++;
-
- dout(DBL) << "no response from " << *it << " for " << status[*it].num_heartbeats_missed << " beats" << endl;
-
- if (status[*it].num_heartbeats_missed >= max_heartbeat_misses) {
- if (acked_failures.count(*it)) {
- dout(DBL) << *it << " is already failed" << endl;
- } else {
- if (unacked_failures.count(*it)) {
- dout(DBL) << *it << " is already failed, but unacked, sending another failure message" << endl;
- } else {
- dout(DBL) << "failing " << *it << endl;
- unacked_failures.insert(*it);
- }
-
- /*if (false) // do this in NewMessenger for now! FIXME
- for (set<msg_addr_t>::iterator nit = notify.begin();
- nit != notify.end();
- nit++) {
- messenger->send_message(new MFailure(*it, messenger->get_inst(*it)),
- *nit, notify_port, 0);
- }
- */
- }
- }
- }
-
- // forget about the pings.
- inflight_pings.clear();
-}
-
-
-// incoming messages
-
-void HostMonitor::proc_message(Message *m)
-{
- switch (m->get_type()) {
-
- case MSG_PING_ACK:
- handle_ping_ack((MPingAck*)m);
- break;
-
- case MSG_FAILURE_ACK:
- handle_failure_ack((MFailureAck*)m);
- break;
-
- }
-}
-
-void HostMonitor::handle_ping_ack(MPingAck *m)
-{
- entity_name_t from = m->get_source();
-
- dout(DBL) << "ping ack from " << from << endl;
- status[from].last_pinged = g_clock.gettime();
- status[from].num_heartbeats_missed = 0;
- inflight_pings.erase(from);
-
- delete m;
-}
-
-void HostMonitor::handle_failure_ack(MFailureAck *m)
-{
-
- // FIXME: this doesn't handle failed -> alive transitions gracefully at all..
-
- // the higher-up's acknowledged our failure notification, we can stop resending it.
- entity_name_t failed = m->get_failed();
- dout(DBL) << "handle_failure_ack " << failed << endl;
- unacked_failures.erase(failed);
- acked_failures.insert(failed);
-
- delete m;
-}
-
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __HOSTMONITOR_H
-#define __HOSTMONITOR_H
-
-#include <time.h>
-
-#include <map>
-#include <set>
-using namespace std;
-
-#include "include/Context.h"
-#include "msg/Message.h"
-
-class Message;
-class Messenger;
-
-typedef struct {
- time_t last_heard_from;
- time_t last_pinged;
- int num_heartbeats_missed;
-} monitor_rec_t;
-
-class HostMonitor {
- Messenger *messenger;
- string whoami;
-
- // hosts i monitor
- set<entity_name_t> hosts;
-
- // who i tell when they fail
- set<entity_name_t> notify;
- int notify_port;
-
- // their status
- map<entity_name_t,monitor_rec_t> status;
-
- set<entity_name_t> inflight_pings; // pings we sent that haven't replied yet
-
- set<entity_name_t> unacked_failures; // failed hosts that haven't been acked yet.
- set<entity_name_t> acked_failures; // these failures have been acked.
-
- float heartbeat_interval; // how often to do a heartbeat
- float max_ping_time; // how long before it's a miss
- int max_heartbeat_misses; // how many misses before i tell
- float notify_retry_interval; // how often to retry failure notification
-
- public:
- set<Context*> pending_events;
-
- private:
- void schedule_heartbeat();
-
- public:
- HostMonitor(Messenger *m, string& whoami) {
- this->messenger = m;
- this->whoami = whoami;
- notify_port = 0;
- }
- set<entity_name_t>& get_hosts() { return hosts; }
- set<entity_name_t>& get_notify() { return notify; }
- void set_notify_port(int p) { notify_port = p; }
-
- void remove_host(entity_name_t h) {
- hosts.erase(h);
- status.erase(h);
- unacked_failures.erase(h);
- acked_failures.erase(h);
- }
-
- void init();
- void shutdown();
-
- void host_is_alive(entity_name_t who);
-
- void proc_message(Message *m);
- void handle_ping_ack(class MPingAck *m);
- void handle_failure_ack(class MFailureAck *m);
-
- void initiate_heartbeat();
- void check_heartbeat();
-
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#ifndef _RWLock_Posix_
-#define _RWLock_Posix_
-
-#include <pthread.h>
-
-class RWLock
-{
- mutable pthread_rwlock_t L;
-
- public:
-
- RWLock() {
- pthread_rwlock_init(&L, NULL);
- }
-
- virtual ~RWLock() {
- pthread_rwlock_unlock(&L);
- pthread_rwlock_destroy(&L);
- }
-
- void unlock() {
- pthread_rwlock_unlock(&L);
- }
- void get_read() {
- pthread_rwlock_rdlock(&L);
- }
- void put_read() { unlock(); }
- void get_write() {
- pthread_rwlock_wrlock(&L);
- }
- void put_write() { unlock(); }
-};
-
-#endif // !_Mutex_Posix_
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __SERIAL_MESSENGER_H
-#define __SERIAL_MESSENGER_H
-
-#include "Dispatcher.h"
-#include "Message.h"
-
-class SerialMessenger : public Dispatcher {
- public:
- virtual void dispatch(Message *m) = 0; // i receive my messages here
- virtual void send(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0; // doesn't block
- virtual Message *sendrecv(Message *m, entity_name_t dest, int port=0, int fromport=0) = 0; // blocks for matching reply
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#include <mpi.h>
-
-#include "TCPMessenger.h"
-
-/*
- * start up TCPMessenger via MPI.
- */
-
-pair<int,int> mpi_bootstrap_tcp(int& argc, char**& argv)
-{
- tcpmessenger_init();
- tcpmessenger_start();
-
- // exchnage addresses with other nodes
- MPI_Init(&argc, &argv);
-
- int mpi_world;
- int mpi_rank;
- MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
- MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
-
- //dout(1) << "i am " << mpi_rank << " of " << mpi_world << endl;
-
- // start up directory?
- tcpaddr_t ta;
- if (mpi_rank == 0) {
- dout(30) << "i am rank 0, starting ns directory" << endl;
- tcpmessenger_start_nameserver(ta);
- } else {
- memset(&ta, 0, sizeof(ta));
- }
-
- // distribute tcpaddr
- int r = MPI_Bcast(&ta, sizeof(ta), MPI_CHAR,
- 0, MPI_COMM_WORLD);
-
- dout(30) << "r = " << r << " ns tcpaddr is " << ta << endl;
- tcpmessenger_start_rankserver(ta);
-
- MPI_Barrier(MPI_COMM_WORLD);
- //g_clock.tare();
- MPI_Finalize();
-
- return pair<int,int>(mpi_rank, mpi_world);
-}
-
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#include <mpi.h>
-#include "NewMessenger.h"
-
-/*
- * start up NewMessenger via MPI.
- */
-
-pair<int,int> mpi_bootstrap_new(int& argc, char**& argv)
-{
- MPI_Init(&argc, &argv);
-
- int mpi_world;
- int mpi_rank;
- MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
- MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
-
- tcpaddr_t nsaddr;
- memset(&nsaddr, 0, sizeof(nsaddr));
-
- if (mpi_rank == 0) {
- // i am root.
- rank.my_rank = 0;
- rank.start_rank(nsaddr);
- nsaddr = rank.get_listen_addr();
- }
-
- int r = MPI_Bcast(&nsaddr, sizeof(nsaddr), MPI_CHAR,
- 0, MPI_COMM_WORLD);
-
- dout(30) << "r = " << r << " ns tcpaddr is " << nsaddr << endl;
-
- if (mpi_rank != 0) {
- rank.start_rank(nsaddr);
- }
-
- MPI_Barrier(MPI_COMM_WORLD);
-
- //g_clock.tare();
-
- MPI_Finalize();
-
- return pair<int,int>(mpi_rank, mpi_world);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-//
-//
-// rush.cc
-//
-// $Id$
-//
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <cassert>
-#include "rush.h"
-
-
-static
-unsigned int
-myhash (unsigned int n)
-{
- unsigned int v = (n ^ 0xdead1234) * (884811920 * 3 + 1);
- return (v);
-}
-
-Rush::Rush ()
-{
- nClusters = 0;
- totalServers = 0;
-}
-
-//----------------------------------------------------------------------
-//
-// Rush::AddCluster
-//
-// Add a cluster. The number of servers in the cluster and
-// the weight of each server is passed. The current number of
-// clusters is returned.
-//
-//----------------------------------------------------------------------
-int
-Rush::AddCluster (int nServers, double weight)
-{
- clusterSize[nClusters] = nServers;
- clusterWeight[nClusters] = weight;
- if (nClusters == 0) {
- serversInPrevious[0] = 0;
- totalWeightBefore[0] = 0.0;
- } else {
- serversInPrevious[nClusters] = serversInPrevious[nClusters-1] +
- clusterSize[nClusters-1];
- totalWeightBefore[nClusters] =
- totalWeightBefore[nClusters-1] + (double)clusterSize[nClusters-1] *
- clusterWeight[nClusters-1];
- }
- nClusters += 1;
- totalServers += nServers;
-#if 0
- for (int i = 0; i < nClusters; i++) {
- fprintf (stderr, "size=%-3d prev=%-3d weight=%-6.2f prevWeight=%-8.2f\n",
- clusterSize[i], serversInPrevious[i], clusterWeight[i],
- totalWeightBefore[i]);
- }
-#endif
- return (nClusters);
-}
-
-
-//----------------------------------------------------------------------
-//
-// Rush::GetServersByKey
-//
-// This function returns a list of servers on which an object
-// should be placed. The servers array must be large enough to
-// contain the list.
-//
-//----------------------------------------------------------------------
-void
-Rush::GetServersByKey (int key, int nReplicas, int servers[])
-{
- int replicasLeft = nReplicas;
- int cluster;
- int mustAssign, numberAssigned;
- int i, toDraw;
- int *srv = servers;
- double myWeight;
- RushRNG rng;
-
- // There may not be more replicas than servers!
- assert (nReplicas <= totalServers);
-
- for (cluster = nClusters-1; (cluster >= 0) && (replicasLeft > 0); cluster--) {
- if (serversInPrevious[cluster] < replicasLeft) {
- mustAssign = replicasLeft - serversInPrevious[cluster];
- } else {
- mustAssign = 0;
- }
- toDraw = replicasLeft - mustAssign;
- if (toDraw > (clusterSize[cluster] - mustAssign)) {
- toDraw = clusterSize[cluster] - mustAssign;
- }
- myWeight = (double)clusterSize[cluster] * clusterWeight[cluster];
- rng.Seed (myhash (key)^cluster, cluster^0xb90738);
- numberAssigned = mustAssign +
- rng.HyperGeometricWeighted (toDraw, myWeight,
- totalWeightBefore[cluster] + myWeight,
- clusterWeight[cluster]);
- if (numberAssigned > 0) {
- rng.Seed (myhash (key)^cluster ^ 11, cluster^0xfea937);
- rng.DrawKofN (srv, numberAssigned, clusterSize[cluster]);
- for (i = 0; i < numberAssigned; i++) {
- srv[i] += serversInPrevious[cluster];
- }
- replicasLeft -= numberAssigned;
- srv += numberAssigned;
- }
- }
-}
-
-\f
-
-//----------------------------------------------------------------------
-//
-// RushRNG::HyperGeometricWeighted
-//
-// Use an iterative method to generate a hypergeometric random
-// variable. This approach guarantees that, if the number of draws
-// is reduced, the number of successes must be as well as long as
-// the seed for the RNG is the same.
-//
-//----------------------------------------------------------------------
-int
-RushRNG::HyperGeometricWeighted (int nDraws, double yesWeighted,
- double totalWeighted, double weightOne)
-{
- int positives = 0, i;
- double curRand;
-
- // If the weight is too small (or is negative), choose zero objects.
- if (weightOne <= 1e-9 || nDraws == 0) {
- return (0);
- }
-
- // Draw nDraws items from the "bag". For each positive, subtract off
- // the weight of an object from the weight of positives remaining. For
- // each draw, subtract off the weight of an object from the total weight
- // remaining.
- for (i = 0; i < nDraws; i++) {
- curRand = RandomDouble ();
- if (curRand < (yesWeighted / totalWeighted)) {
- positives += 1;
- yesWeighted -= weightOne;
- }
- totalWeighted -= weightOne;
- }
- return (positives);
-}
-
-//----------------------------------------------------------------------
-//
-// RushRNG::DrawKofN
-//
-//----------------------------------------------------------------------
-void
-RushRNG::DrawKofN (int vals[], int nToDraw, int setSize)
-{
- int deck[setSize];
- int i, pick;
-
- assert(nToDraw <= setSize);
-
- for (i = 0; i < setSize; i++) {
- deck[i] = i;
- }
-
- for (i = 0; i < nToDraw; i++) {
- pick = (int)(RandomDouble () * (double)(setSize - i));
- if (pick >= setSize-i) pick = setSize-i-1; // in case
- // assert(i >= 0 && i < nToDraw);
- // assert(pick >= 0 && pick < setSize);
- vals[i] = deck[pick];
- deck[pick] = deck[setSize-i-1];
- }
-}
-
-#define SEED_X 521288629
-#define SEED_Y 362436069
-RushRNG::RushRNG ()
-{
- Seed (0, 0);
-}
-
-void
-RushRNG::Seed (unsigned int seed1, unsigned int seed2)
-{
- state1 = ((seed1 == 0) ? SEED_X : seed1);
- state2 = ((seed2 == 0) ? SEED_Y : seed2);
-}
-
-unsigned int
-RushRNG::RandomInt ()
-{
- const unsigned int a = 18000;
- const unsigned int b = 18879;
- unsigned int rndValue;
-
- state1 = a * (state1 & 0xffff) + (state1 >> 16);
- state2 = b * (state2 & 0xffff) + (state2 >> 16);
- rndValue = (state1 << 16) + (state2 & 0xffff);
- return (rndValue);
-}
-
-double
-RushRNG::RandomDouble ()
-{
- double v;
-
- v = (double)RandomInt() / (65536.0*65536.0);
- return (v);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-//
-//
-// rush.h
-//
-// Classes and definitions for the RUSH algorithm.
-//
-// $Id$
-//
-//
-
-#ifndef _rush_h_
-#define _rush_h_
-
-#define RUSH_MAX_CLUSTERS 100
-
-class RushRNG {
-public:
- unsigned int RandomInt ();
- double RandomDouble ();
- void Seed (unsigned int a, unsigned int b);
- int HyperGeometricWeighted (int nDraws, double yesWeighted,
- double totalWeighted, double weightOne);
- void DrawKofN (int vals[], int nToDraw, int setSize);
- RushRNG();
-private:
- unsigned int state1, state2;
-};
-
-class Rush {
-public:
- void GetServersByKey (int key, int nReplicas, int servers[]);
- int AddCluster (int nServers, double weight);
- int Clusters () {return (nClusters);}
- int Servers () {return (totalServers);}
- Rush ();
-private:
- int DrawKofN (int *servers, int n, int clusterSize, RushRNG *g);
- int nClusters;
- int totalServers;
- int clusterSize[RUSH_MAX_CLUSTERS];
- int serversInPrevious[RUSH_MAX_CLUSTERS];
- double clusterWeight[RUSH_MAX_CLUSTERS];
- double totalWeightBefore[RUSH_MAX_CLUSTERS];
-};
-
-#endif /* _rush_h_ */
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#include <iostream>
-#include <string>
-
-using namespace std;
-
-#include "common/Mutex.h"
-#include "common/ThreadPool.h"
-// #include <thread.h>
-
-class Op {
- int i;
-
-public:
-
- Op(int i)
- {
- this->i = i;
- }
-
- int get()
- {
- return i;
- }
-};
-
-void foop(class TP *t, class Op *o);
-
-class TP {
-public:
-
- void foo(Op *o)
- {
- cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
- usleep(1);
-
- // sched_yield();
- }
-
- int main(int argc, char *argv)
- {
- ThreadPool<TP,Op> *t = new ThreadPool<TP,Op>(10, (void (*)(TP*, Op*))foop, this);
-
- for(int i = 0; i < 100; i++) {
- Op *o = new Op(i);
- t->put_op(o);
- }
-
- sleep(1);
-
- delete(t);
-
- return 0;
- }
-};
-
-void foop(class TP *t, class Op *o) {
- t->foo(o);
-}
-
-int main(int argc, char *argv) {
- TP t;
-
- t.main(argc,argv);
-}
-
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".objectcacher.object(" << oid << ") "
-ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
+ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, off_t off)
{
- dout(20) << "split " << *bh << " at " << off << endl;
+ dout(20) << "split " << *left << " at " << off << endl;
// split off right
ObjectCacher::BufferHead *right = new BufferHead(this);
- right->last_write_tid = bh->last_write_tid;
- right->set_state(bh->get_state());
+ right->last_write_tid = left->last_write_tid;
+ right->set_state(left->get_state());
- off_t newleftlen = off - bh->start();
- right->set_start( off );
- right->set_length( bh->length() - newleftlen );
+ off_t newleftlen = off - left->start();
+ right->set_start(off);
+ right->set_length(left->length() - newleftlen);
// shorten left
- oc->bh_stat_sub(bh);
- bh->set_length( newleftlen );
- oc->bh_stat_add(bh);
+ oc->bh_stat_sub(left);
+ left->set_length(newleftlen);
+ oc->bh_stat_add(left);
// add right
oc->bh_add(this, right);
// split buffers too
bufferlist bl;
- bl.claim(bh->bl);
+ bl.claim(left->bl);
if (bl.length()) {
- assert(bl.length() == (bh->length() + right->length()));
- right->bl.substr_of(bl, bh->length(), right->length());
- bh->bl.substr_of(bl, 0, bh->length());
+ assert(bl.length() == (left->length() + right->length()));
+ right->bl.substr_of(bl, left->length(), right->length());
+ left->bl.substr_of(bl, 0, left->length());
}
// move read waiters
- if (!bh->waitfor_read.empty()) {
- map<off_t, list<Context*> >::iterator o, p = bh->waitfor_read.end();
+ if (!left->waitfor_read.empty()) {
+ map<off_t, list<Context*> >::iterator o, p = left->waitfor_read.end();
p--;
- while (p != bh->waitfor_read.begin()) {
+ while (p != left->waitfor_read.begin()) {
if (p->first < right->start()) break;
dout(0) << "split moving waiters at byte " << p->first << " to right bh" << endl;
right->waitfor_read[p->first].swap( p->second );
o = p;
p--;
- bh->waitfor_read.erase(o);
+ left->waitfor_read.erase(o);
}
}
- dout(20) << "split left is " << *bh << endl;
+ dout(20) << "split left is " << *left << endl;
dout(20) << "split right is " << *right << endl;
return right;
}
size_t bhoff = bh->start() - opos;
assert(f_it->second <= bh->length() - bhoff);
+ // get the frag we're mapping in
bufferlist frag;
frag.substr_of(wr->bl,
f_it->first, f_it->second);
- bh->bl.claim_append(frag);
+ // keep anything left of bhoff
+ bufferlist newbl;
+ if (bhoff)
+ newbl.substr_of(bh->bl, 0, bhoff);
+ newbl.claim_append(frag);
+ bh->bl.swap(newbl);
+
opos += f_it->second;
}
// recombine with left?
map<off_t,BufferHead*>::iterator p = o->data.find(bh->start());
if (p != o->data.begin()) {
+ assert(p->second == bh);
p--;
if (p->second->is_dirty()) {
- o->merge_left(p->second,bh);
+ o->merge_left(p->second, bh);
bh = p->second;
}
}
// right?
- p = o->data.find(bh->start());
- p++;
- if (p != o->data.end() &&
- p->second->is_dirty())
- o->merge_left(p->second,bh);
+ while (1) {
+ p = o->data.find(bh->start());
+ assert(p->second == bh);
+ p++;
+ if (p == o->data.end() || !p->second->is_dirty()) break;
+ o->merge_left(bh, p->second);
+ }
}
delete wr;