Client::~Client()
{
- if (messenger) { delete messenger; messenger = 0; }
+ tear_down_cache();
+
+ if (objectcacher) {
+ delete objectcacher;
+ objectcacher = 0;
+ }
+
if (filer) { delete filer; filer = 0; }
- if (objectcacher) { delete objectcacher; objectcacher = 0; }
if (objecter) { delete objecter; objecter = 0; }
if (osdmap) { delete osdmap; osdmap = 0; }
+ if (mdsmap) { delete mdsmap; mdsmap = 0; }
- tear_down_cache();
+ if (messenger) { delete messenger; messenger = 0; }
}
if (cap_reap_queue[in->ino()].empty())
cap_reap_queue.erase(in->ino());
}
+ delete m;
return;
}
} else {
//dout(0) << "didn't put_inode" << endl;
}
-
+ delete m;
return;
}
}
}
in->fc.set_caps(new_caps, onimplement);
-
} else {
// caching off.
}
+void FileCache::tear_down()
+{
+ off_t unclean = release_clean();
+ if (unclean) {
+ dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
+ oc->purge_set(inode.ino);
+ }
+}
+
// caps
void FileCache::set_caps(int caps, Context *onimplement)
latest_caps(0),
num_reading(0), num_writing(0),// num_unsafe(0),
waitfor_release(false) {}
+ ~FileCache() {
+ tear_down();
+ }
// waiters/waiting
bool can_read() { return latest_caps & CAP_FILE_RD; }
bool is_cached();
bool is_dirty();
+ void tear_down();
+
int get_caps() { return latest_caps; }
void set_caps(int caps, Context *onimplement=0);
void check_caps();
// go fuse go
cout << "ok, calling fuse_main" << endl;
- cout << "cwd was " << get_current_dir_name() << endl;
int r = fuse_main(newargc, newargv, &ceph_oper);
- cout << "cwd now " << get_current_dir_name() << endl;
return r;
}
{
filename = "";
if (g_conf.use_abspaths) {
- filename = get_current_dir_name();
+ char *cwd = get_current_dir_name();
+ filename = cwd;
+ delete cwd;
filename += "/";
}
}
int join(void **prval = 0) {
- if (thread_id == 0) return -1; // never started.
+ assert(thread_id);
+ //if (thread_id == 0) return -1; // never started.
+
int status = pthread_join(thread_id, prval);
if (status == 0)
thread_id = 0;
scheduled.erase(tp);
lock.Unlock();
+
+ // delete the canceled event.
+ delete callback;
+
return true;
}
if (g_timer.cancel_event(scheduled[c])) {
// hosed wrapper. hose original event too.
- delete scheduled[c];
+ delete c;
} else {
// clean up later.
canceled[c] = scheduled[c];
BarrierQueue(BlockDevice *bd, const char *d) : bdev(bd), dev(d) {
barrier();
}
+ ~BarrierQueue() {
+ for (list<Queue*>::iterator p = qls.begin();
+ p != qls.end();
+ ++p)
+ delete *p;
+ qls.clear();
+ }
int size() {
// this isn't perfectly accurate.
if (!qls.empty())
assert(cursor.open[cursor.level].size() == 0);
assert(depth == 1);
root = -1;
- depth = 0;
- pool.release(cursor.open[0].node);
+ depth = 0;
+ if (cursor.open[0].node)
+ pool.release(cursor.open[0].node);
}
verify("remove 1");
return 0;
#include "common/Timer.h"
-#define NUMMDS g_conf.num_mds
-#define NUMOSD g_conf.num_osd
-#define NUMCLIENT g_conf.num_client
class C_Test : public Context {
public:
}
// create mds
- MDS *mds[NUMMDS];
- OSD *mdsosd[NUMMDS];
- for (int i=0; i<NUMMDS; i++) {
+ MDS *mds[g_conf.num_mds];
+ OSD *mdsosd[g_conf.num_mds];
+ for (int i=0; i<g_conf.num_mds; i++) {
//cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
mds[i] = new MDS(-1, new FakeMessenger(MSG_ADDR_MDS_NEW), monmap);
if (g_conf.mds_local_osd)
}
// create osd
- OSD *osd[NUMOSD];
- for (int i=0; i<NUMOSD; i++) {
+ OSD *osd[g_conf.num_osd];
+ for (int i=0; i<g_conf.num_osd; i++) {
//cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
osd[i] = new OSD(i, new FakeMessenger(MSG_ADDR_OSD(i)), monmap);
start++;
}
// create client
- Client *client[NUMCLIENT];
- SyntheticClient *syn[NUMCLIENT];
- for (int i=0; i<NUMCLIENT; i++) {
+ Client *client[g_conf.num_client];
+ SyntheticClient *syn[g_conf.num_client];
+ for (int i=0; i<g_conf.num_client; i++) {
//cerr << "client" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
client[i] = new Client(new FakeMessenger(MSG_ADDR_CLIENT(i)), monmap);
start++;
for (int i=0; i<g_conf.num_mon; i++) {
mon[i]->init();
}
- for (int i=0; i<NUMMDS; i++) {
+ for (int i=0; i<g_conf.num_mds; i++) {
mds[i]->init();
if (g_conf.mds_local_osd)
mdsosd[i]->init();
}
- for (int i=0; i<NUMOSD; i++) {
+ for (int i=0; i<g_conf.num_osd; i++) {
osd[i]->init();
}
// create client(s)
- for (int i=0; i<NUMCLIENT; i++) {
+ for (int i=0; i<g_conf.num_client; i++) {
client[i]->init();
// use my argc, argv (make sure you pass a mount point!)
}
- for (int i=0; i<NUMCLIENT; i++) {
+ for (int i=0; i<g_conf.num_client; i++) {
cout << "waiting for synthetic client " << i << " to finish" << endl;
syn[i]->join_thread();
fakemessenger_wait();
// cleanup
- for (int i=0; i<NUMMDS; i++) {
+ for (int i=0; i<g_conf.num_mon; i++) {
+ delete mon[i];
+ }
+ for (int i=0; i<g_conf.num_mds; i++) {
delete mds[i];
}
- for (int i=0; i<NUMOSD; i++) {
+ for (int i=0; i<g_conf.num_osd; i++) {
delete osd[i];
}
- for (int i=0; i<NUMCLIENT; i++) {
+ for (int i=0; i<g_conf.num_client; i++) {
delete client[i];
}
*/
class C_Gather : public Context {
public:
+ bool sub_finish(int r) {
+ //cout << "C_Gather sub_finish " << this << endl;
+ assert(waitfor.count(r));
+ waitfor.erase(r);
+ if (!waitfor.empty())
+ return false; // more subs left
+
+ // last one
+ onfinish->finish(0);
+ delete onfinish;
+ onfinish = 0;
+ return true;
+ }
+
class C_GatherSub : public Context {
C_Gather *gather;
int num;
public:
C_GatherSub(C_Gather *g, int n) : gather(g), num(n) {}
void finish(int r) {
- gather->finish(num);
+ if (gather->sub_finish(num))
+ delete gather; // last one!
}
};
+ Context *new_sub() {
+ num++;
+ waitfor.insert(num);
+ return new C_GatherSub(this, num);
+ }
+
private:
Context *onfinish;
std::set<int> waitfor;
int num;
public:
- C_Gather(Context *f) : onfinish(f), num(0) {}
-
+ C_Gather(Context *f) : onfinish(f), num(0) {
+ //cout << "C_Gather new " << this << endl;
+ }
+ ~C_Gather() {
+ //cout << "C_Gather delete " << this << endl;
+ assert(!onfinish);
+ }
void finish(int r) {
- assert(waitfor.count(r));
- waitfor.erase(r);
- if (waitfor.empty()) {
- onfinish->finish(0);
- delete onfinish;
- }
+ // nobody should ever call me.
+ assert(0);
}
- Context *new_sub() {
- num++;
- waitfor.insert(num);
- return new C_GatherSub(this, num);
- }
};
#endif
if (anchormgr) { delete anchormgr; anchormgr = NULL; }
if (anchorclient) { delete anchorclient; anchorclient = NULL; }
if (osdmap) { delete osdmap; osdmap = 0; }
+ if (mdsmap) { delete mdsmap; mdsmap = 0; }
+
+ if (server) { delete server; server = 0; }
+ if (locker) { delete locker; locker = 0; }
if (filer) { delete filer; filer = 0; }
if (objecter) { delete objecter; objecter = 0; }
if (g_conf.use_abspaths) {
// combine it with the cwd, in case fuse screws things up (i.e. fakefuse)
string old = dir;
- dir = get_current_dir_name();
+ char *cwd = get_current_dir_name();
+ dir = cwd;
+ delete cwd;
dir += "/";
dir += old;
}
<< (osdmap.osds.size() - osdmap.osd_inst.size())
<< " osds to boot" << endl;
}
+
+ delete m;
return;
}
dout(18) << "messenger " << mgr << " at " << mgr->get_myname() << " has " << mgr->num_incoming() << " queued" << endl;
-
if (!mgr->is_ready()) {
dout(18) << "messenger " << mgr << " at " << mgr->get_myname() << " has no dispatcher, skipping" << endl;
it++;
FakeMessenger::~FakeMessenger()
{
-
+ // hose any undelivered messages
+ for (list<Message*>::iterator p = incoming.begin();
+ p != incoming.end();
+ ++p)
+ delete *p;
}
assert(directory.count(_myinst.addr) == 1);
shutdown_set.insert(_myinst.addr);
- /*
- directory.erase(myaddr);
- if (directory.empty()) {
- dout(1) << "fakemessenger: last shutdown" << endl;
- ::fm_shutdown = true;
- cond.Signal(); // why not
- }
- */
-
/*
if (loggers[myaddr]) {
delete loggers[myaddr];
lock.Lock();
- // deliver
- try {
#ifdef LOG_MESSAGES
- // stats
- loggers[get_myaddr()]->inc("+send",1);
- loggers[dest]->inc("-recv",1);
-
- char s[20];
- sprintf(s,"+%s", m->get_type_name());
- loggers[get_myaddr()]->inc(s);
- sprintf(s,"-%s", m->get_type_name());
- loggers[dest]->inc(s);
+ // stats
+ loggers[get_myaddr()]->inc("+send",1);
+ loggers[dest]->inc("-recv",1);
+
+ char s[20];
+ sprintf(s,"+%s", m->get_type_name());
+ loggers[get_myaddr()]->inc(s);
+ sprintf(s,"-%s", m->get_type_name());
+ loggers[dest]->inc(s);
#endif
- // queue
- FakeMessenger *dm = directory[inst.addr];
- if (!dm) {
- dout(1) << "** destination " << inst << " dne" << endl;
- for (map<entity_addr_t, FakeMessenger*>::iterator p = directory.begin();
- p != directory.end();
- ++p) {
- dout(1) << "** have " << p->first << " to " << p->second << endl;
- }
- //assert(dm);
- }
- dm->queue_incoming(m);
-
+ // queue
+ if (directory.count(inst.addr)) {
dout(1) << "--> " << get_myname() << " -> " << inst.name << " " << *m << endl;
-
- }
- catch (...) {
- cout << "no destination " << dest << endl;
- assert(0);
+ directory[inst.addr]->queue_incoming(m);
+ } else {
+ dout(0) << "--> " << get_myname() << " -> " << inst.name << " " << *m
+ << " *** destination DNE ***" << endl;
+ for (map<entity_addr_t, FakeMessenger*>::iterator p = directory.begin();
+ p != directory.end();
+ ++p) {
+ dout(0) << "** have " << p->first << " to " << p->second << endl;
+ }
+ //assert(dm);
+ delete m;
}
-
// wake up loop?
if (!awake) {
dout(10) << "waking up fakemessenger thread" << endl;
if (dest.is_osd()) {
// failed osd. drop message, report to mon.
int mon = monmap->pick_mon();
- dout(0) << "ms_handle_failure " << dest << " inst " << inst
+ dout(0) << "ms_handle_failure " << inst
<< ", dropping and reporting to mon" << mon
+ << " " << *m
<< endl;
messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()),
monmap->get_inst(mon));
} else if (dest.is_mon()) {
// resend to a different monitor.
int mon = monmap->pick_mon(true);
- dout(0) << "ms_handle_failure " << dest << " inst " << inst
+ dout(0) << "ms_handle_failure " << inst
<< ", resending to mon" << mon
+ << " " << *m
<< endl;
messenger->send_message(m, monmap->get_inst(mon));
}
else {
// client?
- dout(0) << "ms_handle_failure " << dest << " inst " << inst
- << ", dropping" << endl;
+ dout(0) << "ms_handle_failure " << inst
+ << ", dropping " << *m << endl;
delete m;
}
}
dout(10) << "merge_left result " << *left << endl;
}
-/* buggy possibly, but more importnatly, unnecessary.
-void ObjectCacher::Object::merge_right(BufferHead *left, BufferHead *right)
-{
- assert(left->end() == right->start());
- assert(left->get_state() == right->get_state());
-
- dout(10) << "merge_right " << *left << " + " << *right << endl;
- oc->bh_remove(this, left);
- oc->bh_stat_sub(right);
- data.erase(right->start());
- right->set_start( left->start() );
- data[right->start()] = right;
- right->set_length( left->length() + right->length());
- oc->bh_stat_add(right);
-
- // data
- bufferlist nbl;
- nbl.claim(left->bl);
- nbl.claim_append(right->bl);
- right->bl.claim(nbl);
-
- // version
- // note: this is sorta busted, but should only be used for dirty buffers
- right->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
-
- // waiters
- map<off_t,list<Context*> > old;
- old.swap(right->waitfor_read);
- // take left's waiters
- right->waitfor_read.swap(left->waitfor_read);
-
- // shift old waiters
- for (map<off_t, list<Context*> >::iterator p = old.begin();
- p != old.end();
- p++)
- right->waitfor_read[p->first + left->length()].swap( p->second );
-
- // hose left
- delete left;
-
- dout(10) << "merge_right result " << *right << endl;
-}
-*/
/*
* map a range of bytes into buffer_heads.
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << g_clock.now() << " " << objecter->messenger->get_myname() << ".objectcacher "
+
/* private */
+void ObjectCacher::close_object(Object *ob)
+{
+ dout(10) << "close_object " << *ob << endl;
+ assert(ob->can_close());
+
+ // ok!
+ objects.erase(ob->get_oid());
+ objects_by_ino[ob->get_ino()].erase(ob);
+ if (objects_by_ino[ob->get_ino()].empty())
+ objects_by_ino.erase(ob->get_ino());
+ delete ob;
+}
+
+
+
+
void ObjectCacher::bh_read(BufferHead *bh)
{
dout(7) << "bh_read on " << *bh << endl;
}
dout(10) << "readx result is " << rd->bl->length() << endl;
+ // done with read.
+ delete rd;
+
trim();
return pos;
}
+// purge. non-blocking. violently removes dirty buffers from cache.
+void ObjectCacher::purge(Object *ob)
+{
+ dout(10) << "purge " << *ob << endl;
+
+ for (map<off_t,BufferHead*>::iterator p = ob->data.begin();
+ p != ob->data.end();
+ p++) {
+ BufferHead *bh = p->second;
+ dout(0) << "purge forcibly removing " << *bh << endl;
+ bh_remove(ob, bh);
+ delete bh;
+ }
+
+ if (ob->can_close()) {
+ dout(10) << "trim trimming " << *ob << endl;
+ close_object(ob);
+ }
+}
+
// flush. non-blocking. no callback.
// true if clean, already flushed.
// false if we wrote something.
return false;
}
+void ObjectCacher::purge_set(inodeno_t ino)
+{
+ if (objects_by_ino.count(ino) == 0) {
+ dout(10) << "purge_set on " << ino << " dne" << endl;
+ return;
+ }
+
+ dout(10) << "purge_set " << ino << endl;
+
+ set<Object*>& s = objects_by_ino[ino];
+ for (set<Object*>::iterator i = s.begin();
+ i != s.end();
+ i++) {
+ Object *ob = *i;
+ purge(ob);
+ }
+}
+
off_t ObjectCacher::release(Object *ob)
{
for (list<BufferHead*>::iterator p = clean.begin();
p != clean.end();
- p++)
+ p++) {
bh_remove(ob, *p);
+ delete *p;
+ }
+
+ if (ob->can_close()) {
+ dout(10) << "trim trimming " << *ob << endl;
+ close_object(ob);
+ }
return o_unclean;
}
dout(10) << "release_set " << ino << endl;
- set<Object*>& s = objects_by_ino[ino];
+ set<Object*> s = objects_by_ino[ino];
for (set<Object*>::iterator i = s.begin();
i != s.end();
i++) {
last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0)
{}
+ ~Object() {
+ assert(data.empty());
+ }
object_t get_oid() { return oid; }
inodeno_t get_ino() { return ino; }
objects_by_ino[ino].insert(o);
return o;
}
- void close_object(Object *ob) {
- assert(ob->can_close());
-
- // ok!
- objects.erase(ob->get_oid());
- objects_by_ino[ob->get_ino()].erase(ob);
- if (objects_by_ino[ob->get_ino()].empty())
- objects_by_ino.erase(ob->get_ino());
- delete ob;
- }
+ void close_object(Object *ob);
// bh stats
Cond stat_cond;
void bh_add(Object *ob, BufferHead *bh) {
ob->add_bh(bh);
- if (bh->is_dirty())
+ if (bh->is_dirty()) {
lru_dirty.lru_insert_top(bh);
- else
+ dirty_bh.insert(bh);
+ } else {
lru_rest.lru_insert_top(bh);
+ }
bh_stat_add(bh);
}
void bh_remove(Object *ob, BufferHead *bh) {
ob->remove_bh(bh);
- if (bh->is_dirty())
+ if (bh->is_dirty()) {
lru_dirty.lru_remove(bh);
- else
+ dirty_bh.erase(bh);
+ } else {
lru_rest.lru_remove(bh);
+ }
bh_stat_sub(bh);
}
bool flush(Object *o);
off_t release(Object *o);
+ void purge(Object *o);
void rdlock(Object *o);
void rdunlock(Object *o);
flusher_thread.create();
}
~ObjectCacher() {
- //lock.Lock(); // hmm.. watch out for deadlock!
+ // we should be empty.
+ assert(objects.empty());
+ assert(lru_rest.lru_get_size() == 0);
+ assert(lru_dirty.lru_get_size() == 0);
+ assert(dirty_bh.empty());
+
+ assert(flusher_thread.is_started());
+ lock.Lock(); // hmm.. watch out for deadlock!
flusher_stop = true;
flusher_cond.Signal();
- //lock.Unlock();
+ lock.Unlock();
flusher_thread.join();
}
bool commit_set(inodeno_t ino, Context *oncommit);
void commit_all(Context *oncommit=0);
+ void purge_set(inodeno_t ino);
+
off_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean)
void kick_sync_writers(inodeno_t ino);
ObjectExtent &ex = st->extents.front();
PG &pg = get_pg( ex.pgid );
- // send
+ // pick tid
last_tid++;
assert(client_inc >= 0);
- MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
- ex.oid, ex.pgid, osdmap->get_epoch(),
- OSD_OP_STAT);
+
+ // add to gather set
+ st->tid = last_tid;
+ op_stat[last_tid] = st;
+
+ pg.active_tids.insert(last_tid);
+
+ // send?
dout(10) << "stat_submit " << st << " tid " << last_tid
<< " oid " << ex.oid
<< " pg " << ex.pgid
<< " osd" << pg.acker()
<< endl;
- if (pg.acker() >= 0)
+ if (pg.acker() >= 0) {
+ MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
+ ex.oid, ex.pgid, osdmap->get_epoch(),
+ OSD_OP_STAT);
+
messenger->send_message(m, osdmap->get_inst(pg.acker()));
+ }
- // add to gather set
- st->tid = last_tid;
- op_stat[last_tid] = st;
-
- pg.active_tids.insert(last_tid);
-
return last_tid;
}
// find OSD
PG &pg = get_pg( ex.pgid );
- // send
+ // pick tid
last_tid++;
assert(client_inc >= 0);
- MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
- ex.oid, ex.pgid, osdmap->get_epoch(),
- OSD_OP_READ);
- m->set_length(ex.length);
- m->set_offset(ex.start);
+
+ // add to gather set
+ rd->ops[last_tid] = ex;
+ op_read[last_tid] = rd;
+
+ pg.active_tids.insert(last_tid);
+
+ // send?
dout(10) << "readx_submit " << rd << " tid " << last_tid
<< " oid " << ex.oid << " " << ex.start << "~" << ex.length
<< " (" << ex.buffer_extents.size() << " buffer fragments)"
<< " osd" << pg.acker()
<< endl;
- if (pg.acker() >= 0)
+ if (pg.acker() >= 0) {
+ MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
+ ex.oid, ex.pgid, osdmap->get_epoch(),
+ OSD_OP_READ);
+ m->set_length(ex.length);
+ m->set_offset(ex.start);
+
messenger->send_message(m, osdmap->get_inst(pg.acker()));
+ }
- // add to gather set
- rd->ops[last_tid] = ex;
- op_read[last_tid] = rd;
-
- pg.active_tids.insert(last_tid);
-
return last_tid;
}
// find
PG &pg = get_pg( ex.pgid );
- // send
+ // pick tid
tid_t tid;
if (usetid > 0)
tid = usetid;
else
tid = ++last_tid;
- MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
- ex.oid, ex.pgid, osdmap->get_epoch(),
- wr->op);
- m->set_length(ex.length);
- m->set_offset(ex.start);
- m->set_rev(ex.rev);
-
- if (wr->tid_version.count(tid))
- m->set_version(wr->tid_version[tid]); // we're replaying this op!
-
- // what type of op?
- switch (wr->op) {
- case OSD_OP_WRITE:
- {
- // map buffer segments into this extent
- // (may be fragmented bc of striping)
- bufferlist cur;
- for (map<size_t,size_t>::iterator bit = ex.buffer_extents.begin();
- bit != ex.buffer_extents.end();
- bit++) {
- bufferlist thisbit;
- thisbit.substr_of(((OSDWrite*)wr)->bl, bit->first, bit->second);
- cur.claim_append(thisbit);
- }
- assert(cur.length() == ex.length);
- m->set_data(cur);//.claim(cur);
- }
- break;
- }
-
// add to gather set
wr->waitfor_ack[tid] = ex;
wr->waitfor_commit[tid] = ex;
++num_unacked;
++num_uncommitted;
- // send
+ // send?
dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
<< " oid " << ex.oid
<< " " << ex.start << "~" << ex.length
<< " pg " << ex.pgid
<< " osd" << pg.primary()
<< endl;
- if (pg.primary() >= 0)
+ if (pg.primary() >= 0) {
+ MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
+ ex.oid, ex.pgid, osdmap->get_epoch(),
+ wr->op);
+ m->set_length(ex.length);
+ m->set_offset(ex.start);
+ m->set_rev(ex.rev);
+
+ if (wr->tid_version.count(tid))
+ m->set_version(wr->tid_version[tid]); // we're replaying this op!
+
+ // what type of op?
+ switch (wr->op) {
+ case OSD_OP_WRITE:
+ {
+ // map buffer segments into this extent
+ // (may be fragmented bc of striping)
+ bufferlist cur;
+ for (map<size_t,size_t>::iterator bit = ex.buffer_extents.begin();
+ bit != ex.buffer_extents.end();
+ bit++) {
+ bufferlist thisbit;
+ thisbit.substr_of(((OSDWrite*)wr)->bl, bit->first, bit->second);
+ cur.claim_append(thisbit);
+ }
+ assert(cur.length() == ex.length);
+ m->set_data(cur);//.claim(cur);
+ }
+ break;
+ }
+
messenger->send_message(m, osdmap->get_inst(pg.primary()));
+ }
dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << endl;