started_(false),
isValid_(false),
currentFileIndex_(0),
+ currentBatchSeq_(0),
+ currentBatchCount_(0),
lastFlushedSequence_(lastFlushedSequence) {
assert(startingSequenceNumber_ <= *lastFlushedSequence_);
assert(files_ != nullptr);
}
Status TransactionLogIteratorImpl::OpenLogFile(
- const LogFile* logFile,
- unique_ptr<SequentialFile>* file) {
+ const LogFile* logFile,
+ unique_ptr<SequentialFile>* file) {
Env* env = options_->env;
if (logFile->Type() == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber());
fname = ArchivedLogFileName(dir_, logFile->LogNumber());
status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
- return Status::IOError(" Requested file not present in the dir");
+ return Status::IOError("Requested file not present in the dir");
}
}
return status;
return started_ && isValid_;
}
-void TransactionLogIteratorImpl::SeekToStartSequence() {
- std::string scratch;
- Slice record;
- isValid_ = false;
- if (startingSequenceNumber_ > *lastFlushedSequence_) {
- currentStatus_ = Status::IOError("Looking for a sequence, "
- "which is not flushed yet.");
- return;
- }
- if (files_->size() == 0) {
- return;
- }
- Status s = OpenLogReader(files_->at(0).get());
- if (!s.ok()) {
- currentStatus_ = s;
- return;
+bool TransactionLogIteratorImpl::RestrictedRead(
+ Slice* record,
+ std::string* scratch) {
+ // Don't read if no more complete entries to read from logs
+ if (currentBatchSeq_ >= *lastFlushedSequence_) {
+ return false;
+ }
+ return currentLogReader_->ReadRecord(record, scratch);
+}
+
+void TransactionLogIteratorImpl::SeekToStartSequence(
+ uint64_t startFileIndex,
+ bool strict) {
+ std::string scratch;
+ Slice record;
+ started_ = false;
+ isValid_ = false;
+ if (startingSequenceNumber_ > *lastFlushedSequence_) {
+ currentStatus_ = Status::IOError("Looking for a sequence, "
+ "which is not flushed yet.");
+ return;
+ }
+ if (files_->size() <= startFileIndex) {
+ return;
+ }
+ Status s = OpenLogReader(files_->at(startFileIndex).get());
+ if (!s.ok()) {
+ currentStatus_ = s;
+ return;
+ }
+ while (RestrictedRead(&record, &scratch)) {
+ if (record.size() < 12) {
+ reporter_.Corruption(
+ record.size(), Status::Corruption("very small log record"));
+ continue;
}
- while (currentLogReader_->ReadRecord(&record, &scratch)) {
- if (record.size() < 12) {
- reporter_.Corruption(
- record.size(), Status::Corruption("log record too small"));
- continue;
- }
- UpdateCurrentWriteBatch(record);
- if (currentBatchSeq_ + currentBatchCount_ - 1 >=
- startingSequenceNumber_) {
- assert(currentBatchSeq_ <= *lastFlushedSequence_);
- isValid_ = true;
- started_ = true; // set started_ as we could seek till starting sequence
+ UpdateCurrentWriteBatch(record);
+ if (currentBatchSeq_ + currentBatchCount_ - 1 >=
+ startingSequenceNumber_) {
+ if (strict && currentBatchSeq_ != startingSequenceNumber_) {
+ currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
+ "seek to required sequence number");
+ reporter_.Info(currentStatus_.ToString().c_str());
return;
- } else {
- isValid_ = false;
+ } else if (strict) {
+ reporter_.Info("Could seek required sequence number. Iterator will "
+ "continue.");
}
+ isValid_ = true;
+ started_ = true; // set started_ as we could seek till starting sequence
+ return;
+ } else {
+ isValid_ = false;
}
- // Could not find start sequence in first file. Normally this must be the
- // only file. Otherwise log the error and let the iterator return next entry
- if (files_->size() != 1) {
- currentStatus_ = Status::Corruption("Start sequence was not found, "
- "skipping to the next available");
- reporter_.Corruption(0, currentStatus_);
- started_ = true; // Let Next find next available entry
- Next();
- }
+ }
+ // Could not find start sequence in first file. Normally this must be the
+ // only file. Otherwise log the error and let the iterator return next entry
+ if (files_->size() != 1) {
+ currentStatus_ = Status::Corruption("Start sequence was not found, "
+ "skipping to the next available");
+ reporter_.Corruption(0, currentStatus_);
+ started_ = true; // Let Next find next available entry
+ Next();
+ }
}
void TransactionLogIteratorImpl::Next() {
- // TODO:Next() says that it requires Valid to be true but this is not true
- // assert(Valid());
std::string scratch;
Slice record;
isValid_ = false;
while (currentLogReader_->ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter_.Corruption(
- record.size(), Status::Corruption("log record too small"));
+ record.size(), Status::Corruption("very small log record"));
continue;
} else {
- UpdateCurrentWriteBatch(record);
- return;
+ return UpdateCurrentWriteBatch(record);
}
}
}
}
}
+bool TransactionLogIteratorImpl::IsBatchContinuous(
+ const WriteBatch* batch,
+ const SequenceNumber expectedSeq) {
+ assert(batch);
+ SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
+ if (started_ && batchSeq != expectedSeq) {
+ char buf[200];
+ snprintf(buf, sizeof(buf),
+ "Discontinuity in log records. Got seq=%lu, Expected seq=%lu, "
+ "Last flushed seq=%lu. Log iterator will seek the correct batch.",
+ batchSeq, expectedSeq, *lastFlushedSequence_);
+ reporter_.Info(buf);
+ return false;
+ }
+ return true;
+}
+
void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
WriteBatch* batch = new WriteBatch();
WriteBatchInternal::SetContents(batch, record);
+
+ SequenceNumber expectedSeq = currentBatchSeq_ + currentBatchCount_;
+ if (!IsBatchContinuous(batch, expectedSeq)) {
+ // Seek to the batch having expected sequence number
+ if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) {
+ // Expected batch must lie in the previous log file
+ currentFileIndex_--;
+ currentFileIndex_ = (currentFileIndex_ >= 0) ? currentFileIndex_ : 0;
+ }
+ startingSequenceNumber_ = expectedSeq;
+ return SeekToStartSequence(currentFileIndex_, true);
+ }
+
currentBatchSeq_ = WriteBatchInternal::Sequence(batch);
currentBatchCount_ = WriteBatchInternal::Count(batch);
+ // currentBatchSeq_ can only change here
+ assert(currentBatchSeq_ <= *lastFlushedSequence_);
+
currentBatch_.reset(batch);
isValid_ = true;
currentStatus_ = Status::OK();
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str());
}
+ virtual void Info(const char* s) {
+ Log(info_log, "%s", s);
+ }
};
class LogFileImpl : public LogFile {
const std::string& dir_;
const Options* options_;
const EnvOptions& soptions_;
- const SequenceNumber startingSequenceNumber_;
+ SequenceNumber startingSequenceNumber_;
std::unique_ptr<VectorLogPtr> files_;
bool started_;
bool isValid_; // not valid when it starts of.
unique_ptr<log::Reader> currentLogReader_;
Status OpenLogFile(const LogFile* logFile, unique_ptr<SequentialFile>* file);
LogReporter reporter_;
- SequenceNumber const * const lastFlushedSequence_;
-
SequenceNumber currentBatchSeq_; // sequence number at start of current batch
uint64_t currentBatchCount_; // count in current batch
+ SequenceNumber const * const lastFlushedSequence_;
- void SeekToStartSequence();
+ // Reads from transaction log only if the writebatch record has been written
+ bool RestrictedRead(Slice* record, std::string* scratch);
+ // Seeks to startingSequenceNumber reading from startFileIndex in files_.
+ // If strict is set,then must get a batch starting with startingSequenceNumber
+ void SeekToStartSequence(uint64_t startFileIndex = 0, bool strict = false);
+ // Check if batch is continuous starting from expectedSeq, else return false
+ bool IsBatchContinuous(const WriteBatch* batch, SequenceNumber expectedSeq);
+ // Update current batch if a continuous batch is found, else return false
void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile* file);
};