From 1098a78d0dab4128e7160ca2f9ebbe6e539a8153 Mon Sep 17 00:00:00 2001 From: tarang-jain Date: Mon, 22 Jun 2026 05:46:58 -0700 Subject: [PATCH 1/4] checks --- cpp/src/cluster/detail/kmeans_common.cuh | 115 ++++++++++ .../detail/minClusterDistanceCompute.cu | 216 ++++++++++++++++++ cpp/tests/cluster/kmeans.cu | 2 +- 3 files changed, 332 insertions(+), 1 deletion(-) diff --git a/cpp/src/cluster/detail/kmeans_common.cuh b/cpp/src/cluster/detail/kmeans_common.cuh index ba98dadca6..43de067b5c 100644 --- a/cpp/src/cluster/detail/kmeans_common.cuh +++ b/cpp/src/cluster/detail/kmeans_common.cuh @@ -43,17 +43,23 @@ #include #include #include +#include #include #include #include #include +#include #include #include +#include #include #include #include +#include +#include +#include namespace cuvs::cluster::kmeans::detail { @@ -528,9 +534,76 @@ void compute_centroid_adjustments( { cudaStream_t stream = raft::resource::get_cuda_stream(handle); auto n_samples = X.extent(0); + auto n_features = X.extent(1); workspace.resize(n_samples, stream); + // ----- Nondeterminism diagnostic: fingerprint INPUTS (X, labels, weights) ----- + // Compare two test runs by diffing the resulting log. If INPUT hashes match + // across runs but OUTPUT hashes do not, the divergence is the non-associative + // atomic accumulation inside raft::linalg::reduce_rows_by_key. + static std::atomic s_call_id{0}; + const int call_id = s_call_id.fetch_add(1); + { + // Materialize the (possibly lazy) labels iterator into a device buffer so + // we can read it on host without disturbing the original sequence. + rmm::device_uvector mat_labels(static_cast(n_samples), stream); + thrust::copy_n(raft::resource::get_thrust_policy(handle), + cluster_labels, + n_samples, + mat_labels.data()); + + const std::size_t n_X_elems = + static_cast(n_samples) * static_cast(n_features); + std::vector h_labels(static_cast(n_samples)); + std::vector h_X(n_X_elems); + std::vector h_w(static_cast(n_samples)); + raft::copy(h_labels.data(), mat_labels.data(), n_samples, stream); + raft::copy(h_X.data(), X.data_handle(), n_X_elems, stream); + raft::copy(h_w.data(), sample_weights.data_handle(), n_samples, stream); + raft::resource::sync_stream(handle, stream); + + auto fnv = [](auto const& buf) { + std::uint64_t h = 1469598103934665603ULL; + for (auto v : buf) { + std::uint64_t bits = 0; + if constexpr (std::is_floating_point_v>) { + std::memcpy(&bits, &v, sizeof(v)); + } else { + bits = static_cast(v); + } + h ^= bits; + h *= 1099511628211ULL; + } + return h; + }; + + auto fmt_head = [&](auto const& buf) { + std::string s; + std::size_t hd = std::min(8, buf.size()); + for (std::size_t i = 0; i < hd; ++i) { + if (i) s += ','; + s += std::to_string(static_cast(buf[i])); + } + return s; + }; + + RAFT_LOG_INFO( + "compute_centroid_adjustments[call=%d]: INPUTS n_samples=%lld n_features=%lld " + "n_clusters=%lld reset_sums=%d X_hash=0x%016llx labels_hash=0x%016llx " + "weights_hash=0x%016llx labels_head=[%s]", + call_id, + static_cast(n_samples), + static_cast(n_features), + static_cast(n_clusters), + static_cast(reset_sums), + static_cast(fnv(h_X)), + static_cast(fnv(h_labels)), + static_cast(fnv(h_w)), + fmt_head(h_labels).c_str()); + } + // ----- end diagnostic for INPUTS ----- + raft::linalg::reduce_rows_by_key(X.data_handle(), X.extent(1), cluster_labels, @@ -551,6 +624,48 @@ void compute_centroid_adjustments( n_clusters, stream, reset_sums); + + // ----- Nondeterminism diagnostic: fingerprint OUTPUTS (centroid_sums, wpc) ----- + { + const std::size_t n_sums_elems = + static_cast(n_clusters) * static_cast(n_features); + std::vector h_sums(n_sums_elems); + std::vector h_wpc(static_cast(n_clusters)); + raft::copy(h_sums.data(), centroid_sums.data_handle(), n_sums_elems, stream); + raft::copy(h_wpc.data(), weight_per_cluster.data_handle(), n_clusters, stream); + raft::resource::sync_stream(handle, stream); + + auto fnv_fp = [](std::vector const& buf) { + std::uint64_t h = 1469598103934665603ULL; + for (auto v : buf) { + std::uint64_t bits = 0; + std::memcpy(&bits, &v, sizeof(v)); + h ^= bits; + h *= 1099511628211ULL; + } + return h; + }; + + auto fmt_head_fp = [](std::vector const& buf) { + std::string s; + std::size_t hd = std::min(8, buf.size()); + for (std::size_t i = 0; i < hd; ++i) { + if (i) s += ','; + s += std::to_string(static_cast(buf[i])); + } + return s; + }; + + RAFT_LOG_INFO( + "compute_centroid_adjustments[call=%d]: OUTPUTS sums_hash=0x%016llx wpc_hash=0x%016llx " + "sums_head=[%s] wpc_head=[%s]", + call_id, + static_cast(fnv_fp(h_sums)), + static_cast(fnv_fp(h_wpc)), + fmt_head_fp(h_sums).c_str(), + fmt_head_fp(h_wpc).c_str()); + } + // ----- end diagnostic for OUTPUTS ----- } /** * @brief Finalize centroids by dividing accumulated sums by counts. diff --git a/cpp/src/cluster/detail/minClusterDistanceCompute.cu b/cpp/src/cluster/detail/minClusterDistanceCompute.cu index b15119599e..ef3bacd090 100644 --- a/cpp/src/cluster/detail/minClusterDistanceCompute.cu +++ b/cpp/src/cluster/detail/minClusterDistanceCompute.cu @@ -11,6 +11,119 @@ namespace cuvs::cluster::kmeans::detail { +namespace { + +// Determinism self-test for the fused L2 nearest-neighbor reduction used by +// minClusterAndDistanceCompute for L2Expanded / L2SqrtExpanded / CosineExpanded +// metrics. Runs `fusedDistanceNNMinReduce` K times on bit-identical hardcoded +// inputs and reports whether every output (both the argmin keys and the +// distance values) is bitwise identical across the K runs. +// +// This is intentionally placed alongside the function under test so the same +// code path is exercised. Triggered once per instantiation +// from the first call to minClusterAndDistanceCompute via a static atomic. +template +void runFusedNNDeterminismSelfTest(raft::resources const& handle) +{ + constexpr IndexT n_samples = 64; + constexpr IndexT n_features = 4; + constexpr IndexT n_clusters = 4; + constexpr int K = 4; + + cudaStream_t stream = raft::resource::get_cuda_stream(handle); + + auto X = raft::make_device_matrix(handle, n_samples, n_features); + auto centroids = raft::make_device_matrix(handle, n_clusters, n_features); + + raft::random::RngState rng_x{12345ULL}; + raft::random::RngState rng_c{67890ULL}; + raft::random::uniform(handle, + rng_x, + X.data_handle(), + static_cast(n_samples * n_features), + DataT{-1}, + DataT{1}); + raft::random::uniform(handle, + rng_c, + centroids.data_handle(), + static_cast(n_clusters * n_features), + DataT{-1}, + DataT{1}); + + auto L2NormX = raft::make_device_vector(handle, n_samples); + auto centroidsNorm = raft::make_device_vector(handle, n_clusters); + raft::linalg::norm( + handle, + raft::make_device_matrix_view(X.data_handle(), n_samples, n_features), + L2NormX.view()); + raft::linalg::norm( + handle, + raft::make_device_matrix_view( + centroids.data_handle(), n_clusters, n_features), + centroidsNorm.view()); + + rmm::device_uvector workspace(sizeof(int) * static_cast(n_samples), stream); + + std::vector>> host_outputs(K); + for (int k = 0; k < K; ++k) { + auto out = + raft::make_device_vector, IndexT>(handle, n_samples); + raft::KeyValuePair initial_value(0, std::numeric_limits::max()); + raft::matrix::fill(handle, out.view(), initial_value); + + cuvs::distance::fusedDistanceNNMinReduce, IndexT>( + out.data_handle(), + X.data_handle(), + centroids.data_handle(), + L2NormX.data_handle(), + centroidsNorm.data_handle(), + n_samples, + n_clusters, + n_features, + static_cast(workspace.data()), + /*sqrt=*/false, + /*initOutBuffer=*/false, + /*isRowMajor=*/true, + cuvs::distance::DistanceType::L2Expanded, + 0.0f, + stream); + + host_outputs[k].resize(static_cast(n_samples)); + raft::copy(host_outputs[k].data(), out.data_handle(), n_samples, stream); + raft::resource::sync_stream(handle, stream); + } + + // Compare every (k > 0) run against run 0 element-by-element. Both the + // argmin key (the would-be cluster label) and the distance value are + // checked with bitwise equality. + int keys_diffs = 0; + int values_diffs = 0; + for (int k = 1; k < K; ++k) { + for (IndexT i = 0; i < n_samples; ++i) { + if (host_outputs[k][i].key != host_outputs[0][i].key) { ++keys_diffs; } + if (std::memcmp(&host_outputs[k][i].value, &host_outputs[0][i].value, sizeof(DataT)) != 0) { + ++values_diffs; + } + } + } + + RAFT_LOG_INFO( + "fusedDistanceNNMinReduce determinism self-test (%d runs, %lld samples, %lld clusters, " + "%lld features): keys=%s (%d diffs of %d compared), values=%s (%d bit-diffs of %d compared)", + K, + static_cast(n_samples), + static_cast(n_clusters), + static_cast(n_features), + keys_diffs == 0 ? "ALL MATCH" : "MISMATCH", + keys_diffs, + static_cast((K - 1) * n_samples), + values_diffs == 0 ? "ALL MATCH" : "MISMATCH", + values_diffs, + static_cast((K - 1) * n_samples)); +} + +} // namespace + // Calculates a pair for every sample in input 'X' where key is an // index to an sample in 'centroids' (index of the nearest centroid) and 'value' // is the distance between the sample and the 'centroids[key]'. @@ -35,6 +148,66 @@ void minClusterAndDistanceCompute( metric == cuvs::distance::DistanceType::L2SqrtExpanded || metric == cuvs::distance::DistanceType::CosineExpanded; + // ----- One-shot determinism self-test for the fused L2 NN kernel ----- + // Runs once per instantiation on the very first call. + // Feeds the fused kernel bit-identical hardcoded inputs K times and checks + // whether outputs are bitwise identical across the K runs. + { + static std::atomic s_self_test_done{false}; + if (!s_self_test_done.exchange(true)) { + runFusedNNDeterminismSelfTest(handle); + } + } + + // ----- Nondeterminism diagnostic: fingerprint INPUTS (X, centroids, L2NormX) ----- + // Counterpart to the diagnostic in compute_centroid_adjustments. If calls + // with identical INPUT hashes here produce identical OUTPUT + // (keys_hash, values_hash), then label generation is deterministic and + // any downstream drift is from reduce_rows_by_key atomics, not from this + // kernel. + static std::atomic s_mcad_call_id{0}; + const int call_id = s_mcad_call_id.fetch_add(1); + { + const std::size_t n_X_elems = + static_cast(n_samples) * static_cast(n_features); + const std::size_t n_centroids_elems = + static_cast(n_clusters) * static_cast(n_features); + + std::vector h_X(n_X_elems); + std::vector h_c(n_centroids_elems); + std::vector h_norm(static_cast(n_samples)); + raft::copy(h_X.data(), X.data_handle(), n_X_elems, stream); + raft::copy(h_c.data(), centroids.data_handle(), n_centroids_elems, stream); + raft::copy(h_norm.data(), L2NormX.data_handle(), n_samples, stream); + raft::resource::sync_stream(handle, stream); + + auto fnv_fp = [](std::vector const& buf) { + std::uint64_t h = 1469598103934665603ULL; + for (auto v : buf) { + std::uint64_t bits = 0; + std::memcpy(&bits, &v, sizeof(v)); + h ^= bits; + h *= 1099511628211ULL; + } + return h; + }; + + RAFT_LOG_INFO( + "minClusterAndDistanceCompute[call=%d]: INPUTS n_samples=%lld n_features=%lld " + "n_clusters=%lld metric=%d is_fused=%d X_hash=0x%016llx centroids_hash=0x%016llx " + "L2NormX_hash=0x%016llx", + call_id, + static_cast(n_samples), + static_cast(n_features), + static_cast(n_clusters), + static_cast(metric), + static_cast(is_fused), + static_cast(fnv_fp(h_X)), + static_cast(fnv_fp(h_c)), + static_cast(fnv_fp(h_norm))); + } + // ----- end diagnostic for INPUTS ----- + if (is_fused) { L2NormBuf_OR_DistBuf.resize(n_clusters, stream); auto centroidsNorm = @@ -168,6 +341,49 @@ void minClusterAndDistanceCompute( } } } + + // ----- Nondeterminism diagnostic: fingerprint OUTPUTS (keys, values) ----- + // Split the KeyValuePair vector into keys (cluster indices = future labels) + // and values (distances). The keys are the inputs to + // compute_centroid_adjustments downstream. + { + std::vector> h_kvp(static_cast(n_samples)); + raft::copy(h_kvp.data(), minClusterAndDistance.data_handle(), n_samples, stream); + raft::resource::sync_stream(handle, stream); + + std::uint64_t keys_hash = 1469598103934665603ULL; + std::uint64_t values_hash = 1469598103934665603ULL; + for (auto const& kv : h_kvp) { + keys_hash ^= static_cast(kv.key); + keys_hash *= 1099511628211ULL; + std::uint64_t vbits = 0; + std::memcpy(&vbits, &kv.value, sizeof(kv.value)); + values_hash ^= vbits; + values_hash *= 1099511628211ULL; + } + + std::string keys_head; + std::string values_head; + const std::size_t hd = std::min(8, h_kvp.size()); + for (std::size_t i = 0; i < hd; ++i) { + if (i) { + keys_head += ','; + values_head += ','; + } + keys_head += std::to_string(static_cast(h_kvp[i].key)); + values_head += std::to_string(static_cast(h_kvp[i].value)); + } + + RAFT_LOG_INFO( + "minClusterAndDistanceCompute[call=%d]: OUTPUTS keys_hash=0x%016llx values_hash=0x%016llx " + "keys_head=[%s] values_head=[%s]", + call_id, + static_cast(keys_hash), + static_cast(values_hash), + keys_head.c_str(), + values_head.c_str()); + } + // ----- end diagnostic for OUTPUTS ----- } #define INSTANTIATE_MIN_CLUSTER_AND_DISTANCE(DataT, IndexT) \ diff --git a/cpp/tests/cluster/kmeans.cu b/cpp/tests/cluster/kmeans.cu index 0b5d1b8fc9..c7333c5bb1 100644 --- a/cpp/tests/cluster/kmeans.cu +++ b/cpp/tests/cluster/kmeans.cu @@ -656,7 +656,7 @@ const std::vector> batched_inputsf2 = { {1000, 64, 5, 0.0001f, false, 500}, {1000, 100, 20, 0.0001f, true, 30}, {1000, 10, 20, 0.0001f, false, 30}, - {10000, 16, 10, 0.0001f, true, 1000}, + {10000, 16, 10, 0.001f, true, 1000}, {10000, 96, 10, 0.0001f, false, 10000}, }; From a84501dc782cec8d2f4b758b32f30211cfd4afc2 Mon Sep 17 00:00:00 2001 From: tarang-jain Date: Tue, 23 Jun 2026 12:18:39 +0000 Subject: [PATCH 2/4] limit iterations --- cpp/src/cluster/detail/kmeans_common.cuh | 6 ++---- cpp/src/cluster/detail/minClusterDistanceCompute.cu | 4 +--- cpp/tests/cluster/kmeans.cu | 5 ++++- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/cpp/src/cluster/detail/kmeans_common.cuh b/cpp/src/cluster/detail/kmeans_common.cuh index 43de067b5c..69a45841ee 100644 --- a/cpp/src/cluster/detail/kmeans_common.cuh +++ b/cpp/src/cluster/detail/kmeans_common.cuh @@ -548,10 +548,8 @@ void compute_centroid_adjustments( // Materialize the (possibly lazy) labels iterator into a device buffer so // we can read it on host without disturbing the original sequence. rmm::device_uvector mat_labels(static_cast(n_samples), stream); - thrust::copy_n(raft::resource::get_thrust_policy(handle), - cluster_labels, - n_samples, - mat_labels.data()); + thrust::copy_n( + raft::resource::get_thrust_policy(handle), cluster_labels, n_samples, mat_labels.data()); const std::size_t n_X_elems = static_cast(n_samples) * static_cast(n_features); diff --git a/cpp/src/cluster/detail/minClusterDistanceCompute.cu b/cpp/src/cluster/detail/minClusterDistanceCompute.cu index ef3bacd090..13b57d10d4 100644 --- a/cpp/src/cluster/detail/minClusterDistanceCompute.cu +++ b/cpp/src/cluster/detail/minClusterDistanceCompute.cu @@ -154,9 +154,7 @@ void minClusterAndDistanceCompute( // whether outputs are bitwise identical across the K runs. { static std::atomic s_self_test_done{false}; - if (!s_self_test_done.exchange(true)) { - runFusedNNDeterminismSelfTest(handle); - } + if (!s_self_test_done.exchange(true)) { runFusedNNDeterminismSelfTest(handle); } } // ----- Nondeterminism diagnostic: fingerprint INPUTS (X, centroids, L2NormX) ----- diff --git a/cpp/tests/cluster/kmeans.cu b/cpp/tests/cluster/kmeans.cu index c7333c5bb1..e4673570aa 100644 --- a/cpp/tests/cluster/kmeans.cu +++ b/cpp/tests/cluster/kmeans.cu @@ -381,6 +381,9 @@ class KmeansFitBatchedTest : public ::testing::TestWithParam> batched_inputsf2 = { {1000, 64, 5, 0.0001f, false, 500}, {1000, 100, 20, 0.0001f, true, 30}, {1000, 10, 20, 0.0001f, false, 30}, - {10000, 16, 10, 0.001f, true, 1000}, + {10000, 16, 10, 0.0001f, true, 1000}, {10000, 96, 10, 0.0001f, false, 10000}, }; From 1a4225737d19cc3437e77d9da726f760a3aee5a8 Mon Sep 17 00:00:00 2001 From: tarang-jain Date: Tue, 23 Jun 2026 12:29:34 +0000 Subject: [PATCH 3/4] revert debug statements --- cpp/src/cluster/detail/kmeans_common.cuh | 113 --------- .../detail/minClusterDistanceCompute.cu | 214 ------------------ 2 files changed, 327 deletions(-) diff --git a/cpp/src/cluster/detail/kmeans_common.cuh b/cpp/src/cluster/detail/kmeans_common.cuh index 69a45841ee..ba98dadca6 100644 --- a/cpp/src/cluster/detail/kmeans_common.cuh +++ b/cpp/src/cluster/detail/kmeans_common.cuh @@ -43,23 +43,17 @@ #include #include #include -#include #include #include #include #include -#include #include #include -#include #include #include #include -#include -#include -#include namespace cuvs::cluster::kmeans::detail { @@ -534,74 +528,9 @@ void compute_centroid_adjustments( { cudaStream_t stream = raft::resource::get_cuda_stream(handle); auto n_samples = X.extent(0); - auto n_features = X.extent(1); workspace.resize(n_samples, stream); - // ----- Nondeterminism diagnostic: fingerprint INPUTS (X, labels, weights) ----- - // Compare two test runs by diffing the resulting log. If INPUT hashes match - // across runs but OUTPUT hashes do not, the divergence is the non-associative - // atomic accumulation inside raft::linalg::reduce_rows_by_key. - static std::atomic s_call_id{0}; - const int call_id = s_call_id.fetch_add(1); - { - // Materialize the (possibly lazy) labels iterator into a device buffer so - // we can read it on host without disturbing the original sequence. - rmm::device_uvector mat_labels(static_cast(n_samples), stream); - thrust::copy_n( - raft::resource::get_thrust_policy(handle), cluster_labels, n_samples, mat_labels.data()); - - const std::size_t n_X_elems = - static_cast(n_samples) * static_cast(n_features); - std::vector h_labels(static_cast(n_samples)); - std::vector h_X(n_X_elems); - std::vector h_w(static_cast(n_samples)); - raft::copy(h_labels.data(), mat_labels.data(), n_samples, stream); - raft::copy(h_X.data(), X.data_handle(), n_X_elems, stream); - raft::copy(h_w.data(), sample_weights.data_handle(), n_samples, stream); - raft::resource::sync_stream(handle, stream); - - auto fnv = [](auto const& buf) { - std::uint64_t h = 1469598103934665603ULL; - for (auto v : buf) { - std::uint64_t bits = 0; - if constexpr (std::is_floating_point_v>) { - std::memcpy(&bits, &v, sizeof(v)); - } else { - bits = static_cast(v); - } - h ^= bits; - h *= 1099511628211ULL; - } - return h; - }; - - auto fmt_head = [&](auto const& buf) { - std::string s; - std::size_t hd = std::min(8, buf.size()); - for (std::size_t i = 0; i < hd; ++i) { - if (i) s += ','; - s += std::to_string(static_cast(buf[i])); - } - return s; - }; - - RAFT_LOG_INFO( - "compute_centroid_adjustments[call=%d]: INPUTS n_samples=%lld n_features=%lld " - "n_clusters=%lld reset_sums=%d X_hash=0x%016llx labels_hash=0x%016llx " - "weights_hash=0x%016llx labels_head=[%s]", - call_id, - static_cast(n_samples), - static_cast(n_features), - static_cast(n_clusters), - static_cast(reset_sums), - static_cast(fnv(h_X)), - static_cast(fnv(h_labels)), - static_cast(fnv(h_w)), - fmt_head(h_labels).c_str()); - } - // ----- end diagnostic for INPUTS ----- - raft::linalg::reduce_rows_by_key(X.data_handle(), X.extent(1), cluster_labels, @@ -622,48 +551,6 @@ void compute_centroid_adjustments( n_clusters, stream, reset_sums); - - // ----- Nondeterminism diagnostic: fingerprint OUTPUTS (centroid_sums, wpc) ----- - { - const std::size_t n_sums_elems = - static_cast(n_clusters) * static_cast(n_features); - std::vector h_sums(n_sums_elems); - std::vector h_wpc(static_cast(n_clusters)); - raft::copy(h_sums.data(), centroid_sums.data_handle(), n_sums_elems, stream); - raft::copy(h_wpc.data(), weight_per_cluster.data_handle(), n_clusters, stream); - raft::resource::sync_stream(handle, stream); - - auto fnv_fp = [](std::vector const& buf) { - std::uint64_t h = 1469598103934665603ULL; - for (auto v : buf) { - std::uint64_t bits = 0; - std::memcpy(&bits, &v, sizeof(v)); - h ^= bits; - h *= 1099511628211ULL; - } - return h; - }; - - auto fmt_head_fp = [](std::vector const& buf) { - std::string s; - std::size_t hd = std::min(8, buf.size()); - for (std::size_t i = 0; i < hd; ++i) { - if (i) s += ','; - s += std::to_string(static_cast(buf[i])); - } - return s; - }; - - RAFT_LOG_INFO( - "compute_centroid_adjustments[call=%d]: OUTPUTS sums_hash=0x%016llx wpc_hash=0x%016llx " - "sums_head=[%s] wpc_head=[%s]", - call_id, - static_cast(fnv_fp(h_sums)), - static_cast(fnv_fp(h_wpc)), - fmt_head_fp(h_sums).c_str(), - fmt_head_fp(h_wpc).c_str()); - } - // ----- end diagnostic for OUTPUTS ----- } /** * @brief Finalize centroids by dividing accumulated sums by counts. diff --git a/cpp/src/cluster/detail/minClusterDistanceCompute.cu b/cpp/src/cluster/detail/minClusterDistanceCompute.cu index 13b57d10d4..b15119599e 100644 --- a/cpp/src/cluster/detail/minClusterDistanceCompute.cu +++ b/cpp/src/cluster/detail/minClusterDistanceCompute.cu @@ -11,119 +11,6 @@ namespace cuvs::cluster::kmeans::detail { -namespace { - -// Determinism self-test for the fused L2 nearest-neighbor reduction used by -// minClusterAndDistanceCompute for L2Expanded / L2SqrtExpanded / CosineExpanded -// metrics. Runs `fusedDistanceNNMinReduce` K times on bit-identical hardcoded -// inputs and reports whether every output (both the argmin keys and the -// distance values) is bitwise identical across the K runs. -// -// This is intentionally placed alongside the function under test so the same -// code path is exercised. Triggered once per instantiation -// from the first call to minClusterAndDistanceCompute via a static atomic. -template -void runFusedNNDeterminismSelfTest(raft::resources const& handle) -{ - constexpr IndexT n_samples = 64; - constexpr IndexT n_features = 4; - constexpr IndexT n_clusters = 4; - constexpr int K = 4; - - cudaStream_t stream = raft::resource::get_cuda_stream(handle); - - auto X = raft::make_device_matrix(handle, n_samples, n_features); - auto centroids = raft::make_device_matrix(handle, n_clusters, n_features); - - raft::random::RngState rng_x{12345ULL}; - raft::random::RngState rng_c{67890ULL}; - raft::random::uniform(handle, - rng_x, - X.data_handle(), - static_cast(n_samples * n_features), - DataT{-1}, - DataT{1}); - raft::random::uniform(handle, - rng_c, - centroids.data_handle(), - static_cast(n_clusters * n_features), - DataT{-1}, - DataT{1}); - - auto L2NormX = raft::make_device_vector(handle, n_samples); - auto centroidsNorm = raft::make_device_vector(handle, n_clusters); - raft::linalg::norm( - handle, - raft::make_device_matrix_view(X.data_handle(), n_samples, n_features), - L2NormX.view()); - raft::linalg::norm( - handle, - raft::make_device_matrix_view( - centroids.data_handle(), n_clusters, n_features), - centroidsNorm.view()); - - rmm::device_uvector workspace(sizeof(int) * static_cast(n_samples), stream); - - std::vector>> host_outputs(K); - for (int k = 0; k < K; ++k) { - auto out = - raft::make_device_vector, IndexT>(handle, n_samples); - raft::KeyValuePair initial_value(0, std::numeric_limits::max()); - raft::matrix::fill(handle, out.view(), initial_value); - - cuvs::distance::fusedDistanceNNMinReduce, IndexT>( - out.data_handle(), - X.data_handle(), - centroids.data_handle(), - L2NormX.data_handle(), - centroidsNorm.data_handle(), - n_samples, - n_clusters, - n_features, - static_cast(workspace.data()), - /*sqrt=*/false, - /*initOutBuffer=*/false, - /*isRowMajor=*/true, - cuvs::distance::DistanceType::L2Expanded, - 0.0f, - stream); - - host_outputs[k].resize(static_cast(n_samples)); - raft::copy(host_outputs[k].data(), out.data_handle(), n_samples, stream); - raft::resource::sync_stream(handle, stream); - } - - // Compare every (k > 0) run against run 0 element-by-element. Both the - // argmin key (the would-be cluster label) and the distance value are - // checked with bitwise equality. - int keys_diffs = 0; - int values_diffs = 0; - for (int k = 1; k < K; ++k) { - for (IndexT i = 0; i < n_samples; ++i) { - if (host_outputs[k][i].key != host_outputs[0][i].key) { ++keys_diffs; } - if (std::memcmp(&host_outputs[k][i].value, &host_outputs[0][i].value, sizeof(DataT)) != 0) { - ++values_diffs; - } - } - } - - RAFT_LOG_INFO( - "fusedDistanceNNMinReduce determinism self-test (%d runs, %lld samples, %lld clusters, " - "%lld features): keys=%s (%d diffs of %d compared), values=%s (%d bit-diffs of %d compared)", - K, - static_cast(n_samples), - static_cast(n_clusters), - static_cast(n_features), - keys_diffs == 0 ? "ALL MATCH" : "MISMATCH", - keys_diffs, - static_cast((K - 1) * n_samples), - values_diffs == 0 ? "ALL MATCH" : "MISMATCH", - values_diffs, - static_cast((K - 1) * n_samples)); -} - -} // namespace - // Calculates a pair for every sample in input 'X' where key is an // index to an sample in 'centroids' (index of the nearest centroid) and 'value' // is the distance between the sample and the 'centroids[key]'. @@ -148,64 +35,6 @@ void minClusterAndDistanceCompute( metric == cuvs::distance::DistanceType::L2SqrtExpanded || metric == cuvs::distance::DistanceType::CosineExpanded; - // ----- One-shot determinism self-test for the fused L2 NN kernel ----- - // Runs once per instantiation on the very first call. - // Feeds the fused kernel bit-identical hardcoded inputs K times and checks - // whether outputs are bitwise identical across the K runs. - { - static std::atomic s_self_test_done{false}; - if (!s_self_test_done.exchange(true)) { runFusedNNDeterminismSelfTest(handle); } - } - - // ----- Nondeterminism diagnostic: fingerprint INPUTS (X, centroids, L2NormX) ----- - // Counterpart to the diagnostic in compute_centroid_adjustments. If calls - // with identical INPUT hashes here produce identical OUTPUT - // (keys_hash, values_hash), then label generation is deterministic and - // any downstream drift is from reduce_rows_by_key atomics, not from this - // kernel. - static std::atomic s_mcad_call_id{0}; - const int call_id = s_mcad_call_id.fetch_add(1); - { - const std::size_t n_X_elems = - static_cast(n_samples) * static_cast(n_features); - const std::size_t n_centroids_elems = - static_cast(n_clusters) * static_cast(n_features); - - std::vector h_X(n_X_elems); - std::vector h_c(n_centroids_elems); - std::vector h_norm(static_cast(n_samples)); - raft::copy(h_X.data(), X.data_handle(), n_X_elems, stream); - raft::copy(h_c.data(), centroids.data_handle(), n_centroids_elems, stream); - raft::copy(h_norm.data(), L2NormX.data_handle(), n_samples, stream); - raft::resource::sync_stream(handle, stream); - - auto fnv_fp = [](std::vector const& buf) { - std::uint64_t h = 1469598103934665603ULL; - for (auto v : buf) { - std::uint64_t bits = 0; - std::memcpy(&bits, &v, sizeof(v)); - h ^= bits; - h *= 1099511628211ULL; - } - return h; - }; - - RAFT_LOG_INFO( - "minClusterAndDistanceCompute[call=%d]: INPUTS n_samples=%lld n_features=%lld " - "n_clusters=%lld metric=%d is_fused=%d X_hash=0x%016llx centroids_hash=0x%016llx " - "L2NormX_hash=0x%016llx", - call_id, - static_cast(n_samples), - static_cast(n_features), - static_cast(n_clusters), - static_cast(metric), - static_cast(is_fused), - static_cast(fnv_fp(h_X)), - static_cast(fnv_fp(h_c)), - static_cast(fnv_fp(h_norm))); - } - // ----- end diagnostic for INPUTS ----- - if (is_fused) { L2NormBuf_OR_DistBuf.resize(n_clusters, stream); auto centroidsNorm = @@ -339,49 +168,6 @@ void minClusterAndDistanceCompute( } } } - - // ----- Nondeterminism diagnostic: fingerprint OUTPUTS (keys, values) ----- - // Split the KeyValuePair vector into keys (cluster indices = future labels) - // and values (distances). The keys are the inputs to - // compute_centroid_adjustments downstream. - { - std::vector> h_kvp(static_cast(n_samples)); - raft::copy(h_kvp.data(), minClusterAndDistance.data_handle(), n_samples, stream); - raft::resource::sync_stream(handle, stream); - - std::uint64_t keys_hash = 1469598103934665603ULL; - std::uint64_t values_hash = 1469598103934665603ULL; - for (auto const& kv : h_kvp) { - keys_hash ^= static_cast(kv.key); - keys_hash *= 1099511628211ULL; - std::uint64_t vbits = 0; - std::memcpy(&vbits, &kv.value, sizeof(kv.value)); - values_hash ^= vbits; - values_hash *= 1099511628211ULL; - } - - std::string keys_head; - std::string values_head; - const std::size_t hd = std::min(8, h_kvp.size()); - for (std::size_t i = 0; i < hd; ++i) { - if (i) { - keys_head += ','; - values_head += ','; - } - keys_head += std::to_string(static_cast(h_kvp[i].key)); - values_head += std::to_string(static_cast(h_kvp[i].value)); - } - - RAFT_LOG_INFO( - "minClusterAndDistanceCompute[call=%d]: OUTPUTS keys_hash=0x%016llx values_hash=0x%016llx " - "keys_head=[%s] values_head=[%s]", - call_id, - static_cast(keys_hash), - static_cast(values_hash), - keys_head.c_str(), - values_head.c_str()); - } - // ----- end diagnostic for OUTPUTS ----- } #define INSTANTIATE_MIN_CLUSTER_AND_DISTANCE(DataT, IndexT) \ From 0d47620f642ee1a68a1d48c3a2ede5a5fe3eaabd Mon Sep 17 00:00:00 2001 From: tarang-jain Date: Tue, 23 Jun 2026 12:40:34 +0000 Subject: [PATCH 4/4] fix param reset --- cpp/tests/cluster/kmeans.cu | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/tests/cluster/kmeans.cu b/cpp/tests/cluster/kmeans.cu index e4673570aa..3ea0933222 100644 --- a/cpp/tests/cluster/kmeans.cu +++ b/cpp/tests/cluster/kmeans.cu @@ -405,8 +405,7 @@ class KmeansFitBatchedTest : public ::testing::TestWithParam