Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Maybe
import java.time.OffsetDateTime
import org.schabi.newpipe.database.BasicDAO
import org.schabi.newpipe.database.stream.StreamWithState
import org.schabi.newpipe.database.stream.model.StreamEntity
import org.schabi.newpipe.database.stream.model.StreamEntity.Companion.STREAM_ID
import org.schabi.newpipe.extractor.stream.StreamType
Expand All @@ -30,6 +31,17 @@ abstract class StreamDAO : BasicDAO<StreamEntity> {
@Query("SELECT * FROM streams WHERE url = :url AND service_id = :serviceId")
abstract fun getStream(serviceId: Long, url: String): Maybe<StreamEntity>

@Query(
"""
SELECT s.*, sst.progress_time
FROM streams s
LEFT JOIN stream_state sst ON s.uid = sst.stream_id
WHERE s.url = :url AND s.service_id = :serviceId
LIMIT 1
"""
)
abstract fun getStreamWithState(serviceId: Int, url: String): Maybe<StreamWithState>

@Query("UPDATE streams SET uploader_url = :uploaderUrl WHERE url = :url AND service_id = :serviceId")
abstract fun setUploaderUrl(serviceId: Long, url: String, uploaderUrl: String): Completable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import androidx.core.net.toUri
import androidx.core.os.postDelayed
import androidx.core.view.isGone
import androidx.core.view.isVisible
import androidx.lifecycle.ViewModelProvider
import androidx.preference.PreferenceManager
import coil3.util.CoilUtils
import com.evernote.android.state.State
Expand Down Expand Up @@ -90,6 +91,7 @@ import org.schabi.newpipe.ktx.AnimationType
import org.schabi.newpipe.ktx.animate
import org.schabi.newpipe.ktx.animateRotation
import org.schabi.newpipe.local.dialog.PlaylistDialog
import org.schabi.newpipe.local.feed.StreamUpdateViewModel
import org.schabi.newpipe.local.history.HistoryRecordManager
import org.schabi.newpipe.local.playlist.LocalPlaylistFragment
import org.schabi.newpipe.player.Player
Expand Down Expand Up @@ -203,6 +205,7 @@ class VideoDetailFragment :
private var currentWorker: Disposable? = null
private val disposables = CompositeDisposable()
private var positionSubscriber: Disposable? = null
private var streamUpdateViewModel: StreamUpdateViewModel? = null

/*//////////////////////////////////////////////////////////////////////////
// Service management
Expand Down Expand Up @@ -581,6 +584,8 @@ class VideoDetailFragment :
override fun initViews(rootView: View?, savedInstanceState: Bundle?) {
super.initViews(rootView, savedInstanceState)

streamUpdateViewModel = ViewModelProvider(requireActivity())[StreamUpdateViewModel::class]

pageAdapter = TabAdapter(getChildFragmentManager())
binding.viewPager.setAdapter(pageAdapter)
binding.tabLayout.setupWithViewPager(binding.viewPager)
Expand Down Expand Up @@ -1531,6 +1536,9 @@ class VideoDetailFragment :
binding.detailThumbnailPlayButton.setImageResource(
if (hasVideoStreams) R.drawable.ic_play_arrow_shadow else R.drawable.ic_headset_shadow
)

// Notify FeedFragment that this stream's data (including view count) has been updated
streamUpdateViewModel?.notifyStreamInfoUpdated(info.serviceId, info.url)
}

private fun displayUploaderAsSubChannel(info: StreamInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class FeedDatabaseManager(context: Context) {
)
}

fun getStreamWithState(serviceId: Int, url: String): Maybe<StreamWithState> = streamTable.getStreamWithState(serviceId, url)

fun outdatedSubscriptions(outdatedThreshold: OffsetDateTime) = feedTable.getAllOutdated(outdatedThreshold)

fun outdatedSubscriptionsWithNotificationMode(
Expand Down
41 changes: 41 additions & 0 deletions app/src/main/java/org/schabi/newpipe/local/feed/FeedFragment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class FeedFragment : BaseStateFragment<FeedState>() {

private var lastNewItemsCount = 0

private lateinit var streamUpdateViewModel: StreamUpdateViewModel

init {
setHasOptionsMenu(true)
}
Expand Down Expand Up @@ -143,6 +145,20 @@ class FeedFragment : BaseStateFragment<FeedState>() {
viewModel = ViewModelProvider(this, factory)[FeedViewModel::class.java]
viewModel.stateLiveData.observe(viewLifecycleOwner) { it?.let(::handleResult) }

// Activity-scoped ViewModel shared with VideoDetailFragment
streamUpdateViewModel = ViewModelProvider(requireActivity())[StreamUpdateViewModel::class.java]

streamUpdateViewModel.updatedStream.observe(viewLifecycleOwner) { (serviceId, url) ->
refreshFeedItem(serviceId, url)
}

disposables.add(
StreamUpdateViewModel.globalProgressBus
.onBackpressureLatest()
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ (serviceId, url) -> refreshFeedItem(serviceId, url) }, { })
)

groupAdapter = GroupieAdapter().apply {
setOnItemClickListener(listenerStreamItem)
setOnItemLongClickListener(listenerStreamItem)
Expand Down Expand Up @@ -184,6 +200,31 @@ class FeedFragment : BaseStateFragment<FeedState>() {
}
}

/**
* Re-queries the DB for a single stream identified by [serviceId] + [url] and updates
* only that item in the adapter (view count + watch progress), without triggering a full
* list reload.
*/
private fun refreshFeedItem(serviceId: Int, url: String) {
disposables.add(
viewModel.refreshStreamWithState(serviceId, url)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ updatedStreamWithState ->
for (i in 0 until groupAdapter.itemCount) {
val item = groupAdapter.getItem(i)
if (item is StreamItem &&
item.streamWithState.stream.url == url &&
item.streamWithState.stream.serviceId == serviceId
) {
item.streamWithState = updatedStreamWithState
groupAdapter.notifyItemChanged(i, StreamItem.UPDATE_STREAM_DATA)
break
}
}
}, { /* ignore — feed refreshes on next full load */ })
)
}

