// append
- size_t wrote = journal_stream.write(bl, write_buf, write_pos);
+ size_t wrote = journal_stream.write(bl, &write_buf, write_pos);
ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~" << wrote << dendl;
write_pos += wrote;
// Check if the retrieve bytestream has enough for an entry
uint64_t need;
- if (journal_stream.readable(read_buf, need)) {
+ if (journal_stream.readable(read_buf, &need)) {
return true;
}
}
uint64_t start_ptr;
- size_t consumed = journal_stream.read(read_buf, bl, start_ptr);
+ size_t consumed = journal_stream.read(read_buf, &bl, &start_ptr);
if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
assert(start_ptr == read_pos);
}
* to get the next header if header was unavailable, or enough to get the whole
* next entry if the header was available but the body wasn't).
*/
-bool JournalStream::readable(bufferlist &read_buf, uint64_t &need)
+bool JournalStream::readable(bufferlist &read_buf, uint64_t *need)
{
+ assert(need != NULL);
+
uint32_t entry_size = 0;
uint64_t start_ptr = 0;
uint64_t entry_sentinel = 0;
// Do we have enough data to decode an entry prefix?
if (format >= JOURNAL_FORMAT_RESILIENT) {
- need = sizeof(entry_size) + sizeof(entry_sentinel);
+ *need = sizeof(entry_size) + sizeof(entry_sentinel);
} else {
- need = sizeof(entry_size);
+ *need = sizeof(entry_size);
}
- if (read_buf.length() >= need) {
+ if (read_buf.length() >= *need) {
if (format >= JOURNAL_FORMAT_RESILIENT) {
::decode(entry_sentinel, p);
if (entry_sentinel != sentinel) {
// Do we have enough data to decode an entry prefix, payload and suffix?
if (format >= JOURNAL_FORMAT_RESILIENT) {
- need = sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(start_ptr);
+ *need = sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(start_ptr);
} else {
- need = sizeof(entry_size) + entry_size;
+ *need = sizeof(entry_size) + entry_size;
}
- if (read_buf.length() >= need) {
+ if (read_buf.length() >= *need) {
return true; // No more bytes needed
}
* 'entry' must be initially empty. 'from' must contain sufficient
* valid data (i.e. readable is true).
*
- * 'offset' will be set to the entry's start pointer, if the collection
+ * 'start_ptr' will be set to the entry's start pointer, if the collection
* format provides it.
*
- * Note that the number of bytes consumed is *not* equal to the
- * length of the blob returned: the former includes envelope data
- * while the latter is just the inner LogEvent serialized.
+ * @returns The number of bytes consumed from the `from` byte stream. Note
+ * that this is not equal to the length of `entry`, which contains
+ * the inner serialized LogEvent and not the envelope.
*/
-size_t JournalStream::read(bufferlist &from, bufferlist &entry, uint64_t &start_ptr)
+size_t JournalStream::read(bufferlist &from, bufferlist *entry, uint64_t *start_ptr)
{
- assert(entry.length() == 0);
+ assert(start_ptr != NULL);
+ assert(entry != NULL);
+ assert(entry->length() == 0);
uint64_t entry_sentinel = 0;
uint32_t entry_size;
::decode(entry_size, p);
p.advance(entry_size);
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::decode(start_ptr, p);
+ ::decode(*start_ptr, p);
} else {
- start_ptr = 0;
+ *start_ptr = 0;
}
}
size_t raw_length;
if (format >= JOURNAL_FORMAT_RESILIENT) {
- raw_length = sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(start_ptr);
+ raw_length = sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(*start_ptr);
assert(entry_sentinel == sentinel);
} else {
raw_length = sizeof(entry_size) + entry_size;
from.splice(0, sizeof(entry_sentinel));
}
from.splice(0, sizeof(entry_size));
- from.splice(0, entry_size, &entry);
+ from.splice(0, entry_size, entry);
if (format >= JOURNAL_FORMAT_RESILIENT) {
- from.splice(0, sizeof(start_ptr));
+ from.splice(0, sizeof(*start_ptr));
}
return raw_length;
/**
* Append one entry
*/
-size_t JournalStream::write(bufferlist &entry, bufferlist &to, uint64_t const &start_ptr)
+size_t JournalStream::write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr)
{
+ assert(to != NULL);
+
uint32_t const entry_size = entry.length();
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::encode(sentinel, to);
+ ::encode(sentinel, *to);
}
- ::encode(entry_size, to);
- to.claim_append(entry);
+ ::encode(entry_size, *to);
+ to->claim_append(entry);
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::encode(start_ptr, to);
+ ::encode(start_ptr, *to);
}
if (format >= JOURNAL_FORMAT_RESILIENT) {