hobject_t oid = i.get_oid();
uint64_t off = i.get_length();
uint64_t len = i.get_length();
+ bool replica = i.get_replica();
bufferlist bl;
i.get_bl(bl);
if (_check_replay_guard(cid, oid, spos) > 0)
- r = _write(cid, oid, off, len, bl);
+ r = _write(cid, oid, off, len, bl, replica);
}
break;
int FileStore::_write(coll_t cid, const hobject_t& oid,
uint64_t offset, size_t len,
- const bufferlist& bl)
+ const bufferlist& bl, bool replica)
{
dout(15) << "write " << cid << "/" << oid << " " << offset << "~" << len << dendl;
int r;
// flush?
{
bool should_flush = (ssize_t)len >= m_filestore_flush_min;
+ bool local_flush = false;
+ bool async_done = false;
#ifdef HAVE_SYNC_FILE_RANGE
if (!should_flush ||
!m_filestore_flusher ||
- !queue_flusher(fd, offset, len)) {
- if (should_flush && m_filestore_sync_flush)
+ !(async_done = queue_flusher(fd, offset, len, replica))) {
+ if (should_flush && m_filestore_sync_flush) {
::sync_file_range(fd, offset, len, SYNC_FILE_RANGE_WRITE);
- lfn_close(fd);
+ local_flush = true;
+ }
}
+ //Both lfn_close() and possible posix_fadvise() done by flusher
+ if (async_done) fd = -1;
#else
// no sync_file_range; (maybe) flush inline and close.
- if (should_flush && m_filestore_sync_flush)
+ if (should_flush && m_filestore_sync_flush) {
::fdatasync(fd);
- lfn_close(fd);
+ local_flush = true;
+ }
#endif
+ if (local_flush && replica) {
+ int fa_r = posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED);
+ if (fa_r) {
+ dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
+ } else {
+ dout(10) << "posix_fadvise performed after local flush" << dendl;
+ }
+ }
}
+ if (fd >= 0) lfn_close(fd);
out:
dout(10) << "write " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
}
-bool FileStore::queue_flusher(int fd, uint64_t off, uint64_t len)
+bool FileStore::queue_flusher(int fd, uint64_t off, uint64_t len, bool replica)
{
bool queued;
lock.Lock();
flusher_queue.push_back(fd);
flusher_queue.push_back(off);
flusher_queue.push_back(len);
+ flusher_queue.push_back(replica);
flusher_queue_len++;
flusher_cond.Signal();
dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
q.pop_front();
uint64_t len = q.front();
q.pop_front();
+ bool replica = q.front();
+ q.pop_front();
if (!stop && ep == sync_epoch) {
dout(10) << "flusher_entry flushing+closing " << fd << " ep " << ep << dendl;
::sync_file_range(fd, off, len, SYNC_FILE_RANGE_WRITE);
+ if (replica) {
+ int fa_r = posix_fadvise(fd, off, len, POSIX_FADV_DONTNEED);
+ if (fa_r) {
+ dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
+ } else {
+ dout(10) << "posix_fadvise performed after local flush" << dendl;
+ }
+ }
} else
dout(10) << "flusher_entry JUST closing " << fd << " (stop=" << stop << ", ep=" << ep
<< ", sync_epoch=" << sync_epoch << ")" << dendl;
- TEMP_FAILURE_RETRY(::close(fd));
+ lfn_close(fd);
}
lock.Lock();
flusher_queue_len -= num; // they're definitely closed, forget
return 0;
}
} flusher_thread;
- bool queue_flusher(int fd, uint64_t off, uint64_t len);
+ bool queue_flusher(int fd, uint64_t off, uint64_t len, bool replica);
int open_journal();
int fiemap(coll_t cid, const hobject_t& oid, uint64_t offset, size_t len, bufferlist& bl);
int _touch(coll_t cid, const hobject_t& oid);
- int _write(coll_t cid, const hobject_t& oid, uint64_t offset, size_t len, const bufferlist& bl);
+ int _write(coll_t cid, const hobject_t& oid, uint64_t offset, size_t len, const bufferlist& bl,
+ bool replica = false);
int _zero(coll_t cid, const hobject_t& oid, uint64_t offset, size_t len);
int _truncate(coll_t cid, const hobject_t& oid, uint64_t size);
int _clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newoid,
bool sobject_encoding;
int64_t pool_override;
bool use_pool_override;
+ bool replica;
public:
void set_pool_override(int64_t pool) {
pool_override = pool;
}
+ void set_replica() {
+ replica = true;
+ }
+ bool get_replica() { return replica; }
void swap(Transaction& other) {
std::swap(ops, other.ops);
bool sobject_encoding;
int64_t pool_override;
bool use_pool_override;
+ bool replica;
iterator(Transaction *t)
: p(t->tbl.begin()),
sobject_encoding(t->sobject_encoding),
pool_override(t->pool_override),
- use_pool_override(t->use_pool_override) {}
+ use_pool_override(t->use_pool_override),
+ replica(t->replica) {}
friend class Transaction;
::decode(bits, p);
return bits;
}
+ bool get_replica() { return replica; }
};
iterator begin() {
// etc.
Transaction() :
ops(0), pad_unused_bytes(0), largest_data_len(0), largest_data_off(0), largest_data_off_in_tbl(0),
- sobject_encoding(false), pool_override(-1), use_pool_override(false) {}
+ sobject_encoding(false), pool_override(-1), use_pool_override(false), replica(false) {}
Transaction(bufferlist::iterator &dp) :
ops(0), pad_unused_bytes(0), largest_data_len(0), largest_data_off(0), largest_data_off_in_tbl(0),
- sobject_encoding(false), pool_override(-1), use_pool_override(false) {
+ sobject_encoding(false), pool_override(-1), use_pool_override(false), replica(false) {
decode(dp);
}
Transaction(bufferlist &nbl) :
ops(0), pad_unused_bytes(0), largest_data_len(0), largest_data_off(0), largest_data_off_in_tbl(0),
- sobject_encoding(false), pool_override(-1), use_pool_override(false) {
+ sobject_encoding(false), pool_override(-1), use_pool_override(false), replica(false) {
bufferlist::iterator dp = nbl.begin();
decode(dp);
}