return;
}
- flush(o); // flush first
+ flush(o, 0, 0); // flush first
int op = 0;
if (o->rdlock_ref > 0) {
// flush. non-blocking. no callback.
// true if clean, already flushed.
// false if we wrote something.
-bool ObjectCacher::flush(Object *ob)
+// be sloppy about the ranges and flush any buffer it touches
+bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
{
bool clean = true;
- for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
- p != ob->data.end();
- p++) {
+ ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
+ map<loff_t,BufferHead*>::iterator p = ob->data.lower_bound(offset);
+ if (p != ob->data.begin() &&
+ (p == ob->data.end() || p->first > offset)) {
+ p--; // might overlap!
+ if (p->first + p->second->length() <= offset)
+ p++; // doesn't overlap.
+ }
+ for ( ; p != ob->data.end(); p++) {
BufferHead *bh = p->second;
+ ldout(cct, 20) << "flush " << *bh << dendl;
+ if (length && bh->start() > offset+length) {
+ break;
+ }
if (bh->is_tx()) {
clean = false;
continue;
}
- if (!bh->is_dirty()) continue;
-
+ if (!bh->is_dirty()) {
+ continue;
+ }
bh_write(bh);
clean = false;
}
!i.end(); ++i) {
Object *ob = *i;
- if (!flush(ob)) {
+ if (!flush(ob, 0, 0)) {
// we'll need to gather...
safe = false;
return false;
}
+// flush. non-blocking, takes callback.
+// returns true if already flushed
+bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv, Context *onfinish)
+{
+ if (oset->objects.empty()) {
+ ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
+ return true;
+ }
+
+ ldout(cct, 10) << "flush_set " << oset << " on " << exv.size() << " ObjectExtents" << dendl;
+
+ // we'll need to wait for all objects to flush!
+ C_GatherBuilder gather(cct, onfinish);
+
+ bool safe = true;
+ for (vector<ObjectExtent>::iterator p = exv.begin();
+ p != exv.end();
+ ++p) {
+ ObjectExtent &ex = *p;
+ sobject_t soid(ex.oid, CEPH_NOSNAP);
+ if (objects[oset->poolid].count(soid) == 0)
+ continue;
+ Object *ob = objects[oset->poolid][soid];
+
+ ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid << " " << ob << dendl;
+
+ if (!flush(ob, ex.offset, ex.length)) {
+ // we'll need to gather...
+ safe = false;
+
+ ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
+ << ob->last_write_tid << " on " << *ob << dendl;
+ if (onfinish != NULL)
+ ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
+ }
+ }
+ if (onfinish != NULL)
+ gather.activate();
+
+ if (safe) {
+ ldout(cct, 10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl;
+ return true;
+ }
+ return false;
+}
+
// commit. non-blocking, takes callback.
// return true if already flushed.
void trim(loff_t max=-1);
void flush(loff_t amount=0);
- bool flush(Object *o);
+ /**
+ * flush a range of buffers
+ *
+ * Flush any buffers that intersect the specified extent. If len==0,
+ * flush *all* buffers for the object.
+ *
+ * @param o object
+ * @param off start offset
+ * @param len extent length, or 0 for entire object
+ * @return true if object was already clean/flushed.
+ */
+ bool flush(Object *o, loff_t off, loff_t len);
loff_t release(Object *o);
void purge(Object *o);
bool set_is_dirty_or_committing(ObjectSet *oset);
bool flush_set(ObjectSet *oset, Context *onfinish=0);
+ bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex, Context *onfinish=0);
void flush_all(Context *onfinish=0);
bool commit_set(ObjectSet *oset, Context *oncommit);