From 7fc860c308f34178c8f5374be1c02977248bb251 Mon Sep 17 00:00:00 2001 From: sage Date: Fri, 4 Aug 2006 23:15:35 +0000 Subject: [PATCH] stabilized new rados logging stuff git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@787 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 31 +++-- ceph/TODO | 28 ++-- ceph/client/Client.cc | 14 +- ceph/config.cc | 2 +- ceph/config.h | 3 +- ceph/include/types.h | 2 +- ceph/messages/MOSDOp.h | 6 + ceph/msg/NewMessenger.cc | 16 ++- ceph/osd/OSD.cc | 271 +++++++++++++++++++++----------------- ceph/osd/OSD.h | 21 ++- ceph/osd/PG.cc | 263 +++++++++++++++++++++++++++--------- ceph/osd/PG.h | 27 ++-- ceph/osdc/Filer.h | 6 +- ceph/osdc/ObjectCacher.cc | 51 ++++++- ceph/osdc/ObjectCacher.h | 3 + ceph/osdc/Objecter.cc | 87 +++++++----- ceph/osdc/Objecter.h | 12 +- 17 files changed, 560 insertions(+), 283 deletions(-) diff --git a/ceph/Makefile b/ceph/Makefile index 762dc9ae97b0f..60089928dfda4 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -49,21 +49,24 @@ OSD_OBJS= \ ebofs.o\ osd/OSD.o +OSDC_OBJS= \ + client/Filer.o\ + client/ObjectCacher.o\ + client/Objecter.o + COMMON_OBJS= \ msg/Messenger.o\ msg/Dispatcher.o\ msg/HostMonitor.o\ - client/Filer.o\ - client/FileCache.o\ - client/ObjectCacher.o\ - client/Objecter.o\ mds/MDCluster.o\ common/Logger.o\ common/Clock.o\ common/Timer.o\ config.o + CLIENT_OBJS= \ + client/FileCache.o\ client/Client.o\ client/SyntheticClient.o\ client/Trace.o @@ -87,7 +90,10 @@ obfs: depend obfstest cosd: cosd.cc osd.o msg/NewMessenger.o common.o ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@ -cfuse: cfuse.cc client.o client/fuse.o msg/NewMessenger.o common.o +cmds: cmds.cc mds.o osdc.o msg/NewMessenger.o common.o + ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@ + +cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/NewMessenger.o common.o ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ @@ -109,16 +115,16 @@ mpifuse: mpifuse.cc mds.o client.o client/fuse.o ${TCP_OBJS} common.o # synthetic workload -fakesyn: fakesyn.cc mds.o client.o osd.o msg/FakeMessenger.o common.o +fakesyn: fakesyn.cc mds.o client.o osd.o osdc.o msg/FakeMessenger.o common.o ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ -mpisyn: mpisyn.cc mds.o client.o osd.o msg/MPIMessenger.cc common.o +mpisyn: mpisyn.cc mds.o client.o osd.o osdc.o msg/MPIMessenger.cc common.o ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ -tcpsyn: tcpsyn.cc mds.o client.o osd.o ${TCP_OBJS} common.o +tcpsyn: tcpsyn.cc mds.o client.o osd.o osdc.o ${TCP_OBJS} common.o ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ -newsyn: newsyn.cc mds.o client.o osd.o msg/NewMessenger.o common.o +newsyn: newsyn.cc mds.o client.o osd.o osdc.o msg/NewMessenger.o common.o ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ # + obfs @@ -142,7 +148,7 @@ ebofs: mkfs.ebofs test.ebofs # libceph -libceph.o: client/ldceph.o client/Client.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS} +libceph.o: client/ldceph.o client/Client.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS} ${OSDC_OBJS} ld -i $^ -o $@ bench/mdtest/mdtest.o: bench/mdtest/mdtest.c @@ -173,12 +179,15 @@ common.o: ${COMMON_OBJS} ebofs.o: ${EBOFS_OBJS} ld -i -o $@ $^ -client.o: ${CLIENT_OBJS} +client.o: ${CLIENT_OBJS} ld -i -o $@ $^ osd.o: ${OSD_OBJS} ld -i -o $@ $^ +osdc.o: ${OSDC_OBJS} + ld -i -o $@ $^ + osd_obfs.o: osd/OBFSStore.o osd/OSD.ccosd/PG.o osd/ObjectStore.o osd/FakeStore.o ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a diff --git a/ceph/TODO b/ceph/TODO index 9747f883145b5..330d962d8276c 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -1,6 +1,6 @@ -FAST rados paper +== FAST rados paper cluster map @@ -31,6 +31,10 @@ ebofs + + +== rados client nodes + why do we want client op ordering? - simpler logic in objectcacher - can pipeline lock + write + unlock, etc. @@ -69,7 +73,6 @@ Objecter: - accept acks from current|prior primary. - only accept commits from current primary. - - need pg map - to detect primary changes, - pg crashes @@ -77,26 +80,26 @@ Objecter: +== todo + -- gzip in messenger? -remaining hard problems -- how to cope with file size changes and read/write sharing -- how to choose osd monitor peer sets -- mds failure recovery (of course) +- gzip in messenger? osd/rados /- add caller to log /- make modify ops idempotent - rdlocks +- test wrnoop - pg_bit changes - use pg->info.same_role_since wrt replication ops. - report crashed pgs? - recover period for crashed pgs osdmonitor +- monitor pgs - watch osd utilization; adjust overload in cluster map objecter @@ -104,12 +107,11 @@ objecter /- detect, cope with crashed PGs, wait for map updates, etc. /- filer read/write: flip offset,size args - replay ops (in order) after pg crash -- sequential lock acquisition objectcacher -- locking -- atomic read/write ops -- flusher +- ocacher caps transitions vs locks +- ocacher flushing, pin Objects* while locks held +- test read locks reliability @@ -126,6 +128,10 @@ bugs/stability general - timer needs cancel sets, schedulers need to cancel outstanding events on shutdown +remaining hard problems +- how to cope with file size changes and read/write sharing +- mds failure recovery (of course) + crush - more efficient failure when all/too many osds are down diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 64b3e4b698cfb..0d22030847f78 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -1770,15 +1770,11 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) r = filer->read(in->inode, offset, size, &blist, onfinish); - if (r == 0) { - // wait! - while (!done) - cond.Wait(client_lock); - } else { - // had it cached, apparently! - assert(r > 0); - delete onfinish; - } + assert(r >= 0); + + // wait! + while (!done) + cond.Wait(client_lock); } // copy data into caller's char* buf diff --git a/ceph/config.cc b/ceph/config.cc index 0464d26670536..d1c0bf18fcaa8 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -174,9 +174,9 @@ md_config_t g_conf = { osd_maxthreads: 2, // 0 == no threading osd_max_opq: 10, osd_mkfs: false, - osd_fake_sync: false, osd_age: .8, osd_age_time: 0, + osd_heartbeat_interval: 10, // --- fakestore --- fakestore_fake_sync: 2, // 2 seconds diff --git a/ceph/config.h b/ceph/config.h index 620053eebb9ea..8b7f533ee648b 100644 --- a/ceph/config.h +++ b/ceph/config.h @@ -147,10 +147,9 @@ struct md_config_t { int osd_maxthreads; int osd_max_opq; bool osd_mkfs; - bool osd_fake_sync; float osd_age; int osd_age_time; - + int osd_heartbeat_interval; int fakestore_fake_sync; bool fakestore_fsync; diff --git a/ceph/include/types.h b/ceph/include/types.h index 1a4e60835566c..829242b7aa1a9 100644 --- a/ceph/include/types.h +++ b/ceph/include/types.h @@ -263,7 +263,7 @@ inline bool operator>=(const eversion_t& l, const eversion_t& r) { return (l.epoch == r.epoch) ? (l.version >= r.version):(l.epoch >= r.epoch); } inline ostream& operator<<(ostream& out, const eversion_t e) { - return out << e.epoch << "." << e.version; + return out << e.epoch << "'" << e.version; } diff --git a/ceph/messages/MOSDOp.h b/ceph/messages/MOSDOp.h index fd9ae0669d7bf..8d73570c68289 100644 --- a/ceph/messages/MOSDOp.h +++ b/ceph/messages/MOSDOp.h @@ -58,6 +58,8 @@ #define OSD_OP_REP_WRUNLOCK (100+OSD_OP_WRUNLOCK) #define OSD_OP_REP_RDLOCK (100+OSD_OP_RDLOCK) #define OSD_OP_REP_RDUNLOCK (100+OSD_OP_RDUNLOCK) +#define OSD_OP_REP_UPLOCK (100+OSD_OP_UPLOCK) +#define OSD_OP_REP_DNLOCK (100+OSD_OP_DNLOCK) #define OSD_OP_REP_PULL 30 // whole object read //#define OSD_OP_REP_PUSH 31 // whole object write @@ -103,6 +105,10 @@ public: case OSD_OP_TRUNCATE: return "truncate"; case OSD_OP_WRLOCK: return "wrlock"; case OSD_OP_WRUNLOCK: return "wrunlock"; + case OSD_OP_RDLOCK: return "rdlock"; + case OSD_OP_RDUNLOCK: return "rdunlock"; + case OSD_OP_UPLOCK: return "uplock"; + case OSD_OP_DNLOCK: return "dnlock"; case OSD_OP_WRNOOP: return "wrnoop"; default: assert(0); } diff --git a/ceph/msg/NewMessenger.cc b/ceph/msg/NewMessenger.cc index 6a6c11f958e57..9bca8c6ef2c42 100644 --- a/ceph/msg/NewMessenger.cc +++ b/ceph/msg/NewMessenger.cc @@ -401,7 +401,7 @@ void *Rank::Receiver::entry() << " inst " << m->get_source_inst() << " > " << rank.entity_map[m->get_source()] << ", WATCH OUT " << *m << endl; - //rank.entity_map[m->get_source()] = m->get_source_inst(); + rank.entity_map[m->get_source()] = m->get_source_inst(); } if (m->get_dest().type() == MSG_ADDR_RANK_BASE) { @@ -1512,9 +1512,17 @@ void Rank::mark_up(msg_addr_t a, entity_inst_t& i) assert(i.rank != my_rank); // hrm? - assert(entity_map.count(a) == 0); - entity_map[a] = i; - connect_rank(i); + if (entity_map.count(a) == 0 || + entity_map[a] < i) { + entity_map[a] = i; + connect_rank(i); + } else if (entity_map[a] == i) { + dout(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl; + derr(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl; + } else { + dout(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl; + derr(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl; + } //if (waiting_for_lookup.count(a)) //lookup(a); diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 55ebd25e61a5b..dc0f13c041017 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -32,11 +32,12 @@ #include "msg/Messenger.h" #include "msg/Message.h" -#include "msg/HostMonitor.h" +//#include "msg/HostMonitor.h" #include "messages/MGenericMessage.h" #include "messages/MPing.h" #include "messages/MPingAck.h" +#include "messages/MOSDPing.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" #include "messages/MOSDBoot.h" @@ -118,6 +119,9 @@ OSD::OSD(int id, Messenger *m, char *dev) if (g_conf.osd_remount_at) g_timer.add_event_after(g_conf.osd_remount_at, new C_Remount(this)); + next_heartbeat = new C_Heartbeat(this); + g_timer.add_event_after(g_conf.osd_heartbeat_interval, next_heartbeat); + // init object store // try in this order: @@ -159,7 +163,7 @@ OSD::~OSD() { if (threadpool) { delete threadpool; threadpool = 0; } if (osdmap) { delete osdmap; osdmap = 0; } - if (monitor) { delete monitor; monitor = 0; } + //if (monitor) { delete monitor; monitor = 0; } if (messenger) { delete messenger; messenger = 0; } if (logger) { delete logger; logger = 0; } if (store) { delete store; store = 0; } @@ -206,7 +210,8 @@ int OSD::init() } // monitor - char s[80]; + /* + char s[80]; sprintf(s, "osd%d", whoami); string st = s; monitor = new HostMonitor(messenger, st); @@ -223,6 +228,7 @@ int OSD::init() monitor->get_notify().insert(MSG_ADDR_MON(0)); // + */ // log char name[80]; @@ -270,6 +276,8 @@ int OSD::shutdown() { dout(1) << "shutdown, timer has " << g_timer.num_event << endl; + if (next_heartbeat) g_timer.cancel_event(next_heartbeat); + // finish ops wait_for_no_ops(); @@ -286,7 +294,7 @@ int OSD::shutdown() pg_map.clear(); // shut everything else down - monitor->shutdown(); + //monitor->shutdown(); messenger->shutdown(); osd_lock.Unlock(); @@ -418,28 +426,108 @@ void OSD::_remove_pg(pg_t pgid) } +// ------------------------------------- + +void OSD::heartbeat() +{ + osd_lock.Lock(); + + utime_t now = g_clock.now(); + utime_t since = now; + since.sec_ref() -= g_conf.osd_heartbeat_interval; + + dout(15) << "heartbeat " << now << endl; + + // send pings + set pingset; + for (hash_map::iterator i = pg_map.begin(); + i != pg_map.end(); + i++) { + PG *pg = i->second; + + // we want to ping the primary. + if (pg->get_role() <= 0) continue; + if (pg->acting.size() < 1) continue; + + if (pg->last_heartbeat < since) { + pg->last_heartbeat = now; + pingset.insert(pg->acting[0]); + } + } + for (set::iterator i = pingset.begin(); + i != pingset.end(); + i++) + messenger->send_message(new MOSDPing(osdmap->get_epoch()), + MSG_ADDR_OSD(*i)); + + + // schedule next! + next_heartbeat = new C_Heartbeat(this); + g_timer.add_event_after(g_conf.osd_heartbeat_interval, next_heartbeat); + + osd_lock.Unlock(); +} + // -------------------------------------- // dispatch +void OSD::share_map(msg_addr_t who, epoch_t epoch) +{ + // does client have old map? + if (who.is_client()) { + if (epoch < osdmap->get_epoch()) { + dout(10) << who << " has old map " << epoch << " < " << osdmap->get_epoch() << endl; + send_incremental_map(epoch, who, true); + } + } + + // does peer have old map? + if (who.is_osd()) { + // remember + if (peer_map_epoch[who] < epoch) + peer_map_epoch[who] = epoch; + + // older? + if (peer_map_epoch[who] < osdmap->get_epoch()) { + dout(10) << who << " has old map " << epoch << " < " << osdmap->get_epoch() << endl; + send_incremental_map(epoch, who, true); + peer_map_epoch[who] = osdmap->get_epoch(); // so we don't send it again. + } + } +} + void OSD::dispatch(Message *m) { // check clock regularly - utime_t now = g_clock.now(); + //utime_t now = g_clock.now(); //dout(-20) << now << endl; + // -- don't need lock -- + switch (m->get_type()) { + case MSG_PING: + dout(10) << "ping from " << m->get_source() << endl; + delete m; + return; + + } + + + // lock! osd_lock.Lock(); switch (m->get_type()) { // -- don't need OSDMap -- + /* // host monitor case MSG_PING_ACK: case MSG_FAILURE_ACK: monitor->proc_message(m); break; + */ // map and replication case MSG_OSD_MAP: @@ -453,11 +541,6 @@ void OSD::dispatch(Message *m) break; - case MSG_PING: - // take note. - monitor->host_is_alive(m->get_source()); - handle_ping((MPing*)m); - break; // -- need OSDMap -- @@ -481,8 +564,18 @@ void OSD::dispatch(Message *m) break; } + + + + // need OSDMap switch (m->get_type()) { + + case MSG_OSD_PING: + // take note. + dout(20) << "osdping from " << m->get_source() << endl; + share_map(m->get_source(), ((MOSDPing*)m)->map_epoch); + break; case MSG_OSD_PG_NOTIFY: handle_pg_notify((MOSDPGNotify*)m); @@ -498,13 +591,11 @@ void OSD::dispatch(Message *m) break; case MSG_OSD_OP: - monitor->host_is_alive(m->get_source()); handle_op((MOSDOp*)m); break; // for replication etc. case MSG_OSD_OPREPLY: - monitor->host_is_alive(m->get_source()); handle_op_reply((MOSDOpReply*)m); break; @@ -557,6 +648,7 @@ void OSD::handle_op_reply(MOSDOpReply *m) op_rep_pull_reply(m); break; + case OSD_OP_REP_WRNOOP: case OSD_OP_REP_WRITE: case OSD_OP_REP_TRUNCATE: case OSD_OP_REP_DELETE: @@ -564,6 +656,8 @@ void OSD::handle_op_reply(MOSDOpReply *m) case OSD_OP_REP_WRUNLOCK: case OSD_OP_REP_RDLOCK: case OSD_OP_REP_RDUNLOCK: + case OSD_OP_REP_UPLOCK: + case OSD_OP_REP_DNLOCK: { const pg_t pgid = m->get_pg(); if (pg_map.count(pgid)) { @@ -649,6 +743,7 @@ void OSD::handle_rep_op_ack(PG *pg, __uint64_t tid, int result, bool commit, +/* void OSD::handle_ping(MPing *m) { dout(7) << "got ping, replying" << endl; @@ -656,6 +751,7 @@ void OSD::handle_ping(MPing *m) m->get_source(), m->get_source_port(), 0); delete m; } +*/ @@ -807,7 +903,7 @@ void OSD::handle_osd_map(MOSDMap *m) // all the way? if (advanced && cur == superblock.newest_map) { // yay! - activate_map(); + activate_map(t); // process waiters take_waiters(waiting_for_osdmap); @@ -864,7 +960,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) pg->info.last_epoch_started = pg->info.same_primary_since = pg->info.same_role_since = osdmap->get_epoch(); - pg->state_set(PG::STATE_ACTIVE); + pg->activate(t); dout(7) << "created " << *pg << endl; } @@ -881,8 +977,8 @@ void OSD::advance_map(ObjectStore::Transaction& t) pg->info.last_epoch_started = pg->info.same_primary_since = pg->info.same_role_since = osdmap->get_epoch(); - pg->state_set(PG::STATE_ACTIVE); - + pg->activate(t); + dout(7) << "created " << *pg << endl; } @@ -1040,7 +1136,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) } } -void OSD::activate_map() +void OSD::activate_map(ObjectStore::Transaction& t) { dout(7) << "activate_map version " << osdmap->get_epoch() << endl; @@ -1061,7 +1157,7 @@ void OSD::activate_map() else if (pg->get_role() == 0 && !pg->is_active()) { // i am (inactive) primary pg->build_prior(); - pg->peer(query_map); + pg->peer(t, query_map); } else if (pg->is_stray()) { // i am residual|replica @@ -1141,29 +1237,20 @@ bool OSD::get_inc_map(epoch_t e, OSDMap::Incremental &inc) bool OSD::require_current_map(Message *m, epoch_t ep) { - int from = MSG_ADDR_NUM(m->get_source()); - // older map? if (ep < osdmap->get_epoch()) { - dout(7) << " from old map epoch " << ep << " < " << osdmap->get_epoch() << endl; + dout(7) << "require_current_map epoch " << ep << " < " << osdmap->get_epoch() << endl; delete m; // discard and ignore. return false; } // newer map? if (ep > osdmap->get_epoch()) { - dout(7) << " from newer map epoch " << ep << " > " << osdmap->get_epoch() << endl; + dout(7) << "require_current_map epoch " << ep << " > " << osdmap->get_epoch() << endl; wait_for_new_map(m); return false; } - // down? - if (osdmap->is_down(from)) { - dout(7) << " from down OSD osd" << from << ", dropping" << endl; - // FIXME - return false; - } - assert(ep == osdmap->get_epoch()); return true; } @@ -1175,6 +1262,8 @@ bool OSD::require_current_map(Message *m, epoch_t ep) */ bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch) { + dout(10) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ")" << endl; + // newer map? if (epoch > osdmap->get_epoch()) { dout(7) << " from newer map epoch " << epoch << " > " << osdmap->get_epoch() << endl; @@ -1188,21 +1277,6 @@ bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch) return false; } - // down osd? - if (m->get_source().is_osd() && - osdmap->is_down(m->get_source().num())) { - if (epoch > osdmap->get_epoch()) { - dout(7) << "msg from down " << m->get_source() - << ", waiting for new map" << endl; - wait_for_new_map(m); - } else { - dout(7) << "msg from down " << m->get_source() - << ", dropping" << endl; - delete m; - } - return false; - } - return true; } @@ -1301,7 +1375,7 @@ void OSD::load_pgs() if (pg->acting[i] == whoami) role = i>0 ? 1:0; pg->set_role(role); - dout(10) << "load_pgs loaded " << *pg << endl; + dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl; } } @@ -1494,7 +1568,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) pg->adjust_prior(); // peer - pg->peer(query_map); + pg->peer(t, query_map); } _unlock_pg(pgid); @@ -1550,12 +1624,10 @@ void OSD::handle_pg_log(MOSDPGLog *m) // merge into our own log pg->merge_log(m->log, m->missing, from); - - pg->clean_up_local(); // peer map< int, map > query_map; - pg->peer(query_map); + pg->peer(t, query_map); do_queries(query_map); } else { @@ -1566,22 +1638,14 @@ void OSD::handle_pg_log(MOSDPGLog *m) pg->merge_log(m->log, m->missing, from); assert(pg->missing.num_lost() == 0); - // clean up any stray objects - pg->clean_up_local(); - // ok active! - pg->info.last_epoch_started = osdmap->get_epoch(); pg->info.same_primary_since = m->info.same_primary_since; - pg->state_set(PG::STATE_ACTIVE); + pg->activate(t); // take any waiters take_waiters(pg->waiting_for_active); - - // initiate any recovery? - pg->start_recovery(); } - pg->write_log(t); unsigned tr = store->apply_transaction(t); assert(tr == 0); @@ -1732,7 +1796,7 @@ void OSD::handle_pg_remove(MOSDPGRemove *m) _remove_pg(pgid); - // unlock. there shouldn't be any waiters, since we're a stray, and pg is presumably clean. + // unlock. there shouldn't be any waiters, since we're a stray, and pg is presumably clean0. assert(pg_lock_waiters.count(pgid) == 0); _unlock_pg(pgid); } @@ -1760,10 +1824,9 @@ void OSD::pull(PG *pg, object_t oid, eversion_t v) assert(pg->missing.loc.count(oid)); int osd = pg->missing.loc[oid]; - dout(7) << "pull " << hex << oid << dec + dout(7) << *pg << " pull " << hex << oid << dec << " v " << v << " from osd" << osd - << " in " << *pg << endl; // send op @@ -1878,8 +1941,6 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op) t.write(oid, 0, op->get_length(), op->get_data()); t.setattrs(oid, op->get_attrset()); t.collection_add(pgid, oid); - unsigned r = store->apply_transaction(t); - assert(r == 0); // close out pull op. pg->objects_pulling.erase(oid); @@ -1890,37 +1951,23 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op) take_waiters(pg->waiting_for_missing_object[oid]); // raise last_complete? + assert(pg->log.complete_to != pg->log.log.end()); while (pg->log.complete_to != pg->log.log.end()) { if (pg->missing.missing.count(pg->log.complete_to->oid)) break; - pg->info.last_complete = pg->log.complete_to->version; - pg++; + if (pg->info.last_complete < pg->log.complete_to->version) + pg->info.last_complete = pg->log.complete_to->version; + pg->log.complete_to++; } dout(10) << *pg << " last_complete now " << pg->info.last_complete << endl; - if (pg->missing.num_missing() == 0) { - assert(pg->info.last_complete == pg->info.last_update); - - if (pg->is_primary()) { - // i am primary - pg->clean_set.insert(whoami); - if (pg->is_all_clean()) { - pg->state_set(PG::STATE_CLEAN); - pg->clean_replicas(); - } - } else { - // tell primary - dout(7) << *pg << " recovery complete, telling primary" << endl; - list ls; - ls.push_back(pg->info); - messenger->send_message(new MOSDPGNotify(osdmap->get_epoch(), - ls), - MSG_ADDR_OSD(pg->get_primary())); - } - } else { - // continue - pg->do_recovery(); - } - + // continue + pg->do_recovery(); + + // apply to disk! + t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info)); + unsigned r = store->apply_transaction(t); + assert(r == 0); + _unlock_pg(pgid); delete op; @@ -2063,6 +2110,8 @@ void OSD::op_rep_modify(MOSDOp *op, PG *pg) MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap->get_epoch(), false); messenger->send_message(ack, op->get_asker()); oncommit->ack(); + + pg->last_heartbeat = g_clock.now(); } } @@ -2079,27 +2128,7 @@ void OSD::handle_op(MOSDOp *op) // require same or newer map if (!require_same_or_newer_map(op, op->get_map_epoch())) return; - // does client have old map? - if (op->get_source().is_client()) { - if (op->get_map_epoch() < osdmap->get_epoch()) { - dout(10) << "handle_op client has old map " << op->get_map_epoch() << " < " << osdmap->get_epoch() << endl; - send_incremental_map(op->get_map_epoch(), op->get_source(), false); - } - } - - // does peer have old map? - if (op->get_source().is_osd()) { - // remember - if (peer_map_epoch[op->get_source()] < op->get_map_epoch()) - peer_map_epoch[op->get_source()] = op->get_map_epoch(); - - // older? - if (peer_map_epoch[op->get_source()] < osdmap->get_epoch()) { - dout(10) << "handle_op osd has old map " << op->get_map_epoch() << " < " << osdmap->get_epoch() << endl; - send_incremental_map(op->get_map_epoch(), op->get_source(), true); - peer_map_epoch[op->get_source()] = osdmap->get_epoch(); // so we don't send it again. - } - } + share_map(op->get_source(), op->get_map_epoch()); // crashed? if (acting_primary < 0) { @@ -2293,6 +2322,7 @@ void OSD::do_op(MOSDOp *op, PG *pg) break; // replicated ops + case OSD_OP_REP_WRNOOP: case OSD_OP_REP_WRITE: case OSD_OP_REP_TRUNCATE: case OSD_OP_REP_DELETE: @@ -2300,6 +2330,8 @@ void OSD::do_op(MOSDOp *op, PG *pg) case OSD_OP_REP_WRUNLOCK: case OSD_OP_REP_RDLOCK: case OSD_OP_REP_RDUNLOCK: + case OSD_OP_REP_UPLOCK: + case OSD_OP_REP_DNLOCK: op_rep_modify(op, pg); break; @@ -2315,6 +2347,7 @@ void OSD::do_op(MOSDOp *op, PG *pg) case OSD_OP_STAT: op_stat(op, pg); break; + case OSD_OP_WRNOOP: case OSD_OP_WRITE: case OSD_OP_ZERO: case OSD_OP_DELETE: @@ -2323,6 +2356,8 @@ void OSD::do_op(MOSDOp *op, PG *pg) case OSD_OP_WRUNLOCK: case OSD_OP_RDLOCK: case OSD_OP_RDUNLOCK: + case OSD_OP_UPLOCK: + case OSD_OP_DNLOCK: op_modify(op, pg); break; default: @@ -2570,20 +2605,18 @@ void OSD::put_repop(OSDReplicaOp *repop) if (repop->can_delete()) { // adjust peers_complete_thru if (!repop->pg_complete_thru.empty()) { - map::iterator p = repop->pg_complete_thru.begin(); - eversion_t min = p->second; - p++; - while (p != repop->pg_complete_thru.end()) { - if (p->second < min) min = p->second; - p++; - } - pg_t pgid = repop->op->get_pg(); osd_lock.Lock(); PG *pg = get_pg(pgid); if (pg) { + eversion_t min = pg->info.last_complete; // hrm.... + for (unsigned i=1; iacting.size(); i++) { + if (repop->pg_complete_thru[i] < min) // note: if we haven't heard, it'll be zero, which is what we want. + min = repop->pg_complete_thru[i]; + } + if (min > pg->peers_complete_thru) { - //dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl; + dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl; pg->peers_complete_thru = min; } //_unlock_pg(pgid); diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 5796bb200e6d2..dde3a6be60f51 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -105,8 +105,17 @@ public: char dev_path[100]; class ObjectStore *store; - // failure monitoring - class HostMonitor *monitor; + // heartbeat + void heartbeat(); + + class C_Heartbeat : public Context { + OSD *osd; + public: + C_Heartbeat(OSD *o) : osd(o) {} + void finish(int r) { + osd->heartbeat(); + } + } *next_heartbeat; // global lock Mutex osd_lock; @@ -161,15 +170,15 @@ public: // -- osd map -- class OSDMap *osdmap; list waiting_for_osdmap; - //map osdmaps; hash_map peer_map_epoch; - + void share_map(msg_addr_t who, epoch_t epoch); + void wait_for_new_map(Message *m); void handle_osd_map(class MOSDMap *m); void advance_map(ObjectStore::Transaction& t); - void activate_map(); + void activate_map(ObjectStore::Transaction& t); bool get_map_bl(epoch_t e, bufferlist& bl); bool get_map(epoch_t e, OSDMap &m); @@ -238,7 +247,7 @@ public: // messages virtual void dispatch(Message *m); - void handle_ping(class MPing *m); + //void handle_ping(class MPing *m); void handle_op(class MOSDOp *m); void op_read(class MOSDOp *m, PG *pg); diff --git a/ceph/osd/PG.cc b/ceph/osd/PG.cc index 83b00a43ac0aa..ac431ff830661 100644 --- a/ceph/osd/PG.cc +++ b/ceph/osd/PG.cc @@ -18,6 +18,7 @@ #include "OSD.h" +#include "messages/MOSDPGNotify.h" #include "messages/MOSDPGLog.h" #include "messages/MOSDPGRemove.h" @@ -38,8 +39,8 @@ void PG::Log::copy_after(const Log &other, eversion_t v) i++) { if (i->version <= v) break; log.push_front(*i); - bottom = i->version; } + bottom = v; } @@ -51,8 +52,8 @@ void PG::IndexedLog::trim(ObjectStore::Transaction& t, eversion_t s) while (!log.empty()) { Entry &e = *log.begin(); - assert(requested_to != log.begin()); assert(complete_to != log.begin()); + assert(requested_to != log.begin()); // remove from index, unindex(e); @@ -72,6 +73,11 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd) dout(10) << "merge_log " << olog << " from osd" << fromosd << " into " << log << endl; + cout << "log" << endl; + log.print(cout); + cout << "olog" << endl; + olog.print(cout); + if (log.empty() || (olog.bottom > log.top && olog.backlog)) { // e.g. log=(0,20] olog=(40,50]+backlog) // i'm missing everything after old log.top. @@ -98,13 +104,16 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd) // take the whole thing. log.log.swap(olog.log); - log.top = olog.top; - log.bottom = olog.bottom; - log.backlog = olog.backlog; + + info.last_update = log.top = olog.top; + info.log_bottom = log.bottom = olog.bottom; + info.log_backlog = log.backlog = olog.backlog; log.index(); dout(10) << "merge_log result " << log << " " << missing << endl; + log.print(cout); + dout(10) << "missing " << hex << missing.missing << dec << endl; return; } @@ -162,8 +171,9 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd) list::iterator to = olog.log.end(); list::iterator from = olog.log.end(); while (1) { + if (from == olog.log.begin()) break; from--; - if (from->version > log.top) { + if (from->version < log.top) { from++; break; } @@ -185,12 +195,14 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd) // splice log.log.splice(log.log.end(), - log.log, from, to); + olog.log, from, to); - log.top = olog.top; + info.last_update = log.top = olog.top; } dout(10) << "merge_log result " << log << " " << missing << endl; + log.print(cout); + dout(10) << "missing " << hex << missing.missing << dec << endl; } @@ -205,6 +217,7 @@ void PG::generate_backlog() osd->store->collection_list(info.pgid, olist); int local = 0; + map add; for (list::iterator it = olist.begin(); it != olist.end(); it++) { @@ -219,28 +232,34 @@ void PG::generate_backlog() osd->store->getattr(*it, "version", &e.version, sizeof(e.version)); - log.log.push_front(e); + add[e.version] = e; + } - // index - log.index( *log.log.begin() ); + for (map::iterator i = add.begin(); + i != add.end(); + i++) { + log.log.push_front(i->second); + log.index( *log.log.begin() ); // index } dout(10) << local << " local objects, " + << add.size() << " objects added to backlog, " << log.objects.size() << " in pg" << endl; } void PG::drop_backlog() { - dout(10) << "dropping backlog for " << log << endl; + dout(10) << "drop_backlog for " << log << endl; + log.print(cout); assert(log.backlog); log.backlog = false; while (!log.log.empty()) { Log::Entry &e = *log.log.begin(); - if (e.version == log.bottom) break; - assert(e.version < log.bottom); + if (e.version > log.bottom) break; + dout(10) << "drop_backlog trimming " << e.version << endl; log.unindex(e); log.log.pop_front(); } @@ -322,7 +341,8 @@ void PG::adjust_prior() } -void PG::peer(map< int, map >& query_map) +void PG::peer(ObjectStore::Transaction& t, + map< int, map >& query_map) { dout(10) << "peer. acting is " << acting << ", prior_set is " << prior_set << endl; @@ -357,7 +377,7 @@ void PG::peer(map< int, map >& query_map) eversion_t newest_update = info.last_update; int newest_update_osd = osd->whoami; eversion_t oldest_update_needed = info.last_update; // only of acting (current) osd set - eversion_t peers_complete_thru = info.last_complete; + peers_complete_thru = info.last_complete; for (map::iterator it = peer_info.begin(); it != peer_info.end(); @@ -488,9 +508,7 @@ void PG::peer(map< int, map >& query_map) // -- ok, activate! - info.last_epoch_started = osd->osdmap->get_epoch(); - state_set(PG::STATE_ACTIVE); // i am active! - dout(10) << "marking active" << endl; + activate(t); clean_set.clear(); if (info.is_clean()) @@ -543,6 +561,8 @@ void PG::peer(map< int, map >& query_map) dout(10) << "sending " << m->log << " " << m->missing << " to osd" << peer << endl; + m->log.print(cout); + osd->messenger->send_message(m, MSG_ADDR_OSD(peer)); } @@ -553,10 +573,122 @@ void PG::peer(map< int, map >& query_map) clean_replicas(); } + // waiters osd->take_waiters(waiting_for_active); } +void PG::activate(ObjectStore::Transaction& t) +{ + // twiddle pg state + state_set(STATE_ACTIVE); + state_clear(PG::STATE_STRAY); + info.last_epoch_started = osd->osdmap->get_epoch(); + + if (role == 0) { // primary state + peers_complete_thru = 0; // we don't know (yet)! + } + + assert(info.last_complete >= log.bottom || log.backlog); + + // write pg info + t.collection_setattr(info.pgid, "info", (char*)&info, sizeof(info)); + + // write log + write_log(t); + + // clean up stray objects + clean_up_local(t); + + // init complete pointer + if (info.last_complete == info.last_update) { + dout(10) << "activate - complete" << endl; + log.complete_to == log.log.end(); + log.requested_to = log.log.end(); + } else { + dout(10) << "activate - not complete, starting recovery" << endl; + + // init complete_to + log.complete_to = log.log.begin(); + while (log.complete_to->version < info.last_complete) { + log.complete_to++; + assert(log.complete_to != log.log.end()); + } + + // start recovery + log.requested_to = log.log.begin(); + do_recovery(); + } +} + +/** clean_up_local + * remove any objects that we're storing but shouldn't. + * as determined by log. + */ +void PG::clean_up_local(ObjectStore::Transaction& t) +{ + dout(10) << "clean_up_local" << endl; + + assert(info.last_update >= log.bottom); // otherwise we need some help! + + if (log.backlog) { + // be thorough. + list ls; + osd->store->collection_list(info.pgid, ls); + set s; + + for (list::iterator i = ls.begin(); + i != ls.end(); + i++) + s.insert(*i); + + set did; + for (list::reverse_iterator p = log.log.rbegin(); + p != log.log.rend(); + p++) { + if (did.count(p->oid)) continue; + did.insert(p->oid); + + if (p->is_delete()) { + if (s.count(p->oid)) { + dout(10) << " deleting " << hex << p->oid << dec + << " when " << p->version << endl; + t.remove(p->oid); + } + s.erase(p->oid); + } else { + // just leave old objects.. they're missing or whatever + s.erase(p->oid); + } + } + + for (set::iterator i = s.begin(); + i != s.end(); + i++) { + dout(10) << " deleting stray " << hex << *i << dec << endl; + t.remove(*i); + } + + } else { + // just scan the log. + set did; + for (list::reverse_iterator p = log.log.rbegin(); + p != log.log.rend(); + p++) { + if (did.count(p->oid)) continue; + did.insert(p->oid); + + if (p->is_delete()) { + dout(10) << " deleting " << hex << p->oid << dec + << " when " << p->version << endl; + t.remove(p->oid); + } else { + // keep old(+missing) objects, just for kicks. + } + } + } +} + /** * do one recovery op. @@ -569,59 +701,52 @@ bool PG::do_recovery() // look at log! Log::Entry *latest = 0; - while (1) { - if (log.requested_to == log.log.end()) { - dout(10) << "already requested everything in log" << endl; - return false; - } + while (log.requested_to != log.log.end()) { + dout(10) << "do_recovery " + << log.requested_to->version + << (log.requested_to->is_update() ? " update":" delete") + << " " << hex << log.requested_to->oid << dec + << endl; + assert(log.objects.count(log.requested_to->oid)); latest = log.objects[log.requested_to->oid]; + if (latest->is_update() && !objects_pulling.count(latest->oid) && - missing.is_missing(latest->oid)) - break; + missing.is_missing(latest->oid)) { + osd->pull(this, latest->oid, latest->version); + return true; + } log.requested_to++; } - osd->pull(this, latest->oid, latest->version); + // done! + assert(missing.num_missing() == 0); + assert(info.last_complete == info.last_update); - return true; -} + if (is_primary()) { + // i am primary + clean_set.insert(osd->whoami); + if (is_all_clean()) { + state_set(PG::STATE_CLEAN); + clean_replicas(); + } + } else { + // tell primary + dout(7) << "do_recovery complete, telling primary" << endl; + list ls; + ls.push_back(info); + osd->messenger->send_message(new MOSDPGNotify(osd->osdmap->get_epoch(), + ls), + MSG_ADDR_OSD(get_primary())); + } + return false; +} -/** clean_up_local - * remove any objects that we're storing but shouldn't. - * as determined by log. - */ -void PG::clean_up_local() -{ - dout(10) << "clean_up_local" << endl; - assert(info.last_update >= log.bottom); // otherwise we need some help! - set did; - for (list::reverse_iterator p = log.log.rbegin(); - p != log.log.rend(); - p++) { - if (!p->is_delete()) continue; - if (did.count(p->oid)) continue; - did.insert(p->oid); - - if (osd->store->exists(p->oid)) { - eversion_t ov = 0; - osd->store->getattr(p->oid, "version", &ov, sizeof(ov)); - if (ov < p->version) { - dout(10) << " removing " << hex << p->oid << dec - << " v " << ov << " < " << p->version << endl; - osd->store->remove(p->oid); - } else { - dout(10) << " keeping " << hex << p->oid << dec - << " v " << ov << " >= " << p->version << endl; - } - } - } -} void PG::clean_replicas() { @@ -704,7 +829,7 @@ void PG::trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v) void PG::append_log(ObjectStore::Transaction& t, PG::Log::Entry& logentry, eversion_t trim_to) { - // write entry + // write entry on disk bufferlist bl; bl.append( (char*)&logentry, sizeof(logentry) ); t.write( info.pgid, ondisklog.top, bl.length(), bl ); @@ -756,5 +881,23 @@ void PG::read_log(ObjectStore *store) } } log.top = info.last_update; + log.index(); + + // build missing + set did; + for (list::reverse_iterator i = log.log.rbegin(); + i != log.log.rend(); + i++) { + if (i->version <= info.last_complete) break; + if (did.count(i->oid)) continue; + did.insert(i->oid); + + if (i->is_delete()) continue; + + eversion_t v; + int r = osd->store->getattr(i->oid, "version", &v, sizeof(v)); + if (r < 0 || v < i->version) + missing.add(i->oid, i->version); + } } diff --git a/ceph/osd/PG.h b/ceph/osd/PG.h index 10b4dfc62e445..f3a8aefc5503d 100644 --- a/ceph/osd/PG.h +++ b/ceph/osd/PG.h @@ -268,12 +268,11 @@ public: hash_set caller_ops; // recovery pointers - bool recovery_valid; list::iterator requested_to; // not inclusive of referenced item list::iterator complete_to; // not inclusive of referenced item /****/ - IndexedLog() : recovery_valid(false) {} + IndexedLog() {} bool logged_object(object_t oid) { return objects.count(oid); @@ -284,6 +283,7 @@ public: void index() { objects.clear(); + caller_ops.clear(); for (list::iterator i = log.begin(); i != log.end(); i++) { @@ -303,6 +303,7 @@ public: assert(objects.count(e.oid)); if (objects[e.oid]->version == e.version) objects.erase(e.oid); + caller_ops.erase(e.reqid); } // accessors @@ -367,7 +368,7 @@ public: IndexedLog log; OndiskLog ondisklog; Missing missing; - + utime_t last_heartbeat; // protected: int role; // 0 = primary, 1 = replica, -1=none. @@ -379,9 +380,9 @@ protected: epoch_t last_epoch_started_any; eversion_t last_complete_commit; - protected: // [primary only] content recovery state eversion_t peers_complete_thru; + protected: set prior_set; // current+prior OSDs, as defined by last_epoch_started_any. set stray_set; // non-acting osds that have PG data. set clean_set; // current OSDs that are clean @@ -403,7 +404,6 @@ protected: list > waiting_for_missing_object; // recovery - //eversion_t requested_thru; map objects_pulling; // which objects are currently being pulled public: @@ -418,7 +418,6 @@ public: peer_info_requested.clear(); peer_log_requested.clear(); clear_primary_recovery_state(); - peers_complete_thru = 0; } public: @@ -451,18 +450,12 @@ public: void generate_backlog(); void drop_backlog(); - void peer(map< int, map >& query_map); + void peer(ObjectStore::Transaction& t, map< int, map >& query_map); - bool do_recovery(); - void start_recovery() { - log.recovery_valid = true; - log.requested_to = log.log.begin(); - log.complete_to = log.log.begin(); + void activate(ObjectStore::Transaction& t); - do_recovery(); - } + bool do_recovery(); - void clean_up_local(); void clean_replicas(); off_t get_log_write_pos() { @@ -513,6 +506,9 @@ public: } + // pg on-disk content + void clean_up_local(ObjectStore::Transaction& t); + // pg on-disk state void write_log(ObjectStore::Transaction& t); void append_log(ObjectStore::Transaction& t, @@ -553,6 +549,7 @@ inline ostream& operator<<(ostream& out, const PG& pg) { out << "pg[" << pg.info << " r=" << pg.get_role(); + if (pg.get_role() == 0) out << " pct " << pg.peers_complete_thru; if (pg.is_active()) out << " active"; if (pg.is_clean()) out << " clean"; if (pg.is_stray()) out << " stray"; diff --git a/ceph/osdc/Filer.h b/ceph/osdc/Filer.h index 9db25df73db36..b59e60712953a 100644 --- a/ceph/osdc/Filer.h +++ b/ceph/osdc/Filer.h @@ -68,7 +68,7 @@ class Filer { Objecter::OSDRead *rd = new Objecter::OSDRead(bl); file_to_extents(inode, offset, len, rd->extents); - return objecter->readx(rd, onfinish); + return objecter->readx(rd, onfinish) > 0 ? 0:-1; } int write(inode_t& inode, @@ -81,7 +81,7 @@ class Filer { Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl); file_to_extents(inode, offset, len, wr->extents); - return objecter->modifyx(wr, onack, oncommit); + return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1; } int zero(inode_t& inode, @@ -92,7 +92,7 @@ class Filer { Objecter::OSDModify *z = new Objecter::OSDModify(OSD_OP_ZERO); file_to_extents(inode, offset, len, z->extents); - return objecter->modifyx(z, onack, oncommit); + return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1; } diff --git a/ceph/osdc/ObjectCacher.cc b/ceph/osdc/ObjectCacher.cc index 9d498804a07be..be67513002058 100644 --- a/ceph/osdc/ObjectCacher.cc +++ b/ceph/osdc/ObjectCacher.cc @@ -794,9 +794,9 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute // single object. // make sure we aren't already locking/locked... - object_t oid = wr->extents.front()->oid; + object_t oid = wr->extents.front().oid; Object *o = 0; - if (objects.count(oid)) o = get_object(iod, ino); + if (objects.count(oid)) o = get_object(oid, ino); if (!o || (o->lock_state != Object::LOCK_WRLOCK && o->lock_state != Object::LOCK_WRLOCKING && @@ -1195,3 +1195,50 @@ off_t ObjectCacher::release_set(inodeno_t ino) return unclean; } + + +void ObjectCacher::kick_sync_writers(inodeno_t ino) +{ + if (objects_by_ino.count(ino) == 0) { + dout(10) << "kick_sync_writers on " << hex << ino << dec << " dne" << endl; + return; + } + + dout(10) << "kick_sync_writers on " << hex << ino << dec << endl; + + list ls; + + set& s = objects_by_ino[ino]; + for (set::iterator i = s.begin(); + i != s.end(); + i++) { + Object *ob = *i; + + ls.splice(ls.begin(), ob->waitfor_wr); + } + + finish_contexts(ls); +} + +void ObjectCacher::kick_sync_readers(inodeno_t ino) +{ + if (objects_by_ino.count(ino) == 0) { + dout(10) << "kick_sync_readers on " << hex << ino << dec << " dne" << endl; + return; + } + + dout(10) << "kick_sync_readers on " << hex << ino << dec << endl; + + list ls; + + set& s = objects_by_ino[ino]; + for (set::iterator i = s.begin(); + i != s.end(); + i++) { + Object *ob = *i; + + ls.splice(ls.begin(), ob->waitfor_rd); + } + + finish_contexts(ls); +} diff --git a/ceph/osdc/ObjectCacher.h b/ceph/osdc/ObjectCacher.h index fd93b6f2342cd..20614c44faa07 100644 --- a/ceph/osdc/ObjectCacher.h +++ b/ceph/osdc/ObjectCacher.h @@ -406,6 +406,9 @@ class ObjectCacher { off_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean) + void kick_sync_writers(inodeno_t ino); + void kick_sync_readers(inodeno_t ino); + // file functions diff --git a/ceph/osdc/Objecter.cc b/ceph/osdc/Objecter.cc index b6f45dbf38c8e..45cdca4290aec 100644 --- a/ceph/osdc/Objecter.cc +++ b/ceph/osdc/Objecter.cc @@ -100,6 +100,8 @@ void Objecter::handle_osd_map(MOSDMap *m) void Objecter::scan_pgs(set& changed_pgs, set& down_pgs) { + dout(10) << "scan_pgs" << endl; + for (hash_map::iterator i = pg_map.begin(); i != pg_map.end(); i++) { @@ -107,12 +109,22 @@ void Objecter::scan_pgs(set& changed_pgs, set& down_pgs) PG& pg = i->second; int old = pg.primary; - if (pg.calc_primary(pgid, osdmap)) { + pg.calc_primary(pgid, osdmap); + + if (old != pg.primary) { if (osdmap->is_down(old)) { - dout(10) << "scan_pgs pg " << hex << pgid << dec << " went down" << endl; + dout(10) << "scan_pgs pg " << hex << pgid << dec + << " (" << pg.active_tids << ")" + << " primary " << old << " -> " << pg.primary + << " (went down)" + << endl; down_pgs.insert(pgid); } else { - dout(10) << "scan_pgs pg " << hex << pgid << dec << " changed" << endl; + dout(10) << "scan_pgs pg " << hex << pgid << dec + << " (" << pg.active_tids << ")" + << " primary " << old << " -> " << pg.primary + << " (changed)" + << endl; } changed_pgs.insert(pgid); } @@ -121,6 +133,10 @@ void Objecter::scan_pgs(set& changed_pgs, set& down_pgs) void Objecter::kick_requests(set& changed_pgs, set& down_pgs) { + dout(10) << "kick_requests changed " << hex << changed_pgs + << ", down " << down_pgs << dec + << endl; + for (set::iterator i = changed_pgs.begin(); i != changed_pgs.end(); i++) { @@ -143,34 +159,20 @@ void Objecter::kick_requests(set& changed_pgs, set& down_pgs) // WRITE if (wr->waitfor_ack.count(tid)) { - // resubmit if (down_pgs.count(pgid) == 0) { - dout(0) << "kick_requests resub WRNOOP " << tid << endl; + dout(0) << "kick_requests missing ack, resub WRNOOP " << tid << endl; modifyx_submit(wr, wr->waitfor_ack[tid], true); } else { - dout(0) << "kick_requests resub write " << tid << endl; + dout(0) << "kick_requests missing ack, resub write " << tid << endl; modifyx_submit(wr, wr->waitfor_ack[tid]); } wr->waitfor_ack.erase(tid); wr->waitfor_commit.erase(tid); } else if (wr->waitfor_commit.count(tid)) { - // fake it. FIXME. - dout(0) << "kick_requests faking commit on write " << tid << endl; - wr->waitfor_ack.erase(tid); - if (wr->waitfor_ack.empty() && wr->onack) { - wr->onack->finish(0); - delete wr->onack; - wr->onack = 0; - } + dout(0) << "kick_requests missing commit, resub WRNOOP " << tid << endl; + modifyx_submit(wr, wr->waitfor_commit[tid], true); wr->waitfor_commit.erase(tid); - if (wr->waitfor_commit.empty()) { - if (wr->oncommit) { - wr->oncommit->finish(0); - delete wr->oncommit; - } - delete wr; - } } } @@ -201,10 +203,15 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) handle_osd_read_reply(m); break; + case OSD_OP_WRNOOP: case OSD_OP_WRITE: case OSD_OP_ZERO: case OSD_OP_WRUNLOCK: case OSD_OP_WRLOCK: + case OSD_OP_RDLOCK: + case OSD_OP_RDUNLOCK: + case OSD_OP_UPLOCK: + case OSD_OP_DNLOCK: handle_osd_modify_reply(m); break; @@ -218,7 +225,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) tid_t Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl, - Context *onfinish) + Context *onfinish) { OSDRead *rd = new OSDRead(bl); rd->extents.push_back(ObjectExtent(oid, off, len)); @@ -252,9 +259,12 @@ void Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex) OSD_OP_READ); m->set_length(ex.length); m->set_offset(ex.start); - dout(10) << "readx_submit tid " << last_tid << " to osd" << pg.primary + dout(10) << "readx_submit " << rd << " tid " << last_tid << " oid " << hex << ex.oid << dec << " " << ex.start << "~" << ex.length - << " (" << ex.buffer_extents.size() << " buffer fragments)" << endl; + << " (" << ex.buffer_extents.size() << " buffer fragments)" + << " pg " << hex << ex.pgid << dec + << " osd" << pg.primary + << endl; if (pg.primary >= 0) messenger->send_message(m, MSG_ADDR_OSD(pg.primary), 0); @@ -271,9 +281,14 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) { // get pio tid_t tid = m->get_tid(); + + if (op_read.count(tid) == 0) { + dout(7) << "handle_osd_read_reply " << tid << " ... stray" << endl; + delete m; + return; + } + dout(7) << "handle_osd_read_reply " << tid << endl; - - assert(op_read.count(tid)); OSDRead *rd = op_read[ tid ]; op_read.erase( tid ); @@ -303,7 +318,6 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) if (rd->ops.empty()) { // all done size_t bytes_read = 0; - rd->bl->clear(); if (rd->read_data.size()) { dout(15) << " assembling frags" << endl; @@ -409,7 +423,7 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) Context *onfinish = rd->onfinish; dout(7) << " " << bytes_read << " bytes " - //<< rd->bl->length() + << rd->bl->length() << endl; // done @@ -536,9 +550,12 @@ void Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, bool wrnoop) pg.active_tids.insert(last_tid); // send - dout(10) << "modifyx_submit op " << wr->op << " tid " << last_tid - << " osd" << pg.primary << " oid " << hex << ex.oid << dec - << " " << ex.start << "~" << ex.length << endl; + dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << last_tid + << " oid " << hex << ex.oid << dec + << " " << ex.start << "~" << ex.length + << " pg " << hex << ex.pgid << dec + << " osd" << pg.primary + << endl; if (pg.primary >= 0) messenger->send_message(m, MSG_ADDR_OSD(pg.primary), 0); } @@ -549,8 +566,14 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) { // get pio tid_t tid = m->get_tid(); + + if (op_modify.count(tid) == 0) { + dout(7) << "handle_osd_modify_reply " << tid << " commit " << m->get_commit() << " ... stray" << endl; + delete m; + return; + } + dout(7) << "handle_osd_modify_reply " << tid << " commit " << m->get_commit() << endl; - assert(op_modify.count(tid)); OSDModify *wr = op_modify[ tid ]; Context *onack = 0; diff --git a/ceph/osdc/Objecter.h b/ceph/osdc/Objecter.h index 72cc1cb90cce8..abcd46f95d29d 100644 --- a/ceph/osdc/Objecter.h +++ b/ceph/osdc/Objecter.h @@ -40,7 +40,9 @@ class Objecter { map ops; map read_data; // bits of data as they come back - OSDRead(bufferlist *b) : bl(b), onfinish(0) {} + OSDRead(bufferlist *b) : bl(b), onfinish(0) { + bl->clear(); + } }; // generic modify @@ -89,12 +91,8 @@ class Objecter { PG() : primary(-1) {} - bool calc_primary(pg_t pgid, OSDMap *osdmap) { // return true if change - int n = osdmap->get_pg_acting_primary(pgid); - if (n == primary) - return false; - primary = n; - return true; + void calc_primary(pg_t pgid, OSDMap *osdmap) { // return true if change + primary = osdmap->get_pg_acting_primary(pgid); } }; -- 2.39.5