public:
Cond() {
- pthread_cond_init(&C,NULL);
+ int r = pthread_cond_init(&C,NULL);
+ assert(r == 0);
}
virtual ~Cond() {
}
int Signal() {
- int r = pthread_cond_signal(&C);
+ //int r = pthread_cond_signal(&C);
+ int r = pthread_cond_broadcast(&C);
return r;
}
};
//pthread_exit(0);
return 0; // like this, i think!
}
- //tpdout(DBLVL) << "Thread "<< pthread_self() << " calling the function\n";
+ tpdout(DBLVL) << "Thread "<< pthread_self() << " calling the function on " << op << endl;
func(u, op);
}
return 0;
void put_op(T* op)
{
+ tpdout(DBLVL) << "put_op " << op << endl;
q_lock.Lock();
q.push(op);
num_ops++;
#undef dout
#define dout(x) if (x <= g_conf.debug) cout << "Timer: "
-#define DBL 20
+#define DBL 10
#include <signal.h>
#include <sys/time.h>
// single global instance
Timer g_timer;
-Context *messenger_kicker = 0;
+//Context *messenger_kicker = 0;
+Messenger *messenger = 0;
utime_t t = it->first;
dout(DBL) << "queuing event(s) scheduled at " << t << endl;
- pending[t] = it->second;
+ if (messenger) {
+ for (set<Context*>::iterator cit = it->second.begin();
+ cit != it->second.end();
+ cit++)
+ messenger->queue_callback(*cit);
+ }
+
+ //pending[t] = it->second;
it++;
scheduled.erase(t);
}
- if (messenger_kicker) {
- dout(DBL) << "kicking messenger" << endl;
- messenger_kicker->finish(0);
- } else {
- dout(DBL) << "no messenger ot kick!" << endl;
- }
-
}
else {
// sleep
if (event) {
dout(DBL) << "sleeping until " << next << endl;
- cond.Wait(lock, next); // wait for waker or time
+ timed_sleep = true;
+ timeout_cond.Wait(lock, next); // wait for waker or time
+ utime_t now = g_clock.now();
+ dout(DBL) << "kicked or timed out at " << now << endl;
} else {
dout(DBL) << "sleeping" << endl;
- cond.Wait(lock); // wait for waker
+
+ //wtf this isn't waking up!
+ timed_sleep = false;
+ sleep_cond.Wait(lock); // wait for waker
+ // setting a 1s limit works tho
+ //utime_t next = g_clock.now();
+ //next.sec_ref() += 10;
+ //cond.Wait(lock, next); // wait for waker
+
+ utime_t now = g_clock.now();
+ dout(DBL) << "kicked at " << now << endl;
}
}
}
* Timer bits
*/
-
+/*
void Timer::set_messenger_kicker(Context *c)
{
dout(10) << "messenger kicker is " << c << endl;
}
cancel_timer();
}
+*/
+
+void Timer::set_messenger(Messenger *m)
+{
+ dout(10) << "set messenger " << m << endl;
+ messenger = m;
+}
+void Timer::unset_messenger()
+{
+ dout(10) << "unset messenger" << endl;
+ messenger = 0;
+}
void Timer::register_timer()
{
if (thread_id) {
dout(DBL) << "register_timer kicking thread" << endl;
- cond.Signal();
+ if (timed_sleep)
+ timeout_cond.Signal();
+ else
+ sleep_cond.Signal();
} else {
dout(DBL) << "register_timer starting thread" << endl;
pthread_create(&thread_id, NULL, timer_thread_entrypoint, (void*)this);
dout(10) << "setting thread_stop flag" << endl;
lock.Lock();
thread_stop = true;
- cond.Signal();
+ if (timed_sleep)
+ timeout_cond.Signal();
+ else
+ sleep_cond.Signal();
lock.Unlock();
dout(10) << "waiting for thread to finish" << endl;
lock.Lock();
scheduled[ when ].insert(callback);
event_times[callback] = when;
- lock.Unlock();
// make sure i wake up
register_timer();
+
+ lock.Unlock();
}
bool Timer::cancel_event(Context *callback)
* this should be called by the Messenger in the proper thread (usually same as incoming messages)
*/
+/*
void Timer::execute_pending()
{
lock.Lock();
lock.Unlock();
}
+
+*/
pthread_t thread_id;
bool thread_stop;
Mutex lock;
- Cond cond;
+ bool timed_sleep;
+ Cond sleep_cond;
+ Cond timeout_cond;
public:
void timer_thread(); // waiter thread (that wakes us up)
void set_messenger_kicker(Context *c);
void unset_messenger_kicker();
+ void set_messenger(Messenger *m);
+ void unset_messenger();
+
+
// schedule events
void add_event_after(float seconds,
Context *callback);
-FileLayout g_OSD_FileLayout( 1<<20, 1, 1<<20 ); // stripe files over whole objects
+FileLayout g_OSD_FileLayout( 1<<20, 1, 1<<20, 3 ); // stripe files over whole objects
//FileLayout g_OSD_FileLayout( 1<<17, 4, 1<<20 ); // 128k stripes over sets of 4
// ??
-FileLayout g_OSD_MDDirLayout( 1<<14, 1<<2, 1<<19 );
+FileLayout g_OSD_MDDirLayout( 1<<14, 1<<2, 1<<19, 3 );
// stripe mds log over 128 byte bits (see mds_log_pad_entry below to match!)
-FileLayout g_OSD_MDLogLayout( 1<<7, 32, 1<<20 ); // new (good?) way
+FileLayout g_OSD_MDLogLayout( 1<<7, 32, 1<<20, 3 ); // new (good?) way
//FileLayout g_OSD_MDLogLayout( 57, 32, 1<<20 ); // pathological case to test striping buffer mapping
//FileLayout g_OSD_MDLogLayout( 1<<20, 1, 1<<20 ); // old way
fake_clock: false,
fakemessenger_serialize: true,
fake_osdmap_expand: 0,
+ fake_osd_sync: true,
debug: 1,
debug_mds_balancer: 1,
osd_writesync: false,
osd_maxthreads: 0, // 0 == no threading!
+ osd_fakestore_syncthreads: 4,
// --- fakeclient (mds regression testing) ---
bool fakemessenger_serialize;
int fake_osdmap_expand;
+ bool fake_osd_sync;
int debug;
int debug_mds_balancer;
bool osd_writesync;
int osd_maxthreads;
+ int osd_fakestore_syncthreads;
+
// fake client
int num_fakeclient;
unsigned fakeclient_requests;
#include <string>
#include <set>
+#include <map>
#include <vector>
#include <iostream>
using namespace std;
return out;
}
-inline ostream& operator<<(ostream& out, set<__uint64_t>& iset) {
- for (set<__uint64_t>::iterator it = iset.begin();
+template<class A>
+inline ostream& operator<<(ostream& out, set<A>& iset) {
+ for (typename set<A>::iterator it = iset.begin();
it != iset.end();
it++) {
if (it != iset.begin()) out << ",";
return out;
}
-inline ostream& operator<<(ostream& out, multiset<int>& iset) {
- for (multiset<int>::iterator it = iset.begin();
+template<class A>
+inline ostream& operator<<(ostream& out, multiset<A>& iset) {
+ for (typename multiset<A>::iterator it = iset.begin();
it != iset.end();
it++) {
if (it != iset.begin()) out << ",";
return out;
}
+template<class A,class B>
+inline ostream& operator<<(ostream& out, map<A,B>& m)
+{
+ out << "{";
+ for (typename map<A,B>::const_iterator it = m.begin();
+ it != m.end();
+ it++) {
+ if (it != m.begin()) out << ",";
+ out << it->first << "=" << it->second;
+ }
+ out << "}";
+ return out;
+}
+
// <HACK set up OSDMap from g_conf>
osdmap = new OSDMap();
osdmap->set_pg_bits(g_conf.osd_pg_bits);
+ osdmap->inc_version();
Bucket *b = new UniformBucket(1, 0);
for (int i=0; i<g_conf.num_osd; i++) {
osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
}
- /*OSDGroup osdg;
- osdg.num_osds = g_conf.num_osd;
- for (int i=0; i<osdg.num_osds; i++) osdg.osds.push_back(i);
- osdg.weight = 100;
- osdg.osd_size = 100; // not used yet?
- osdmap->add_group(osdg);
- */
-
// </HACK>
filer = new Filer(messenger, osdmap);
friend class MOSDOpReply;
public:
- long get_tid() { return st.tid; }
- msg_addr_t get_asker() { return st.asker; }
+ const long get_tid() { return st.tid; }
+ const msg_addr_t get_asker() { return st.asker; }
- object_t get_oid() { return st.oid; }
- pg_t get_pg() { return st.pg; }
- version_t get_map_version() { return st.map_version; }
+ const object_t get_oid() { return st.oid; }
+ const pg_t get_pg() { return st.pg; }
+ const version_t get_map_version() { return st.map_version; }
- int get_pg_role() { return st.pg_role; } // who am i asking for?
- version_t get_version() { return st.version; }
- version_t get_old_version() { return st.old_version; }
+ const int get_pg_role() { return st.pg_role; } // who am i asking for?
+ const version_t get_version() { return st.version; }
+ const version_t get_old_version() { return st.old_version; }
- int get_op() { return st.op; }
- size_t get_length() { return st.length; }
- size_t get_offset() { return st.offset; }
+ const int get_op() { return st.op; }
+ const size_t get_length() { return st.length; }
+ const size_t get_offset() { return st.offset; }
void set_data(bufferlist &d) {
data.claim(d);
__uint64_t map_version;
list<pg_t> pg_list;
+ bool complete;
+
public:
__uint64_t get_version() { return map_version; }
list<pg_t>& get_pg_list() { return pg_list; }
+ bool get_complete() { return complete; }
MOSDPGPeer() {}
- MOSDPGPeer(__uint64_t v, list<pg_t>& l) :
+ MOSDPGPeer(__uint64_t v, list<pg_t>& l, bool c=false) :
Message(MSG_OSD_PG_PEER) {
this->map_version = v;
+ this->complete = c;
pg_list.splice(pg_list.begin(), l);
}
void encode_payload() {
payload.append((char*)&map_version, sizeof(map_version));
+ payload.append((char*)&complete, sizeof(complete));
_encode(pg_list, payload);
}
void decode_payload() {
int off = 0;
payload.copy(off, sizeof(map_version), (char*)&map_version);
off += sizeof(map_version);
+ payload.copy(off, sizeof(complete), (char*)&complete);
+ off += sizeof(complete);
_decode(pg_list, payload, off);
}
};
version_t map_version;
pg_t pgid;
//pginfo_t info;
- bool complete;
+ bool complete;
+ version_t last_any_complete;
public:
version_t get_version() { return map_version; }
pg_t get_pgid() { return pgid; }
//pginfo_t& get_pginfo() { return info; }
bool is_complete() { return complete; }
+ version_t get_last_any_complete() { return last_any_complete; }
MOSDPGUpdate() {}
- MOSDPGUpdate(version_t mv, pg_t pgid, bool complete) :
+ MOSDPGUpdate(version_t mv, pg_t pgid, bool complete, version_t last_any_complete) :
Message(MSG_OSD_PG_UPDATE) {
this->map_version = mv;
this->pgid = pgid;
this->complete = complete;
+ this->last_any_complete = last_any_complete;
}
char *get_type_name() { return "PGUp"; }
payload.append((char*)&map_version, sizeof(map_version));
payload.append((char*)&pgid, sizeof(pgid));
payload.append((char*)&complete, sizeof(complete));
+ payload.append((char*)&last_any_complete, sizeof(last_any_complete));
}
void decode_payload() {
int off = 0;
off += sizeof(pgid);
payload.copy(off, sizeof(complete), (char*)&complete);
off += sizeof(complete);
+ payload.copy(off, sizeof(last_any_complete), (char*)&last_any_complete);
+ off += sizeof(last_any_complete);
}
};
hash_map<int, Logger*> loggers;
LogType fakemsg_logtype;
+set<FakeMessenger*> shutdown_set;
+
Mutex lock;
Cond cond;
void finish(int r) {
dout(18) << "timer kick" << endl;
pending_timer = true;
+ lock.Lock();
cond.Signal(); // why not
+ lock.Unlock();
}
};
-
void *fakemessenger_thread(void *ptr)
{
- dout(1) << "thread start, setting timer kicker" << endl;
- g_timer.set_messenger_kicker(new C_FakeKicker());
+ //dout(1) << "thread start, setting timer kicker" << endl;
+ //g_timer.set_messenger_kicker(new C_FakeKicker());
+ msgr_callback_kicker = new C_FakeKicker();
lock.Lock();
while (1) {
}
lock.Unlock();
- cout << "unsetting messenger kicker" << endl;
- g_timer.unset_messenger_kicker();
+ cout << "unsetting messenger" << endl;
+ //g_timer.unset_messenger_kicker();
+ g_timer.unset_messenger();
+ msgr_callback_kicker = 0;
dout(1) << "thread finish (i woke up but no messages, bye)" << endl;
return 0;
dout(18) << "do_loop top" << endl;
- // timer?
+ /*// timer?
if (pending_timer) {
pending_timer = false;
dout(5) << "pending timer" << endl;
g_timer.execute_pending();
}
+ */
+
+ // callbacks
+ lock.Unlock();
+ messenger_do_callbacks();
+ lock.Lock();
// messages
map<int, FakeMessenger*>::iterator it = directory.begin();
}
}
+ // deal with shutdowns.. dleayed to avoid concurrent directory modification
+ if (!shutdown_set.empty()) {
+ for (set<FakeMessenger*>::iterator it = shutdown_set.begin();
+ it != shutdown_set.end();
+ it++) {
+ dout(7) << "fakemessenger: removing " << MSG_ADDR_NICE((*it)->get_myaddr()) << " from directory" << endl;
+ assert(directory.count((*it)->get_myaddr()));
+ directory.erase((*it)->get_myaddr());
+ if (directory.empty()) {
+ dout(1) << "fakemessenger: last shutdown" << endl;
+ ::shutdown = true;
+ }
+ }
+ shutdown_set.clear();
+ }
if (!didone)
break;
}
+
dout(18) << "do_loop end (no more messages)." << endl;
//lock.Unlock();
return 0;
cout << "fakemessenger " << whoami << " messenger is " << this << endl;
+ g_timer.set_messenger(this);
+
/*
string name;
name = "m.";
{
//cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl;
lock.Lock();
-
assert(directory.count(whoami) == 1);
+ shutdown_set.insert(this);
+
+ /*
directory.erase(whoami);
if (directory.empty()) {
dout(1) << "fakemessenger: last shutdown" << endl;
::shutdown = true;
cond.Signal(); // why not
}
+ */
/*
if (loggers[whoami]) {
// queue
FakeMessenger *dm = directory[dest];
- assert(dm);
+ if (!dm) {
+ dout(1) << "** destination " << MSG_ADDR_NICE(dest) << " (" << dest << ") dne" << endl;
+ assert(dm);
+ }
dm->queue_incoming(m);
dout(5) << "--> sending " << m << " to " << MSG_ADDR_NICE(dest) << endl;//" m " << dm << " has " << dm->num_incoming() << " queued" << endl;
}
+// --------
+// callbacks
+
+Mutex msgr_callback_lock;
+list<Context*> msgr_callback_queue;
+Context* msgr_callback_kicker = 0;
+
+void Messenger::queue_callback(Context *c) {
+ msgr_callback_lock.Lock();
+ msgr_callback_queue.push_back(c);
+ msgr_callback_lock.Unlock();
+
+ msgr_callback_kicker->finish(0);
+}
+
+void messenger_do_callbacks() {
+ // take list
+ msgr_callback_lock.Lock();
+ list<Context*> ls;
+ ls.splice(ls.begin(), msgr_callback_queue);
+ msgr_callback_lock.Unlock();
+
+ // do them
+ for (list<Context*>::iterator it = ls.begin();
+ it != ls.end();
+ it++) {
+ dout(10) << "--- doing callback " << *it << endl;
+ (*it)->finish(0);
+ delete *it;
+ }
+}
+
// ---------
// incoming messages
#include "Dispatcher.h"
#include "common/Mutex.h"
#include "common/Cond.h"
+#include "include/Context.h"
typedef __uint64_t lamport_t;
msg_addr_t _myaddr;
lamport_t lamport_clock;
+ // callbacks
+
// procedure call fun
long _last_pcid;
Mutex _lock; // protect call_sem, call_reply
virtual int shutdown() = 0;
+ void queue_callback(Context *c);
+
// setup
void set_dispatcher(Dispatcher *d) { dispatcher = d; ready(); }
Dispatcher *get_dispatcher() { return dispatcher; }
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
};
+// callbacks
+void messenger_do_callbacks();
+extern Context *msgr_callback_kicker;
+
extern Message *decode_message(msg_envelope_t &env, bufferlist& bl);
while (1) {
+ // callbacks?
+ Messenger::do_callbacks();
+
// timer events?
if (pending_timer) {
pending_timer = false;
return r;
}
+ {
+ char name[80];
+ sprintf(name,"osd%d.fakestore.threadpool", whoami);
+ fsync_threadpool = new ThreadPool<FakeStore, pair<int,Context*> >(name, g_conf.osd_fakestore_syncthreads,
+ (void (*)(FakeStore*, pair<int,Context*>*))dofsync,
+ this);
+ }
+
// all okay.
return 0;
}
// close collections db files
close_collections();
+ delete fsync_threadpool;
+
// nothing
return 0;
}
+///////////
+
+void FakeStore::do_fsync(int fd, Context *c)
+{
+ ::fsync(fd);
+ ::close(fd);
+ dout(10) << "do_fsync finished on " << fd << " context " << c << endl;
+ c->finish(0);
+ delete c;
+}
+
////
int FakeStore::read(object_t oid,
size_t len, off_t offset,
- char *buffer) {
+ bufferlist& bl) {
dout(20) << "read " << oid << " len " << len << " off " << offset << endl;
string fn;
off_t actual = lseek(fd, offset, SEEK_SET);
size_t got = 0;
if (actual == offset) {
- got = ::read(fd, buffer, len);
+ bufferptr bptr = new buffer(len); // prealloc space for entire read
+ got = ::read(fd, bptr.c_str(), len);
+ bptr.set_length(got); // properly size the buffer
+ bl.push_back( bptr ); // put it in the target bufferlist
}
::flock(fd, LOCK_UN);
::close(fd);
int FakeStore::write(object_t oid,
size_t len, off_t offset,
- char *buffer,
+ bufferlist& bl,
bool do_fsync) {
dout(20) << "write " << oid << " len " << len << " off " << offset << endl;
::flock(fd, LOCK_EX); // lock for safety
//::fchmod(fd, 0664);
+ // seek
off_t actual = lseek(fd, offset, SEEK_SET);
int did = 0;
assert(actual == offset);
- did = ::write(fd, buffer, len);
+
+ // write buffers
+ for (list<bufferptr>::iterator it = bl.buffers().begin();
+ it != bl.buffers().end();
+ it++) {
+ int r = ::write(fd, (*it).c_str(), (*it).length());
+ if (r > 0)
+ did += r;
+ else {
+ dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+ }
+ }
+
if (did < 0) {
dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
}
return did;
}
+int FakeStore::write(object_t oid,
+ size_t len, off_t offset,
+ bufferlist& bl,
+ Context *onsafe)
+{
+ dout(20) << "write " << oid << " len " << len << " off " << offset << endl;
+
+ string fn;
+ get_oname(oid,fn);
+
+ ::mknod(fn.c_str(), 0644, 0); // in case it doesn't exist yet.
+
+ int flags = O_WRONLY;//|O_CREAT;
+ int fd = ::open(fn.c_str(), flags);
+ if (fd < 0) {
+ dout(1) << "write couldn't open " << fn.c_str() << " flags " << flags << " errno " << errno << " " << strerror(errno) << endl;
+ return fd;
+ }
+ ::flock(fd, LOCK_EX); // lock for safety
+ //::fchmod(fd, 0664);
+
+ // seek
+ off_t actual = lseek(fd, offset, SEEK_SET);
+ int did = 0;
+ assert(actual == offset);
+
+ // write buffers
+ for (list<bufferptr>::iterator it = bl.buffers().begin();
+ it != bl.buffers().end();
+ it++) {
+ int r = ::write(fd, (*it).c_str(), (*it).length());
+ if (r > 0)
+ did += r;
+ else {
+ dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+ }
+ }
+
+ if (did < 0) {
+ dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+ }
+
+ ::flock(fd, LOCK_UN);
+
+ // schedule sync
+ queue_fsync(fd, onsafe);
+
+ return did;
+}
#include "ObjectStore.h"
#include "BDBMap.h"
+#include "common/ThreadPool.h"
#include <map>
using namespace std;
+
class FakeStore : public ObjectStore {
string basedir;
int whoami;
void wipe_dir(string mydir);
+ // async fsync
+ class ThreadPool<class FakeStore, pair<int, class Context*> > *fsync_threadpool;
+ void queue_fsync(int fd, class Context *c) {
+ fsync_threadpool->put_op(new pair<int, class Context*>(fd,c));
+ }
+ public:
+ void do_fsync(int fd, class Context *c);
+ static void dofsync(class FakeStore *f, pair<int, class Context*> *af) {
+ f->do_fsync(af->first, af->second);
+ delete af;
+ }
+
+
public:
FakeStore(char *base, int whoami);
int truncate(object_t oid, off_t size);
int read(object_t oid,
size_t len, off_t offset,
- char *buffer);
+ bufferlist& bl);
int write(object_t oid,
size_t len, off_t offset,
- char *buffer,
+ bufferlist& bl,
bool fsync);
+ int write(object_t oid,
+ size_t len, off_t offset,
+ bufferlist& bl,
+ Context *onsafe);
int setattr(object_t oid, const char *name,
void *value, size_t size);
#include "messages/MOSDPGPeerAck.h"
#include "messages/MOSDPGUpdate.h"
-//#include "messages/MOSDPGQuery.h"
-//#include "messages/MOSDPGQueryReply.h"
-
#include "common/Logger.h"
#include "common/LogType.h"
-
+#include "common/Timer.h"
#include "common/ThreadPool.h"
#include <iostream>
#define ROLE_TYPE(x) ((x)>0 ? 1:(x))
+
// cons/des
LogType osd_logtype;
handle_pg_update((MOSDPGUpdate*)m);
break;
- /*
- case MSG_OSD_PG_QUERYREPLY:
- handle_pg_query_reply((MOSDPGQueryReply*)m);
- break;
- */
-
case MSG_OSD_OP:
monitor->host_is_alive(m->get_source());
handle_op((MOSDOp*)m);
void OSD::handle_op_reply(MOSDOpReply *m)
{
+ // did i get a new osdmap?
+ if (m->get_map_version() > osdmap->get_version()) {
+ dout(3) << "replica op reply includes a new osd map" << endl;
+ update_map(m->get_osdmap());
+ }
+
+ // handle op
switch (m->get_op()) {
case OSD_OP_REP_PULL:
op_rep_pull_reply(m);
case OSD_OP_REP_WRITE:
case OSD_OP_REP_TRUNCATE:
case OSD_OP_REP_DELETE:
- ack_replica_op(m->get_tid(), m->get_result(), MSG_ADDR_NUM(m->get_source()));
+ ack_replica_op(m->get_tid(), m->get_result(), m->get_safe(), MSG_ADDR_NUM(m->get_source()));
delete m;
break;
}
}
-void OSD::ack_replica_op(__uint64_t tid, int result, int fromosd)
+void OSD::ack_replica_op(__uint64_t tid, int result, bool safe, int fromosd)
{
- replica_write_lock.Lock();
+ //replica_write_lock.Lock();
+
+ if (!replica_ops.count(tid)) {
+ dout(7) << "not waiting for tid " << tid << " replica op reply, map must have changed, dropping." << endl;
+ return;
+ }
- if (replica_writes.count(tid)) {
- MOSDOp *op = replica_writes[tid];
- dout(7) << "rep_write_reply ack tid " << tid << " orig op " << op << endl;
+
+ OSDReplicaOp *repop = replica_ops[tid];
+ MOSDOp *op = repop->op;
+ pg_t pgid = op->get_pg();
+
+ dout(7) << "ack_replica_op " << tid << " op " << op << " result " << result << " safe " << safe << " from osd" << fromosd << endl;
+ dout(15) << " repop was: op " << repop->op << " waitfor ack=" << repop->waitfor_ack << " sync=" << repop->waitfor_sync << " localsync=" << repop->local_sync << " cancel=" << repop->cancel << " osd=" << repop->osds << endl;
+
+
+ if (result >= 0) {
+ // success
- replica_writes.erase(tid);
- replica_write_tids[op].erase(tid);
-
- pg_t pgid = op->get_pg();
+ if (safe) {
+ // sync
+ repop->waitfor_sync.erase(tid);
+ repop->waitfor_ack.erase(tid);
+ replica_ops.erase(tid);
+
+ replica_pg_osd_tids[pgid][fromosd].erase(tid);
+ if (replica_pg_osd_tids[pgid][fromosd].empty()) replica_pg_osd_tids[pgid].erase(fromosd);
+ if (replica_pg_osd_tids[pgid].empty()) replica_pg_osd_tids.erase(pgid);
+
+ // send 'safe' to client?
+ if (repop->can_send_sync()) {
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+ messenger->send_message(reply, op->get_asker());
+ delete op;
+ delete repop;
+ }
+ } else {
+ // ack
+ repop->waitfor_ack.erase(tid);
+
+ // send 'ack' to client?
+ if (repop->can_send_ack()) {
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+ messenger->send_message(reply, op->get_asker());
+ }
+ }
+
+ } else {
+ // failure
+
+ // forget about this failed attempt..
+ repop->osds.erase(fromosd);
+ repop->waitfor_ack.erase(tid);
+ repop->waitfor_sync.erase(tid);
+
+ replica_ops.erase(tid);
replica_pg_osd_tids[pgid][fromosd].erase(tid);
if (replica_pg_osd_tids[pgid][fromosd].empty()) replica_pg_osd_tids[pgid].erase(fromosd);
if (replica_pg_osd_tids[pgid].empty()) replica_pg_osd_tids.erase(pgid);
-
- if (replica_write_tids[op].empty()) {
- // reply?
- if (replica_write_local.count(op)) {
- replica_write_local.erase(op);
-
- if (result >= 0) {
- dout(7) << "last one, replying to write op" << endl;
-
- // written locally too, reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
- messenger->send_message(reply, op->get_asker());
- delete op;
- } else {
- dout(7) << "last one, but replica write failed, resubmit" << endl;
- finished.push_back(op); // handle_op will fw this to new primary, probably!
- }
- replica_write_result.erase(op);
- } else {
- // not yet written locally.
- dout(9) << "not yet written locally, still waiting for that" << endl;
- replica_write_result[op] = -1;
+
+ bool did = false;
+ PG *pg = get_pg(pgid);
+
+ // am i no longer the primary?
+ if (pg->get_primary() != whoami) {
+ // oh, it wasn't a replica.. primary must have changed
+ dout(4) << "i'm no longer the primary for " << *pg << endl;
+
+ // retry the whole thing
+ finished.push_back(repop->op);
+
+ // clean up
+ for (map<__uint64_t,int>::iterator it = repop->waitfor_ack.begin();
+ it != repop->waitfor_ack.end();
+ it++) {
+ replica_ops.erase(it->first);
+ replica_pg_osd_tids[pgid][it->second].erase(it->first);
+ if (replica_pg_osd_tids[pgid][it->second].empty()) replica_pg_osd_tids[pgid].erase(it->second);
+ }
+ for (map<__uint64_t,int>::iterator it = repop->waitfor_sync.begin();
+ it != repop->waitfor_sync.end();
+ it++) {
+ replica_ops.erase(it->first);
+ replica_pg_osd_tids[pgid][it->second].erase(it->first);
+ if (replica_pg_osd_tids[pgid][it->second].empty()) replica_pg_osd_tids[pgid].erase(it->second);
}
- replica_write_tids.erase(op);
+ if (replica_pg_osd_tids[pgid].empty()) replica_pg_osd_tids.erase(pgid);
+
+ if (repop->local_sync)
+ delete repop;
+ else {
+ repop->op = 0; // we're forwarding it
+ repop->cancel = true; // will get deleted by local sync callback
+ }
+ did = true;
}
-
- } else {
- dout(7) << "not waiting for tid " << tid << " rep_write reply, map must have changed, dropping." << endl;
+
+ /* no! don't do this, not without checking complete/clean-ness
+ else {
+ // i am still primary.
+ // re-issue replica op to a moved replica?
+ for (unsigned i=1; i<pg->acting.size(); i++) {
+ if (repop->osds.count(pg->acting[i])) continue;
+ issue_replica_op(pg, repop, pg->acting[i]);
+ did = true;
+ }
+ }
+ */
+
+ if (!did) {
+ // an osd musta just gone down or somethin. are we "done" now?
+
+ // send 'safe' to client?
+ if (repop->can_send_sync()) {
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+ messenger->send_message(reply, op->get_asker());
+ delete op;
+ delete repop;
+ }
+
+ // send 'ack' to client?
+ else if (repop->can_send_ack()) {
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+ messenger->send_message(reply, op->get_asker());
+ }
+ }
+
}
-
- replica_write_lock.Unlock();
+
+
}
/** update_map
* assimilate a new OSDMap. scan pgs.
*/
-/*
-void OSD::update_map(bufferlist& state)
+void OSD::update_map(bufferlist& state, bool mkfs)
{
// decode new map
- if (!osdmap) osdmap = new OSDMap();
+ osdmap = new OSDMap();
osdmap->decode(state);
- dout(7) << "update_map version " << osdmap->get_version() << endl;
-
osdmaps[osdmap->get_version()] = osdmap;
+ dout(7) << "got osd map version " << osdmap->get_version() << endl;
+
+ // pg list
+ list<pg_t> pg_list;
+
+ if (mkfs) {
+ // create PGs
+ for (int nrep = 2; nrep <= g_conf.osd_max_rep; nrep++) {
+ ps_t maxps = 1LL << osdmap->get_pg_bits();
+ for (pg_t ps = 0; ps < maxps; ps++) {
+ pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep);
+ vector<int> acting;
+ osdmap->pg_to_acting_osds(pgid, acting);
+
+
+ if (acting[0] == whoami) {
+ PG *pg = create_pg(pgid);
+ pg->acting = acting;
+ pg->set_role(0);
+ pg->set_primary_since(osdmap->get_version());
+ pg->mark_complete( osdmap->get_version() );
+
+ dout(7) << "created " << *pg << endl;
+
+ pg_list.push_back(pgid);
+ }
+ }
+ }
+ } else {
+ // get pg list
+ get_pg_list(pg_list);
+ }
+
+ // use our new map(s)
+ advance_map(pg_list);
+ activate_map(pg_list);
+
+ if (mkfs) {
+ // mark all peers complete
+ for (list<pg_t>::iterator pgid = pg_list.begin();
+ pgid != pg_list.end();
+ pgid++) {
+ PG *pg = get_pg(*pgid);
+ for (map<int,PGPeer*>::iterator it = pg->peers.begin();
+ it != pg->peers.end();
+ it++) {
+ PGPeer *p = it->second;
+ //dout(7) << " " << *pg << " telling peer osd" << p->get_peer() << " they are complete" << endl;
+ messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
+ MSG_ADDR_OSD(p->get_peer()));
+ }
+ }
+ }
- // FIXME mutliple maps?
-
- // scan PGs
- list<pg_t> ls;
- get_pg_list(ls);
-
- advance_map(ls);
- activate_map(ls);
+ // process waiters
+ take_waiters(waiting_for_osdmap);
}
-*/
void OSD::handle_osd_map(MOSDMap *m)
{
dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl;
}
- // decode new map
- osdmap = new OSDMap();
- osdmap->decode(m->get_osdmap());
- osdmaps[osdmap->get_version()] = osdmap;
- dout(7) << "got osd map version " << osdmap->get_version() << endl;
-
- // pg list
- list<pg_t> pg_list;
-
- if (m->is_mkfs()) {
- // create PGs
- for (int nrep = 2; nrep <= g_conf.osd_max_rep; nrep++) {
- ps_t maxps = 1LL << osdmap->get_pg_bits();
- for (pg_t ps = 0; ps < maxps; ps++) {
- pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep);
- vector<int> acting;
- osdmap->pg_to_acting_osds(pgid, acting);
-
-
- if (acting[0] == whoami) {
- PG *pg = create_pg(pgid);
- pg->acting = acting;
- pg->set_role(0);
- pg->set_primary_since(osdmap->get_version());
- pg->state_set(PG_STATE_COMPLETE);
-
- dout(7) << "created " << *pg << endl;
-
- pg_list.push_back(pgid);
- }
- }
- }
- } else {
- // get pg list
- get_pg_list(pg_list);
- }
-
- // use our new map(s)
- advance_map(pg_list);
- activate_map(pg_list);
-
- // process waiters
- take_waiters(waiting_for_osdmap);
+ update_map(m->get_osdmap(), m->is_mkfs());
} else {
dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
PG *pg = new PG(whoami, pgid);
//pg->info.created = osdmap->get_version();
- pg->last_complete = osdmap->get_version();
pg->store(store);
pg_map[pgid] = pg;
if (pg->get_role() == 0) {
// drop peers
take_waiters(pg->waiting_for_peered);
+ for (hash_map<object_t, list<Message*> >::iterator it = pg->waiting_for_missing_object.begin();
+ it != pg->waiting_for_missing_object.end();
+ it++)
+ take_waiters(it->second);
+ pg->waiting_for_missing_object.clear();
+
+ for (hash_map<object_t, list<Message*> >::iterator it = pg->waiting_for_clean_object.begin();
+ it != pg->waiting_for_clean_object.end();
+ it++)
+ take_waiters(it->second);
+ pg->waiting_for_clean_object.clear();
+
pg->drop_peers();
pg->state_clear(PG_STATE_CLEAN);
pg->discard_recovery_plan();
for (set<__uint64_t>::iterator tid = s.begin();
tid != s.end();
tid++)
- ack_replica_op(*tid, -1, *down);
+ ack_replica_op(*tid, -1, false, *down);
}
}
{
dout(7) << "activate_map version " << osdmap->get_version() << endl;
- map< int, map<pg_t, version_t> > notify_list; // primary -> pgid -> last_complete
+ map< int, map<pg_t, version_t> > notify_list; // primary -> pgid -> last_any_complete
map< int, map<PG*,int> > start_map; // peer -> PG -> peer_role
// scan pg's
}
else if (pg->is_stray()) {
// i am residual|replica
- notify_list[pg->get_primary()][pgid] = pg->get_last_complete();
+ notify_list[pg->get_primary()][pgid] = pg->get_last_any_complete();
}
}
void OSD::start_peers(PG *pg, map< int, map<PG*,int> >& start_map)
{
- dout(10) << " " << *pg << " was last_complete " << pg->get_last_complete() << endl;
+ dout(10) << " " << *pg << " last_any_complete " << pg->get_last_any_complete() << endl;
// determine initial peer set
map<int,int> peerset; // peer -> role
// prior map(s), if OSDs are still up
- for (version_t epoch = pg->get_last_complete();
+ for (version_t epoch = pg->get_last_any_complete();
epoch < osdmap->get_version();
epoch++) {
OSDMap *omap = get_osd_map(epoch);
!pg->is_peered()) {
dout(10) << " " << *pg << " already has necessary peers, analyzing" << endl;
pg->mark_peered();
- pg->plan_recovery(store, osdmap->get_version());
+ take_waiters(pg->waiting_for_peered);
+
+ plan_recovery(pg);
do_recovery(pg);
}
}
// down?
if (osdmap->is_down(from)) {
- dout(7) << " from down OSD osd" << from << ", pinging" << endl;
+ dout(7) << " from down OSD osd" << from << ", dropping" << endl;
// FIXME
return false;
}
assert(pg->acting[0] == whoami);
pg->set_role(0);
pg->set_primary_since( osdmap->get_version() ); // FIXME: this may miss a few epochs!
- pg->set_last_complete( 0 );
+ pg->mark_any_complete( it->second );
dout(10) << " " << *pg << " is new, nrep=" << nrep << endl;
assert(0);
}
+ if (it->second > pg->get_last_any_complete())
+ pg->mark_any_complete( it->second );
+
// peered with this guy specifically?
PGPeer *pgp = pg->get_peer(from);
if (!pgp &&
pg->acting = acting;
pg->set_role(role);
+ //if (m->get_version() == 1) pg->mark_complete(); // hack... need a more elegant solution
+
dout(10) << " " << *pg << " dne (before), but i am role " << role << endl;
// take any waiters
// report back state and pg content
ack->pg_state[pgid].state = pg->get_state();
ack->pg_state[pgid].last_complete = pg->get_last_complete();
+ ack->pg_state[pgid].last_any_complete = pg->get_last_any_complete();
pg->scan_local_objects(ack->pg_state[pgid].objects, store); // list my objects
// i am now peered
assert(pg);
dout(10) << " " << *pg << " osd" << from << " remote state " << it->second.state
- << " w/ " << it->second.objects.size() << " objects" << endl;
+ << " w/ " << it->second.objects.size() << " objects"
+ << ", last_complete " << it->second.last_complete
+ << ", last_any_complete " << it->second.last_any_complete
+ << endl;
PGPeer *pgp = pg->get_peer(from);
assert(pgp);
+ pg->mark_any_complete( it->second.last_any_complete );
+
pgp->last_complete = it->second.last_complete;
pgp->objects = it->second.objects;
pgp->state_set(PG_PEER_STATE_ACTIVE);
take_waiters(pg->waiting_for_peered);
dout(10) << " " << *pg << " fully peered, analyzing" << endl;
- pg->plan_recovery(store, osdmap->get_version());
+ plan_recovery(pg);
do_recovery(pg);
} else {
// we're already peered.
void OSD::handle_pg_update(MOSDPGUpdate *m)
{
int from = MSG_ADDR_NUM(m->get_source());
- dout(7) << "handle_pg_update on " << hex << m->get_pgid() << dec << " from osd" << from << endl;
+ dout(7) << "handle_pg_update on " << hex << m->get_pgid() << dec << " from osd" << from
+ << " complete=" << m->is_complete()
+ << " last_any_complete=" << m->get_last_any_complete()
+ << endl;
PG *pg = get_pg(m->get_pgid());
if (!require_current_pg_primary(m, m->get_version(), pg)) return;
//pg->assim_info( m->get_pginfo() );
// complete?
- if (m->is_complete())
- pg->mark_complete();
+ if (m->is_complete()) {
+ pg->mark_complete( osdmap->get_version() );
+ }
+
+ if (m->get_last_any_complete())
+ pg->mark_any_complete( m->get_last_any_complete() );
pg->store(store);
}
// RECOVERY
+void OSD::plan_recovery(PG *pg)
+{
+ version_t current_version = osdmap->get_version();
+
+ list<PGPeer*> complete_peers;
+ pg->plan_recovery(store, current_version, complete_peers);
+
+ for (list<PGPeer*>::iterator it = complete_peers.begin();
+ it != complete_peers.end();
+ it++) {
+ PGPeer *p = *it;
+ dout(7) << " " << *pg << " telling peer osd" << p->get_peer() << " they are complete" << endl;
+ messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
+ MSG_ADDR_OSD(p->get_peer()));
+ }
+}
+
void OSD::do_recovery(PG *pg)
{
// recover
- if (!pg->is_complete()) {
+ if (!pg->is_complete(osdmap->get_version())) {
pg_pull(pg, max_recovery_ops);
}
// replicate
- if (pg->is_complete()) {
+ if (pg->is_complete( osdmap->get_version() )) {
if (!pg->objects_unrep.empty())
pg_push(pg, max_recovery_ops);
if (!pg->objects_stray.empty())
dout(7) << "pull_replica " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl;
// add to fetching list
- p->pull(oid, v);
- pg->objects_pulling[oid] = p;
+ pg->pulling(oid, v, p);
// send op
__uint64_t tid = ++last_tid;
assert(v == op->get_version());
// read
- bufferptr bptr = new buffer(st.st_size); // prealloc space for entire read
+ bufferlist bl;
long got = store->read(op->get_oid(),
st.st_size, 0,
- bptr.c_str());
+ bl);
assert(got == st.st_size);
// reply
MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
- bptr.set_length(got); // properly size the buffer
- bufferlist bl;
- bl.push_back( bptr );
reply->set_result(0);
reply->set_data(bl);
reply->set_length(got);
osd_lock.Unlock();
// write it and add it to the PG
- store->write(o, op->get_length(), 0, op->get_data().c_str());
+ store->write(o, op->get_length(), 0, op->get_data());
p->pg->add_object(store, o);
store->setattr(o, "version", &v, sizeof(v));
pull_ops.erase(op->get_tid());
pg->pulled(o, v, p);
-
+
+ // now complete?
+ if (pg->objects_missing.empty()) {
+ pg->mark_complete(osdmap->get_version());
+
+ // distribute new last_any_complete
+ dout(7) << " " << *pg << " now complete, updating last_any_complete on peers" << endl;
+ for (map<int,PGPeer*>::iterator it = pg->peers.begin();
+ it != pg->peers.end();
+ it++) {
+ PGPeer *p = it->second;
+ messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), false, osdmap->get_version()),
+ MSG_ADDR_OSD(p->get_peer()));
+ }
+ }
+
// finish waiters
if (pg->waiting_for_missing_object.count(o))
take_waiters(pg->waiting_for_missing_object[o]);
push_replica(pg, oid);
ops++;
}
-
}
void OSD::push_replica(PG *pg, object_t oid)
set<int>& peers = pg->objects_unrep[oid];
- dout(7) << "push_replica " << hex << oid << dec << " v " << v << " to osds " << peers << endl;
-
// load object content
struct stat st;
store->stat(oid, &st);
- bufferptr b = new buffer(st.st_size);
- store->read(oid, st.st_size, 0, b.c_str());
+ bufferlist bl;
+ store->read(oid, st.st_size, 0, bl);
+ assert(bl.length() == st.st_size);
+
+ dout(7) << "push_replica " << hex << oid << dec << " v " << v << " to osds " << peers << " size " << st.st_size << endl;
for (set<int>::iterator pit = peers.begin();
pit != peers.end();
assert(p);
// add to list
- p->push(oid, v);
- pg->objects_pushing[oid].insert(p);
+ pg->pushing(oid, v, p);
// send op
__uint64_t tid = ++last_tid;
op->set_pg_role(-1); // whatever, not 0
// include object content
- op->get_data().append(b);
+ //op->set_data(bl); // no no bad, will modify bl
+ op->get_data() = bl; // _copy_ bufferlist, we may have multiple destinations!
op->set_length(st.st_size);
op->set_offset(0);
dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
lock_object(op->get_oid());
+ PG *pg = get_pg(op->get_pg());
+ assert(pg);
+
// exists?
if (store->exists(op->get_oid())) {
store->truncate(op->get_oid(), 0);
}
// write out buffers
- bufferlist bl;
- bl.claim( op->get_data() );
-
- off_t off = 0;
- for (list<bufferptr>::iterator it = bl.buffers().begin();
- it != bl.buffers().end();
- it++) {
- int r = store->write(op->get_oid(),
- (*it).length(), off,
- (*it).c_str(),
- false); // write async, no rush
- assert((unsigned)r == (*it).length());
- off += (*it).length();
- }
+ int r = store->write(op->get_oid(),
+ op->get_length(), 0,
+ op->get_data(),
+ false); // FIXME
+ pg->add_object(store, op->get_oid());
+ assert(r >= 0);
// set version
version_t v = op->get_version();
if (p->is_complete()) {
dout(7) << " telling replica they are complete" << endl;
- messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true),
+ messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
MSG_ADDR_OSD(p->get_peer()));
}
assert(p);
const version_t v = it->second;
- p->remove(oid, v);
- pg->objects_removing[oid][p] = v;
-
+ // add to list
+ pg->removing(oid, v, p);
+
// send op
__uint64_t tid = ++last_tid;
MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
if (p->is_complete()) {
dout(7) << " telling replica they are complete" << endl;
- messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true),
+ messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
MSG_ADDR_OSD(p->get_peer()));
}
}
+class C_OSD_RepModifySync : public Context {
+public:
+ OSD *osd;
+ MOSDOp *op;
+ C_OSD_RepModifySync(OSD *o, MOSDOp *oo) : osd(o), op(oo) { }
+ void finish(int r) {
+ osd->op_rep_modify_sync(op);
+ }
+};
+
+void OSD::op_rep_modify_sync(MOSDOp *op)
+{
+ osd_lock.Lock();
+ dout(2) << "rep_modify_sync on op " << op << endl;
+ MOSDOpReply *ack2 = new MOSDOpReply(op, 0, osdmap, true);
+ messenger->send_message(ack2, op->get_asker());
+ delete op;
+ osd_lock.Unlock();
+}
+
void OSD::op_rep_modify(MOSDOp *op)
{
// when we introduce unordered messaging.. FIXME
// PG
PG *pg = get_pg(op->get_pg());
assert(pg);
-
- dout(12) << "rep_modify " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
-
- // pre-ack
- //MOSDOpReply *ack1 = new MOSDOpReply(op, 0, osdmap);
- //messenger->send_message(ack1, op->get_asker());
-
- /*
- // update pg stamp(s)
- pg->last_modify_stamp = op->get_version();
- if (pg->is_complete())
- pg->last_complete_stamp = pg->last_modify_stamp;
- */
+ dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
int r = 0;
+ Context *onsync = 0;
if (op->get_op() == OSD_OP_REP_WRITE) {
// write
- r = apply_write(op, false, op->get_version());
+ assert(op->get_data().length() == op->get_length());
+ onsync = new C_OSD_RepModifySync(this, op);
+ r = apply_write(op, op->get_version(), onsync);
if (ov == 0) pg->add_object(store, oid);
} else if (op->get_op() == OSD_OP_REP_DELETE) {
// delete
r = store->truncate(oid, op->get_offset());
} else assert(0);
- // ack
- MOSDOpReply *ack2 = new MOSDOpReply(op, 0, osdmap, true);
- messenger->send_message(ack2, op->get_asker());
-
- delete op;
+ if (onsync) {
+ // ack
+ MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
+ messenger->send_message(ack, op->get_asker());
+ } else {
+ // sync, safe
+ MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
+ messenger->send_message(ack, op->get_asker());
+ delete op;
+ }
}
void OSD::handle_op(MOSDOp *op)
{
+ osd_lock.Lock();
+
pg_t pgid = op->get_pg();
PG *pg = get_pg(pgid);
// op's is newer
dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl;
wait_for_new_map(op);
+ osd_lock.Unlock();
return;
}
op->get_asker());
delete op;
}
+ osd_lock.Unlock();
return;
}
if (!pg) {
dout(7) << "hit non-existent pg " << hex << op->get_pg() << dec << ", waiting" << endl;
waiting_for_pg[pgid].push_back(op);
+ osd_lock.Unlock();
return;
}
else {
- if (!pg->is_complete()) {
+ dout(7) << "handle_op " << op << " in " << *pg << endl;
+
+ // must be peered.
+ if (!pg->is_peered()) {
+ dout(7) << "op_write " << *pg << " not peered (yet)" << endl;
+ pg->waiting_for_peered.push_back(op);
+ osd_lock.Unlock();
+ return;
+ }
+
+ const object_t oid = op->get_oid();
+
+ if (!pg->is_complete( osdmap->get_version() )) {
// consult PG object map
- if (pg->objects_missing.count(op->get_oid())) {
+ if (pg->objects_missing.count(oid)) {
// need to pull
- dout(7) << "need to pull object " << hex << op->get_oid() << dec << endl;
- if (!pg->objects_pulling.count(op->get_oid()))
- pull_replica(pg, op->get_oid());
- pg->waiting_for_missing_object[op->get_oid()].push_back(op);
+ dout(7) << "need to pull object " << hex << oid << dec << endl;
+ if (!pg->objects_pulling.count(oid))
+ pull_replica(pg, oid);
+ pg->waiting_for_missing_object[oid].push_back(op);
+ osd_lock.Unlock();
+ return;
+ }
+ }
+
+ if (!pg->is_clean() &&
+ (op->get_op() == OSD_OP_WRITE ||
+ op->get_op() == OSD_OP_TRUNCATE ||
+ op->get_op() == OSD_OP_DELETE)) {
+ // exists but not replicated?
+ if (pg->objects_unrep.count(oid)) {
+ dout(7) << "object " << hex << oid << dec << " in " << *pg
+ << " exists but not clean" << endl;
+ pg->waiting_for_clean_object[oid].push_back(op);
+ if (pg->objects_pushing.count(oid) == 0)
+ push_replica(pg, oid);
+ osd_lock.Unlock();
+ return;
+ }
+
+ // just stray?
+ // FIXME: this is a bit to aggressive; includes inactive peers
+ if (pg->objects_stray.count(oid)) {
+ dout(7) << "object " << hex << oid << dec << " in " << *pg
+ << " dne but is not clean" << endl;
+ pg->waiting_for_clean_object[oid].push_back(op);
+ if (pg->objects_removing.count(oid) == 0)
+ remove_replica(pg, oid);
+ osd_lock.Unlock();
return;
}
}
-
}
} else {
// REPLICATION OP
-
+ if (pg) {
+ dout(7) << "handle_rep_op " << op << " in " << *pg << endl;
+ } else {
+ dout(7) << "handle_rep_op " << op << " in pgid " << op->get_pg() << endl;
+ }
// check osd map
if (op->get_map_version() != osdmap->get_version()) {
// make sure source is still primary
dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary changed on pg " << hex << op->get_pg() << dec << endl;
MOSDOpReply *fail = new MOSDOpReply(op, -1, osdmap, false);
messenger->send_message(fail, op->get_asker());
+ osd_lock.Unlock();
return;
} else {
dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary same on pg " << hex << op->get_pg() << dec << endl;
do_op(op);
} else
queue_op(op);
+
+ osd_lock.Unlock();
}
void OSD::queue_op(MOSDOp *op) {
}
} else {
// regular op
- pg_t pgid = op->get_pg();
- PG *pg = get_pg(pgid);
-
- // PG must be peered for all client ops.
- if (!pg) {
- dout(7) << "op_write pg " << hex << pgid << dec << " dne (yet)" << endl;
- waiting_for_pg[pgid].push_back(op);
- }
- else if (!pg->is_peered()) {
- dout(7) << "op_write " << *pg << " not peered (yet)" << endl;
- pg->waiting_for_peered.push_back(op);
- }
- else {
- // do op
- switch (op->get_op()) {
- case OSD_OP_READ:
- op_read(op, pg);
- break;
- case OSD_OP_STAT:
- op_stat(op, pg);
- break;
- case OSD_OP_WRITE:
- case OSD_OP_DELETE:
- case OSD_OP_TRUNCATE:
- op_modify(op, pg);
- break;
- default:
- assert(0);
- }
+ switch (op->get_op()) {
+ case OSD_OP_READ:
+ op_read(op);
+ break;
+ case OSD_OP_STAT:
+ op_stat(op);
+ break;
+ case OSD_OP_WRITE:
+ case OSD_OP_DELETE:
+ case OSD_OP_TRUNCATE:
+ op_modify(op);
+ break;
+ default:
+ assert(0);
}
}
// READ OPS
-bool OSD::object_complete(PG *pg, object_t oid, Message *op)
-{
- if (!pg->is_complete()) {
- if (pg->objects_missing.count(oid)) {
- dout(7) << "object " << hex << oid << dec << /*" v " << v << */" in " << *pg
- << " exists but not local (yet)" << endl;
- pg->waiting_for_missing_object[oid].push_back(op);
- return false;
- }
- }
- return true;
-}
-
-void OSD::op_read(MOSDOp *op, PG *pg)
+void OSD::op_read(MOSDOp *op)
{
object_t oid = op->get_oid();
lock_object(oid);
- // version? clean?
- if (!object_complete(pg, oid, op)) {
- unlock_object(oid);
- return;
- }
-
// read into a buffer
- bufferptr bptr = new buffer(op->get_length()); // prealloc space for entire read
+ bufferlist bl;
long got = store->read(oid,
op->get_length(), op->get_offset(),
- bptr.c_str());
+ bl);
// set up reply
MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
if (got >= 0) {
- bptr.set_length(got); // properly size the buffer
-
- // give it to the reply in a bufferlist
- bufferlist bl;
- bl.push_back( bptr );
-
reply->set_result(0);
reply->set_data(bl);
reply->set_length(got);
} else {
- bptr.set_length(0);
reply->set_result(got); // error
reply->set_length(0);
}
unlock_object(oid);
}
-void OSD::op_stat(MOSDOp *op, PG *pg)
+void OSD::op_stat(MOSDOp *op)
{
object_t oid = op->get_oid();
lock_object(oid);
-
- // version? clean?
- if (!object_complete(pg, oid, op)) {
- unlock_object(oid);
- return;
- }
struct stat st;
memset(&st, sizeof(st), 0);
// WRITE OPS
-int OSD::apply_write(MOSDOp *op, bool write_sync, version_t v)
+int OSD::apply_write(MOSDOp *op, version_t v, Context *onsync)
{
// take buffers from the message
bufferlist bl;
- bl.claim( op->get_data() );
+ bl = op->get_data();
+ //bl.claim( op->get_data() );
- // write out buffers
- off_t off = op->get_offset();
- for (list<bufferptr>::iterator it = bl.buffers().begin();
- it != bl.buffers().end();
- it++) {
-
- int r = store->write(op->get_oid(),
- (*it).length(), off,
- (*it).c_str(),
- write_sync); // write synchronously
- off += (*it).length();
- if (r < 0) {
- dout(1) << "write error on " << hex << op->get_oid() << dec << " len " << (*it).length() << " off " << off << " r = " << r << endl;
- assert(r >= 0);
+ // write
+ if (onsync) {
+ if (g_conf.fake_osd_sync) {
+ // fake a delayed sync
+ store->write(op->get_oid(),
+ op->get_length(),
+ op->get_offset(),
+ bl,
+ false);
+ g_timer.add_event_after(1.0,
+ onsync);
+ } else {
+ // for real
+ store->write(op->get_oid(),
+ op->get_length(),
+ op->get_offset(),
+ bl,
+ onsync);
}
+ } else {
+ // normal business
+ store->write(op->get_oid(),
+ op->get_length(),
+ op->get_offset(),
+ bl,
+ false);
}
// set version
}
-bool OSD::object_clean(PG *pg, object_t oid, version_t& v, Message *op)
+
+void OSD::issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd)
{
- v = 0;
+ MOSDOp *op = repop->op;
+ object_t oid = op->get_oid();
+
+ dout(7) << " issue_replica_op in " << *pg << " o " << hex << oid << dec << " to osd" << osd << endl;
- if (pg->is_complete() && pg->is_clean()) {
- // PG is complete+clean, easy shmeasy!
- if (store->exists(oid)) {
- store->getattr(oid, "version", &v, sizeof(v));
- assert(v>0);
- }
- } else {
- // PG is recovering, blech.
-
- // does oid exist, and what version.
- if (pg->is_complete()) {
- // pg !clean, complete
- if (store->exists(oid)) {
- store->getattr(oid, "version", &v, sizeof(v));
- assert(v > 0);
- }
- } else {
- // pg !clean, !complete
- if (pg->objects_missing.count(oid))
- v = pg->objects_missing_v[oid];
- }
+ // forward the write
+ __uint64_t tid = ++last_tid;
+ MOSDOp *wr = new MOSDOp(tid,
+ messenger->get_myaddr(),
+ oid,
+ pg->get_pgid(),
+ osdmap->get_version(),
+ 100+op->get_op());
+ wr->get_data() = op->get_data(); // copy bufferlist
+ wr->set_length(op->get_length());
+ wr->set_offset(op->get_offset());
+ wr->set_version(repop->new_version);
+ wr->set_old_version(repop->old_version);
+ wr->set_pg_role(1); // replica
+ messenger->send_message(wr, MSG_ADDR_OSD(osd));
+
+ repop->osds.insert(osd);
+ repop->waitfor_ack[tid] = osd;
+ repop->waitfor_sync[tid] = osd;
- if (v > 0) {
- dout(10) << " pg not clean, checking if " << hex << oid << dec << " v " << v << " is specifically clean yet! " << *pg << endl;
- // object (logically) exists
- if (!pg->existant_object_is_clean(oid, v)) {
- dout(7) << "object " << hex << oid << dec << " v " << v << " in " << *pg
- << " exists but is not clean" << endl;
- pg->waiting_for_clean_object[oid].push_back(op);
- if (pg->objects_pushing.count(oid) == 0)
- push_replica(pg, oid);
- return false;
- }
- } else {
- // object (logically) dne
- if (store->exists(oid) ||
- !pg->nonexistant_object_is_clean(oid)) {
- dout(7) << "object " << hex << oid << dec << " v " << v << " in " << *pg
- << " dne but is not clean" << endl;
- pg->waiting_for_clean_object[oid].push_back(op);
- if (pg->objects_removing.count(oid) == 0)
- remove_replica(pg, oid);
- return false;
- }
- }
+ replica_ops[tid] = repop;
+ replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid);
+}
+
+
+class C_OSD_WriteSync : public Context {
+public:
+ OSD *osd;
+ OSDReplicaOp *repop;
+ C_OSD_WriteSync(OSD *o, OSDReplicaOp *op) : osd(o), repop(op) {}
+ void finish(int r) {
+ osd->op_modify_sync(repop);
}
+};
- return true;
+void OSD::op_modify_sync(OSDReplicaOp *repop)
+{
+ dout(2) << "op_modify_sync on op " << repop->op << endl;
+
+ osd_lock.Lock();
+ {
+ repop->local_sync = true;
+ if (repop->can_send_sync()) {
+ dout(2) << "op_modify_sync on " << hex << repop->op->get_oid() << dec << " op " << repop->op << endl;
+ MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap, true);
+ messenger->send_message(reply, repop->op->get_asker());
+ delete repop->op;
+ }
+ if (repop->can_delete()) {
+ delete repop;
+ }
+ }
+ osd_lock.Unlock();
}
-void OSD::op_modify(MOSDOp *op, PG *pg)
+void OSD::op_modify(MOSDOp *op)
{
object_t oid = op->get_oid();
// version? clean?
version_t ov = 0; // 0 == dne (yet)
- if (!object_clean(pg, oid, ov, op)) {
- unlock_object(oid);
- return;
- }
+ store->getattr(oid, "version", &ov, sizeof(ov));
+ version_t nv = messenger->peek_lamport();
+ assert(nv > ov);
- version_t v = messenger->peek_lamport();
-
- dout(12) << opname << " " << hex << oid << dec << " v " << v << endl;
+ dout(12) << opname << " " << hex << oid << dec << " v " << nv << " off " << op->get_offset() << " len " << op->get_length() << endl;
// issue replica writes
- replica_write_lock.Lock();
- assert(replica_write_tids.count(op) == 0);
- assert(replica_write_local.count(op) == 0);
+ OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
+ osd_lock.Lock();
+ PG *pg = get_pg(op->get_pg());
for (unsigned i=1; i<pg->acting.size(); i++) {
- int osd = pg->acting[i];
- dout(7) << " replica write in " << *pg << " o " << hex << oid << dec << " to osd" << osd << endl;
-
- // forward the write
- __uint64_t tid = ++last_tid;
- MOSDOp *wr = new MOSDOp(tid,
- messenger->get_myaddr(),
- oid,
- op->get_pg(),
- osdmap->get_version(),
- 100+op->get_op());
- wr->get_data() = op->get_data(); // copy bufferlist
- wr->set_length(op->get_length());
- wr->set_offset(op->get_offset());
- wr->set_version(v);
- wr->set_old_version(ov);
- wr->set_pg_role(1); // replica
- messenger->send_message(wr, MSG_ADDR_OSD(osd));
-
- replica_write_tids[op].insert(tid);
- replica_writes[tid] = op;
- replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid);
+ issue_replica_op(pg, repop, pg->acting[i]);
}
- replica_write_lock.Unlock();
+ osd_lock.Unlock();
// pre-ack
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
- messenger->send_message(reply, op->get_asker());
+ //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+ //messenger->send_message(reply, op->get_asker());
- /*
- // update pg stamp(s)
- pg->last_modify_stamp = v;
- if (pg->is_complete())
- pg->last_complete_stamp = v;
- */
-
// do it
int r;
if (op->get_op() == OSD_OP_WRITE) {
// write
- r = apply_write(op, true, v);
+ assert(op->get_data().length() == op->get_length());
+ Context *onsync = new C_OSD_WriteSync(this, repop);
+ r = apply_write(op, nv, onsync);
// put new object in proper collection
if (ov == 0) pg->add_object(store, oid);
+
+ repop->local_ack = true;
}
else if (op->get_op() == OSD_OP_TRUNCATE) {
// truncate
r = store->truncate(oid, op->get_offset());
+ repop->local_ack = true;
+ repop->local_sync = true;
}
else if (op->get_op() == OSD_OP_DELETE) {
// delete
- store->collection_remove(pg->get_pgid(), op->get_oid());
+ pg->remove_object(store, op->get_oid());
r = store->remove(oid);
+ repop->local_ack = true;
+ repop->local_sync = true;
}
else assert(0);
- // reply?
- replica_write_lock.Lock();
- if (replica_write_tids.count(op) == 0) {
- // all replica writes completed.
- if (replica_write_result[op] == 0) {
- dout(10) << opname << " wrote locally: rep writes already finished, replying" << endl;
+ // can we reply yet?
+ osd_lock.Lock();
+ {
+ if (repop->can_send_sync()) {
+ dout(10) << opname << " sending sync on " << op << endl;
MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
messenger->send_message(reply, op->get_asker());
- delete op;
- } else {
- dout(10) << opname << " wrote locally, but rep writes failed, fw to new primary" << endl;
- //finished.push_back(op);
- messenger->send_message(op, MSG_ADDR_OSD(pg->get_primary()));
}
- replica_write_result.erase(op);
- } else {
- // note that it's written locally
- dout(10) << opname << " wrote locally: rep writes not yet finished, waiting" << endl;
- replica_write_local.insert(op);
+ else if (repop->can_send_ack()) {
+ dout(10) << opname << " sending ack on " << op << endl;
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+ messenger->send_message(reply, op->get_asker());
+ }
}
- replica_write_lock.Unlock();
+ osd_lock.Unlock();
unlock_object(oid);
}
class Message;
+class OSDReplicaOp {
+ public:
+ class MOSDOp *op;
+ map<__uint64_t,int> waitfor_ack;
+ map<__uint64_t,int> waitfor_sync;
+ bool local_ack;
+ bool local_sync;
+ bool cancel;
+
+ set<int> osds;
+ version_t new_version, old_version;
+
+ OSDReplicaOp(class MOSDOp *o, version_t nv, version_t ov) :
+ op(o),
+ local_ack(false), local_sync(false), cancel(false),
+ new_version(nv), old_version(ov)
+ { }
+ bool can_send_ack() { return !cancel && local_ack && waitfor_ack.empty(); }
+ bool can_send_sync() { return !cancel && local_sync && waitfor_sync.empty(); }
+ bool can_delete() { return cancel || can_send_sync(); }
+};
class OSD : public Dispatcher {
//int read_onode(onode_t& onode);
//int write_onode(onode_t& onode);
- bool object_complete(PG *pg, object_t oid, Message *op);
- bool object_clean(PG *pg, object_t oid, version_t& v, Message *op);
-
// -- ops --
class ThreadPool<class OSD, class MOSDOp> *threadpool;
void queue_op(class MOSDOp *m);
void wait_for_no_ops();
- int apply_write(MOSDOp *op, bool write_sync, version_t v); // for op_write and op_rep_write
+ int apply_write(MOSDOp *op, version_t v,
+ Context *onsync = 0);
+
public:
list<class Message*> waiting_for_osdmap;
map<version_t, OSDMap*> osdmaps;
- void update_map(bufferlist& state);
+ void update_map(bufferlist& state, bool mkfs=false);
void wait_for_new_map(Message *m);
void handle_osd_map(class MOSDMap *m);
OSDMap *get_osd_map(version_t v);
- // <old replica hack>
- Mutex replica_write_lock;
- map<__uint64_t, MOSDOp*> replica_writes;
- map<MOSDOp*, set<__uint64_t> > replica_write_tids;
- set<MOSDOp*> replica_write_local;
- map<MOSDOp*, int> replica_write_result;
- map<pg_t, map<int, set<__uint64_t> > > replica_pg_osd_tids; // pg -> osd -> tid
- // </hack>
+ void advance_map(list<pg_t>& ls);
+ void activate_map(list<pg_t>& ls);
+
// -- replication --
// PG
hash_map<pg_t, PG*> pg_map;
- void get_pg_list(list<pg_t>& ls);
- bool pg_exists(pg_t pg);
- PG *create_pg(pg_t pg); // create new PG
- PG *get_pg(pg_t pg); // return existing PG, load state from store (if needed)
- void close_pg(pg_t pg); // close in-memory state
- void remove_pg(pg_t pg); // remove state from store
+
+ void get_pg_list(list<pg_t>& ls);
+ bool pg_exists(pg_t pg);
+ PG *create_pg(pg_t pg); // create new PG
+ PG *get_pg(pg_t pg); // return existing PG, load state from store (if needed)
+ void close_pg(pg_t pg); // close in-memory state
+ void remove_pg(pg_t pg); // remove state from store
__uint64_t last_tid;
- map<__uint64_t,PGPeer*> pull_ops; // tid -> PGPeer*
- map<__uint64_t,PGPeer*> push_ops; // tid -> PGPeer*
- map<__uint64_t,PGPeer*> remove_ops; // tid -> PGPeer*
- hash_map<pg_t, list<Message*> > waiting_for_pg;
+ hash_map<pg_t, list<Message*> > waiting_for_pg;
+ // replica ops
+ map<__uint64_t, OSDReplicaOp*> replica_ops;
+ map<pg_t, map<int, set<__uint64_t> > > replica_pg_osd_tids; // pg -> osd -> tid
+
+ void issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd);
+ void ack_replica_op(__uint64_t tid, int result, bool safe, int fromosd);
- void advance_map(list<pg_t>& ls);
- void activate_map(list<pg_t>& ls);
+ // recovery
+ map<__uint64_t,PGPeer*> pull_ops; // tid -> PGPeer*
+ map<__uint64_t,PGPeer*> push_ops; // tid -> PGPeer*
+ map<__uint64_t,PGPeer*> remove_ops; // tid -> PGPeer*
void start_peers(PG *pg, map< int, map<PG*,int> >& start_map);
void peer_notify(int primary, map<pg_t,version_t>& pg_list);
void peer_start(int replica, map<PG*,int>& pg_map);
+ void plan_recovery(PG *pg);
void do_recovery(PG *pg);
void pg_pull(PG *pg, int maxops);
void pg_push(PG *pg, int maxops);
void op_rep_remove_reply(class MOSDOpReply *op);
void op_rep_modify(class MOSDOp *op); // write, trucnate, delete
- void ack_replica_op(__uint64_t tid, int result, int fromosd);
+ void op_rep_modify_sync(class MOSDOp *op);
+ friend class C_OSD_RepModifySync;
public:
OSD(int id, Messenger *m);
void handle_ping(class MPing *m);
void handle_op(class MOSDOp *m);
- void op_read(class MOSDOp *m, PG *pg);
- void op_stat(class MOSDOp *m, PG *pg);
- void op_modify(class MOSDOp *m, PG *pg);
- //void op_delete(class MOSDOp *m, PG *pg);
- //void op_truncate(class MOSDOp *m, PG *pg);
-
- //void op_mkfs(class MOSDOp *m);
+ void op_read(class MOSDOp *m);
+ void op_stat(class MOSDOp *m);
+ void op_modify(class MOSDOp *m);
+ void op_modify_sync(class OSDReplicaOp *repop);
// for replication
void handle_op_reply(class MOSDOpReply *m);
#define __OBJECTSTORE_H
#include "include/types.h"
+#include "include/Context.h"
+#include "include/bufferlist.h"
#include <sys/stat.h>
virtual int read(object_t oid,
size_t len, off_t offset,
- char *buffer) = 0;
+ bufferlist& bl) = 0;
+
virtual int write(object_t oid,
size_t len, off_t offset,
- char *buffer,
- bool fsync=true) = 0;
-
+ bufferlist& bl,
+ bool fsync=true) = 0;
+ virtual int write(object_t oid,
+ size_t len, off_t offset,
+ bufferlist& bl,
+ Context *onsafe) { return -1; }
+
virtual int setattr(object_t oid, const char *name,
void *value, size_t size) {return 0;} //= 0;
virtual int getattr(object_t oid, const char *name,
void *value, size_t size) {return 0;} //= 0;
virtual int collection_listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
+
+
};
#endif
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << whoami << " " << *this << " "
-void PG::mark_peered()
-{
- dout(10) << "mark_peered" << endl;
- state_set(PG_STATE_PEERED);
-}
-
void PG::pulled(object_t oid, version_t v, PGPeer *p)
{
dout(10) << "pulled o " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl;
// object is now local
objects_missing.erase(oid);
objects_missing_v.erase(oid);
-
- if (objects_missing.empty()) {
- assert(!is_complete());
- mark_complete();
- }
}
-void PG::mark_complete()
-{
- dout(10) << "mark_complete" << endl;
-
- // done pulling objects!
- state_set(PG_STATE_COMPLETE);
-}
-
-void PG::mark_clean()
-{
- dout(10) << "mark_clean" << endl;
- state_set(PG_STATE_CLEAN);
-
- // drop residual peers
-
- // discard peer state
-
-}
void PG::pushed(object_t oid, version_t v, PGPeer *p)
{
}
}
+/*
bool PG::existant_object_is_clean(object_t o, version_t v)
{
assert(is_peered() && !is_clean());
return true;
}
+*/
-void PG::plan_recovery(ObjectStore *store, version_t current_version)
+void PG::plan_recovery(ObjectStore *store, version_t current_version,
+ list<PGPeer*>& complete_peers)
{
- dout(10) << "plan_recovery" << endl;
+ dout(10) << "plan_recovery " << current_version << endl;
assert(is_peered());
// choose newest last_complete epoch
for (map<int, PGPeer*>::iterator pit = peers.begin();
pit != peers.end();
pit++) {
+ dout(10) << " osd" << pit->first << " "
+ << pit->second->objects.size() << " objects, last_complete " << pit->second->last_complete << endl;
if (pit->second->last_complete > last)
last = pit->second->last_complete;
}
dout(10) << " combined last_complete epoch is " << last << endl;
- if (last < current_version-1) {
+ if (last+1 < current_version) {
dout(1) << "WARNING: last_complete skipped one or more epochs, we're possibly missing something" << endl;
}
- if (!last) {
+ if (!last) { // bootstrap!
dout(1) << "WARNING: no complete peers available (yet), pg is crashed" << endl;
return;
}
scan_local_objects(local_objects, store);
dout(10) << " " << local_objects.size() << " local objects" << endl;
- if (last_complete == last) {
- assert(is_complete());
+ if (last_complete == last)
master = local_objects;
- }
for (map<int, PGPeer*>::iterator pit = peers.begin();
pit != peers.end();
dout(7) << " master list has " << master.size() << " objects" << endl;
// local cleanup?
- if (!is_complete()) {
+ if (!is_complete(current_version)) {
// just cleanup old local objects
// FIXME: do this async?
assert(local || !objects_missing[o].empty()); // pull
}
+ if (objects_missing.empty()) {
+ mark_complete(current_version);
+ }
+
// plan clean -> objects_stray
for (map<int, PGPeer*>::iterator pit = peers.begin();
pit != peers.end();
PGPeer *p = pit->second;
assert(p->is_active());
+ if (p->missing.empty() && p->stray.empty()) {
+ p->state_set(PG_PEER_STATE_COMPLETE);
+ complete_peers.push_back(p);
+ }
+
if (p->is_complete()) {
dout(12) << " peer osd" << pit->first << " is complete" << endl;
} else {
}
}
+ if (objects_unrep.empty() && objects_stray.empty())
+ mark_clean();
+
// clear peer content lists
for (map<int, PGPeer*>::iterator pit = peers.begin();
pit != peers.end();
struct PGReplicaInfo {
int state;
version_t last_complete;
+ version_t last_any_complete;
map<object_t,version_t> objects; // remote object list
void _encode(bufferlist& blist) {
blist.append((char*)&state, sizeof(state));
+ blist.append((char*)&last_complete, sizeof(last_complete));
+ blist.append((char*)&last_any_complete, sizeof(last_any_complete));
::_encode(objects, blist);
//::_encode(deleted, blist);
}
void _decode(bufferlist& blist, int& off) {
blist.copy(off, sizeof(state), (char*)&state);
off += sizeof(state);
+ blist.copy(off, sizeof(last_complete), (char*)&last_complete);
+ off += sizeof(last_complete);
+ blist.copy(off, sizeof(last_any_complete), (char*)&last_any_complete);
+ off += sizeof(last_any_complete);
::_decode(objects, blist, off);
//::_decode(deleted, blist, off);
}
-
+/*
// a task list for moving objects around
class PGQueue {
list<object_t> objects;
}
bool empty() { return objects.empty(); }
};
+*/
*/
// any
-#define PG_STATE_COMPLETE 1 // i have full PG contents locally
+//#define PG_STATE_COMPLETE 1 // i have full PG contents locally
#define PG_STATE_PEERED 2 // primary: peered with everybody
// replica: peered with auth
int state; // see bit defns above
version_t primary_since; // (only defined if role==0)
+ version_t last_complete; // me
+ version_t last_any_complete; // anybody in the set
+
+ public:
map<int, PGPeer*> peers; // primary: (soft state) active peers
public:
vector<int> acting;
//pginfo_t info;
- version_t last_complete; // epoch
/*
lamport_t last_complete_stamp; // lamport timestamp of last complete op
if (objects_missing.empty()) return false;
if (objects_missing.size() == objects_pulling.size()) return false;
+ if (objects_pulling.empty() || pull_pos == objects_missing.end())
+ pull_pos = objects_missing.begin();
while (objects_pulling.count(pull_pos->first)) {
pull_pos++;
if (pull_pos == objects_missing.end())
if (objects_unrep.empty()) return false;
if (objects_unrep.size() == objects_pushing.size()) return false;
+ if (objects_pushing.empty() || push_pos == objects_unrep.end())
+ push_pos = objects_unrep.begin();
while (objects_pushing.count(push_pos->first)) {
push_pos++;
if (push_pos == objects_unrep.end())
if (objects_stray.empty()) return false;
if (objects_stray.size() == objects_removing.size()) return false;
+ if (objects_removing.empty() || remove_pos == objects_stray.end())
+ remove_pos = objects_stray.begin();
while (objects_removing.count(remove_pos->first)) {
remove_pos++;
if (remove_pos == objects_stray.end())
return true;
}
+ void pulling(object_t oid, version_t v, PGPeer *p) {
+ p->pull(oid, v);
+ objects_pulling[oid] = p;
+ }
void pulled(object_t oid, version_t v, PGPeer *p);
+
+ void pushing(object_t oid, version_t v, PGPeer *p) {
+ p->push(oid, v);
+ objects_pushing[oid].insert(p);
+ }
void pushed(object_t oid, version_t v, PGPeer *p);
+
+ void removing(object_t oid, version_t v, PGPeer *p) {
+ p->remove(oid, v);
+ objects_removing[oid][p] = v;
+ }
void removed(object_t oid, version_t v, PGPeer *p);
public:
- void plan_recovery(ObjectStore *store, version_t current_version);
+ void plan_recovery(ObjectStore *store, version_t current_version,
+ list<PGPeer*>& complete_peers);
void discard_recovery_plan() {
assert(waiting_for_peered.empty());
PG(int osd, pg_t p) : whoami(osd), pgid(p),
role(0),
state(0),
- last_complete(0)
+ primary_since(0),
+ last_complete(0), last_any_complete(0)
//last_complete_stamp(0), last_modify_stamp(0), last_clean_stamp(0)
{ }
int get_nrep() { return acting.size(); }
version_t get_last_complete() { return last_complete; }
- void set_last_complete(version_t v) { last_complete = v; }
+ //void set_last_complete(version_t v) { last_complete = v; }
+ version_t get_last_any_complete() { return last_any_complete; }
+ //void set_last_any_complete(version_t v) { last_any_complete = v; }
version_t get_primary_since() { return primary_since; }
void set_primary_since(version_t v) { primary_since = v; }
bool is_primary() { return role == 0; }
bool is_residual() { return role < 0; }
- bool existant_object_is_clean(object_t o, version_t v);
- bool nonexistant_object_is_clean(object_t o);
-
int get_state() { return state; }
bool state_test(int m) { return (state & m) != 0; }
void set_state(int s) { state = s; }
void state_set(int m) { state |= m; }
void state_clear(int m) { state &= ~m; }
- bool is_complete() { return state_test(PG_STATE_COMPLETE); }
+ bool is_complete(version_t v) {
+ //return state_test(PG_STATE_COMPLETE);
+ return v == last_complete;
+ }
bool is_peered() { return state_test(PG_STATE_PEERED); }
//bool is_crowned() { return state_test(PG_STATE_CROWNED); }
bool is_clean() { return state_test(PG_STATE_CLEAN); }
//bool is_flushing() { return state_test(PG_STATE_FLUSHING); }
bool is_stray() { return state_test(PG_STATE_STRAY); }
- void mark_peered();
- void mark_complete();
- void mark_clean();
+ void mark_peered() {
+ state_set(PG_STATE_PEERED);
+ }
+ void mark_complete(version_t v) {
+ last_complete = v;
+ if (v > last_any_complete) last_any_complete = v;
+ }
+ void mark_any_complete(version_t v) {
+ if (v > last_any_complete) last_any_complete = v;
+ }
+ void mark_clean() {
+ state_set(PG_STATE_CLEAN);
+ }
int num_active_ops() {
int o = 0;
store->collection_getattr(pgid, "state", &state, sizeof(state));
}
- void add_object(ObjectStore *store, object_t oid) {
+ void add_object(ObjectStore *store, const object_t oid) {
store->collection_add(pgid, oid);
}
- void remove_object(ObjectStore *store, object_t oid) {
+ void remove_object(ObjectStore *store, const object_t oid) {
store->collection_remove(pgid, oid);
}
void list_objects(ObjectStore *store, list<object_t>& ls) {
inline ostream& operator<<(ostream& out, PG& pg)
{
out << "pg[" << hex << pg.get_pgid() << dec << " " << pg.get_role();
- if (pg.is_complete()) out << " complete";
+ //if (pg.is_complete()) out << " complete";
if (pg.is_peered()) out << " peered";
if (pg.is_clean()) out << " clean";
+ out << " lc=" << pg.get_last_complete();
out << "]";
return out;
}
}
+bool Filer::is_active()
+{
+ if (!op_reads.empty() ||
+ !op_modify.empty() ||
+ !op_probes.empty()) {
+ for (hash_map<tid_t,PendingOSDRead_t*>::iterator it = op_reads.begin();
+ it != op_reads.end();
+ it++) dout(10) << " pending read op " << it->first << endl;
+ for (hash_map<tid_t,PendingOSDOp_t*>::iterator it = op_modify.begin();
+ it != op_modify.end();
+ it++) dout(10) << " pending modify op " << it->first << endl;
+ return true;
+ }
+ return false;
+}
void Filer::handle_osd_map(MOSDMap *m)
{
// map buffer into OSD extents
file_to_extents(inode, len, offset, p->extents);
- dout(7) << "osd read ino " << inode.ino << " len " << len << " off " << offset << " in " << p->extents.size() << " object extents" << endl;
+ dout(7) << "osd read ino " << hex << inode.ino << dec << " len " << len << " off " << offset << " in " << p->extents.size() << " object extents" << endl;
// issue reads
for (list<OSDExtent>::iterator it = p->extents.begin();
OSD_OP_READ);
m->set_length(it->len);
m->set_offset(it->offset);
- dout(15) << " read on " << last_tid << " from oid " << it->oid << " off " << it->offset << " len " << it->len << " (" << it->buffer_extents.size() << " buffer bits)" << endl;
+ dout(15) << " read on " << last_tid << " from oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << " (" << it->buffer_extents.size() << " buffer bits)" << endl;
messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
// add to gather set
for (map<size_t,size_t>::iterator bit = eit->buffer_extents.begin();
bit != eit->buffer_extents.end();
bit++) {
- dout(21) << "object " << eit->oid << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl;
+ dout(21) << "object " << hex << eit->oid << dec << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl;
by_off[bit->first] = new bufferlist;
if (ox_off + bit->second <= ox_len) {
list<OSDExtent> extents;
file_to_extents(inode, len, offset, extents);
- dout(7) << "osd write ino " << inode.ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl;
+ dout(7) << "osd write ino " << hex << inode.ino << dec << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl;
size_t off = 0; // ptr into buffer
op_modify[last_tid] = p;
// send
- dout(15) << " write on " << last_tid << endl;
+ dout(15) << " write on " << last_tid << " oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << endl;
messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
}
list<OSDExtent> extents;
file_to_extents(inode, old_size, new_size, extents);
- dout(7) << "osd truncate ino " << inode.ino << " to new size " << new_size << " from old_size " << old_size << " in " << extents.size() << " extents" << endl;
+ dout(7) << "osd truncate ino " << hex << inode.ino << dec << " to new size " << new_size << " from old_size " << old_size << " in " << extents.size() << " extents" << endl;
int n = 0;
for (list<OSDExtent>::iterator it = extents.begin();
void dispatch(Message *m);
- bool is_active() {
- if (!op_reads.empty() ||
- !op_modify.empty() ||
- !op_probes.empty()) return true;
- return false;
- }
+ bool is_active();
// osd fun
int read(inode_t& inode,