@@ -12,3 +12,73 @@ def put(self, message):
1212 def outgoing (self ):
1313 while self .queue :
1414 yield self .queue .pop ()
15+
16+
17+ class TimeBucket (object ):
18+ """Store the last N seconds of time-stamped data within
19+ interval-keyed buckets.
20+
21+ The idea is we can store and retrieve records that were
22+ within the last N seconds only, or in the previous
23+ 2N-N seconds, or 3N-2N seconds, including that we can efficiently
24+ clean up old ranges in O(1) time.
25+
26+ E.g. assume four buckets and interval of 100::
27+
28+ bucket[0] -> timestamp 50000-50100
29+ bucket[1] -> timestamp 50101-50200
30+ bucket[2] -> timestamp 50201-50300
31+ bucket[3] -> timestamp 50301-50400
32+
33+ 100 seconds later::
34+
35+ bucket[0] -> timestamp 50401-50500
36+ bucket[1] -> timestamp 50101-50200
37+ bucket[2] -> timestamp 50201-50300
38+ bucket[3] -> timestamp 50301-50400
39+
40+ 100 seconds later::
41+
42+ bucket[0] -> timestamp 50401-50500
43+ bucket[1] -> timestamp 50501-50600
44+ bucket[2] -> timestamp 50201-50300
45+ bucket[3] -> timestamp 50301-50400
46+
47+ etc.
48+
49+ The object assumes if a new timestamp is coming in that is newer
50+ than the current bucket, we go to the next bucket. If the next bucket
51+ has data from the old range it had B buckets ago, we empty it out first.
52+
53+ """
54+ __slots__ = 'num_buckets' , 'buckets' , 'interval'
55+
56+ def __init__ (self , num_buckets , interval ):
57+ self .num_buckets = num_buckets
58+ self .buckets = [
59+ {"slot" : None , "data" : {}} for i in range (num_buckets )
60+ ]
61+ self .interval = interval
62+
63+ def _get_bucket (self , timestamp ):
64+ slot = (timestamp // self .interval )
65+ bucket_num = slot % self .num_buckets
66+ bucket = self .buckets [bucket_num ]
67+ bucket_slot = bucket ["slot" ]
68+ if bucket_slot is None :
69+ bucket ["slot" ] = slot
70+ elif bucket_slot < slot :
71+ bucket ["data" ].clear ()
72+ bucket ["slot" ] = slot
73+ elif bucket_slot > slot :
74+ raise KeyError ()
75+ return bucket
76+
77+ def put (self , timestamp , key , data ):
78+ self ._get_bucket (timestamp )['data' ][key ] = data
79+
80+ def get (self , current_time , key ):
81+ return self ._get_bucket (current_time )['data' ].get (key )
82+
83+ def get_data (self , current_time ):
84+ return self ._get_bucket (current_time )['data' ]
0 commit comments