@@ -90,6 +90,10 @@ class FeedLoadManager(private val context: Context) {
9090 else -> feedDatabaseManager.outdatedSubscriptionsForGroup(groupId, outdatedThreshold)
9191 }
9292
93+ // like `currentProgress`, but counts the number of extractions that have begun, so they
94+ // can be properly throttled every once in a while (see doOnNext below)
95+ val extractionCount = AtomicInteger ()
96+
9397 return outdatedSubscriptions
9498 .take(1 )
9599 .doOnNext {
@@ -105,6 +109,13 @@ class FeedLoadManager(private val context: Context) {
105109 .observeOn(Schedulers .io())
106110 .flatMap { Flowable .fromIterable(it) }
107111 .takeWhile { ! cancelSignal.get() }
112+ .doOnNext {
113+ // throttle extractions once every BATCH_SIZE to avoid being throttled
114+ val previousCount = extractionCount.getAndIncrement()
115+ if (previousCount != 0 && previousCount % BATCH_SIZE == 0 ) {
116+ Thread .sleep(DELAY_BETWEEN_BATCHES_MILLIS .random())
117+ }
118+ }
108119 .parallel(PARALLEL_EXTRACTIONS , PARALLEL_EXTRACTIONS * 2 )
109120 .runOn(Schedulers .io(), PARALLEL_EXTRACTIONS * 2 )
110121 .filter { ! cancelSignal.get() }
@@ -328,7 +339,19 @@ class FeedLoadManager(private val context: Context) {
328339 /* *
329340 * How many extractions will be running in parallel.
330341 */
331- private const val PARALLEL_EXTRACTIONS = 6
342+ private const val PARALLEL_EXTRACTIONS = 3
343+
344+ /* *
345+ * How many extractions to perform before waiting [DELAY_BETWEEN_BATCHES_MILLIS] to avoid
346+ * being rate limited
347+ */
348+ private const val BATCH_SIZE = 50
349+
350+ /* *
351+ * Wait a random delay in this range once every [BATCH_SIZE] extractions to avoid being
352+ * rate limited
353+ */
354+ private val DELAY_BETWEEN_BATCHES_MILLIS = (6000L .. 12000L )
332355
333356 /* *
334357 * Number of items to buffer to mass-insert in the database.
0 commit comments