{
}
-tid_t Objecter::resend_linger(LingerOpInfo& info, Context *onack, Context *onfinish, eversion_t *objver)
+tid_t Objecter::resend_linger(LingerOp *info, Context *onack, Context *onfinish, eversion_t *objver)
{
- vector<OSDOp> ops = info.ops; // need to pass a copy to ops
- Op *o = new Op(info.oid, info.oloc, ops, info.flags | CEPH_OSD_FLAG_READ, onack, onfinish, objver, true);
- o->snapid = info.snap;
- return op_submit(o, info.linger_id);
+ vector<OSDOp> ops = info->ops; // need to pass a copy to ops
+ Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ, onack, onfinish, objver, true);
+ o->snapid = info->snap;
+ return op_submit(o, info);
}
-tid_t Objecter::resend_linger(uint64_t linger_id, Context *onack, Context *onfinish, eversion_t *objver)
+uint64_t Objecter::register_linger(LingerOp *info)
{
Mutex::Locker locker(linger_info_mutex);
- map<uint64_t, LingerOpInfo>::iterator iter = op_linger_info.find(linger_id);
- if (iter != op_linger_info.end()) {
- return resend_linger(iter->second, onack, onfinish, objver);
- } else {
- dout(0) << "WARNING: resend_linger(): could not find linger_id" << linger_id << dendl; // should that happen?
- }
- return -1;
-}
-
-uint64_t Objecter::register_linger(LingerOpInfo& info, uint64_t linger_id)
-{
- Mutex::Locker locker(linger_info_mutex);
- if (!linger_id)
- linger_id = ++max_linger_id;
-
- info.linger_id = linger_id;
-
- op_linger_info[linger_id] = info;
-
- return linger_id;
+ info->linger_id = ++max_linger_id;
+ op_linger_info[info->linger_id] = info;
+ return info->linger_id;
}
void Objecter::unregister_linger(uint64_t linger_id)
{
Mutex::Locker locker(linger_info_mutex);
- map<uint64_t, LingerOpInfo>::iterator iter = op_linger_info.find(linger_id);
+ map<uint64_t, LingerOp*>::iterator iter = op_linger_info.find(linger_id);
if (iter != op_linger_info.end()) {
- LingerOpInfo& info = iter->second;
- pg_t pgid = osdmap->object_locator_to_pg(info.oid, info.oloc);
-
- // find
- PG &pg = get_pg(pgid);
- map<uint64_t, bool>::iterator pg_iter = pg.linger_ops.find(linger_id);
- if (pg_iter != pg.linger_ops.end())
- pg.linger_ops.erase(pg_iter);
-
+ LingerOp *info = iter->second;
+ info->pg_item.remove_myself();
op_linger_info.erase(iter);
+ delete info;
}
}
tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
- snapid_t snap, bufferlist *pbl, int flags,
+ snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
Context *onack, Context *onfinish,
- eversion_t *objver,
- uint64_t *linger_id)
+ eversion_t *objver)
{
- uint64_t lid;
- LingerOpInfo info;
- info.oid = oid;
- info.oloc = oloc;
- info.snap = snap;
- info.flags = flags;
- info.ops = op.ops;
- lid = register_linger(info, 0);
- if (linger_id)
- *linger_id = lid;
- return resend_linger(info, onack, onfinish, objver);
+ LingerOp *info = new LingerOp;
+ info->oid = oid;
+ info->oloc = oloc;
+ info->snap = snap;
+ info->flags = flags;
+ info->ops = op.ops;
+ info->inbl = inbl;
+ info->poutbl = poutbl;
+ uint64_t lid = register_linger(info);
+ resend_linger(info, onack, onfinish, objver);
+ return lid;
}
void Objecter::dispatch(Message *m)
// resubmit ops!
set<tid_t> tids;
tids.swap( pg.active_tids );
- map<tid_t, bool>::iterator liter;
- for (liter = pg.linger_ops.begin(); liter != pg.linger_ops.end(); ++liter) {
- resend_linger(liter->first, NULL, NULL, NULL);
- }
- dout(0) << "pg.linger_ops.empty()=" << pg.linger_ops.empty() << dendl;
+
+ // resend lingers
+ for (xlist<LingerOp*>::iterator j = pg.linger_ops.begin(); !j.end(); ++j)
+ resend_linger(*j, NULL, NULL, NULL);
if (pg.linger_ops.empty())
close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing
// read | write ---------------------------
-tid_t Objecter::op_submit(Op *op, uint64_t linger_id)
+tid_t Objecter::op_submit(Op *op, LingerOp *linger_op)
{
// throttle. before we look at any state, because
// take_op_budget() may drop our lock while it blocks.
// find
PG &pg = get_pg(op->pgid);
- if (linger_id)
- pg.linger_ops[linger_id] = true;
+ if (linger_op)
+ pg.linger_ops.push_back(&linger_op->pg_item);
// pick tid
op->tid = ++last_tid;
* track pending ops by pg
* ...so we can cope with failures, map changes
*/
+ class LingerOp;
+
class PG {
public:
vector<int> acting;
set<tid_t> active_tids; // active ops
utime_t last;
- map<uint64_t, bool> linger_ops;
+ xlist<LingerOp*> linger_ops;
PG() {}
hash_map<pg_t,PG> pg_map;
- struct LingerOpInfo {
+ struct LingerOp {
uint64_t linger_id;
object_t oid;
object_locator_t oloc;
snapid_t snap;
int flags;
vector<OSDOp> ops;
+ bufferlist inbl;
+ bufferlist *poutbl;
PG *pg;
- LingerOpInfo() : linger_id(0), off(0), len(0), flags(0), pg(NULL) {}
+ xlist<LingerOp*>::item pg_item;
+
+ LingerOp() : linger_id(0), off(0), len(0), flags(0), poutbl(NULL), pg(NULL), pg_item(this) {}
+
+ // no copy!
+ const LingerOp &operator=(const LingerOp& r);
+ LingerOp(const LingerOp& o);
};
- map<uint64_t, LingerOpInfo> op_linger_info;
+ map<uint64_t, LingerOp*> op_linger_info;
Mutex linger_info_mutex;
private:
// low-level
- tid_t op_submit(Op *op, uint64_t linger_id = 0);
+ tid_t op_submit(Op *op, LingerOp *linger_op = NULL);
+
+ tid_t resend_linger(LingerOp *info, Context *onack, Context *onfinish, eversion_t *objver);
+ uint64_t register_linger(LingerOp *info);
+public: // FIXME
+ void unregister_linger(uint64_t linger_id);
// public interface
public:
o->outbl = pbl;
return op_submit(o);
}
-
- tid_t resend_linger(LingerOpInfo& info, Context *onack, Context *onfinish, eversion_t *objver);
- tid_t resend_linger(uint64_t linger_id, Context *onack, Context *onfinish, eversion_t *objver);
- uint64_t register_linger(LingerOpInfo& info, uint64_t linger_id = 0);
- void unregister_linger(uint64_t linger_id);
-
tid_t linger(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
- snapid_t snap, bufferlist *pbl, int flags,
+ snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
Context *onack, Context *onfinish,
- eversion_t *objver, uint64_t *linger_id);
+ eversion_t *objver);
int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {