11package org.schabi.newpipe.local.feed.notifications
22
33import android.content.Context
4+ import android.util.Log
45import androidx.core.app.NotificationCompat
56import androidx.work.BackoffPolicy
67import androidx.work.Constraints
@@ -12,7 +13,7 @@ import androidx.work.PeriodicWorkRequest
1213import androidx.work.WorkManager
1314import androidx.work.WorkerParameters
1415import androidx.work.rxjava3.RxWorker
15- import io.reactivex.rxjava3.core.Observable
16+ import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
1617import io.reactivex.rxjava3.core.Single
1718import org.schabi.newpipe.App
1819import org.schabi.newpipe.R
@@ -34,30 +35,39 @@ class NotificationWorker(
3435 }
3536 private val feedLoadManager = FeedLoadManager (appContext)
3637
37- override fun createWork (): Single <Result > = if (isEnabled (applicationContext)) {
38+ override fun createWork (): Single <Result > = if (areNotificationsEnabled (applicationContext)) {
3839 feedLoadManager.startLoading(
3940 ignoreOutdatedThreshold = true ,
4041 groupId = FeedLoadManager .GROUP_NOTIFICATION_ENABLED
4142 )
43+ .doOnSubscribe { showLoadingFeedForegroundNotification() }
4244 .map { feed ->
43- feed.mapNotNull { x ->
44- x.value?.takeIf {
45- it.newStreams.isNotEmpty()
45+ // filter out feedUpdateInfo items (i.e. channels) with nothing new
46+ feed.mapNotNull {
47+ it.value?.takeIf { feedUpdateInfo ->
48+ feedUpdateInfo.newStreams.isNotEmpty()
4649 }
4750 }
4851 }
49- .doOnSubscribe { setForegroundAsync(createForegroundInfo()) }
50- .flatMapObservable { Observable .fromIterable(it) }
51- .flatMapCompletable { x -> notificationHelper.displayNewStreamsNotification(x) }
52- .toSingleDefault(Result .success())
52+ .observeOn(AndroidSchedulers .mainThread()) // Picasso requires calls from main thread
53+ .map { feedUpdateInfoList ->
54+ // display notifications for each feedUpdateInfo (i.e. channel)
55+ feedUpdateInfoList.forEach { feedUpdateInfo ->
56+ notificationHelper.displayNewStreamsNotification(feedUpdateInfo)
57+ }
58+ return @map Result .success()
59+ }
60+ .doOnError { throwable ->
61+ Log .e(TAG , " Error while displaying streams notifications" , throwable)
62+ // TODO show error notification
63+ }
5364 .onErrorReturnItem(Result .failure())
5465 } else {
55- // Can be the case when the user disables notifications for NewPipe
56- // in the device's app settings.
66+ // the user can disable streams notifications in the device's app settings
5767 Single .just(Result .success())
5868 }
5969
60- private fun createForegroundInfo (): ForegroundInfo {
70+ private fun showLoadingFeedForegroundNotification () {
6171 val notification = NotificationCompat .Builder (
6272 applicationContext,
6373 applicationContext.getString(R .string.notification_channel_id)
@@ -68,14 +78,15 @@ class NotificationWorker(
6878 .setPriority(NotificationCompat .PRIORITY_LOW )
6979 .setContentTitle(applicationContext.getString(R .string.feed_notification_loading))
7080 .build()
71- return ForegroundInfo (FeedLoadService .NOTIFICATION_ID , notification)
81+ setForegroundAsync( ForegroundInfo (FeedLoadService .NOTIFICATION_ID , notification) )
7282 }
7383
7484 companion object {
7585
76- private const val TAG = App .PACKAGE_NAME + " _streams_notifications"
86+ private val TAG = NotificationWorker ::class .java.simpleName
87+ private const val WORK_TAG = App .PACKAGE_NAME + " _streams_notifications"
7788
78- private fun isEnabled (context : Context ) =
89+ private fun areNotificationsEnabled (context : Context ) =
7990 NotificationHelper .areNewStreamsNotificationsEnabled(context) &&
8091 NotificationHelper .areNotificationsEnabledOnDevice(context)
8192
@@ -86,7 +97,7 @@ class NotificationWorker(
8697 */
8798 @JvmStatic
8899 fun initialize (context : Context ) {
89- if (isEnabled (context)) {
100+ if (areNotificationsEnabled (context)) {
90101 schedule(context)
91102 } else {
92103 cancel(context)
@@ -114,13 +125,13 @@ class NotificationWorker(
114125 options.interval,
115126 TimeUnit .MILLISECONDS
116127 ).setConstraints(constraints)
117- .addTag(TAG )
128+ .addTag(WORK_TAG )
118129 .setBackoffCriteria(BackoffPolicy .LINEAR , 30 , TimeUnit .MINUTES )
119130 .build()
120131
121132 WorkManager .getInstance(context)
122133 .enqueueUniquePeriodicWork(
123- TAG ,
134+ WORK_TAG ,
124135 if (force) {
125136 ExistingPeriodicWorkPolicy .REPLACE
126137 } else {
@@ -139,7 +150,7 @@ class NotificationWorker(
139150 @JvmStatic
140151 fun runNow (context : Context ) {
141152 val request = OneTimeWorkRequestBuilder <NotificationWorker >()
142- .addTag(TAG )
153+ .addTag(WORK_TAG )
143154 .build()
144155 WorkManager .getInstance(context).enqueue(request)
145156 }
@@ -149,7 +160,7 @@ class NotificationWorker(
149160 */
150161 @JvmStatic
151162 fun cancel (context : Context ) {
152- WorkManager .getInstance(context).cancelAllWorkByTag(TAG )
163+ WorkManager .getInstance(context).cancelAllWorkByTag(WORK_TAG )
153164 }
154165 }
155166}
0 commit comments