OPTION(journaler_batch_max, OPT_U64, 0) // max bytes we'll delay flushing; disable, for now....
OPTION(mds_data, OPT_STR, "/var/lib/ceph/mds/$cluster-$id")
OPTION(mds_max_file_size, OPT_U64, 1ULL << 40) // Used when creating new CephFS. Change with 'ceph mds set max_file_size <size>' afterwards
+// max xattr kv pairs size for each dir/file
+OPTION(mds_max_xattr_pairs_size, OPT_U32, 64 << 10)
OPTION(mds_cache_size, OPT_INT, 100000)
OPTION(mds_cache_mid, OPT_FLOAT, .7)
OPTION(mds_max_file_recover, OPT_U32, 32)
max = dir->get_num_any(); // whatever, something big.
unsigned max_bytes = req->head.args.readdir.max_bytes;
if (!max_bytes)
- max_bytes = 512 << 10; // 512 KB?
+ // make sure at least one item can be encoded
+ max_bytes = (512 << 10) + g_conf->mds_max_xattr_pairs_size;
// start final blob
bufferlist dirbl;
return;
map<string, bufferptr> *pxattrs = cur->get_projected_xattrs();
+ size_t len = req->get_data().length();
+ size_t inc = len + name.length();
+
+ // check xattrs kv pairs size
+ size_t cur_xattrs_size = 0;
+ for (const auto& p : *pxattrs) {
+ if ((flags & CEPH_XATTR_REPLACE) && (name.compare(p.first) == 0)) {
+ continue;
+ }
+ cur_xattrs_size += p.first.length() + p.second.length();
+ }
+
+ if (((cur_xattrs_size + inc) > g_conf->mds_max_xattr_pairs_size)) {
+ dout(10) << "xattr kv pairs size too big. cur_xattrs_size "
+ << cur_xattrs_size << ", inc " << inc << dendl;
+ respond_to_request(mdr, -ENOSPC);
+ return;
+ }
+
if ((flags & CEPH_XATTR_CREATE) && pxattrs->count(name)) {
dout(10) << "setxattr '" << name << "' XATTR_CREATE and EEXIST on " << *cur << dendl;
respond_to_request(mdr, -EEXIST);
return;
}
- int len = req->get_data().length();
dout(10) << "setxattr '" << name << "' len " << len << " on " << *cur << dendl;
// project update
max_entries = infomap.size();
int max_bytes = req->head.args.readdir.max_bytes;
if (!max_bytes)
- max_bytes = 512 << 10;
+ // make sure at least one item can be encoded
+ max_bytes = (512 << 10) + g_conf->mds_max_xattr_pairs_size;
__u64 last_snapid = 0;
string offset_str = req->get_path2();
uint64_t Journaler::append_entry(bufferlist& bl)
{
- lock_guard l(lock);
+ unique_lock l(lock);
assert(!readonly);
uint32_t s = bl.length();
bufferptr bp(write_pos - owp);
bp.zero();
assert(bp.length() >= 4);
+ if (!write_buf_throttle.get_or_fail(bp.length())) {
+ l.unlock();
+ ldout(cct, 10) << "write_buf_throttle wait, bp len "
+ << bp.length() << dendl;
+ write_buf_throttle.get(bp.length());
+ l.lock();
+ }
+ ldout(cct, 20) << "write_buf_throttle get, bp len "
+ << bp.length() << dendl;
write_buf.push_back(bp);
// now flush.
// append
+ size_t delta = bl.length() + journal_stream.get_envelope_size();
+ // write_buf space is nearly full
+ if (!write_buf_throttle.get_or_fail(delta)) {
+ l.unlock();
+ ldout(cct, 10) << "write_buf_throttle wait, delta " << delta << dendl;
+ write_buf_throttle.get(delta);
+ l.lock();
+ }
+ ldout(cct, 20) << "write_buf_throttle get, delta " << delta << dendl;
size_t wrote = journal_stream.write(bl, &write_buf, write_pos);
ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~"
<< wrote << dendl;
assert(!readonly);
// flush
- unsigned len = write_pos - flush_pos;
+ uint64_t len = write_pos - flush_pos;
assert(len == write_buf.length());
if (amount && amount < len)
len = amount;
flush_pos += len;
assert(write_buf.length() == write_pos - flush_pos);
-
+ write_buf_throttle.put(len);
+ ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
+
ldout(cct, 10)
<< "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at "
<< "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos
#include "Filer.h"
#include "common/Timer.h"
-
+#include "common/Throttle.h"
class CephContext;
class Context;
bool readable(bufferlist &bl, uint64_t *need) const;
size_t read(bufferlist &from, bufferlist *to, uint64_t *start_ptr);
size_t write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr);
+ size_t get_envelope_size() const {
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ return JOURNAL_ENVELOPE_RESILIENT;
+ } else {
+ return JOURNAL_ENVELOPE_LEGACY;
+ }
+ }
// A magic number for the start of journal entries, so that we can
// identify them in damaged journals.
bufferlist write_buf; ///< write buffer. flush_pos +
/// write_buf.length() == write_pos.
+ // protect write_buf from bufferlist _len overflow
+ Throttle write_buf_throttle;
+
bool waiting_for_zero;
interval_set<uint64_t> pending_zero; // non-contig bits we've zeroed
std::set<uint64_t> pending_safe;
timer(tim), delay_flush_event(0),
state(STATE_UNDEF), error(0),
prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), safe_pos(0),
+ write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)),
waiting_for_zero(false),
read_pos(0), requested_pos(0), received_pos(0),
fetch_len(0), temp_fetch_len(0),