From 1983a29ace5b5e2702f9d234846267ffcd903373 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Thu, 8 Nov 2018 23:35:30 +0100 Subject: [PATCH 1/9] Add automatic journal flushing --- pysyncobj/config.py | 7 ++++ pysyncobj/journal.py | 23 ++++++++--- pysyncobj/syncobj.py | 2 +- test_syncobj.py | 97 ++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 119 insertions(+), 10 deletions(-) diff --git a/pysyncobj/config.py b/pysyncobj/config.py index 5729cf9..9c0db50 100644 --- a/pysyncobj/config.py +++ b/pysyncobj/config.py @@ -115,8 +115,14 @@ def __init__(self, **kwargs): self.fullDumpFile = kwargs.get('fullDumpFile', None) #: File to store operations journal. Save each record as soon as received. + #: If unspecified or None, an in-memory journal is used. self.journalFile = kwargs.get('journalFile', None) + #: Flush the journal whenever something is written to it. + #: Enabled by default if a journalFile is used. + #: Cannot be enabled if the journalFile is unspecified (i.e. when the journal is not preserved). + self.flushJournal = kwargs.get('flushJournal', None) + #: Will try to bind port every bindRetryTime seconds until success. self.bindRetryTime = kwargs.get('bindRetryTime', 1.0) @@ -179,6 +185,7 @@ def validate(self): assert self.logCompactionMinEntries >= 2 assert self.logCompactionMinTime > 0 assert self.logCompactionBatchSize > 0 + assert self.journalFile is not None or not self.flushJournal, 'flushJournal cannot be enabled without specifying a journal file' assert self.bindRetryTime > 0 assert (self.deserializer is None) == (self.serializer is None) if self.serializer is not None: diff --git a/pysyncobj/journal.py b/pysyncobj/journal.py index 2ae88a3..8a54eaf 100644 --- a/pysyncobj/journal.py +++ b/pysyncobj/journal.py @@ -125,7 +125,7 @@ def flush(self): class FileJournal(Journal): - def __init__(self, journalFile): + def __init__(self, journalFile, flushJournal): self.__journalFile = ResizableFile(journalFile, defaultContent=self.__getDefaultHeader()) self.__journal = [] currentOffset = FIRST_RECORD_OFFSET @@ -138,6 +138,7 @@ def __init__(self, journalFile): self.__journal.append((command, idx, term)) currentOffset += nextRecordSize + 8 self.__currentOffset = currentOffset + self.__flushJournal = flushJournal def __getDefaultHeader(self): appName = APP_NAME + b'\0' * (NAME_SIZE - len(APP_NAME)) @@ -150,8 +151,9 @@ def __getLastRecordOffset(self): def __setLastRecordOffset(self, offset): self.__journalFile.write(LAST_RECORD_OFFSET_OFFSET, struct.pack(' Date: Thu, 15 Nov 2018 17:21:41 +0100 Subject: [PATCH 2/9] Delete file one last time after testing --- test_syncobj.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test_syncobj.py b/test_syncobj.py index b0d4384..b54e1d6 100755 --- a/test_syncobj.py +++ b/test_syncobj.py @@ -1173,6 +1173,8 @@ def run_test(flushJournal, useDestroy): finally: pysyncobj.journal.ResizableFile = origResizableFile + removeFiles(journalFiles) + def test_autoTick1(): random.seed(42) From 0776b3c60ad36db541d4df3d6ff19dee860ff325 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Thu, 15 Nov 2018 17:32:49 +0100 Subject: [PATCH 3/9] Add header fields for current term and the voted-for candidate node ID --- pysyncobj/journal.py | 66 ++++++++++++++++++++---- test_syncobj.py | 119 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 9 deletions(-) diff --git a/pysyncobj/journal.py b/pysyncobj/journal.py index 8a54eaf..88577ba 100644 --- a/pysyncobj/journal.py +++ b/pysyncobj/journal.py @@ -1,7 +1,10 @@ +import hashlib import os import mmap +import pickle import struct +from .atomic_replace import atomicReplace from .version import VERSION from .pickle import to_bytes @@ -106,7 +109,7 @@ def flush(self): -JOURNAL_FORMAT_VERSION = 1 +JOURNAL_FORMAT_VERSION = 2 APP_NAME = b'PYSYNCOBJ' APP_VERSION = str.encode(VERSION) @@ -114,20 +117,62 @@ def flush(self): VERSION_SIZE = 8 assert len(APP_NAME) < NAME_SIZE assert len(APP_VERSION) < VERSION_SIZE -FIRST_RECORD_OFFSET = NAME_SIZE + VERSION_SIZE + 4 + 4 -LAST_RECORD_OFFSET_OFFSET = NAME_SIZE + VERSION_SIZE + 4 +CURRENT_TERM_SIZE = 8 +VOTED_FOR_SIZE = 16 +FIRST_RECORD_OFFSET = NAME_SIZE + VERSION_SIZE + 4 + CURRENT_TERM_SIZE + VOTED_FOR_SIZE + 4 +LAST_RECORD_OFFSET_OFFSET = NAME_SIZE + VERSION_SIZE + 4 + CURRENT_TERM_SIZE + VOTED_FOR_SIZE -# -# APP_NAME (24b) + APP_VERSION (8b) + FORMAT_VERSION (4b) + LAST_RECORD_OFFSET (4b) + -# record1size + record1 + record1size + record2size + record2 + record2size + ... -# (record1) | (record2) | ... -# +# Journal version 2: +# APP_NAME (24b) + APP_VERSION (8b) + FORMAT_VERSION (4b) + CURRENT_TERM (8b) + VOTED_FOR (16b) + LAST_RECORD_OFFSET (4b) + +# record1size + record1 + record1size + record2size + record2 + record2size + ... +# (record1) | (record2) | ... + +# VOTED_FOR is an MD5 hash of the pickled node ID. +# LAST_RECORD_OFFSET is the offset from the beginning of the journal file at which the last record ends. + +# Version 1 is identical except it has neither CURRENT_TERM nor VOTED_FOR. class FileJournal(Journal): def __init__(self, journalFile, flushJournal): self.__journalFile = ResizableFile(journalFile, defaultContent=self.__getDefaultHeader()) self.__journal = [] + + # Handle journal format version upgrades + version = struct.unpack('?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff'), + [(bytes(bytearray.fromhex(''.join('{:02x}'.format(i) for i in range(256)))), 1, 1)] # bytes(range(256)) on Py3 + ), + + # Empty data should work as well + ( + 'empty data', + data_to_entry(1, 1, b''), + [(b'', 1, 1)] + ), + + # A few entries + ( + 'a few entries', + data_to_entry(1, 1, b'\x01') + data_to_entry(2, 2, b'\x01') + data_to_entry(3, 4, b'\x01'), + [(b'\x01', 1, 1), (b'\x01', 2, 2), (b'\x01', 3, 4)] + ), + ] + + # Generate larger journals + # 400 B: some arbitrary length which fits well within a single block + # 1000 B: should expand to exactly 1024 B in the new format, i.e. no resizing of the file necessary + # 1020/3/4 B: almost or entirely full journal which can't fit the new header fields anymore, so it needs to be expanded on migration + # 1025 B: a journal which already has been expanded + # pi MiB: some large journal + for length in (400, 1000, 1020, 1023, 1024, 1025, int(math.pi * 1048576)): # length including the header... + for smallRecords in (True, False): + entries = [] + expectedEntries = [] + deltaLength = length - len(baseHeader) - 4 # 4: last record offset + + # Each record has an overhead of 8 bytes (length of the record at beginning and end) and also stores an index and a term with 8 bytes each. + if smallRecords: + # Small records hold only 1 B of data, i.e. are 25 B in total; so we create deltaLength // 25 - 1 such records and then fill up with one possibly slightly larger record (up to 25 bytes of payload) + nSmall = deltaLength // 25 - 1 + for i in range(nSmall): + entries.append(data_to_entry(i, i, b'\x01')) + expectedEntries.append((b'\x01', i, i)) + remainingData = b'\x00' * (deltaLength - nSmall * 25 - 24) + entries.append(data_to_entry(nSmall, nSmall, remainingData)) + expectedEntries.append((remainingData, nSmall, nSmall)) + else: + # Just one huge record to fit the length + data = b'\x00' * (deltaLength - 24) + entries.append(data_to_entry(1, 1, data)) + expectedEntries.append((data, 1, 1)) + + journals.append(('length={} small={}'.format(length, smallRecords), b''.join(entries), expectedEntries)) + + journalFile = getNextJournalFile() + + for name, entryBytes, expectedEntries in journals: + print(name) + removeFiles([journalFile]) + with open(journalFile, 'wb') as fp: + size = 0 + fp.write(baseHeader) + size += len(baseHeader) + fp.write(struct.pack(' Date: Thu, 15 Nov 2018 17:33:32 +0100 Subject: [PATCH 4/9] Fix "IndexError: mmap slice assignment is wrong size" in ResizableFile when writing large data ResizableFile only extended once even when the data to be written to it was larger than that. For example, if the file is currently 1024 B and we're trying to write 1.5 KiB to it, it would get resized to 2048 B and the write would fail (because 2.5 KiB would be required). --- pysyncobj/journal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pysyncobj/journal.py b/pysyncobj/journal.py index 88577ba..8d60998 100644 --- a/pysyncobj/journal.py +++ b/pysyncobj/journal.py @@ -81,7 +81,7 @@ def __init__(self, fileName, initialSize = 1024, resizeFactor = 2.0, defaultCont def write(self, offset, values): size = len(values) currSize = self.__mm.size() - if offset + size > self.__mm.size(): + while offset + size > self.__mm.size(): try: self.__mm.resize(int(self.__mm.size() * self.__resizeFactor)) except SystemError: From 568b03f65251739faddd83f42e5a3185ebbd9ffc Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Thu, 15 Nov 2018 19:08:17 +0100 Subject: [PATCH 5/9] Add currentTerm and votedForNodeId to the journal The FileJournal only stores the hash of the votedForNodeId because the size of the stored value has to be constant. Therefore, FileJournal.votedForNodeId does not return the actual node ID but a proxy object which can be compared against the node ID. --- pysyncobj/journal.py | 120 +++++++++++++++++++++++++++++++++++++++++-- test_syncobj.py | 51 +++++++++++++++++- 2 files changed, 166 insertions(+), 5 deletions(-) diff --git a/pysyncobj/journal.py b/pysyncobj/journal.py index 8d60998..3a25897 100644 --- a/pysyncobj/journal.py +++ b/pysyncobj/journal.py @@ -1,15 +1,40 @@ import hashlib import os import mmap -import pickle +import pysyncobj.pickle import struct from .atomic_replace import atomicReplace from .version import VERSION -from .pickle import to_bytes class Journal(object): + @property + def currentTerm(self): + raise NotImplementedError + + @currentTerm.setter + def currentTerm(self, term): + raise NotImplementedError + + @property + def votedForNodeId(self): + raise NotImplementedError + + @votedForNodeId.setter + def votedForNodeId(self, nodeId): + raise NotImplementedError + + def set_currentTerm_and_votedForNodeId(self, term, nodeId): + """ + Convenience method since the two are often modified at the same time. + + Subclasses may choose to implement a more efficient method than setting the two individually here. + """ + + self.currentTerm = term + self.votedForNodeId = nodeId + def add(self, command, idx, term): raise NotImplementedError @@ -37,6 +62,24 @@ class MemoryJournal(Journal): def __init__(self): self.__journal = [] self.__bytesSize = 0 + self.__currentTerm = 0 + self.__votedForNodeId = None + + @property + def currentTerm(self): + return self.__currentTerm + + @currentTerm.setter + def currentTerm(self, term): + self.__currentTerm = term + + @property + def votedForNodeId(self): + return self.__votedForNodeId + + @votedForNodeId.setter + def votedForNodeId(self, nodeId): + self.__votedForNodeId = nodeId def add(self, command, idx, term): self.__journal.append((command, idx, term)) @@ -60,6 +103,32 @@ def _destroy(self): pass +class VotedForNodeIdHashProxy(object): + """ + A proxy for the voted-for node ID storing only the hash. + + This object can only be used for equality tests (equal if the MD5 hash of the other operand after pickling is identical) and identity checks against None ('is None'). + """ + + def __init__(self, nodeId = None, _hash = None): + # Accepts either a node ID or a hash, but the latter is not public API (optimisation because the FileJournal already needs to compute the hash) + if nodeId is None and _hash is None: + raise ValueError('Argument required') + if _hash is not None: + self.__hash = _hash + else: + self.__hash = hashlib.md5(pysyncobj.pickle.dumps(nodeId)).digest() + + def __eq__(self, other): + return self.__hash == hashlib.md5(pysyncobj.pickle.dumps(other)).digest() + + def __ne__(self, other): # Py2 compatibility + return not (self == other) + + def __repr__(self): + return '{}({!r})'.format(type(self).__name__, self.__hash) + + class ResizableFile(object): def __init__(self, fileName, initialSize = 1024, resizeFactor = 2.0, defaultContent = None): @@ -130,6 +199,8 @@ def flush(self): # VOTED_FOR is an MD5 hash of the pickled node ID. # LAST_RECORD_OFFSET is the offset from the beginning of the journal file at which the last record ends. +VOTED_FOR_NONE_HASH = hashlib.md5(pysyncobj.pickle.dumps(None)).digest() + # Version 1 is identical except it has neither CURRENT_TERM nor VOTED_FOR. class FileJournal(Journal): @@ -173,6 +244,10 @@ def __init__(self, journalFile, flushJournal): else: raise RuntimeError('Unknown journal file version encountered: {} (expected <= {})'.format(version, JOURNAL_FORMAT_VERSION)) + self.__currentTerm = struct.unpack(' Date: Fri, 16 Nov 2018 17:38:41 +0100 Subject: [PATCH 6/9] Move currentTerm and votedForNodeId from SyncObj to the journal --- pysyncobj/syncobj.py | 43 +++++++++++++++++++------------------------ 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/pysyncobj/syncobj.py b/pysyncobj/syncobj.py index a77d4eb..e142f6a 100644 --- a/pysyncobj/syncobj.py +++ b/pysyncobj/syncobj.py @@ -149,14 +149,12 @@ def __init__(self, selfNode, otherNodes, conf=None, consumers=None, nodeClass = self.__nodeClass = nodeClass self.__raftState = _RAFT_STATE.FOLLOWER - self.__raftCurrentTerm = 0 - self.__votedForNodeId = None self.__votesCount = 0 self.__raftLeader = None self.__raftElectionDeadline = time.time() + self.__generateRaftTimeout() self.__raftLog = createJournal(self.__conf.journalFile, self.__conf.flushJournal) if len(self.__raftLog) == 0: - self.__raftLog.add(_bchr(_COMMAND_TYPE.NO_OP), 1, self.__raftCurrentTerm) + self.__raftLog.add(_bchr(_COMMAND_TYPE.NO_OP), 1, self.__raftLog.currentTerm) self.__raftCommitIndex = 1 self.__raftLastApplied = 1 self.__raftNextIndex = {} @@ -428,7 +426,7 @@ def _checkCommandsToApply(self): requestNode, requestID = callback if self.__raftState == _RAFT_STATE.LEADER: - idx, term = self.__getCurrentLogIndex() + 1, self.__raftCurrentTerm + idx, term = self.__getCurrentLogIndex() + 1, self.__raftLog.currentTerm if self.__conf.dynamicMembershipChange: changeClusterRequest = self.__parseChangeClusterRequest(command) @@ -537,13 +535,12 @@ def _onTick(self, timeToWait=0.0): self.__raftElectionDeadline = time.time() + self.__generateRaftTimeout() self.__raftLeader = None self.__setState(_RAFT_STATE.CANDIDATE) - self.__raftCurrentTerm += 1 - self.__votedForNodeId = self.__selfNode.id + self.__raftLog.set_currentTerm_and_votedForNodeId(self.__raftLog.currentTerm + 1, self.__selfNode.id) self.__votesCount = 1 for node in self.__otherNodes: self.__transport.send(node, { 'type': 'request_vote', - 'term': self.__raftCurrentTerm, + 'term': self.__raftLog.currentTerm, 'last_log_index': self.__getCurrentLogIndex(), 'last_log_term': self.__getCurrentLogTerm(), }) @@ -645,7 +642,7 @@ def getStatus(self): status['log_len'] = len(self.__raftLog) status['last_applied'] = self.__raftLastApplied status['commit_idx'] = self.__raftCommitIndex - status['raft_term'] = self.__raftCurrentTerm + status['raft_term'] = self.__raftLog.currentTerm status['next_node_idx_count'] = len(self.__raftNextIndex) for node, idx in iteritems(self.__raftNextIndex): status['next_node_idx_server_' + node.id] = idx @@ -712,25 +709,24 @@ def __onMessageReceived(self, node, message): if message['type'] == 'request_vote' and self.__selfNode is not None: - if message['term'] > self.__raftCurrentTerm: - self.__raftCurrentTerm = message['term'] - self.__votedForNodeId = None + if message['term'] > self.__raftLog.currentTerm: + self.__raftLog.set_currentTerm_and_votedForNodeId(message['term'], None) self.__setState(_RAFT_STATE.FOLLOWER) self.__raftLeader = None if self.__raftState in (_RAFT_STATE.FOLLOWER, _RAFT_STATE.CANDIDATE): lastLogTerm = message['last_log_term'] lastLogIdx = message['last_log_index'] - if message['term'] >= self.__raftCurrentTerm: + if message['term'] >= self.__raftLog.currentTerm: if lastLogTerm < self.__getCurrentLogTerm(): return if lastLogTerm == self.__getCurrentLogTerm() and \ lastLogIdx < self.__getCurrentLogIndex(): return - if self.__votedForNodeId is not None: + if self.__raftLog.votedForNodeId is not None: return - self.__votedForNodeId = node.id + self.__raftLog.votedForNodeId = node.id self.__raftElectionDeadline = time.time() + self.__generateRaftTimeout() self.__transport.send(node, { @@ -738,14 +734,13 @@ def __onMessageReceived(self, node, message): 'term': message['term'], }) - if message['type'] == 'append_entries' and message['term'] >= self.__raftCurrentTerm: + if message['type'] == 'append_entries' and message['term'] >= self.__raftLog.currentTerm: self.__raftElectionDeadline = time.time() + self.__generateRaftTimeout() if self.__raftLeader != node: self.__onLeaderChanged() self.__raftLeader = node - if message['term'] > self.__raftCurrentTerm: - self.__raftCurrentTerm = message['term'] - self.__votedForNodeId = None + if message['term'] > self.__raftLog.currentTerm: + self.__raftLog.set_currentTerm_and_votedForNodeId(message['term'], None) self.__setState(_RAFT_STATE.FOLLOWER) newEntries = message.get('entries', []) serialized = message.get('serialized', None) @@ -832,7 +827,7 @@ def __onMessageReceived(self, node, message): self.__commandsWaitingCommit[idx].append((term, callback)) if self.__raftState == _RAFT_STATE.CANDIDATE: - if message['type'] == 'response_vote' and message['term'] == self.__raftCurrentTerm: + if message['type'] == 'response_vote' and message['term'] == self.__raftLog.currentTerm: self.__votesCount += 1 if self.__votesCount > (len(self.__otherNodes) + 1) / 2: @@ -962,7 +957,7 @@ def _isReady(self): return self.isReady() def _getTerm(self): - return self.__raftCurrentTerm + return self.__raftLog.currentTerm def _getRaftLogSize(self): return len(self.__raftLog) @@ -992,7 +987,7 @@ def __onBecomeLeader(self): self.__lastResponseTime[node] = time.time() # No-op command after leader election. - idx, term = self.__getCurrentLogIndex() + 1, self.__raftCurrentTerm + idx, term = self.__getCurrentLogIndex() + 1, self.__raftLog.currentTerm self.__raftLog.add(_bchr(_COMMAND_TYPE.NO_OP), idx, term) self.__noopIDx = idx if not self.__conf.appendEntriesUseBatch: @@ -1050,7 +1045,7 @@ def __sendAppendEntries(self): 'type': 'append_entries', 'transmission': transmission, 'data': currData, - 'term': self.__raftCurrentTerm, + 'term': self.__raftLog.currentTerm, 'commit_index': self.__raftCommitIndex, 'prevLogIdx': prevLogIdx, 'prevLogTerm': prevLogTerm, @@ -1059,7 +1054,7 @@ def __sendAppendEntries(self): else: message = { 'type': 'append_entries', - 'term': self.__raftCurrentTerm, + 'term': self.__raftLog.currentTerm, 'commit_index': self.__raftCommitIndex, 'entries': entries, 'prevLogIdx': prevLogIdx, @@ -1070,7 +1065,7 @@ def __sendAppendEntries(self): transmissionData = self.__serializer.getTransmissionData(node) message = { 'type': 'append_entries', - 'term': self.__raftCurrentTerm, + 'term': self.__raftLog.currentTerm, 'commit_index': self.__raftCommitIndex, 'serialized': transmissionData, } From 5af5f9a7fa8590a9d8b9a08c9520ee944b1b3eb8 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Fri, 16 Nov 2018 17:42:32 +0100 Subject: [PATCH 7/9] Block votes for 1.1 times the maximum timeout when flushing is disabled or no file journal is used --- pysyncobj/syncobj.py | 13 ++++++++ test_syncobj.py | 79 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/pysyncobj/syncobj.py b/pysyncobj/syncobj.py index e142f6a..edb8950 100644 --- a/pysyncobj/syncobj.py +++ b/pysyncobj/syncobj.py @@ -170,6 +170,15 @@ def __init__(self, selfNode, otherNodes, conf=None, consumers=None, nodeClass = self.__destroying = False self.__recvTransmission = '' + if not self.__conf.journalFile or not self.__conf.flushJournal: + # Journal is either not written to a file or not flushed. + # We can't be sure whether this node voted in the current term yet, so don't vote until maxTimeout has elapsed. + # Factor 1.1 just to be safe. + # Note that an entire non-flushing cluster should always use the same maximum timeout to ensure that this method works! + self.__voteBlockTime = time.time() + 1.1 * self.__conf.raftMaxTimeout + else: + self.__voteBlockTime = None + self.__onTickCallbacks = [] self.__onTickCallbacksLock = threading.Lock() @@ -714,6 +723,10 @@ def __onMessageReceived(self, node, message): self.__setState(_RAFT_STATE.FOLLOWER) self.__raftLeader = None + if self.__voteBlockTime is not None and time.time() <= self.__voteBlockTime: + return + self.__voteBlockTime = None + if self.__raftState in (_RAFT_STATE.FOLLOWER, _RAFT_STATE.CANDIDATE): lastLogTerm = message['last_log_term'] lastLogIdx = message['last_log_index'] diff --git a/test_syncobj.py b/test_syncobj.py index 4f407a0..ff1a020 100755 --- a/test_syncobj.py +++ b/test_syncobj.py @@ -35,6 +35,8 @@ class TEST_TYPE: AUTO_TICK_1 = 5 WAIT_BIND = 6 LARGE_COMMAND = 7 + NOFLUSH_NOVOTE_1 = 8 + NOFLUSH_NOVOTE_2 = 9 class TestObj(SyncObj): @@ -113,6 +115,16 @@ def __init__(self, selfNodeAddr, otherNodeAddrs, cfg.maxBindRetries = 1 cfg.autoTick = True + if testType in (TEST_TYPE.NOFLUSH_NOVOTE_1, TEST_TYPE.NOFLUSH_NOVOTE_2): + cfg.logCompactionMinTime = 999999 + cfg.logCompactionMinEntries = 999999 + cfg.appendEntriesPeriod = 0.02 + cfg.raftMinTimeout = 0.1 if testType == TEST_TYPE.NOFLUSH_NOVOTE_1 else 0.48 + cfg.raftMaxTimeout = 0.1000001 if testType == TEST_TYPE.NOFLUSH_NOVOTE_1 else 0.4800001 + cfg.fullDumpFile = dumpFile + cfg.journalFile = journalFile + cfg.flushJournal = testType == TEST_TYPE.NOFLUSH_NOVOTE_1 + super(TestObj, self).__init__(selfNodeAddr, otherNodeAddrs, cfg, consumers) self.__counter = 0 self.__data = {} @@ -361,6 +373,73 @@ def test_syncThreeObjectsLeaderFail(): o2._destroy() o3._destroy() +def sync_noflush_novote(journalFile2Enabled): + # Test that if flushing is disabled, the node won't vote until the maximum timeout has been exceeded + # o1's timeout is set to 0.1 seconds, so it will call for an election almost immediately. + # o2's timeout is set to 0.48 seconds, so it will not call for an election before o1, and it will ignore o1's request_vote messages. + # Specifically, o2 is expected to ignore the messages until 1.1 * timeout, i.e. including the one sent by o1 after 0.5 seconds. + # This test doesn't actually verify that o2 gets o1's request_vote messages, but that should be covered by other tests. + # Note that o1 has flushing enabled but o2 doesn't! + + random.seed(42) + + a = [getNextAddr(), getNextAddr()] + if journalFile2Enabled: + journalFiles = [getNextJournalFile(), getNextJournalFile()] + else: + journalFiles = [getNextJournalFile()] + removeFiles(journalFiles) + + # Make sure that o1 already has a log; this means that it will never accept a vote request from o2 + o1 = TestObj(a[0], [], TEST_TYPE.NOFLUSH_NOVOTE_1, journalFile = journalFiles[0]) + doTicks([o1], 10, stopFunc=lambda: o1.isReady()) + o1.addValue(42) + doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 42) + o1._destroy() + + states = defaultdict(list) + + o1 = TestObj(a[0], [a[1]], TEST_TYPE.NOFLUSH_NOVOTE_1, journalFile = journalFiles[0], onStateChanged=lambda old, new: states[a[0]].append(new)) + o2 = TestObj(a[1], [a[0]], TEST_TYPE.NOFLUSH_NOVOTE_2, journalFile = journalFiles[1] if journalFile2Enabled else None, onStateChanged=lambda old, new: states[a[1]].append(new)) + objs = [o1, o2] + + assert not o1._isReady() + assert not o2._isReady() + + doTicks(objs, 0.45) #, stopFunc=lambda: o1._SyncObj__raftState == _RAFT_STATE.CANDIDATE) + + # Here, o1 has called several elections, but o2 never granted its vote. + + assert o1._SyncObj__raftState == _RAFT_STATE.CANDIDATE + assert o2._SyncObj__raftState == _RAFT_STATE.FOLLOWER + assert _RAFT_STATE.LEADER not in states[a[0]] + assert states[a[1]] == [] # Never had a state change, i.e. it's still the default follower + + doTicks(objs, 0.1) + + # We have now surpassed o2's timeout, but the last vote request from o1 was at 0.5, i.e. *before* o2's 1.1 * timeout has expired. + # o2 is expected to have called for an election, but o1 would never vote for o2 due to the missing log entry. + + assert o1._SyncObj__raftState == _RAFT_STATE.CANDIDATE + assert o2._SyncObj__raftState == _RAFT_STATE.CANDIDATE + assert _RAFT_STATE.LEADER not in states[a[0]] + assert _RAFT_STATE.LEADER not in states[a[1]] + + doTicks(objs, 0.1) + + # o1 called for another election at 0.6, i.e. after o2's vote block timeout, so it should now be elected. + assert o1._SyncObj__raftState == _RAFT_STATE.LEADER + assert o2._SyncObj__raftState == _RAFT_STATE.FOLLOWER + assert _RAFT_STATE.LEADER not in states[a[1]] + + o1._destroy() + o2._destroy() + removeFiles(journalFiles) + +def test_sync_noflush_novote(): + sync_noflush_novote(True) # Test with an unflushed journal file + sync_noflush_novote(False) # Test with a memory journal + def test_manyActionsLogCompaction(): random.seed(42) From ac263d057545bb1f96b13dd5c41b5ff0306bcef9 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Sat, 17 Nov 2018 11:59:18 +0100 Subject: [PATCH 8/9] Fix test failures in test_sync_noflush_novote - The test was very sensitive to small timing differences. This updated version is still sensitive, but it should be much better now. I was unable to trigger any failures even under high load. - o1 should be a follower after o2's first vote request due to the higher term in o2's message and its timeout not having been triggered again. - Python 2's math.ceil returns a float instead of an int for whatever reason. --- test_syncobj.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/test_syncobj.py b/test_syncobj.py index ff1a020..b408f55 100755 --- a/test_syncobj.py +++ b/test_syncobj.py @@ -118,9 +118,8 @@ def __init__(self, selfNodeAddr, otherNodeAddrs, if testType in (TEST_TYPE.NOFLUSH_NOVOTE_1, TEST_TYPE.NOFLUSH_NOVOTE_2): cfg.logCompactionMinTime = 999999 cfg.logCompactionMinEntries = 999999 - cfg.appendEntriesPeriod = 0.02 - cfg.raftMinTimeout = 0.1 if testType == TEST_TYPE.NOFLUSH_NOVOTE_1 else 0.48 - cfg.raftMaxTimeout = 0.1000001 if testType == TEST_TYPE.NOFLUSH_NOVOTE_1 else 0.4800001 + cfg.raftMinTimeout = 0.5 if testType == TEST_TYPE.NOFLUSH_NOVOTE_1 else 2.4 + cfg.raftMaxTimeout = cfg.raftMinTimeout * 1.000001 cfg.fullDumpFile = dumpFile cfg.journalFile = journalFile cfg.flushJournal = testType == TEST_TYPE.NOFLUSH_NOVOTE_1 @@ -375,10 +374,9 @@ def test_syncThreeObjectsLeaderFail(): def sync_noflush_novote(journalFile2Enabled): # Test that if flushing is disabled, the node won't vote until the maximum timeout has been exceeded - # o1's timeout is set to 0.1 seconds, so it will call for an election almost immediately. - # o2's timeout is set to 0.48 seconds, so it will not call for an election before o1, and it will ignore o1's request_vote messages. - # Specifically, o2 is expected to ignore the messages until 1.1 * timeout, i.e. including the one sent by o1 after 0.5 seconds. - # This test doesn't actually verify that o2 gets o1's request_vote messages, but that should be covered by other tests. + # o1's timeout is set to 0.5 seconds, so it will call for an election almost immediately. + # o2's timeout is set to 2.4 seconds, so it will not call for an election before o1, and it will ignore o1's request_vote messages. + # Specifically, o2 is expected to ignore the messages until 1.1 * timeout, i.e. including the one sent by o1 after 2.5 seconds, except for updating its term. # Note that o1 has flushing enabled but o2 doesn't! random.seed(42) @@ -406,7 +404,7 @@ def sync_noflush_novote(journalFile2Enabled): assert not o1._isReady() assert not o2._isReady() - doTicks(objs, 0.45) #, stopFunc=lambda: o1._SyncObj__raftState == _RAFT_STATE.CANDIDATE) + doTicks(objs, 2.25) # Here, o1 has called several elections, but o2 never granted its vote. @@ -415,19 +413,20 @@ def sync_noflush_novote(journalFile2Enabled): assert _RAFT_STATE.LEADER not in states[a[0]] assert states[a[1]] == [] # Never had a state change, i.e. it's still the default follower - doTicks(objs, 0.1) + doTicks(objs, 0.3) # 2.55 total - # We have now surpassed o2's timeout, but the last vote request from o1 was at 0.5, i.e. *before* o2's 1.1 * timeout has expired. - # o2 is expected to have called for an election, but o1 would never vote for o2 due to the missing log entry. + # We have now surpassed o2's timeout, but the last vote request from o1 was at 2.5, i.e. *before* o2's 1.1 * timeout (= 2.64) has expired. + # o2 is expected to have called for an election at 2.4, but o1 would never vote for o2 due to the missing log entry. + # Due to the bigger term in o2's vote request message, o1 should now be a follower. - assert o1._SyncObj__raftState == _RAFT_STATE.CANDIDATE + assert o1._SyncObj__raftState == _RAFT_STATE.FOLLOWER assert o2._SyncObj__raftState == _RAFT_STATE.CANDIDATE assert _RAFT_STATE.LEADER not in states[a[0]] assert _RAFT_STATE.LEADER not in states[a[1]] - doTicks(objs, 0.1) + doTicks(objs, 0.6) # 3.15 total - # o1 called for another election at 0.6, i.e. after o2's vote block timeout, so it should now be elected. + # o1 called for another election at 2.9 (it reset its timeout on o2's vote request at 2.4), i.e. after o2's vote block timeout, so it should now be elected. assert o1._SyncObj__raftState == _RAFT_STATE.LEADER assert o2._SyncObj__raftState == _RAFT_STATE.FOLLOWER assert _RAFT_STATE.LEADER not in states[a[1]] @@ -1388,12 +1387,12 @@ def data_to_entry(index, term, data): size += len(entryBytes) # Fill up with zero bytes to a power of 2 - fp.write(b'\x00' * (2 ** max(math.ceil(math.log(size, 2)), 10) - size)) + fp.write(b'\x00' * (2 ** max(int(math.ceil(math.log(size, 2))), 10) - size)) journal = createJournal(journalFile, True) assert journal[:] == expectedEntries journal._destroy() - assert os.path.getsize(journalFile) == 2 ** max(math.ceil(math.log(size + 24, 2)), 10) + assert os.path.getsize(journalFile) == 2 ** max(int(math.ceil(math.log(size + 24, 2))), 10) with open(journalFile, 'rb') as fp: # Check app name, version, and journal format version assert fp.read(24) == b'PYSYNCOBJ' + b'\x00' * 15 From 1bc7246e1dc4a07a048236ed412392fc1edab00a Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Sat, 17 Nov 2018 15:21:31 +0100 Subject: [PATCH 9/9] Fix test failures in test_sync_noflush_novote, take 2 The doTicks approach did not work well because it isn't very accurate and doesn't take into account the time spent outside of the actual ticking. So this replaces it with a direct loop which compares against the current time to ensure that exactly the right amount of time has elapsed since the creation of the SyncObj. This test may break again if the time spent inside SyncObj increases greatly; for example, if __init__ takes very long, the start time would not match the corresponding values inside the objects. Each individual tick may also not take too long. --- test_syncobj.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/test_syncobj.py b/test_syncobj.py index b408f55..dce04f2 100755 --- a/test_syncobj.py +++ b/test_syncobj.py @@ -379,6 +379,14 @@ def sync_noflush_novote(journalFile2Enabled): # Specifically, o2 is expected to ignore the messages until 1.1 * timeout, i.e. including the one sent by o1 after 2.5 seconds, except for updating its term. # Note that o1 has flushing enabled but o2 doesn't! + def tick(o1, o2, startTime, totalTickTime, sleepTime): + # Tick o1 and o2 until totalTickTime has elapsed since startTime + assert time.time() < startTime + totalTickTime # Make sure that we tick at least once + while time.time() < start + totalTickTime: + o1.doTick() + o2.doTick() + time.sleep(sleepTime) + random.seed(42) a = [getNextAddr(), getNextAddr()] @@ -397,6 +405,7 @@ def sync_noflush_novote(journalFile2Enabled): states = defaultdict(list) + start = time.time() o1 = TestObj(a[0], [a[1]], TEST_TYPE.NOFLUSH_NOVOTE_1, journalFile = journalFiles[0], onStateChanged=lambda old, new: states[a[0]].append(new)) o2 = TestObj(a[1], [a[0]], TEST_TYPE.NOFLUSH_NOVOTE_2, journalFile = journalFiles[1] if journalFile2Enabled else None, onStateChanged=lambda old, new: states[a[1]].append(new)) objs = [o1, o2] @@ -404,7 +413,7 @@ def sync_noflush_novote(journalFile2Enabled): assert not o1._isReady() assert not o2._isReady() - doTicks(objs, 2.25) + tick(o1, o2, start, 2.25, 0.01) # Here, o1 has called several elections, but o2 never granted its vote. @@ -413,20 +422,30 @@ def sync_noflush_novote(journalFile2Enabled): assert _RAFT_STATE.LEADER not in states[a[0]] assert states[a[1]] == [] # Never had a state change, i.e. it's still the default follower - doTicks(objs, 0.3) # 2.55 total + tick(o1, o2, start, 2.45, 0.01) - # We have now surpassed o2's timeout, but the last vote request from o1 was at 2.5, i.e. *before* o2's 1.1 * timeout (= 2.64) has expired. + # We have now surpassed o2's timeout, but the last vote request from o1 was at 2.0, i.e. *before* o2's 1.1 * timeout (= 2.64) has expired. # o2 is expected to have called for an election at 2.4, but o1 would never vote for o2 due to the missing log entry. - # Due to the bigger term in o2's vote request message, o1 should now be a follower. + # o1 converted to a follower due to o2's bigger term. assert o1._SyncObj__raftState == _RAFT_STATE.FOLLOWER assert o2._SyncObj__raftState == _RAFT_STATE.CANDIDATE assert _RAFT_STATE.LEADER not in states[a[0]] assert _RAFT_STATE.LEADER not in states[a[1]] - doTicks(objs, 0.6) # 3.15 total + tick(o1, o2, start, 2.55, 0.01) + + # While o1 converted to a follower at 2.4 due to o2's vote request, it still called for an election at 2.5 since the term change doesn't affect the election timeout. + # Therefore, o2 converted to a follower again at 2.5 due to the bigger term. However, o2 still didn't grant its vote to o1 since 1.1 * timeout (= 2.64) hasn't elapsed yet. + + assert o1._SyncObj__raftState == _RAFT_STATE.CANDIDATE + assert o2._SyncObj__raftState == _RAFT_STATE.FOLLOWER + assert _RAFT_STATE.LEADER not in states[a[0]] + assert _RAFT_STATE.LEADER not in states[a[1]] + + tick(o1, o2, start, 3.15, 0.01) - # o1 called for another election at 2.9 (it reset its timeout on o2's vote request at 2.4), i.e. after o2's vote block timeout, so it should now be elected. + # o1 called for another election at 3.0, i.e. after o2's vote block timeout, so it should now be elected. assert o1._SyncObj__raftState == _RAFT_STATE.LEADER assert o2._SyncObj__raftState == _RAFT_STATE.FOLLOWER assert _RAFT_STATE.LEADER not in states[a[1]]