#include "msg/Messenger.h"
#include "msg/Message.h"
-//#include "msg/HostMonitor.h"
-
#include "messages/MGenericMessage.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
assert(whoami == superblock.whoami);
}
- // monitor
- /*
- char s[80];
- sprintf(s, "osd%d", whoami);
- string st = s;
- monitor = new HostMonitor(messenger, st);
- monitor->init();
-
- // <hack> for testing monitoring
- int i = whoami;
- if (++i == g_conf.num_osd) i = 0;
- monitor->get_hosts().insert(MSG_ADDR_OSD(i));
- if (++i == g_conf.num_osd) i = 0;
- monitor->get_hosts().insert(MSG_ADDR_OSD(i));
- if (++i == g_conf.num_osd) i = 0;
- monitor->get_hosts().insert(MSG_ADDR_OSD(i));
-
- monitor->get_notify().insert(MSG_ADDR_MON(0));
- // </hack>
- */
// log
char name[80];
for (map<tid_t,PG::RepOpGather*>::iterator p = pg->repop_gather.begin();
p != pg->repop_gather.end();
p++) {
+ dout(-1) << "chekcing repop tid " << p->first << endl;
if (p->second->waitfor_ack.count(osd) ||
p->second->waitfor_commit.count(osd))
ls.push_back(p->second);
}
-/*
-void OSD::get_pg_list(list<pg_t>& ls)
-{
- // just list collections; assume they're all pg's (for now)
- store->list_collections(ls);
-}
-
-PG *OSD::get_pg(pg_t pgid)
-{
- // already open?
- if (pg_map.count(pgid))
- return pg_map[pgid];
-
- // exists?
- if (!pg_exists(pgid))
- return 0;
-
- // open, stat collection
- PG *pg = new PG(this, pgid);
- pg_map[pgid] = pg;
-
- // read pg info
- store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info));
-
- // read pg log
- pg->read_log(store);
- return pg;
-}
-*/
PG *OSD::get_pg(pg_t pgid)
{
<< " from " << op->get_source()
<< endl;
- // am i missing it?
- if (waitfor_missing_object(op, pg)) {
- // ok, i better the primary... (or else there's a map mismatch, or the primary is wrong about my objects, or?)
- assert(pg->is_primary());
- return;
- }
-
// is a replica asking? are they missing it?
if (pg->is_primary() &&
(pg->peer_missing.count(from) == 0 ||
_share_map_incoming(op->get_source(), op->get_map_epoch());
// what kind of op?
+ bool read = op->get_op() < 10; // read, stat. but not pull.
+
if (!op->get_source().is_osd()) {
// REGULAR OP (non-replication)
return;
}
- bool read = op->get_op() < 10;
-
if (read) {
// read. am i the (same) acker?
if (pg->get_acker() != whoami ||
return;
}
+ // missing object?
+ if (op->get_op() != OSD_OP_PUSH &&
+ waitfor_missing_object(op, pg)) return;
+
dout(7) << "handle_op " << op << " in " << *pg << endl;
} else {
}
if (g_conf.osd_maxthreads < 1) {
+ _lock_pg(pgid);
do_op(op, pg); // do it now
+ _unlock_pg(pgid);
} else {
- enqueue_op(pgid, op); // queue for worker threads
+ // queue for worker threads
+ if (read)
+ enqueue_op(0, op); // no locking needed for reads
+ else
+ enqueue_op(pgid, op);
}
}
}
if (g_conf.osd_maxthreads < 1) {
+ _lock_pg(pgid);
do_op(op, pg); // do it now
+ _unlock_pg(pgid);
} else {
enqueue_op(pgid, op); // queue for worker threads
}
*/
void OSD::dequeue_op(pg_t pgid)
{
- Message *op;
- PG *pg;
+ Message *op = 0;
+ PG *pg = 0;
osd_lock.Lock();
{
- // lock pg
- pg = _lock_pg(pgid);
+ if (pgid) {
+ // lock pg
+ pg = _lock_pg(pgid);
+ }
// get pending op
list<Message*> &ls = op_queue[pgid];
assert(!ls.empty());
op = ls.front();
ls.pop_front();
-
- dout(10) << "dequeue_op pg " << hex << pgid << dec << " op " << op << ", "
- << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
+
+ if (pgid) {
+ dout(10) << "dequeue_op write pg " << hex << pgid << dec << " op " << op << ", "
+ << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
+ } else {
+ dout(10) << "dequeue_op read op " << op << ", "
+ << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
+ }
if (ls.empty())
op_queue.erase(pgid);
}
osd_lock.Unlock();
-
+
// do it
do_op(op, pg);
- // unlock pg
- unlock_pg(pgid);
-
// finish
osd_lock.Lock();
{
+ if (pgid) {
+ // unlock pg
+ _unlock_pg(pgid);
+ }
+
dout(10) << "dequeue_op finish op " << op << endl;
assert(pending_ops > 0);
-
+
if (pending_ops > g_conf.osd_max_opq)
op_queue_cond.Signal();
switch (op->get_op()) {
+ // reads
+ case OSD_OP_READ:
+ op_read(op);//, pg);
+ break;
+ case OSD_OP_STAT:
+ op_stat(op);//, pg);
+ break;
+
// rep stuff
case OSD_OP_PULL:
op_pull(op, pg);
op_push(op, pg);
break;
- // reads
- case OSD_OP_READ:
- op_read(op, pg);
- break;
- case OSD_OP_STAT:
- op_stat(op, pg);
- break;
-
// writes
case OSD_OP_WRNOOP:
case OSD_OP_WRITE:
* client read op
* NOTE: called from opqueue.
*/
-void OSD::op_read(MOSDOp *op, PG *pg)
+void OSD::op_read(MOSDOp *op)//, PG *pg)
{
const object_t oid = op->get_oid();
- if (waitfor_missing_object(op, pg)) return;
-
// if the target object is locked for writing by another client, put 'op' to the waiting queue
// for _any_ op type -- eg only the locker can unlock!
if (block_if_wrlocked(op)) return; // op will be handled later, after the object unlocks
dout(10) << "op_read " << hex << oid << dec
<< " " << op->get_offset() << "~" << op->get_length()
- << " in " << *pg
+ //<< " in " << *pg
<< endl;
// read into a buffer
* client stat
* NOTE: called from opqueue
*/
-void OSD::op_stat(MOSDOp *op, PG *pg)
+void OSD::op_stat(MOSDOp *op)//, PG *pg)
{
object_t oid = op->get_oid();
- if (waitfor_missing_object(op, pg)) return;
-
// if the target object is locked for writing by another client, put 'op' to the waiting queue
if (block_if_wrlocked(op)) return; //read will be handled later, after the object unlocks
dout(3) << "op_stat on " << hex << oid << dec
<< " r = " << r
<< " size = " << st.st_size
- << " in " << *pg
+ //<< " in " << *pg
<< endl;
MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap->get_epoch(), true);
repop->op->wants_commit()) {
// send commit.
MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap->get_epoch(), true);
- dout(10) << "put_repop sending commit on " << *repop << " " << reply << endl;
+ dout(10) << "put_repop sending commit on " << *repop << " " << reply << endl;
messenger->send_message(reply, repop->op->get_client());
repop->sent_commit = true;
}
else if (repop->can_send_ack() &&
repop->op->wants_ack()) {
// apply
- dout(10) << "put_repop applying update on " << *repop << endl;
+ dout(10) << "put_repop applying update on " << *repop << endl;
ObjectStore::Transaction t;
prepare_op_transaction(t, repop->op, repop->new_version, pg);
Context *oncommit = new C_OSD_WriteCommit(this, pg->info.pgid, repop->rep_tid, repop->pg_local_last_complete);
unsigned r = store->apply_transaction(t, oncommit);
if (r)
- dout(-10) << "put_repop apply transaction return " << r << " on " << *repop << endl;
+ dout(-10) << "put_repop apply transaction return " << r << " on " << *repop << endl;
// send ack
MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap->get_epoch(), false);
- dout(10) << "put_repop sending ack on " << *repop << " " << reply << endl;
+ dout(10) << "put_repop sending ack on " << *repop << " " << reply << endl;
messenger->send_message(reply, repop->op->get_client());
repop->sent_ack = true;
}
if (min > pg->peers_complete_thru) {
- dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
+ dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
pg->peers_complete_thru = min;
}
}
- dout(10) << "put_repop deleting " << *repop << endl;
+ dout(10) << "put_repop deleting " << *repop << endl;
//repop->lock.Unlock();
+
+ assert(pg->repop_gather.count(repop->rep_tid));
+ //pg->repop_gather.erase(repop->rep_tid);
+
delete repop->op;
delete repop;
+
} else {
//repop->lock.Unlock();
}
assert(repop->waitfor_commit.count(fromosd));
repop->waitfor_commit.erase(fromosd);
repop->waitfor_ack.erase(fromosd);
-
repop->pg_complete_thru[fromosd] = pg_complete_thru;
-
- if (repop->waitfor_commit.empty()) {
- pg->repop_gather.erase(repop->rep_tid);
- }
} else {
// ack
repop->waitfor_ack.erase(fromosd);
repop->pg_complete_thru[whoami] = pg_complete_thru;
}
put_repop_gather(pg, repop);
+ dout(10) << "op_modify_commit done on " << repop << endl;
} else {
dout(10) << "op_modify_commit pg " << hex << pgid << dec << " rep_tid " << rep_tid << " dne" << endl;
}
const char *opname = MOSDOp::get_opname(op->get_op());
- // missing?
- if (waitfor_missing_object(op, pg)) return;
-
// are any peers missing this?
for (unsigned i=1; i<pg->acting.size(); i++) {
int peer = pg->acting[i];