private fun setupListViewMode() {
// does everything needed to setup the layouts for grid or list modes
groupAdapter.spanCount = if (shouldUseGridLayout(context)) getGridSpanCountStreams(context) else 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import androidx.lifecycle.viewmodel.viewModelFactory
import androidx.preference.PreferenceManager
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Maybe
import io.reactivex.rxjava3.functions.Function6
import io.reactivex.rxjava3.processors.BehaviorProcessor
import io.reactivex.rxjava3.schedulers.Schedulers
Expand Down Expand Up @@ -153,6 +154,14 @@ class FeedViewModel(

fun getShowFutureItemsFromPreferences() = getShowFutureItemsFromPreferences(application)

/**
* Returns a fresh [StreamWithState] for a single stream identified by [serviceId] and [url],
* reading the latest view count and watch progress directly from the database.
* Executes on the IO scheduler.
*/
fun refreshStreamWithState(serviceId: Int, url: String): Maybe<StreamWithState> = feedDatabaseManager.getStreamWithState(serviceId, url)
.subscribeOn(Schedulers.io())

companion object {
private fun getShowPlayedItemsFromPreferences(context: Context) = PreferenceManager.getDefaultSharedPreferences(context)
.getBoolean(context.getString(R.string.feed_show_watched_items_key), true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.schabi.newpipe.local.feed

import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import androidx.lifecycle.ViewModel
import io.reactivex.rxjava3.processors.PublishProcessor

/**
* Activity-scoped ViewModel used as a message bus between
* [VideoDetailFragment][org.schabi.newpipe.fragments.detail.VideoDetailFragment]
* and [FeedFragment].
*
* Two trigger points post here:
* 1. `VideoDetailFragment.handleResult` — stream info (view count) written to DB.
* 2. `HistoryRecordManager.saveStreamState` — watch progress written to DB via [globalProgressBus].
*
* [FeedFragment] observes [updatedStream] and re-queries only the affected item from the DB.
*/
class StreamUpdateViewModel : ViewModel() {

private val _updatedStream = MutableLiveData<Pair<Int, String>>()

/** Emits (serviceId, url) whenever a stream's DB record (view count or progress) is updated. */
val updatedStream: LiveData<Pair<Int, String>> = _updatedStream

/** Called by VideoDetailFragment after the stream info (including view count) is stored. */
fun notifyStreamInfoUpdated(serviceId: Int, url: String) {
_updatedStream.postValue(Pair(serviceId, url))
}

companion object {
/**
* Process-wide bus used by [org.schabi.newpipe.local.history.HistoryRecordManager] (which
* has no Activity context) to publish progress-save events.
* [FeedFragment] subscribes to this bus directly in [FeedFragment.onViewCreated].
*/
@JvmStatic
val globalProgressBus: PublishProcessor<Pair<Int, String>> = PublishProcessor.create()

/**
* Called by [org.schabi.newpipe.local.history.HistoryRecordManager] every time it saves playback progress.
* Safe to call from any thread.
*/
@JvmStatic
fun postProgressUpdate(serviceId: Int, url: String) {
globalProgressBus.onNext(Pair(serviceId, url))
}
}
}
32 changes: 28 additions & 4 deletions app/src/main/java/org/schabi/newpipe/local/feed/item/StreamItem.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ import org.schabi.newpipe.util.StreamTypeUtil
import org.schabi.newpipe.util.image.CoilHelper

data class StreamItem(
val streamWithState: StreamWithState,
var streamWithState: StreamWithState,
var itemVersion: ItemVersion = ItemVersion.NORMAL
) : BindableItem<ListStreamItemBinding>() {
companion object {
const val UPDATE_RELATIVE_TIME = 1
const val UPDATE_STREAM_DATA = 2
}

private val stream: StreamEntity = streamWithState.stream
private val stateProgressTime: Long? = streamWithState.stateProgressMillis
private val stream: StreamEntity get() = streamWithState.stream
private val stateProgressTime: Long? get() = streamWithState.stateProgressMillis

/**
* Will be executed at the end of the [StreamItem.bind] (with (ListStreamItemBinding,Int)).
Expand Down Expand Up @@ -62,6 +63,27 @@ data class StreamItem(
return
}

if (payloads.contains(UPDATE_STREAM_DATA)) {
// Rebind only the fields that may have changed: view count and watch progress
if (itemVersion != ItemVersion.MINI) {
viewBinding.itemAdditionalDetails.text =
getStreamInfoDetailLine(viewBinding.itemAdditionalDetails.context)
}
if (stream.duration > 0) {
val progress = stateProgressTime
if (progress != null) {
viewBinding.itemProgressView.visibility = View.VISIBLE
viewBinding.itemProgressView.max = stream.duration.toInt()
viewBinding.itemProgressView.progress =
TimeUnit.MILLISECONDS.toSeconds(progress).toInt()
} else {
viewBinding.itemProgressView.visibility = View.GONE
}
}
execBindEnd?.accept(viewBinding)
return
}

super.bind(viewBinding, position, payloads)
}

Expand All @@ -82,7 +104,9 @@ data class StreamItem(
if (stateProgressTime != null) {
viewBinding.itemProgressView.visibility = View.VISIBLE
viewBinding.itemProgressView.max = stream.duration.toInt()
viewBinding.itemProgressView.progress = TimeUnit.MILLISECONDS.toSeconds(stateProgressTime).toInt()
viewBinding.itemProgressView.progress = TimeUnit.MILLISECONDS.toSeconds(
stateProgressTime!!
).toInt()
} else {
viewBinding.itemProgressView.visibility = View.GONE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.schabi.newpipe.extractor.stream.StreamInfo;
import org.schabi.newpipe.extractor.stream.StreamInfoItem;
import org.schabi.newpipe.local.feed.FeedViewModel;
import org.schabi.newpipe.local.feed.StreamUpdateViewModel;
import org.schabi.newpipe.player.playqueue.PlayQueueItem;

import java.time.OffsetDateTime;
Expand Down Expand Up @@ -243,12 +244,15 @@ public Maybe<StreamStateEntity> loadStreamState(final StreamInfo info) {

public Completable saveStreamState(@NonNull final StreamInfo info, final long progressMillis) {
return Completable.fromAction(() -> database.runInTransaction(() -> {
final long streamId = streamTable.upsert(new StreamEntity(info));
final var state = new StreamStateEntity(streamId, progressMillis);
if (state.isValid(info.getDuration())) {
streamStateTable.upsert(state);
}
})).subscribeOn(Schedulers.io());
final long streamId = streamTable.upsert(new StreamEntity(info));
final var state = new StreamStateEntity(streamId, progressMillis);
if (state.isValid(info.getDuration())) {
streamStateTable.upsert(state);
}
})).subscribeOn(Schedulers.io())
.doOnComplete(() -> StreamUpdateViewModel.postProgressUpdate(
info.getServiceId(), info.getUrl()
));
}

public Maybe<StreamStateEntity> loadStreamState(final InfoItem info) {
Expand Down
Loading