Skip to content

Commit 2d8f8d3

Browse files
committed
submit error after second strike
Signed-off-by: Hagar Meir <[email protected]>
1 parent 2d7f475 commit 2d8f8d3

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

node/batcher/batcher_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,10 @@ func TestBatcherComplainAndReqFwd(t *testing.T) {
267267
}
268268

269269
// submit another request only to a secondary
270-
batchers[2].Submit(context.Background(), tx.CreateStructuredRequest([]byte{3}))
270+
require.Eventually(t, func() bool {
271+
resp, err := batchers[2].Submit(context.Background(), tx.CreateStructuredRequest([]byte{3}))
272+
return err == nil && resp.Error == ""
273+
}, 30*time.Second, 10*time.Millisecond)
271274

272275
// after a timeout the request is forwarded
273276
require.Eventually(t, func() bool {

request/pending.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type PendingStore struct {
3232
lastTick atomic.Value
3333
lastEpochChange time.Time
3434
lastProcessedGC time.Time
35+
lastSecondStrikeTime atomic.Value
3536
reqID2Bucket *sync.Map
3637
currentBucket atomic.Value
3738
buckets []*bucket
@@ -47,6 +48,7 @@ func (ps *PendingStore) Init() {
4748
ps.reqID2Bucket = new(sync.Map)
4849
ps.currentBucket.Store(newBucket(ps.reqID2Bucket, 1))
4950
ps.lastTick.Store(ps.StartTime)
51+
ps.lastSecondStrikeTime.Store(time.Time{})
5052
ps.closeChan = make(chan struct{})
5153
ps.closeOnce = sync.Once{}
5254
ps.resetChan = make(chan struct{})
@@ -220,19 +222,31 @@ func (ps *PendingStore) checkSecondStrike(now time.Time) bool {
220222
continue
221223
}
222224

223-
if now.Sub(bucket.getFirstStrikeTimestamp()) <= ps.SecondStrikeThreshold {
225+
secondStrike := ps.calcSecondStrike()
226+
227+
if now.Sub(bucket.getFirstStrikeTimestamp()) <= secondStrike {
224228
continue
225229
}
226230

227-
bucket.resetTimestamp(ps.now())
231+
now := ps.now()
232+
bucket.resetTimestamp(now)
228233
detectedCensorship = true
229-
ps.Logger.Infof("Second strike occurred for bucket id %d of size %d", bucket.id, bucket.getSize())
234+
ps.lastSecondStrikeTime.Store(now)
235+
ps.Logger.Infof("Second strike occurred for bucket id %d of size %d (threshold is currently %s)", bucket.id, bucket.getSize(), secondStrike.String())
230236
break
231237
}
232238

233239
return detectedCensorship
234240
}
235241

242+
func (ps *PendingStore) calcSecondStrike() time.Duration {
243+
// now := ps.now()
244+
// if now.Sub(ps.StartTime) <= 20 * ps.SecondStrikeThreshold {
245+
// return 5 * ps.SecondStrikeThreshold
246+
// }
247+
return ps.SecondStrikeThreshold
248+
}
249+
236250
func (ps *PendingStore) rotateBuckets(now time.Time) {
237251
currentBucket := ps.currentBucket.Load().(*bucket)
238252

@@ -315,6 +329,10 @@ func (ps *PendingStore) Submit(request []byte) error {
315329

316330
reqID := ps.Inspector.RequestID(request)
317331

332+
if ps.now().Sub(ps.lastSecondStrike()) <= 2*ps.SecondStrikeThreshold {
333+
return errors.Errorf("there was a second strike not long ago")
334+
}
335+
318336
// Insertion may fail if we have a concurrent sealing of the bucket.
319337
// In such a case, wait for a new un-sealed bucket to replace the current bucket.
320338
for {
@@ -330,6 +348,10 @@ func (ps *PendingStore) now() time.Time {
330348
return ps.lastTick.Load().(time.Time)
331349
}
332350

351+
func (ps *PendingStore) lastSecondStrike() time.Time {
352+
return ps.lastSecondStrikeTime.Load().(time.Time)
353+
}
354+
333355
// GetAllRequests returns all stored requests in the same order of their arrival, the oldest one will be the first
334356
func (ps *PendingStore) GetAllRequests(max uint64) [][]byte {
335357
if !ps.isStopped() {

0 commit comments

Comments
 (0)