# behave just fine... change ${CC} back to mpicxx if you get paranoid.
CC = g++
CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
-LIBS = -lpthread -lrt -ldb
+LIBS = -lpthread -lrt
#for normal mpich2 machines
MPICC = mpicxx
CLIENT_OBJS= \
client/Client.o\
client/SyntheticClient.o\
- client/Trace.o\
- client/Buffercache.o
+ client/Trace.o
TCP_OBJS = \
msg/TCPMessenger.o\
# libceph
-libceph.o: client/ldceph.o client/Client.o client/Buffercache.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS}
+libceph.o: client/ldceph.o client/Client.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS}
ld -i $^ -o $@
bench/mdtest/mdtest.o: bench/mdtest/mdtest.c
+
+
- stability
- ebofs table.remove() thing
- fakestore crapping out.. missing timer events?
ebofs
+- hey.. why do we drop on->commit_waiters on delete? that's stupid!
- fix NEAR_LAST_FWD (?)
- combine inodes into same blocks?
- delay allocation
osdmap = new OSDMap(); // initially blank.. see mount()
objecter = new Objecter(messenger, osdmap);
objectcacher = new ObjectCacher(objecter);
- filer = new Filer(objecter); //, objectcacher);
+ filer = new Filer(objecter, objectcacher);
}
in->inode = m->get_inode(); // might have updated size... FIXME this is overkill!
// flush buffers?
- if (g_conf.client_bcache &&
+ if (g_conf.client_oc &&
!(in->file_caps() & CAP_FILE_WRBUFFER)) {
// **** write me ****
dout(10) << " flushing dirty buffers on " << hex << in->ino() << dec << endl;
- /*if (g_conf.client_bcache &&
+ /*if (g_conf.client_oc &&
(fc->is_dirty() || fc->is_inflight())) {
// flushing.
dout(10) << " waiting for inflight buffers on " << hex << in->ino() << dec << endl;
// ------------
// read, write
-// ------------------------
// blocking osd interface
-class C_Client_Cond : public Context {
-public:
- Cond *cond;
- bool *done;
- int *rvalue;
- C_Client_Cond(Cond *cond, bool *d, int *rvalue) {
- this->done = d;
- this->cond = cond;
- this->rvalue = rvalue;
- *done = false;
- }
- void finish(int r) {
- *rvalue = r;
- *done = true;
- cond->Signal();
- }
-};
-
-
-class C_Client_LockedCond : public Context {
-public:
- bool *done;
- Cond *cond;
- Mutex *mutex;
- int *rvalue;
- C_Client_LockedCond(Cond *cond, Mutex *mutex, bool *d, int *rvalue) {
- this->done = d;
- this->cond = cond;
- this->mutex = mutex;
- this->rvalue = rvalue;
- *done = false;
- }
- void finish(int r) {
- mutex->Lock();
- *rvalue = r;
- *done = true;
- cond->Signal();
- mutex->Unlock();
- }
-};
-
-
int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
{
client_lock.Lock();
// adjust fd pos
f->pos = offset+size;
- if (!g_conf.client_bcache) {
+ if (!g_conf.client_oc) {
// buffer cache OFF
Cond cond;
bufferlist blist; // data will go here
bool done = false;
- C_Client_Cond *onfinish = new C_Client_Cond(&cond, &done, &rvalue);
+ C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
filer->read(in->inode, size, offset, &blist, onfinish);
while (!done)
cond.Wait(client_lock);
// adjust fd pos
f->pos = offset+size;
- if (g_conf.client_bcache && // buffer cache ON?
- (in->file_caps() & CAP_FILE_WRBUFFER)) { // caps buffered write?
+ // time it.
+ utime_t start = g_clock.now();
- // ***
+ if (g_conf.client_oc) { // buffer cache ON?
+ assert(objectcacher);
+
+ bufferlist blist;
+ blist.push_back( new buffer(buf, size) );
+ // wait? (this may block!)
+ objectcacher->wait_for_write(size, client_lock);
+
+ if (in->file_caps() & CAP_FILE_WRBUFFER) { // caps buffered write?
+ // async, caching, non-blocking.
+ filer->caching_write(in->inode, size, offset, blist);
+ } else {
+ // atomic, synchronous, blocking.
+ filer->atomic_sync_write(in->inode, size, offset, blist, client_lock);
+ }
} else {
- // synchronous write
- // FIXME: do not bypass buffercache
- //if (g_conf.client_bcache) {
- // write me
- //} else
- {
- dout(7) << "synchronous write" << endl;
-
- // create a buffer that refers to *buf, but doesn't try to free it when it's done.
- bufferlist blist;
- blist.push_back( new buffer(buf, size, BUFFER_MODE_NOCOPY|BUFFER_MODE_NOFREE) );
-
- // issue write
- Cond cond;
- int rvalue = 0;
+ // legacy, inconsistent synchronous write.
+ dout(7) << "synchronous write" << endl;
- utime_t start = g_clock.now();
-
- bool done = false;
- C_Client_Cond *onfinish = new C_Client_Cond(&cond, &done, &rvalue);
- C_Client_HackUnsafe *onsafe = new C_Client_HackUnsafe(this);
- unsafe_sync_write++;
-
- dout(20) << " sync write start " << onfinish << endl;
-
- filer->write(in->inode, size, offset, blist, 0,
- //NULL,NULL); // no wait hack
- onfinish, onsafe);
+ // create a buffer that refers to *buf, but doesn't try to free it when it's done.
+ bufferlist blist;
+ blist.push_back( new buffer(buf, size, BUFFER_MODE_NOCOPY|BUFFER_MODE_NOFREE) );
- while (!done) {
- cond.Wait(client_lock);
- dout(20) << " sync write bump " << onfinish << endl;
- }
-
- // time
- utime_t lat = g_clock.now();
- lat -= start;
- if (client_logger) {
- client_logger->finc("wrlsum",(double)lat);
- client_logger->inc("wrlnum");
- }
-
- dout(20) << " sync write done " << onfinish << endl;
+ // issue write
+ Cond cond;
+ int rvalue = 0;
+
+ bool done = false;
+ C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
+ C_Client_HackUnsafe *onsafe = new C_Client_HackUnsafe(this);
+ unsafe_sync_write++;
+
+ dout(20) << " sync write start " << onfinish << endl;
+
+ filer->write(in->inode, size, offset, blist, 0,
+ onfinish, onsafe);
+
+ while (!done) {
+ cond.Wait(client_lock);
+ dout(20) << " sync write bump " << onfinish << endl;
}
+ dout(20) << " sync write done " << onfinish << endl;
}
+ // time
+ utime_t lat = g_clock.now();
+ lat -= start;
+ if (client_logger) {
+ client_logger->finc("wrlsum",(double)lat);
+ client_logger->inc("wrlnum");
+ }
+
// assume success for now. FIXME.
off_t totalwritten = size;
#include "Mutex.h"
#include "Clock.h"
+#include "include/Context.h"
+
#include <pthread.h>
#include <cassert>
}
};
+class C_Cond : public Context {
+ Cond *cond;
+ bool *done;
+ int *rval;
+public:
+ C_Cond(Cond *c, bool *d, int *r=0) : cond(c), done(d), rval(r) {
+ *done = false;
+ }
+ void finish(int r) {
+ if (rval) *rval = r;
+ *done = true;
+ cond->Signal();
+ }
+};
+
+class C_SafeCond : public Context {
+ Mutex *lock;
+ Cond *cond;
+ bool *done;
+ int *rval;
+public:
+ C_SafeCond(Mutex *l, Cond *c, bool *d, int *r=0) : lock(l), cond(c), done(d), rval(r) {
+ *done = false;
+ }
+ void finish(int r) {
+ lock->Lock();
+ if (rval) *rval = r;
+ *done = false;
+ cond->Signal();
+ lock->Unlock();
+ }
+};
+
#endif // !_Cond_Posix_
client_sync_writes: 0,
+ client_oc: false,
+ client_oc_max_dirty: 1024*1024* 100,
+
+ /*
client_bcache: 0,
client_bcache_alloc_minsize: 1<<10, // 1KB
client_bcache_alloc_maxsize: 1<<18, // 256KB
client_bcache_lowater: 60, // % of size
client_bcache_hiwater: 80, // % of size
client_bcache_align: 1<<10, // 1KB splice alignment
+ */
client_trace: 0,
fuse_direct_io: 0,
else if (strcmp(args[i], "--client_sync_writes") == 0)
g_conf.client_sync_writes = atoi(args[++i]);
- else if (strcmp(args[i], "--client_bcache") == 0)
- g_conf.client_bcache = atoi(args[++i]);
- else if (strcmp(args[i], "--client_bcache_ttl") == 0)
- g_conf.client_bcache_ttl = atoi(args[++i]);
+ else if (strcmp(args[i], "--client_oc") == 0)
+ g_conf.client_oc = atoi(args[++i]);
+ else if (strcmp(args[i], "--client_oc_max_dirty") == 0)
+ g_conf.client_oc_max_dirty = atoi(args[++i]);
else if (strcmp(args[i], "--ebofs") == 0)
bool client_use_random_mds; // debug flag
bool client_sync_writes;
+
+ bool client_oc;
+ int client_oc_max_dirty;
+
+ /*
bool client_bcache;
int client_bcache_alloc_minsize;
int client_bcache_alloc_maxsize;
int client_bcache_lowater;
int client_bcache_hiwater;
size_t client_bcache_align;
+ */
int client_trace;
int fuse_direct_io;
// *** file i/o ***
-bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on)
+bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl,
+ Cond *will_wait_on, bool *will_wait_on_bool)
{
dout(10) << "attempt_read " << *on << " len " << len << " off " << off << endl;
ObjectCache *oc = on->get_oc(&bc);
}
BufferHead *wait_on = missing.begin()->second;
block_t b = MAX(wait_on->start(), bstart);
- wait_on->waitfor_read[b].push_back(new C_Cond(will_wait_on));
+ wait_on->waitfor_read[b].push_back(new C_Cond(will_wait_on, will_wait_on_bool));
return false;
}
if (!i->second->have_partial_range(start, end)) {
if (partials_ok) {
// wait on this one
- Context *c = new C_Cond(will_wait_on);
+ Context *c = new C_Cond(will_wait_on, will_wait_on_bool);
dout(10) << "attempt_read insufficient partial buffer " << *(i->second) << " c " << c << endl;
i->second->waitfor_read[i->second->start()].push_back(c);
}
// wait on rx?
if (!rx.empty()) {
BufferHead *wait_on = rx.begin()->second;
- Context *c = new C_Cond(will_wait_on);
+ Context *c = new C_Cond(will_wait_on, will_wait_on_bool);
dout(1) << "attempt_read waiting for read to finish on " << *wait_on << " c " << c << endl;
block_t b = MAX(wait_on->start(), bstart);
wait_on->waitfor_read[b].push_back(c);
size_t will_read = MIN(off+len, on->object_size) - off;
- if (attempt_read(on, will_read, off, bl, &cond))
+ bool done;
+ if (attempt_read(on, will_read, off, bl, &cond, &done))
break; // yay
// wait
- cond.Wait(ebofs_lock);
+ while (!done)
+ cond.Wait(ebofs_lock);
if (on->deleted) {
dout(7) << "read " << hex << oid << dec << " len " << len << " off " << off << " ... object deleted" << endl;
if (fsync) {
// wait for flush.
Cond cond;
+ bool done;
int flush = 1; // write never returns positive
- Context *c = new C_Cond(&cond, &flush);
+ Context *c = new C_Cond(&cond, &done, &flush);
int r = write(oid, len, off, bl, c);
if (r < 0) return r;
ebofs_lock.Lock();
- if (flush == 1) { // write never returns positive
- cond.Wait(ebofs_lock);
+ {
+ while (!done)
+ cond.Wait(ebofs_lock);
assert(flush <= 0);
}
ebofs_lock.Unlock();
interval_set<block_t>& alloc,
block_t& old_bfirst, block_t& old_blast);
void apply_write(Onode *on, size_t len, off_t off, bufferlist& bl);
- bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on);
+ bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl,
+ Cond *will_wait_on, bool *will_wait_on_bool);
// ** finisher **
// async write notification to users
#endif
-class C_Cond : public Context {
- Cond *cond;
- int *rval;
-public:
- C_Cond(Cond *c, int *r=0) : cond(c), rval(r) {}
- void finish(int r) {
- if (rval) *rval = r;
- //cout << "C_Cond signal " << this << " cond " << (void*)cond << " rval " << (void*)rval << " r " << r << endl;
- cond->Signal();
- }
-};
-
-class C_SafeCond : public Context {
- Mutex *lock;
- Cond *cond;
- int *rval;
-public:
- C_SafeCond(Mutex *l, Cond *c, int *r=0) : lock(l), cond(c), rval(r) {}
- void finish(int r) {
- if (rval) *rval = r;
- lock->Lock();
- //cout << "C_Cond signal " << this << " cond " << (void*)cond << " rval " << (void*)rval << " r " << r << endl;
- cond->Signal();
- lock->Unlock();
- }
-};
/*
// list<T>
template<class T>
-inline void _encode(list<T>& s, bufferlist& bl)
+inline void _encode(const list<T>& s, bufferlist& bl)
{
int n = s.size();
bl.append((char*)&n, sizeof(n));
- for (typename list<T>::iterator it = s.begin();
+ for (typename list<T>::const_iterator it = s.begin();
it != s.end();
it++) {
T v = *it;
// map<T,U>
template<class T, class U>
-inline void _encode(map<T, U>& s, bufferlist& bl)
+inline void _encode(const map<T, U>& s, bufferlist& bl)
{
int n = s.size();
bl.append((char*)&n, sizeof(n));
- for (typename map<T, U>::iterator it = s.begin();
+ for (typename map<T, U>::const_iterator it = s.begin();
it != s.end();
it++) {
T k = it->first;
--- /dev/null
+// -*- mode:C++; tab-width:4; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MOSDPGLOG_H
+#define __MOSDPGLOG_H
+
+#include "msg/Message.h"
+
+class MOSDPGLog : public Message {
+ epoch_t epoch;
+ pg_t pgid;
+
+public:
+ PG::PGLog log;
+ PG::PGInfo info;
+
+ epoch_t get_epoch() { return epoch; }
+ pg_t get_pgid() { return pgid; }
+
+ MOSDPGLog() {}
+ MOSDPGLog(version_t mv, pg_t pgid) :
+ Message(MSG_OSD_PG_LOG) {
+ this->epoch = mv;
+ this->pgid = pgid;
+ }
+
+ char *get_type_name() { return "PGlog"; }
+
+ void encode_payload() {
+ payload.append((char*)&epoch, sizeof(epoch));
+ payload.append((char*)&pgid, sizeof(pgid));
+ payload.append((char*)&info, sizeof(info));
+ log._encode(payload);
+ }
+ void decode_payload() {
+ int off = 0;
+ payload.copy(off, sizeof(epoch), (char*)&epoch);
+ off += sizeof(epoch);
+ payload.copy(off, sizeof(pgid), (char*)&pgid);
+ off += sizeof(pgid);
+ payload.copy(off, sizeof(info), (char*)&info);
+ off += sizeof(info);
+ log._decode(payload, off);
+ }
+};
+
+#endif
epoch_t get_epoch() { return epoch; }
MOSDPGSummary() {}
- MOSDPGSummary(version_t mv, pg_t pgid, PG::PGContentSummary *sum) :
+ MOSDPGSummary(version_t mv, pg_t pgid, PG::PGSummary &summary) :
Message(MSG_OSD_PG_SUMMARY) {
this->epoch = mv;
this->pgid = pgid;
- sum->_encode(sumbl);
+ summary._encode(sumbl);
}
pg_t get_pgid() { return pgid; }
- PG::PGContentSummary *get_summary() {
- PG::PGContentSummary *sum = new PG::PGContentSummary;
- int off = 0;
- sum->_decode(sumbl,off);
- return sum;
+ bufferlist& get_summary_bl() {
+ return sumbl;
}
char *get_type_name() { return "PGsum"; }
#define MSG_OSD_PG_NOTIFY 50
#define MSG_OSD_PG_QUERY 51
#define MSG_OSD_PG_SUMMARY 52
+#define MSG_OSD_PG_LOG 53
#define MSG_OSD_PG_UPDATE 57
#define MSG_OSD_PG_REMOVE 58
#include "messages/MOSDMap.h"
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGQuery.h"
+#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGSummary.h"
//#include "messages/MOSDPGUpdate.h"
case MSG_OSD_PG_QUERY:
m = new MOSDPGQuery();
break;
+ case MSG_OSD_PG_LOG:
+ m = new MOSDPGLog();
+ break;
case MSG_OSD_PG_SUMMARY:
m = new MOSDPGSummary();
break;
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGSummary.h"
+#include "messages/MOSDPGLog.h"
#include "common/Logger.h"
#include "common/LogType.h"
pg->last_epoch_started_any = osdmap->get_epoch();
pg->mark_complete();
pg->mark_active();
-
+
dout(7) << "created " << *pg << endl;
pg_list.push_back(pgid);
}
pg->waiting_for_missing_object.clear();
// drop peers
- pg->drop_peers();
+ pg->clear_content_recovery_state();
pg->state_clear(PG::STATE_CLEAN);
}
// new primary?
if (role == 0) {
+ // i am new primary
pg->state_clear(PG::STATE_ACTIVE);
} else {
- // we need to announce
- pg->state_set(PG::STATE_ACTIVE);
+ // i am now replica|stray. we need to send a notify.
+ pg->state_clear(PG::STATE_ACTIVE);
+ pg->state_set(PG::STATE_STRAY);
if (nrep == 0)
dout(1) << "crashed pg " << *pg << endl;
for (set<int>::const_iterator down = osdmap->get_down_osds().begin();
down != osdmap->get_down_osds().end();
down++) {
- PG::PGPeer *pgp = pg->get_peer(*down);
- if (!pgp) continue;
+ if (!pg->is_acting(*down)) continue;
- dout(10) << " " << *pg << " peer osd" << *down << " is down, removing" << endl;
- pg->remove_peer(*down);
+ dout(10) << " " << *pg << " peer osd" << *down << " is down" << endl;
// NAK any ops to the down osd
if (replica_pg_osd_tids[pgid].count(*down)) {
if (pg->get_role() == 0) {
// i am primary
- repeer(pg, query_map);
+ pg->build_prior();
+ pg->peer(query_map);
}
else if (pg->is_stray()) {
// i am residual|replica
int who = pit->first;
dout(7) << "do_queries querying osd" << who
<< " on " << pit->second.size() << " PGs" << endl;
-
+
MOSDPGQuery *m = new MOSDPGQuery(osdmap->get_epoch(),
pit->second);
messenger->send_message(m,
}
-/** repeer()
- * primary: check, query whatever replicas i need to.
- */
-void OSD::repeer(PG *pg, map< int, map<pg_t,version_t> >& query_map)
-{
- dout(10) << "repeer " << *pg << endl;
-
- // determine initial peer set
- map<int,int> peerset; // peer -> role
-
- // prior map(s), if OSDs are still up
- for (version_t epoch = pg->last_epoch_started_any;
- epoch < osdmap->get_epoch();
- epoch++) {
- OSDMap *omap = get_osd_map(epoch);
- assert(omap);
-
- vector<int> acting;
- omap->pg_to_acting_osds(pg->get_pgid(), acting);
-
- for (unsigned i=0; i<acting.size(); i++)
- if (osdmap->is_up(acting[i]))
- peerset[acting[i]] = -1;
- }
-
- // current map
- for (unsigned i=1; i<pg->acting.size(); i++)
- peerset[pg->acting[i]] = i>0 ? 1:0;
-
-
- // -- query info from everyone.
- bool haveallinfo = true;
- for (map<int,int>::iterator it = peerset.begin();
- it != peerset.end();
- it++) {
- int who = it->first;
- int role = it->second;
- if (who == whoami) continue; // nevermind me
-
- PG::PGPeer *pgp = pg->get_peer(who);
- if (pgp && pgp->have_info()) {
- dout(10) << *pg << " have info from osd" << who << " role " << role << endl;
- continue;
- }
- if (pgp && pgp->state_test(PG::PGPeer::STATE_QINFO)) {
- dout(10) << *pg << " waiting for osd" << who << " role " << role << endl;
- } else {
- dout(10) << *pg << " querying info from osd" << who << " role " << role << endl;
- query_map[who][pg->get_pgid()] = 0;
- }
- haveallinfo = false;
- }
- if (!haveallinfo) return;
-
-
- // -- ok, we have all info. who has latest PG content summary?
- version_t newest_update = pg->info.last_update;
- int newest_update_osd = whoami;
- version_t oldest_update = pg->info.last_update;
- PG::PGPeer *newest_update_peer = 0;
-
- for (map<int,PG::PGPeer*>::iterator it = pg->peers.begin();
- it != pg->peers.end();
- it++) {
- PG::PGPeer *pgp = it->second;
- assert(pgp->have_info());
-
- if (pgp->info.last_update > newest_update) {
- newest_update = pgp->info.last_update;
- newest_update_osd = it->first;
- newest_update_peer = pgp;
- }
- if (pgp->get_role() == 1 &&
- pgp->info.last_update < oldest_update)
- oldest_update = pgp->info.last_update;
- }
-
- if (newest_update_peer) {
- // get contents from newest.
- assert(!newest_update_peer->have_summary());
-
- dout(10) << *pg << " newest PG on osd" << newest_update_osd
- << " v " << newest_update
- << ", querying summary"
- << endl;
- query_map[newest_update_osd][pg->get_pgid()] = 1;
- return;
- } else {
- dout(10) << *pg << " i have the latest: " << pg->info.last_update << endl;
- }
-
-
- // -- find pg contents?
- if (pg->info.last_complete < pg->info.last_update) {
- if (pg->content_summary->missing > 0) {
- // search!
- dout(10) << *pg << " searching for PG contents, querying all peers" << endl;
- bool didquery = false;
- for (map<int,PG::PGPeer*>::iterator it = pg->peers.begin();
- it != pg->peers.end();
- it++) {
- PG::PGPeer *pgp = it->second;
- if (pgp->have_summary()) continue;
- query_map[it->first][pg->get_pgid()] = 1;
- didquery = true;
- }
-
- if (didquery) return;
- } else {
- dout(10) << *pg << " i have located all objects" << endl;
- }
- } else {
- dout(10) << *pg << " i have all objects" << endl;
- }
-
-
- // -- distribute summary?
-
- // does anyone need it?
- //if (oldest_update < pg->info.last_update) {
-
- // generate summary?
- if (pg->content_summary == 0)
- pg->generate_content_summary();
-
- // distribute summary!
- for (map<int,PG::PGPeer*>::iterator it = pg->peers.begin();
- it != pg->peers.end();
- it++) {
- PG::PGPeer *pgp = it->second;
- if (pgp->get_role() != 1) continue;
-
- pgp->state_clear(PG::PGPeer::STATE_WAITING);
- pgp->state_set(PG::PGPeer::STATE_ACTIVE);
-
- //if (pgp->info.last_update < pg->info.last_update) {
- dout(10) << *pg << " sending summary to osd" << it->first << endl;
- MOSDPGSummary *m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary);
- messenger->send_message(m, MSG_ADDR_OSD(it->first));
- //}
- }
- //} else {
- //dout(10) << *pg << " nobody needs the summary" << endl;
- //}
-
- // plan my own recovery
- pg->plan_recovery();
-
- // i am active!
- pg->state_set(PG::STATE_ACTIVE);
-
- take_waiters(pg->waiting_for_active);
-
-}
-
-
-
/** PGNotify
* from non-primary to primary
* includes PGInfo.
assert(pg->acting[0] == whoami);
pg->info.same_primary_since = it->same_primary_since;
pg->set_role(0);
-
- dout(10) << " " << *pg << " is new, nrep=" << nrep << endl;
- // start peers
- repeer(pg, query_map);
+ pg->last_epoch_started_any = it->last_epoch_started;
+ pg->build_prior();
+ dout(10) << " " << *pg << " is new" << endl;
+
// kick any waiters
if (waiting_for_pg.count(pgid)) {
take_waiters(waiting_for_pg[pgid]);
waiting_for_pg.erase(pgid);
}
- } else {
- // already had pg.
+ }
- // peered with this guy specifically?
- PG::PGPeer *pgp = pg->get_peer(from);
- if (!pgp) {
- int role = osdmap->get_pg_role(pg->get_pgid(), from);
- pgp = pg->new_peer(from, role);
- }
+ // save info.
+ pg->peer_info[from] = *it;
- pgp->info = *it;
- pgp->state_set(PG::PGPeer::STATE_INFO);
+ // adjust prior?
+ if (it->last_epoch_started > pg->last_epoch_started_any)
+ pg->adjust_prior();
- repeer(pg, query_map);
- }
+ // peer
+ pg->peer(query_map);
}
do_queries(query_map);
delete m;
}
+void OSD::handle_pg_log(MOSDPGLog *m)
+{
+
+
+}
+
/** PGQuery
* from primary to replica | other
dout(10) << *pg << " dne (before), but i am role " << role << endl;
}
- if (it->second) {
+ if (it->second == 0) {
+ // info
+ dout(10) << *pg << " sending info" << endl;
+ notify_list[from].push_back(pg->info);
+ } else if (it->second == 1) {
// summary
- MOSDPGSummary *m;
- if (pg->content_summary == 0) {
- pg->generate_content_summary();
- m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary);
- delete pg->content_summary;
- pg->content_summary = 0;
- } else {
- m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary);
- }
+ dout(10) << *pg << " sending content summary" << endl;
+ PG::PGSummary summary;
+ pg->generate_summary(summary);
+ MOSDPGSummary *m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), summary);
messenger->send_message(m, MSG_ADDR_OSD(from));
} else {
- // notify
- notify_list[from].push_back(pg->info);
+ // log + info
+ dout(10) << *pg << " sending info+log since " << it->second << endl;
+ MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->get_pgid());
+ m->info = pg->info;
+ m->log.copy_after(pg->log, it->second);
+ messenger->send_message(m, MSG_ADDR_OSD(from));
}
}
- do_notifies(notify_list);
+ do_notifies(notify_list);
delete m;
}
+
+
void OSD::handle_pg_summary(MOSDPGSummary *m)
{
+ /*
dout(7) << "handle_pg_summary from " << m->get_source() << endl;
int from = MSG_ADDR_NUM(m->get_source());
map< int, map<pg_t,version_t> > query_map; // peer -> PG -> get_summary_since
pg_t pgid = m->get_pgid();
- PG::PGContentSummary *sum = m->get_summary();
+ PG::PGSummary *sum = m->get_summary();
PG *pg = get_pg(pgid);
assert(pg);
// initiate any recovery?
pg->plan_recovery();
}
-
+ */
delete m;
}
int ops = pg->num_active_ops();
dout(7) << "pg_pull pg " << *pg
- << " " << pg->objects_missing.size() << " to do, "
+ << " " << pg->missing.num_missing() << " to do, "
<< ops << "/" << maxops << " active" << endl;
+ /*
while (ops < maxops &&
!pg->recovery_queue.empty()) {
map<version_t, PG::ObjectInfo>::iterator first = pg->recovery_queue.upper_bound(pg->requested_through);
pg->requested_through = first->first;
ops++;
- }
+ }
+ */
}
-void OSD::pull_replica(PG *pg, PG::ObjectInfo& oi)
+void OSD::pull_replica(PG *pg, object_t oid, version_t v)
{
- // get peer
+/* // get peer
dout(7) << "pull_replica " << hex << oi.oid << dec
<< " v " << oi.version
<< " from osd" << oi.osd << endl;
// take note
pull_ops[tid] = oi;
pg->objects_pulling[oi.oid] = oi;
+*/
}
+
void OSD::op_rep_pull(MOSDOp *op)
{
long got = 0;
void OSD::op_rep_pull_reply(MOSDOpReply *op)
{
+ /*
object_t o = op->get_oid();
version_t v = op->get_version();
take_waiters(pg->waiting_for_missing_object[o]);
delete op;
+ */
}
if (!pg->is_complete()) {
// consult PG object map
- if (pg->objects_missing.count(oid)) {
+ if (pg->missing.missing.count(oid)) {
// need to pull
- version_t v = pg->objects_missing[oid];
+ version_t v = pg->missing.missing[oid];
dout(7) << "need to pull object " << hex << oid << dec
<< " v " << v << endl;
if (!pg->objects_pulling.count(oid))
- pull_replica(pg, pg->recovery_queue[v]);
+ pull_replica(pg, oid, v);
pg->waiting_for_missing_object[oid].push_back(op);
return;
}
// do it
Context *oncommit = new C_OSD_WriteCommit(this, repop);
-
op_apply(op, nv, oncommit);
+ // local ack
get_repop(repop);
assert(repop->waitfor_ack.count(0));
repop->waitfor_ack.erase(0);
void handle_rep_op_ack(__uint64_t tid, int result, bool commit, int fromosd);
// recovery
- map<tid_t,PG::ObjectInfo> pull_ops; // tid -> PGPeer*
+ //map<tid_t,PG::ObjectInfo> pull_ops; // tid -> PGPeer*
void do_notifies(map< int, list<PG::PGInfo> >& notify_list);
void do_queries(map< int, map<pg_t,version_t> >& query_map);
void repeer(PG *pg, map< int, map<pg_t,version_t> >& query_map);
void pg_pull(PG *pg, int maxops);
- void pull_replica(PG *pg, PG::ObjectInfo& oi);
+ void pull_replica(PG *pg, object_t, version_t);
bool require_current_map(Message *m, version_t v);
bool require_same_or_newer_map(Message *m, epoch_t e);
void handle_pg_query(class MOSDPGQuery *m);
void handle_pg_notify(class MOSDPGNotify *m);
void handle_pg_summary(class MOSDPGSummary *m);
+ void handle_pg_log(class MOSDPGLog *m);
void op_rep_pull(class MOSDOp *op);
void op_rep_pull_reply(class MOSDOpReply *op);
#include "config.h"
#include "OSD.h"
+
+#include "messages/MOSDPGLog.h"
+#include "messages/MOSDPGSummary.h"
+
+
#undef dout
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << osd->whoami << " " << *this << " "
+/******* PGLog ********/
+
+
+void PG::PGLog::trim(version_t s)
+{
+ // trim updated
+ while (!updated.empty()) {
+ map<version_t,object_t>::iterator rit = rupdated.begin();
+ if (rit->first > s) break;
+ updated.erase(rit->second);
+ rupdated.erase(rit);
+ }
+
+ // trim deleted
+ while (!deleted.empty()) {
+ map<version_t, object_t>::iterator rit = rdeleted.begin();
+ if (rit->first > s) break;
+ deleted.erase(rit->second);
+ rdeleted.erase(rit);
+ }
+
+ bottom = s;
+}
+
+
+void PG::PGLog::copy_after(const PGLog &other, version_t v)
+{
+ assert(v >= other.bottom);
+ bottom = top = v;
+ assert(updated.empty() && deleted.empty());
+
+ // updated
+ for (map<version_t, object_t>::const_iterator it = other.rupdated.upper_bound(v); // first > v
+ it != other.updated.end();
+ it++)
+ add_update(it->second, it->first);
+
+ // deleted
+ for (map<version_t, object_t>::const_iterator it = other.rdeleted.upper_bound(v); // first > v
+ it != other.rdeleted.end();
+ it++)
+ add_delete(it->second, it->first);
+
+ assert(top == other.top);
+}
+
+
+void PG::PGLog::merge_after(version_t after, const PGLog &other)
+{
+ // extend on bottom?
+ if (other.bottom < bottom &&
+ after < bottom) {
+ version_t newbottom = after;
+ if (after > other.bottom)
+ newbottom = other.bottom; // skip part of other
+
+ // updated
+ for (map<version_t,object_t>::const_iterator p = other.rupdated.upper_bound(newbottom); // first > newbottom
+ p != other.rupdated.end();
+ p++) {
+ if (p->first > bottom) break;
+ if (updated.count(p->second) || deleted.count(p->second)) continue;
+ updated[p->second] = p->first;
+ rupdated[p->first] = p->second;
+ }
+
+ // deleted
+ for (map<version_t,object_t>::const_iterator p = other.rdeleted.upper_bound(newbottom);
+ p != other.rdeleted.end();
+ p++) {
+ if (p->first > bottom) break;
+ if (updated.count(p->second) || deleted.count(p->second)) continue;
+ deleted[p->second] = p->first;
+ rdeleted[p->first] = p->second;
+ }
+ bottom = newbottom;
+ }
+
+ // extend on top?
+ if (other.top > top) {
+ map<version_t,object_t>::const_iterator pu = other.rupdated.lower_bound(top);
+ map<version_t,object_t>::const_iterator pd = other.rdeleted.lower_bound(top);
+
+ // both
+ while (pu != other.rupdated.end() && pd != other.rdeleted.end()) {
+ assert(pd->first != pu->first);
+ if (pu->first > pd->first) {
+ add_update(pu->second, pu->first);
+ pu++;
+ } else {
+ add_delete(pd->second, pd->first);
+ pd++;
+ }
+ }
+ // tail
+ while (pu != other.rupdated.end()) {
+ add_update(pu->second, pu->first);
+ pu++;
+ }
+ while (pd != other.rdeleted.end()) {
+ add_delete(pd->second, pd->first);
+ pd++;
+ }
+ top = other.top;
+ }
+}
+
+void PG::PGLog::print(ostream& out) const
+{
+ out << " " << bottom << " - " << top << endl;
+
+ map<version_t,object_t>::const_iterator pu = rupdated.begin();
+ map<version_t,object_t>::const_iterator pd = rdeleted.begin();
+
+ // both
+ while (pu != rupdated.end() && pd != rdeleted.end()) {
+ assert(pd->first != pu->first);
+ if (pu->first > pd->first) {
+ out << " " << pu->first << " " << hex << pu->second << dec << endl;
+ pu++;
+ } else {
+ out << " " << pd->first << " - " << hex << pd->second << dec << endl;
+ pd++;
+ }
+ }
+ // tail
+ while (pu != rupdated.end()) {
+ out << " " << pu->first << " " << hex << pu->second << dec << endl;
+ pu++;
+ }
+ while (pd != rdeleted.end()) {
+ out << " " << pd->first << " - " << hex << pd->second << dec << endl;
+ pd++;
+ }
+ out << " " << top << " top" << endl;
+}
+
+
+
+/******* PG ***********/
+void PG::build_prior()
+{
+ // build prior set.
+ prior_set.clear();
+
+ // current
+ for (unsigned i=1; i<acting.size(); i++)
+ prior_set.insert(acting[i]);
+
+ // and prior map(s), if OSDs are still up
+ for (version_t epoch = last_epoch_started_any;
+ epoch < osd->osdmap->get_epoch();
+ epoch++) {
+ OSDMap *omap = osd->get_osd_map(epoch);
+ assert(omap);
+
+ vector<int> acting;
+ omap->pg_to_acting_osds(get_pgid(), acting);
+
+ for (unsigned i=0; i<acting.size(); i++)
+ if (osd->osdmap->is_up(acting[i]) && // is up now
+ omap->is_up(acting[i]) && // and was up then
+ acting[i] != osd->whoami) // and not me
+ prior_set.insert(acting[i]);
+ }
+
+ dout(10) << "build_prior " << prior_set << endl;
+}
+
+void PG::adjust_prior()
+{
+ assert(!prior_set.empty());
+
+ // raise last_epoch_started_any
+ epoch_t max;
+ for (map<int,PGInfo>::iterator it = peer_info.begin();
+ it != peer_info.end();
+ it++) {
+ if (it->second.last_epoch_started > max)
+ max = it->second.last_epoch_started;
+ }
+ assert(max > last_epoch_started_any);
+ last_epoch_started_any = max;
+
+ // rebuild prior set
+ build_prior();
+}
+
+
+void PG::peer(map< int, map<pg_t,version_t> >& query_map)
+{
+ dout(10) << "peer" << endl;
+
+ // -- query info from everyone in prior_set.
+ bool missing_info = false;
+ for (set<int>::iterator it = prior_set.begin();
+ it != prior_set.end();
+ it++) {
+ if (peer_info.count(*it)) {
+ dout(10) << " have info from osd" << *it << endl;
+ continue;
+ }
+ missing_info = true;
+
+ if (peer_info_requested.count(*it)) {
+ dout(10) << " waiting for osd" << *it << endl;
+ continue;
+ }
+
+ dout(10) << " querying info from osd" << *it << endl;
+ query_map[*it][info.pgid] = 0;
+ peer_info_requested.insert(*it);
+ }
+ if (missing_info) return;
+
+
+ // -- ok, we have all (prior_set) info. (and maybe others.)
+ // who (of _everyone_ we're heard from) has the latest PG version?
+ version_t newest_update = info.last_update;
+ int newest_update_osd = osd->whoami;
+ version_t oldest_update_needed = info.last_update; // only of acting (current) osd set
+
+ for (map<int,PGInfo>::iterator it = peer_info.begin();
+ it != peer_info.end();
+ it++) {
+ if (it->second.last_update > newest_update) {
+ newest_update = it->second.last_update;
+ newest_update_osd = it->first;
+ }
+ if (is_acting(it->first) &&
+ it->second.last_update < oldest_update_needed)
+ oldest_update_needed = it->second.last_update;
+ }
+
+ // get log?
+ if (newest_update_osd != osd->whoami) {
+ if (peer_log_requested.count(newest_update_osd)) {
+ dout(10) << " newest update on osd" << newest_update_osd
+ << " v " << newest_update
+ << ", already queried"
+ << endl;
+ } else {
+ dout(10) << " newest update on osd" << newest_update_osd
+ << " v " << newest_update
+ << ", querying since " << oldest_update_needed
+ << endl;
+ query_map[newest_update_osd][info.pgid] = oldest_update_needed;
+ peer_info_requested.insert(newest_update_osd);
+ }
+ return;
+ } else {
+ dout(10) << " i have the most up-to-date log " << info.last_update << endl;
+ }
+
+ // -- is that the whole story? (is my log sufficient?)
+ if (info.last_complete < log.bottom) {
+ // nope. fetch a summary from someone.
+ if (peer_summary.count(newest_update_osd)) {
+ dout(10) << "i am complete thru " << info.last_complete
+ << ", but my log starts at " << log.bottom
+ << ". already waiting for summary from osd" << newest_update_osd
+ << endl;
+ } else {
+ dout(10) << "i am complete thru " << info.last_complete
+ << ", but my log starts at " << log.bottom
+ << ". fetching summary from osd" << newest_update_osd
+ << endl;
+ assert(newest_update_osd != osd->whoami); // can't be me!
+ query_map[newest_update_osd][info.pgid] = 1;
+ peer_summary_requested.insert(newest_update_osd);
+ }
+ return;
+ }
+
+ // -- ok. and have i located all pg contents?
+ if (missing.num_lost()) {
+ dout(10) << "there are still " << missing.num_lost() << " lost objects" << endl;
+
+ // ok, let's get more summaries!
+ bool waiting = false;
+ for (map<int,PGInfo>::iterator it = peer_info.begin();
+ it != peer_info.end();
+ it++) {
+ int peer = it->first;
+ if (peer_summary.count(peer)) {
+ dout(10) << " have summary from osd" << peer << endl;
+ continue;
+ }
+ if (peer_summary_requested.count(peer)) {
+ dout(10) << " already requested summary from osd" << peer << endl;
+ waiting = true;
+ continue;
+ }
+
+ dout(10) << " requesting summary from osd" << peer << endl;
+ query_map[peer][info.pgid] = 1;
+ peer_summary_requested.insert(peer);
+ waiting = true;
+ }
+
+ if (!waiting) {
+ dout(10) << missing.num_lost() << " objects are still lost, waiting+hoping for a notify from someone else!" << endl;
+ }
+ return;
+ }
+
+
+ // -- do i need to generate a larger log for any of my peers?
+ PGSummary summary;
+ if (oldest_update_needed > log.bottom) {
+ dout(10) << "my log isn't long enough for all peers: bottom "
+ << log.bottom << " > " << oldest_update_needed
+ << endl;
+ generate_summary(summary);
+ }
+
+
+ // -- ok, activate!
+ bool allclean = true;
+ for (unsigned i=1; i<acting.size(); i++) {
+ int peer = acting[i];
+ assert(peer_info.count(peer));
+
+ if (peer_info[peer].last_update < log.bottom) {
+ // need full summary
+ dout(10) << "sending complete summary to osd" << peer
+ << ", their last_update was " << peer_info[peer].last_update
+ << endl;
+ MOSDPGSummary *m = new MOSDPGSummary(osd->osdmap->get_epoch(),
+ info.pgid,
+ summary);
+ allclean = false;
+ osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
+ } else {
+ // need incremental (or no) log update.
+ dout(10) << "sending incremental|empty log "
+ << peer_info[peer].last_update << " - " << info.last_update
+ << " to osd" << peer << endl;
+ MOSDPGLog *m = new MOSDPGLog(osd->osdmap->get_epoch(),
+ info.pgid);
+ if (peer_info[peer].last_update < info.last_update) {
+ m->log.copy_after(log, peer_info[peer].last_update);
+ allclean = false;
+ }
+ osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
+ }
+ }
+
+ // do anything about allclean?
+ // ???
+
+ // clean up some
+ clear_content_recovery_state();
+
+ // i am active!
+ state_set(PG::STATE_ACTIVE);
+ osd->take_waiters(waiting_for_active);
+}
+
-void PG::generate_content_summary()
+void PG::generate_summary(PGSummary &summary)
{
dout(10) << "generating summary" << endl;
list<object_t> olist;
osd->store->collection_list(info.pgid, olist);
- content_summary = new PGContentSummary;
-
for (list<object_t>::iterator it = olist.begin();
it != olist.end();
it++) {
- ObjectInfo item(*it);
- osd->store->getattr(item.oid,
+ version_t v;
+ osd->store->getattr(*it,
"version",
- &item.version, sizeof(item.version));
- item.osd = osd->whoami;
- content_summary->ls.push_back(item);
+ &v, sizeof(v));
+ summary.objects[*it] = v;
}
+
+ dout(10) << summary.objects.size() << " local objects. " << endl;
}
dout(10) << "plan_recovery " << endl;
assert(is_active());
- assert(content_summary);
+ /*
// load local contents
list<object_t> olist;
osd->store->collection_list(info.pgid, olist);
<< " v " << it->second << endl;
osd->store->remove(it->first);
}
+
+ */
}
void PG::do_recovery()
class PG {
public:
- /** ObjectInfo
- * summary info about an object (replica)
+ /*
+ * PGInfo - summary of PG statistics.
*/
- struct ObjectInfo {
- object_t oid;
- version_t version;
- int osd; // -1 = unknown. if local, osd == whoami.
- ObjectInfo(object_t o=0, version_t v=0, int os=-1) : oid(o), version(v), osd(os) {}
- };
-
struct PGInfo {
pg_t pgid;
- version_t last_update; // last object version applied.
- version_t last_complete; // last pg version pg was complete.
+ version_t last_update; // last object version logged/updated.
+ version_t last_complete; // last version pg was complete through.
+ version_t log_floor; // oldest log entry.
epoch_t last_epoch_started; // last epoch started.
epoch_t last_epoch_finished; // last epoch finished.
- epoch_t same_primary_since; //
+ epoch_t same_primary_since; // first epoch the current primary was primary.
PGInfo(pg_t p=0) : pgid(p),
last_update(0), last_complete(0),
last_epoch_started(0), last_epoch_finished(0),
same_primary_since(0) {}
};
- struct PGContentSummary {
- //version_t since;
- int remote, missing;
- list<ObjectInfo> ls;
-
+ /*
+ * PGSummary - snapshot of full pg contents
+ */
+ class PGSummary {
+ public:
+ map<object_t, version_t> objects; // objects i currently store.
+ //PGMissing missing; // objects i am missing (to get thru info.last_update).
+
void _encode(bufferlist& blist) {
- //blist.append((char*)&since, sizeof(since));
- blist.append((char*)&remote, sizeof(remote));
- blist.append((char*)&missing, sizeof(missing));
- ::_encode(ls, blist);
+ ::_encode(objects, blist);
+ //missing._encode(blist);
}
void _decode(bufferlist& blist, int& off) {
- //blist.copy(off, sizeof(since), (char*)&since);
- //off += sizeof(since);
- blist.copy(off, sizeof(remote), (char*)&remote);
- off += sizeof(remote);
- blist.copy(off, sizeof(missing), (char*)&missing);
- off += sizeof(missing);
- ::_decode(ls, blist, off);
+ ::_decode(objects, blist, off);
+ //missing._decode(blist, off);
}
-
- PGContentSummary() : remote(0), missing(0) {}
};
-
- /** PGPeer
- * state associated with non-primary OSDS with PG content.
- * only used by primary.
+ /*
+ * PGMissing - summary of missing objects.
+ * kept in memory, as a supplement to PGLog.
+ * also used to pass missing info in messages.
*/
-
- class PGPeer {
+ class PGMissing {
public:
- // bits
- static const int STATE_INFO = 1; // we have info
- static const int STATE_SUMMARY = 2; // we have summary
- static const int STATE_QINFO = 4; // we are querying info|summary.
- static const int STATE_QSUMMARY = 8; // we are querying info|summary.
- static const int STATE_WAITING = 16; // peer is waiting for go.
- static const int STATE_ACTIVE = 32; // peer is active.
- //static const int STATE_COMPLETE = 64; // peer is complete.
-
- class PG *pg;
- private:
- int peer;
- int role;
- int state;
-
+ map<object_t, version_t> missing; // oid -> v
+ map<version_t, object_t> rmissing; // v -> oid
+
+ map<object_t, int> loc; // where i think i can get them.
+
+ int num_lost() { return missing.size() - loc.size(); }
+ int num_missing() { return missing.size(); }
+
+ void _encode(bufferlist& blist) {
+ ::_encode(missing, blist);
+ ::_encode(loc, blist);
+ }
+ void _decode(bufferlist& blist, int& off) {
+ ::_decode(missing, blist, off);
+ ::_decode(loc, blist, off);
+
+ for (map<object_t,version_t>::iterator it = missing.begin();
+ it != missing.end();
+ it++)
+ rmissing[it->second] = it->first;
+ }
+ };
+
+ /*
+ * PGLog - incremental log of recent pg changes.
+ * summary of persistent on-disk copy:
+ * multiply-modified objects are implicitly trimmed from in-memory log.
+ * also, serves as a recovery queue.
+ */
+ class PGLog {
public:
- // peer state
- PGInfo info;
- PGContentSummary *content_summary;
-
- friend class PG;
+ version_t top; // corresponds to newest entry.
+ version_t bottom; // corresponds to entry prio to oldest entry (t=bottom is trimmed).
+ map<object_t, version_t> updated; // oid -> v. items > bottom, + version.
+ map<version_t, object_t> rupdated; // v -> oid.
+ map<object_t, version_t> deleted; // oid -> when. items <= bottom that no longer exist
+ map<version_t, object_t> rdeleted; // when -> oid.
- public:
- PGPeer(class PG *pg, int p, int ro) :
- pg(pg),
- peer(p),
- role(ro),
- state(0),
- content_summary(NULL) { }
- ~PGPeer() {
- if (content_summary) delete content_summary;
+ PGLog() : top(0), bottom(0) {}
+
+ void _reverse(map<object_t, version_t> &fw, map<version_t, object_t> &bw) {
+ for (map<object_t,version_t>::iterator it = fw.begin();
+ it != fw.end();
+ it++)
+ bw[it->second] = it->first;
+ }
+ void _encode(bufferlist& blist) const {
+ blist.append((char*)&top, sizeof(top));
+ blist.append((char*)&bottom, sizeof(bottom));
+ ::_encode(updated, blist);
+ ::_encode(deleted, blist);
+ }
+ void _decode(bufferlist& blist, int& off) {
+ blist.copy(off, sizeof(top), (char*)&top);
+ off += sizeof(top);
+ blist.copy(off, sizeof(bottom), (char*)&bottom);
+ off += sizeof(bottom);
+ ::_decode(updated, blist, off);
+ ::_decode(deleted, blist, off);
+
+ _reverse(updated, rupdated);
+ _reverse(deleted, rdeleted);
}
-
- int get_peer() { return peer; }
- int get_role() { return role; }
-
- int get_state() { return state; }
- bool state_test(int m) { return (state & m) != 0; }
- void state_set(int m) { state |= m; }
- void state_clear(int m) { state &= ~m; }
-
- bool have_info() { return state_test(STATE_INFO); }
- bool have_summary() { return state_test(STATE_SUMMARY); }
- bool is_waiting() { return state_test(STATE_WAITING); }
- bool is_active() { return state_test(STATE_ACTIVE); }
- bool is_complete() { return have_info() &&
- info.last_update == info.last_complete; }
+
+
+ // accessors
+ version_t is_updated(object_t oid) {
+ if (updated.count(oid)) return updated[oid];
+ return 0;
+ }
+ version_t is_deleted(object_t oid) {
+ if (deleted.count(oid)) return deleted[oid];
+ return 0;
+ }
+
+ // actors
+ void add_update(object_t oid, version_t v) {
+ updated[oid] = v;
+ rupdated[v] = oid;
+ if (deleted.count(oid)) {
+ assert(v > deleted[oid]); // future deletions or past mods impossible.
+ rdeleted.erase(deleted[oid]);
+ deleted.erase(oid);
+ }
+ assert(v > top);
+ top = v;
+ }
+ void add_delete(object_t oid, version_t when) {
+ deleted[oid] = when;
+ rdeleted[when] = oid;
+ assert(when > top);
+ top = when;
+ }
+
+ void trim(version_t s);
+ void copy_after(const PGLog &other, version_t v);
+ void merge_after(version_t after, const PGLog &other);
+ void print(ostream& out) const;
};
+
+
/*** PG ****/
public:
// any
protected:
OSD *osd;
- // generic state
public:
+ // pg state
PGInfo info;
- PGContentSummary *content_summary;
+ PGLog log;
+ PGMissing missing;
protected:
int role; // 0 = primary, 1 = replica, -1=none.
int state; // see bit defns above
// primary state
+ public:
+ vector<int> acting;
+ epoch_t last_epoch_started_any;
+
+ protected:
+ // [primary only] content recovery state
+ set<int> prior_set; // current+prior OSDs, as defined by last_epoch_started_any.
+ set<int> stray_set; // non-acting osds that have PG data.
+ map<int, PGInfo> peer_info; // info from peers (stray or prior)
+ set<int> peer_info_requested;
+ map<int, PGLog*> peer_log; // logs from peers (for recovering pg content)
+ map<int, PGMissing*> peer_missing;
+ set<int> peer_log_requested;
+ map<int, PGSummary*> peer_summary; // full contents of peers
+ set<int> peer_summary_requested;
+ friend class OSD;
+
public:
- epoch_t last_epoch_started_any;
- map<int, PGPeer*> peers; // primary: (soft state) active peers
+ void clear_content_recovery_state() {
+ prior_set.clear();
+ stray_set.clear();
+ peer_info.clear();
+ peer_info_requested.clear();
+ peer_log.clear();
+ peer_missing.clear();
+ peer_log_requested.clear();
+ peer_summary.clear();
+ }
public:
- vector<int> acting;
+ bool is_acting(int osd) const {
+ for (unsigned i=0; i<acting.size(); i++)
+ if (acting[i] == osd) return true;
+ return false;
+ }
+ bool is_prior(int osd) const { return prior_set.count(osd); }
+ bool is_stray(int osd) const { return stray_set.count(osd); }
+
+ void build_prior();
+ void adjust_prior(); // based on new peer_info.last_epoch_started_any
// pg waiters
list<class Message*> waiting_for_active;
list<class Message*> > waiting_for_missing_object;
// recovery
- map<object_t, version_t> objects_missing; // objects (versions) i need
- map<version_t, ObjectInfo> recovery_queue; // objects i need to pull (in order)
- version_t requested_through;
- map<object_t, ObjectInfo> objects_pulling; // which objects are currently being pulled
+ version_t requested_thru;
+ map<object_t, version_t> objects_pulling; // which objects are currently being pulled
+ void peer(map< int, map<pg_t,version_t> >& query_map);
+ void generate_summary(PGSummary &summary);
+
void plan_recovery();
- void generate_content_summary();
void do_recovery();
+ void do_clean();
public:
PG(OSD *o, pg_t p) :
osd(o),
- info(p), content_summary(0),
+ info(p),
role(0),
state(0)
{ }
return objects_pulling.size();
}
- // peers
- map<int, PGPeer*>& get_peers() { return peers; }
- PGPeer* get_peer(int p) {
- if (peers.count(p)) return peers[p];
- return 0;
- }
- PGPeer* new_peer(int p, int r) {
- return peers[p] = new PGPeer(this, p, r);
- }
- void remove_peer(int p) {
- assert(peers.count(p));
- delete peers[p];
- peers.erase(p);
- }
- void drop_peers() {
- for (map<int,PGPeer*>::iterator it = peers.begin();
- it != peers.end();
- it++)
- delete it->second;
- peers.clear();
- }
-
+
// pg state storage
/*
};
-inline ostream& operator<<(ostream& out, PG::ObjectInfo& oi)
-{
- return out << "object[" << hex << oi.oid << dec
- << " v " << oi.version
- << " osd" << oi.osd
- << "]";
-}
-
inline ostream& operator<<(ostream& out, PG::PGInfo& pgi)
{
return out << "pgi(" << hex << pgi.pgid << dec
<< ")";
}
+inline ostream& operator<<(ostream& out, PG::PGLog& log)
+{
+ log.print(out);
+ return out;
+}
+
inline ostream& operator<<(ostream& out, PG& pg)
{
out << "pg[" << pg.info
else
x_len = left;
- if (ex->offset + ex->len == x_offset) {
+ if (ex->start + ex->length == x_offset) {
// add to extent
- ex->len += x_len;
+ ex->length += x_len;
} else {
// new extent
- assert(ex->len == 0);
- assert(ex->offset == 0);
- ex->offset = x_offset;
- ex->len = x_len;
+ assert(ex->length == 0);
+ assert(ex->start == 0);
+ ex->start = x_offset;
+ ex->length = x_len;
}
ex->buffer_extents[cur-offset] = x_len;
Objecter::OSDRead *rd = new Objecter::OSDRead(bl);
file_to_extents(inode, len, offset, rd->extents);
- // cacheless async?
- if (oc == 0)
- return objecter->readx(rd, onfinish);
-
- // use cache
- oc->readx(rd, inode.ino, onfinish);
- return 0;
+ return objecter->readx(rd, onfinish);
}
int write(inode_t& inode,
Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
file_to_extents(inode, len, offset, wr->extents);
- // cacheles async?
- if (oc == 0)
- return objecter->writex(wr, onack, oncommit);
-
- // use cache
- oc->writex(wr, inode.ino, onack, oncommit);
- return 0;
+ return objecter->writex(wr, onack, oncommit);
}
int zero(inode_t& inode,
Objecter::OSDZero *z = new Objecter::OSDZero;
file_to_extents(inode, len, offset, z->extents);
- // cacheless async?
- if (oc == 0)
- return objecter->zerox(z, onack, oncommit);
+ return objecter->zerox(z, onack, oncommit);
+ }
+
+
+ /*** async+caching (non-blocking) file interface ***/
+ int caching_read(inode_t& inode,
+ size_t len,
+ off_t offset,
+ bufferlist *bl,
+ Context *onfinish) {
+ Objecter::OSDRead *rd = new Objecter::OSDRead(bl);
+ file_to_extents(inode, len, offset, rd->extents);
+ return oc->readx(rd, inode.ino, onfinish);
+ }
- // actually: mds should never do this, and clients don't do zero().
- assert(0);
- return 0;
+ int caching_write(inode_t& inode,
+ size_t len,
+ off_t offset,
+ bufferlist& bl) {
+ Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
+ file_to_extents(inode, len, offset, wr->extents);
+ return oc->writex(wr, inode.ino);
}
int atomic_sync_read(inode_t& inode,
size_t len, off_t offset,
- bufferlist *bl) {
+ bufferlist *bl,
+ Mutex &lock) {
Objecter::OSDRead *rd = new Objecter::OSDRead(bl);
file_to_extents(inode, len, offset, rd->extents);
assert(oc);
- int r = oc->atomic_sync_readx(rd, inode.ino,
- 0); // block.
+ int r = oc->atomic_sync_readx(rd, inode.ino, lock);
return r;
}
int atomic_sync_write(inode_t& inode,
size_t len, off_t offset,
bufferlist& bl,
- Context *oncommit) {
+ Mutex &lock) {
Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
file_to_extents(inode, len, offset, wr->extents);
assert(oc);
- int r = oc->atomic_sync_writex(wr, inode.ino,
- 0, // block
- oncommit);
+ int r = oc->atomic_sync_writex(wr, inode.ino, lock);
return r;
}
/*** ObjectCacher::Object ***/
-BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
+ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
{
dout(20) << "split " << *bh << " at " << off << endl;
// split off right
- ObjectCacher::BufferHead *right = new ObjectCacher::BufferHead();
+ ObjectCacher::BufferHead *right = new BufferHead();
right->set_version(bh->get_version());
right->set_state(bh->get_state());
- block_t newleftlen = off - bh->start();
+ off_t newleftlen = off - bh->start();
right->set_start( off );
right->set_length( bh->length() - newleftlen );
// shorten left
- stat_sub(bh);
+ oc->bh_stat_sub(bh);
bh->set_length( newleftlen );
- stat_add(bh);
+ oc->bh_stat_add(bh);
// add right
add_bh(right);
// move read waiters
if (!bh->waitfor_read.empty()) {
- map<block_t, list<Context*> >::iterator o, p = bh->waitfor_read.end();
+ map<off_t, list<Context*> >::iterator o, p = bh->waitfor_read.end();
p--;
while (p != bh->waitfor_read.begin()) {
if (p->first < right->start()) break;
return right;
}
+
+void ObjectCacher::Object::merge(BufferHead *left, BufferHead *right)
+{
+ assert(left->end() == right->start());
+ assert(left->get_state() == right->get_state());
+
+ dout(10) << "merge " << *left << " + " << *right << endl;
+ left->set_length( left->length() + right->length());
+
+ // data
+ left->bl.claim_append(right->bl);
+
+ // version
+ left->set_version( MAX( left->get_version(), right->get_version() ) );
+
+ // waiters
+ for (map<off_t, list<Context*> >::iterator p = right->waitfor_read.begin();
+ p != right->waitfor_read.end();
+ p++)
+ left->waitfor_read[p->first].splice( left->waitfor_read[p->first].begin(),
+ p->second );
+
+ // hose right
+ data.erase(right->start());
+ delete right;
+
+ dout(10) << "merge result " << *left << endl;
+}
+
/*
- * map a range of blocks into buffer_heads.
+ * map a range of bytes into buffer_heads.
* - create missing buffer_heads as necessary.
*/
int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
- map<block_t, BufferHead*>& hits,
- map<block_t, BufferHead*>& missing,
- map<block_t, BufferHead*>& rx)
+ map<off_t, BufferHead*>& hits,
+ map<off_t, BufferHead*>& missing,
+ map<off_t, BufferHead*>& rx)
{
- for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
- ex_it != wr->extents.end();
+ for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
+ ex_it != rd->extents.end();
ex_it++) {
if (ex_it->oid != oid) continue;
- dout(10) << "map_read " << ex_it->oid << " " << ex_it->offset << "~" << ex_it->len << endl;
+ dout(10) << "map_read " << ex_it->oid << " " << ex_it->start << "~" << ex_it->length << endl;
- map<off_t, ObjectCacher::BufferHead*>::iterator p = data.lower_bound(start);
+ map<off_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
// p->first >= start
- size_t cur = ex_it->offset;
- size_t left = ex_it->len;
+ off_t cur = ex_it->start;
+ off_t left = ex_it->length;
if (p != data.begin() &&
(p == data.end() || p->first > cur)) {
// at end?
if (p == data.end()) {
// rest is a miss.
- BufferHead *n = new ObjectCacher::BufferHead();
+ BufferHead *n = new BufferHead();
n->set_start( cur );
n->set_length( left );
add_bh(n);
cur += left;
left -= left;
assert(left == 0);
- assert(cur == ex_it->offset + ex_it->len);
- break;
+ assert(cur == ex_it->start + ex_it->length);
+ break; // no more.
}
if (p->first <= cur) {
}
else assert(0);
- size_t lenfromcur = MIN(e->end() - cur, left);
+ off_t lenfromcur = MIN(e->end() - cur, left);
cur += lenfromcur;
left -= lenfromcur;
p++;
} else if (p->first > cur) {
// gap.. miss
- size_t next = p->first;
- BufferHead *n = new ObjectCacher::BufferHead();
+ off_t next = p->first;
+ BufferHead *n = new BufferHead();
n->set_start( cur );
- n->set_length( MIN(next - cur), left );
+ n->set_length( MIN(next - cur, left) );
add_bh(n);
missing[cur] = n;
cur += MIN(left, n->length());
/*
* map a range of extents on an object's buffer cache.
- *
+ * - combine any bh's we're writing into one
* - break up bufferheads that don't fall completely within the range
- * - cancel rx ops we obsolete.
- * - resubmit rx ops if we split bufferheads
- *
- * - leave potentially obsoleted tx ops alone (for now)
+ * - increase the bh version number (to be larger than any it subsumes)
*/
-int ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
+ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
{
+ BufferHead *final = 0;
+ version_t max_version = 0;
+
for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
ex_it != wr->extents.end();
ex_it++) {
-
+
if (ex_it->oid != oid) continue;
- dout(10) << "map_write " << ex_it->oid << " " << ex_it->offset << "~" << ex_it->len << endl;
+ dout(10) << "map_write " << ex_it->oid << " " << ex_it->start << "~" << ex_it->length << endl;
- map<off_t, ObjectCacher::BufferHead*>::iterator p = data.lower_bound(start);
+ map<off_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
// p->first >= start
- size_t cur = ex_it->offset;
- size_t left = ex_it->len;
+ off_t cur = ex_it->start;
+ off_t left = ex_it->length;
if (p != data.begin() &&
(p == data.end() || p->first > cur)) {
p++; // doesn't overlap.
}
- ObjectCacher::BufferHead *prev = NULL;
while (left > 0) {
- size_t max = left;
-
+ off_t max = left;
+
// at end ?
if (p == data.end()) {
- if (prev == NULL) {
- ObjectCacher::BufferHead *n = new ObjectCacher::BufferHead();
- n->set_start( cur );
- n->set_length( max );
- add_bh(n);
- //hits[cur] = n;
+ if (final == NULL) {
+ final = new BufferHead();
+ final->set_start( cur );
+ final->set_length( max );
+ add_bh(final);
} else {
- prev->set_length( prev->length() + max );
+ final->set_length( final->length() + max );
}
left -= max;
cur += max;
dout(10) << "p is " << *p->second << endl;
+ // note highest version we see
+ if (max_version < p->second->get_version())
+ max_version = p->second->get_version();
+
if (p->first <= cur) {
- ObjectCacher::BufferHead *bh = p->second;
+ BufferHead *bh = p->second;
dout(10) << "map_write bh " << *bh << " intersected" << endl;
if (p->first < cur) {
+ assert(final == 0);
if (cur + max >= p->first + p->second->length()) {
// we want right bit (one splice)
- if (bh->is_rx() && bh_cancel_read(bh)) {
- ObjectCacher::BufferHead *right = split(bh, cur);
- bh_read(this, bh); // reread left bit
- bh = right;
- } else if (bh->is_tx() && bh_cancel_write(bh)) {
- ObjectCacher::BufferHead *right = split(bh, cur);
- bh_write(this, bh); // rewrite left bit
- bh = right;
- } else {
- bh = split(bh, cur); // just split it
- }
- prev = bh; // maybe want to expand right buffer ...
+ final = split(bh, cur); // just split it, take right half.
p++;
- assert(p->second == bh);
+ assert(p->second == final);
} else {
// we want middle bit (two splices)
- if (bh->is_rx() && bh_cancel_read(bh)) {
- ObjectCacher::BufferHead *middle = split(bh, cur);
- bh_read(this, bh); // reread left
- p++;
- assert(p->second == middle);
- ObjectCacher::BufferHead *right = split(middle, cur + max);
- bh_read(this(on, right); // reread right
- bh = middle;
- } else if (bh->is_tx() && bh_cancel_write(bh)) {
- ObjectCacher::BufferHead *middle = split(bh, cur);
- bh_write(this, bh); // redo left
- p++;
- assert(p->second == middle);
- ObjectCacher::BufferHead *right = split(middle, cur + max);
- bh_write(this, right); // redo right
- bh = middle;
- } else {
- ObjectCacher::BufferHead *middle = split(bh, cur);
- p++;
- assert(p->second == middle);
- split(middle, cur+max);
- bh = middle;
- }
- }
+ final = split(bh, cur);
+ p++;
+ assert(p->second == final);
+ split(final, cur+max);
+ }
} else if (p->first == cur) {
if (p->second->length() <= max) {
// whole bufferhead, piece of cake.
} else {
// we want left bit (one splice)
- if (bh->is_rx() && bh_cancel_read(bh)) {
- ObjectCacher::BufferHead *right = split(bh, cur + max);
- bh_read(this, right); // re-rx the right bit
- } else if (bh->is_tx() && bh_cancel_write(bh)) {
- ObjectCacher::BufferHead *right = split(bh, cur + max);
- bh_write(this, right); // re-tx the right bit
- } else {
- split(bh, cur + max); // just split
- }
- }
+ split(bh, cur + max); // just split
+ }
+ if (final)
+ merge(final,bh);
+ else
+ final = bh;
}
- // try to cancel tx?
- if (bh->is_tx()) bh_cancel_write(bh);
-
- // put in our map
- //hits[cur] = bh;
-
// keep going.
- size_t lenfromcur = bh->end() - cur;
+ off_t lenfromcur = final->end() - cur;
cur += lenfromcur;
left -= lenfromcur;
p++;
continue;
} else {
// gap!
- size_t next = p->first;
- size_t glen = MIN(next - cur, max);
+ off_t next = p->first;
+ off_t glen = MIN(next - cur, max);
dout(10) << "map_write gap " << cur << "~" << glen << endl;
- ObjectCacher::BufferHead *n = new ObjectCacher::BufferHead();
- n->set_start( cur );
- n->set_length( glen );
- add_bh(n);
- //hits[cur] = n;
+ if (final) {
+ final->set_length( final->length() + glen );
+ } else {
+ final = new BufferHead();
+ final->set_start( cur );
+ final->set_length( glen );
+ add_bh(final);
+ }
cur += glen;
left -= glen;
}
}
}
- return(0);
+
+ // set versoin
+ assert(final);
+ final->set_version(max_version+1);
+ dout(10) << "map_write final is " << *final << endl;
+
+ return 0;
}
/*** ObjectCacher ***/
/* private */
-bool ObjectCacher::bh_cancel_read(BufferHead *bh)
+void ObjectCacher::bh_read(Object *ob, BufferHead *bh)
{
- assert(0);
- return(0);
-}
+ dout(7) << "bh_read on " << *bh << endl;
-bool ObjectCacher::bh_cancel_write(BufferHead *bh)
-{
- assert(0);
- return(0);
+ // finisher
+ C_ReadFinish *fin = new C_ReadFinish(this, ob->get_oid(), bh->start(), bh->length());
+
+ // read req
+ Objecter::OSDRead *rd = new Objecter::OSDRead(&fin->bl);
+
+ // object extent
+ ObjectExtent ex(ob->get_oid(), bh->start(), bh->length());
+ ex.buffer_extents[0] = bh->length();
+ rd->extents.push_back(ex);
+
+ // go
+ objecter->readx(rd, fin);
}
-void ObjectCacher::bh_read(Object *ob, BufferHead *bh)
+void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, bufferlist &bl)
{
- assert(0);
- return(0);
+ dout(7) << "bh_read_finish "
+ << hex << oid << dec
+ << " " << start << "~" << length
+ << endl;
+
+ if (objects.count(oid) == 0) {
+ dout(7) << "bh_read_finish no object cache" << endl;
+ return;
+ }
+ Object *ob = objects[oid];
+
+ // apply to bh's!
+ off_t opos = start;
+ map<off_t, BufferHead*>::iterator p = ob->data.lower_bound(opos);
+
+ while (p != ob->data.end() &&
+ opos < start+length) {
+ BufferHead *bh = p->second;
+
+ if (bh->start() > opos) {
+ dout(1) << "weirdness: gap when applying read results, "
+ << opos << "~" << bh->start() - opos
+ << endl;
+ opos = bh->start();
+ p++;
+ continue;
+ }
+
+ if (!bh->is_rx()) {
+ dout(10) << "bh_read_finish skipping non-rx " << *bh << endl;
+ continue;
+ }
+
+ assert(bh->start() == opos); // we don't merge rx bh's... yet!
+ assert(bh->length() < start+length-opos);
+
+ bh->bl.substr_of(bl,
+ start+length-opos,
+ bh->length());
+ bh->set_version(1);
+ mark_clean(bh);
+ dout(10) << "bh_read_finish read " << *bh << endl;
+ }
}
+
void ObjectCacher::bh_write(Object *ob, BufferHead *bh)
{
assert(0);
- return(0);
}
/* public */
int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
{
+ bool success = true;
+ list<BufferHead*> hit_ls;
+ map<size_t, bufferlist> stripe_map; // final buffer offset -> substring
+
for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
- ex_it != wr->extents.end();
+ ex_it != rd->extents.end();
ex_it++) {
+ dout(10) << "readx ex " << *ex_it << endl;
+
+ // get Object cache
+ Object *o;
if (objects.count(ex_it->oid) == 0) {
- ObjectCache::Object *o = new ObjectCache::Object(ex_it->oid, ino);
+ // create it.
+ Object *o = new Object(this, ex_it->oid, ino);
objects[ex_it->oid] = o;
- objects_by_ino[ino].add(o);
- }
- map<block_t, BufferHead*> hits, missing, rx;
+ objects_by_ino[ino].insert(o);
+ } else {
+ // had it.
+ o = objects[ex_it->oid];
+ }
+
+ // map extent into bufferheads
+ map<off_t, BufferHead*> hits, missing, rx;
+
o->map_read(rd, hits, missing, rx);
- for (map<off_t, BufferHead*>::iterator bh_it = hits.begin();
- bh_it != hits.end();
- bh_it++) {
- rd->bl.substr_of(bh_it->second->bl, bh_it->first, bh_it->second->length());
- }
- for (map<off_t, BufferHead*>::iterator bh_it = missing.begin();
- bh_it != missing.end();
- bh_it++) {
- bh_read(o, bh_it->second);
- }
- for (map<off_t, BufferHead*>::iterator bh_it = rx.begin();
- bh_it != rx.end();
- bh_it++) {
- //FIXME: need to wait here?
- bh_read(o, bh_it->second);
- }
- }
- return(0);
+
+ if (!missing.empty() && !rx.empty()) {
+ // read missing
+ for (map<off_t, BufferHead*>::iterator bh_it = missing.begin();
+ bh_it != missing.end();
+ bh_it++) {
+ bh_read(o, bh_it->second);
+ if (success) {
+ dout(10) << "readx missed, waiting on " << *bh_it->second
+ << " off " << bh_it->first << endl;
+ success = false;
+ bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) );
+ }
+ }
+
+ // bump rx
+ for (map<off_t, BufferHead*>::iterator bh_it = rx.begin();
+ bh_it != rx.end();
+ bh_it++) {
+ touch_bh(bh_it->second); // bump in lru, so we don't lose it.
+ if (success) {
+ dout(10) << "readx missed, waiting on " << *bh_it->second
+ << " off " << bh_it->first << endl;
+ success = false;
+ bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) );
+ }
+ }
+ } else {
+ // create reverse map of buffer offset -> object for the eventual result.
+ // this is over a single ObjectExtent, so we know that
+ // - the bh's are contiguous
+ // - the buffer frags need not be (and almost certainly aren't)
+ map<off_t, BufferHead*>::iterator bh_it = hits.begin();
+ size_t bhoff = 0;
+ map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
+ size_t foff = 0;
+ off_t opos = ex_it->start;
+ while (1) {
+ BufferHead *bh = bh_it->second;
+ assert(opos == bh->start() + bhoff);
+ size_t len = MIN(f_it->second - foff,
+ bh->length() - bhoff);
+ stripe_map[f_it->first].substr_of(bh->bl,
+ opos - bh->start(),
+ len);
+ opos += len;
+ bhoff += len;
+ foff += len;
+ if (opos == bh->end()) {
+ bh_it++;
+ bhoff = 0;
+ if (bh_it == hits.end()) break;
+ }
+ if (foff == f_it->second) {
+ f_it++;
+ foff = 0;
+ if (f_it == ex_it->buffer_extents.end()) break;
+ }
+ }
+ assert(f_it == ex_it->buffer_extents.end());
+ assert(bh_it == hits.end());
+ assert(opos == ex_it->start + ex_it->length);
+ }
+ }
+
+ // bump hits in lru
+ for (list<BufferHead*>::iterator bhit = hit_ls.begin();
+ bhit != hit_ls.end();
+ bhit++)
+ touch_bh(*bhit);
+
+ if (!success)
+ return -1;
+
+ // no misses... success! do the read.
+ assert(!hit_ls.empty());
+ dout(10) << "readx has all buffers" << endl;
+
+ // ok, assemble into result buffer.
+ rd->bl->clear();
+ size_t pos = 0;
+ for (map<size_t,bufferlist>::iterator i = stripe_map.begin();
+ i != stripe_map.end();
+ i++) {
+ assert(pos == i->first);
+ pos += i->second.length();
+ rd->bl->claim_append(i->second);
+ }
+
+ return 0;
}
-int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit)
+
+int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
{
for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
- ex_it != wr->extents.end();
+ ex_it != wr->extents.end();
ex_it++) {
+ // create object cache?
+ Object *o = 0;
if (objects.count(ex_it->oid) == 0) {
- ObjectCache::Object *o = new ObjectCache::Object(ex_it->oid, ino);
+ o = new Object(this, ex_it->oid, ino);
objects[ex_it->oid] = o;
- objects_by_ino[ino].add(o);
- }
- o->map_write(wr);
- for (map<off_t, BufferHead*>::iterator bh_it = o->data.begin();
- bh_it != o->data.end();
- bh_it++) {
- bh_it->second->bl.substr_of(wr->bl, ex_it->offset, ex_it->len);
- bh_set_state(bh_it->second, BufferHead::STATE_DIRTY);
- }
- // FIXME: how to set up contexts for eventual writes?
+ objects_by_ino[ino].insert(o);
+ } else {
+ o = objects[ex_it->oid];
+ }
+
+ // map into a single bufferhead.
+ BufferHead *bh = o->map_write(wr);
+
+ // adjust buffer pointers (ie "copy" data into my cache)
+ // this is over a single ObjectExtent, so we know that
+ // - there is one contiguous bh
+ // - the buffer frags need not be (and almost certainly aren't)
+ // note: i assume striping is monotonic... no jumps backwards, ever!
+ off_t opos = ex_it->start;
+ for (map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
+ f_it != ex_it->buffer_extents.end();
+ f_it++) {
+ size_t bhoff = bh->start() - opos;
+ assert(f_it->second < bh->length() - bhoff);
+
+ bufferlist frag;
+ frag.substr_of(wr->bl,
+ f_it->first, f_it->second);
+ bh->bl.claim_append(frag);
+ opos += f_it->second;
+ }
+
+ mark_dirty(bh);
}
- return(0);
+ return 0;
}
+
+// blocking wait for write.
+void ObjectCacher::wait_for_write(size_t len, Mutex& lock)
+{
+ while (get_stat_dirty() + len > g_conf.client_oc_max_dirty) {
+ dout(10) << "wait_for_write waiting" << endl;
+ stat_waiter++;
+ stat_cond.Wait(lock);
+ stat_waiter--;
+ dout(10) << "wait_for_write woke up" << endl;
+ }
+}
+
// blocking. atomic+sync.
-int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
+int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock)
{
assert(0);
return 0;
}
-int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit)
+int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock)
{
assert(0);
return 0;
#include "include/types.h"
#include "include/lru.h"
+#include "include/Context.h"
#include "common/Cond.h"
class Objecter::OSDWrite;
class ObjectCacher {
- private:
+ public:
// ******* BufferHead *********
class BufferHead : public LRUObject {
public:
class Object {
private:
// ObjectCacher::Object fields
+ ObjectCacher *oc;
object_t oid;
inodeno_t ino;
int lock_state;
public:
- Object(object_t o, inodeno_t i) :
+ Object(ObjectCacher *_oc, object_t o, inodeno_t i) :
+ oc(_oc),
oid(o), ino(i),
lock_state(LOCK_NONE)
{}
bool is_empty() { return data.empty(); }
// mid-level
+ int scan_versions(off_t start, off_t len,
+ version_t& low, version_t& high);
BufferHead *split(BufferHead *bh, off_t off);
- int map_read(Objecter::OSDRead *rd);
- int map_write(Objecter::OSDWrite *wr);
+ void merge(BufferHead *left, BufferHead *right);
+
+ int map_read(Objecter::OSDRead *rd,
+ map<off_t, BufferHead*>& hits,
+ map<off_t, BufferHead*>& missing,
+ map<off_t, BufferHead*>& rx);
+ BufferHead *map_write(Objecter::OSDWrite *wr);
+
};
// ******* ObjectCacher *********
off_t get_stat_dirty() { return stat_dirty; }
off_t get_stat_clean() { return stat_clean; }
+ void touch_bh(BufferHead *bh) {
+ if (bh->is_dirty())
+ lru_dirty.lru_touch(bh);
+ else
+ lru_rest.lru_touch(bh);
+ }
+
// bh states
void bh_set_state(BufferHead *bh, int s) {
// move between lru lists?
//bh->set_dirty_stamp(g_clock.now());
};
+
+
// io
- bool bh_cancel_read(BufferHead *bh);
- bool bh_cancel_write(BufferHead *bh);
void bh_read(Object *ob, BufferHead *bh);
void bh_write(Object *ob, BufferHead *bh);
+ public:
+ void bh_read_finish(object_t oid, off_t offset, size_t length, bufferlist &bl);
+ void bh_write_finish(object_t oid, off_t offset, size_t length, version_t v);
+
+ class C_ReadFinish : public Context {
+ ObjectCacher *oc;
+ object_t oid;
+ off_t start;
+ size_t length;
+ public:
+ bufferlist bl;
+ C_ReadFinish(ObjectCacher *c, object_t o, off_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
+ void finish(int r) {
+ oc->bh_read_finish(oid, start, length, bl);
+ }
+ };
+
+
public:
ObjectCacher(Objecter *o) :
stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0)
{}
- // blocking. async.
+
+ class C_RetryRead : public Context {
+ ObjectCacher *oc;
+ Objecter::OSDRead *rd;
+ inodeno_t ino;
+ Context *onfinish;
+ public:
+ C_RetryRead(ObjectCacher *_oc, Objecter::OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {}
+ void finish(int) {
+ oc->readx(rd, ino, onfinish);
+ }
+ };
+
+ // non-blocking. async.
int readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish);
- int writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit);
+ int writex(Objecter::OSDWrite *wr, inodeno_t ino);
+
+ // write blocking
+ void wait_for_write(size_t len, Mutex& lock);
// blocking. atomic+sync.
- int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish);
- int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit);
+ int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock);
+ int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock);
void flush_set(inodeno_t ino, Context *onfinish=0);
void flush_all(Context *onfinish=0);
};
+inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh)
+{
+ out << "bh["
+ << bh.start() << "~" << bh.end()
+ << " v " << bh.get_version();
+ if (bh.is_tx()) out << " tx";
+ if (bh.is_rx()) out << " rx";
+ if (bh.is_dirty()) out << " dirty";
+ if (bh.is_clean()) out << " clean";
+ if (bh.is_missing()) out << " missing";
+ out << "]";
+ return out;
+}
+
#endif
MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
it->oid, it->pgid, osdmap->get_epoch(),
OSD_OP_READ);
- m->set_length(it->len);
- m->set_offset(it->offset);
+ m->set_length(it->length);
+ m->set_offset(it->start);
dout(15) << " read tid " << last_tid << " from osd" << osd
- << " oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len
+ << " oid " << hex << it->oid << dec << " " << it->start << "~" << it->length
<< " (" << it->buffer_extents.size() << " buffer fragments)" << endl;
messenger->send_message(m, MSG_ADDR_OSD(osd), 0);
bufferlist *ox_buf = rd->read_data[eit->oid];
unsigned ox_len = ox_buf->length();
unsigned ox_off = 0;
- assert(ox_len <= eit->len);
+ assert(ox_len <= eit->length);
// for each buffer extent we're mapping into...
for (map<size_t,size_t>::iterator bit = eit->buffer_extents.begin();
bit != eit->buffer_extents.end();
bit++) {
- dout(21) << " object " << hex << eit->oid << dec << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl;
+ dout(21) << " object " << hex << eit->oid << dec << " extent " << eit->start << "~" << eit->length << " : ox offset " << ox_off << " -> buffer extent " << bit->first << "~" << bit->second << endl;
by_off[bit->first] = new bufferlist;
if (ox_off + bit->second <= ox_len) {
}
ox_off += bit->second;
}
- assert(ox_off == eit->len);
+ assert(ox_off == eit->length);
}
// sort and string bits together
MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
it->oid, it->pgid, osdmap->get_epoch(),
OSD_OP_WRITE);
- m->set_length(it->len);
- m->set_offset(it->offset);
+ m->set_length(it->length);
+ m->set_offset(it->start);
// map buffer segments into this extent
// (may be fragmented bc of striping)
thisbit.substr_of(wr->bl, bit->first, bit->second);
cur.claim_append(thisbit);
}
- assert(cur.length() == it->len);
+ assert(cur.length() == it->length);
m->set_data(cur);//.claim(cur);
- off += it->len;
+ off += it->length;
// add to gather set
if (onack)
// send
dout(10) << " write tid " << last_tid << " osd" << osd
- << " oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << endl;
+ << " oid " << hex << it->oid << dec << " " << it->start << "~" << it->length << endl;
messenger->send_message(m, MSG_ADDR_OSD(osd), 0);
}
MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
it->oid, it->pgid, osdmap->get_epoch(),
OSD_OP_ZERO);
- m->set_length(it->len);
- m->set_offset(it->offset);
+ m->set_length(it->length);
+ m->set_offset(it->start);
- off += it->len;
+ off += it->length;
// add to gather set
if (onack)
// send
dout(10) << " zero tid " << last_tid << " osd" << osd
- << " oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << endl;
+ << " oid " << hex << it->oid << dec << " " << it->start << "~" << it->length << endl;
messenger->send_message(m, MSG_ADDR_OSD(osd), 0);
}
// new types
-typedef __uint64_t tid_t;
+typedef __uint64_t tid_t; // transaction id
class ObjectExtent {
public:
object_t oid; // object id
- pg_t pgid;
- size_t offset, len; // extent within the object
+ pg_t pgid;
+ off_t start; // in object
+ size_t length; // in object
map<size_t, size_t> buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!)
-
- ObjectExtent(object_t o=0, off_t off=0, size_t l=0) : oid(o), offset(off), len(l) { }
+
+ ObjectExtent(object_t o=0, off_t s=0, size_t l=0) : oid(o), start(s), length(l) { }
};
+inline ostream& operator<<(ostream& out, ObjectExtent &ex)
+{
+ return out << "extent("
+ << hex << ex.oid << " in " << ex.pgid << dec
+ << " " << ex.start << "~" << ex.length;
+}
+
class Objecter {
public:
Messenger *messenger;