From 164548523ecbaf906a543458d247b39c8f309878 Mon Sep 17 00:00:00 2001 From: Eric Ning Date: Tue, 23 Jun 2026 12:37:26 -0700 Subject: [PATCH 1/2] Collapse remote installed plugin requests --- .../request_processors/account_processor.rs | 3 + .../src/request_processors/plugins.rs | 57 +++-- .../tests/suite/v2/plugin_install.rs | 32 ++- .../app-server/tests/suite/v2/plugin_list.rs | 179 +++++-------- .../app-server/tests/suite/v2/skills_list.rs | 26 +- .../core-plugins/src/discoverable_tests.rs | 29 +-- codex-rs/core-plugins/src/manager.rs | 108 ++++++-- codex-rs/core-plugins/src/manager_tests.rs | 63 +++-- codex-rs/core-plugins/src/remote.rs | 114 ++++++--- .../src/remote/installed_snapshot.rs | 235 ++++++++++++++++++ .../src/remote/installed_snapshot_tests.rs | 221 ++++++++++++++++ .../remote/remote_installed_plugin_sync.rs | 89 +++++-- .../core/src/plugins/discoverable_tests.rs | 29 +-- .../tests/suite/request_plugin_install.rs | 18 +- 14 files changed, 899 insertions(+), 304 deletions(-) create mode 100644 codex-rs/core-plugins/src/remote/installed_snapshot.rs create mode 100644 codex-rs/core-plugins/src/remote/installed_snapshot_tests.rs diff --git a/codex-rs/app-server/src/request_processors/account_processor.rs b/codex-rs/app-server/src/request_processors/account_processor.rs index 4691e222044c..b3998143dc37 100644 --- a/codex-rs/app-server/src/request_processors/account_processor.rs +++ b/codex-rs/app-server/src/request_processors/account_processor.rs @@ -196,6 +196,9 @@ impl AccountRequestProcessor { thread_manager .plugins_manager() .set_auth_mode(auth.as_ref().map(CodexAuth::api_auth_mode)); + thread_manager + .plugins_manager() + .clear_remote_installed_plugins_cache(); thread_manager .plugins_manager() .clear_recommended_plugins_cache(); diff --git a/codex-rs/app-server/src/request_processors/plugins.rs b/codex-rs/app-server/src/request_processors/plugins.rs index 020fdc7cbd2c..9c233899c5b0 100644 --- a/codex-rs/app-server/src/request_processors/plugins.rs +++ b/codex-rs/app-server/src/request_processors/plugins.rs @@ -502,6 +502,17 @@ impl PluginRequestProcessor { self.thread_manager.skills_service().clear_cache(); } + fn clear_plugin_share_related_caches(&self, config: &Config, auth: Option) { + self.clear_plugin_related_caches(); + let plugins_manager = self.thread_manager.plugins_manager(); + plugins_manager.invalidate_remote_installed_plugin_snapshot(); + plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation( + &config.plugins_config_input(), + auth, + Some(self.effective_plugins_changed_callback()), + ); + } + async fn load_latest_config( &self, fallback_cwd: Option, @@ -658,11 +669,12 @@ impl PluginRequestProcessor { // TODO(remote plugins): Remove this once remote plugins are ready and vertical plugins are // served directly from the normal remote catalog. if include_vertical && !config.features.enabled(Feature::RemotePlugin) { - match codex_core_plugins::remote::fetch_openai_curated_remote_collection_marketplace( - &remote_plugin_service_config, - auth.as_ref(), - ) - .await + match plugins_manager + .fetch_openai_curated_remote_collection_marketplace_for_config( + &plugins_input, + auth.as_ref(), + ) + .await { Ok(Some(remote_marketplace)) => { data.push(remote_marketplace_to_info(remote_marketplace)); @@ -699,13 +711,14 @@ impl PluginRequestProcessor { remote_sources.push(RemoteMarketplaceSource::SharedWithMe); } if !remote_sources.is_empty() { - match codex_core_plugins::remote::fetch_remote_marketplaces( - &remote_plugin_service_config, - auth.as_ref(), - &remote_sources, - /*global_catalog_cache_path*/ Some(config.codex_home.as_path()), - ) - .await + match plugins_manager + .fetch_remote_marketplaces_for_config( + &plugins_input, + auth.as_ref(), + &remote_sources, + /*global_catalog_cache_path*/ Some(config.codex_home.as_path()), + ) + .await { Ok(remote_marketplaces) => { for remote_marketplace in remote_marketplaces @@ -1262,7 +1275,7 @@ impl PluginRequestProcessor { .await .map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "save remote plugin share"))?; let remote_plugin_id = result.remote_plugin_id; - self.clear_plugin_related_caches(); + self.clear_plugin_share_related_caches(&config, auth); Ok(PluginShareSaveResponse { remote_plugin_id, share_url: result.share_url.unwrap_or_default(), @@ -1301,7 +1314,7 @@ impl PluginRequestProcessor { .map_err(|err| { remote_plugin_catalog_error_to_jsonrpc(err, "update remote plugin share targets") })?; - self.clear_plugin_related_caches(); + self.clear_plugin_share_related_caches(&config, auth); Ok(PluginShareUpdateTargetsResponse { principals: result .principals @@ -1400,7 +1413,7 @@ impl PluginRequestProcessor { ) .await .map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "delete remote plugin share"))?; - self.clear_plugin_related_caches(); + self.clear_plugin_share_related_caches(&config, auth); Ok(PluginShareDeleteResponse {}) } @@ -1636,13 +1649,13 @@ impl PluginRequestProcessor { remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin") })?; - self.thread_manager - .plugins_manager() - .maybe_start_remote_installed_plugins_cache_refresh_after_mutation( - &config.plugins_config_input(), - auth.clone(), - Some(self.effective_plugins_changed_callback()), - ); + let plugins_manager = self.thread_manager.plugins_manager(); + plugins_manager.clear_remote_installed_plugins_cache(); + plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation( + &config.plugins_config_input(), + auth.clone(), + Some(self.effective_plugins_changed_callback()), + ); let plugin_metadata = self .thread_manager diff --git a/codex-rs/app-server/tests/suite/v2/plugin_install.rs b/codex-rs/app-server/tests/suite/v2/plugin_install.rs index 975338e11d59..1d555f71eee9 100644 --- a/codex-rs/app-server/tests/suite/v2/plugin_install.rs +++ b/codex-rs/app-server/tests/suite/v2/plugin_install.rs @@ -61,6 +61,7 @@ use wiremock::matchers::header; use wiremock::matchers::method; use wiremock::matchers::path; use wiremock::matchers::query_param; +use wiremock::matchers::query_param_is_missing; // Plugin install tests wait on connector discovery after the install response path // starts, which is noticeably slower on Windows CI. @@ -2231,22 +2232,31 @@ async fn mount_remote_plugin_detail_with_options( } async fn mount_empty_remote_installed_plugins(server: &MockServer) { - Mock::given(method("GET")) - .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", "GLOBAL")) - .and(header("authorization", "Bearer chatgpt-token")) - .and(header("chatgpt-account-id", "account-123")) - .respond_with(ResponseTemplate::new(200).set_body_string( - r#"{ + let body = r#"{ "plugins": [], "pagination": { "limit": 50, "next_page_token": null } -}"#, - )) - .mount(server) - .await; +}"#; + for scope in [Some("GLOBAL"), None] { + let request = Mock::given(method("GET")) + .and(path("/backend-api/ps/plugins/installed")) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .and(header("oai-product-sku", "codex")); + let request = match scope { + Some(scope) => request.and(query_param("scope", scope)), + None => request + .and(query_param_is_missing("scope")) + .and(query_param("includeDownloadUrls", "true")) + .and(query_param("limit", "1000")), + }; + request + .respond_with(ResponseTemplate::new(200).set_body_string(body)) + .mount(server) + .await; + } } async fn mount_remote_plugin_install(server: &MockServer, remote_plugin_id: &str) { diff --git a/codex-rs/app-server/tests/suite/v2/plugin_list.rs b/codex-rs/app-server/tests/suite/v2/plugin_list.rs index 974bb8a2c5b8..17df458d9005 100644 --- a/codex-rs/app-server/tests/suite/v2/plugin_list.rs +++ b/codex-rs/app-server/tests/suite/v2/plugin_list.rs @@ -245,10 +245,7 @@ enabled = true .expect("installed plugins should be an array") .push(remote_only); let global_installed_body = serde_json::to_string(&global_installed_body)?; - mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[global_installed_body.as_str()]).await; let mut app_server = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, app_server.initialize()).await??; @@ -1517,10 +1514,7 @@ async fn app_server_startup_sync_downloads_remote_installed_plugin_bundles() -> /*enabled*/ true, remote_app_manifest.clone(), ); - mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[global_installed_body.as_str()]).await; let installed_path = codex_home .path() @@ -1589,10 +1583,7 @@ async fn plugin_list_sync_upgrades_and_removes_remote_installed_plugin_bundles() ); mount_remote_plugin_list(&server, "GLOBAL", &global_installed_body).await; mount_remote_plugin_list(&server, "WORKSPACE", empty_remote_installed_plugins_body()).await; - mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[global_installed_body.as_str()]).await; let old_path = codex_home .path() @@ -1763,24 +1754,7 @@ async fn plugin_list_includes_remote_marketplaces_when_remote_plugin_enabled() - .respond_with(ResponseTemplate::new(200).set_body_string(empty_page_body)) .mount(&server) .await; - Mock::given(method("GET")) - .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", "GLOBAL")) - .and(header("authorization", "Bearer chatgpt-token")) - .and(header("chatgpt-account-id", "account-123")) - .and(header("oai-product-sku", "codex")) - .respond_with(ResponseTemplate::new(200).set_body_string(global_installed_body)) - .mount(&server) - .await; - Mock::given(method("GET")) - .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", "WORKSPACE")) - .and(header("authorization", "Bearer chatgpt-token")) - .and(header("chatgpt-account-id", "account-123")) - .and(header("oai-product-sku", "codex")) - .respond_with(ResponseTemplate::new(200).set_body_string(empty_page_body)) - .mount(&server) - .await; + mount_unscoped_remote_installed_plugins(&server, &[global_installed_body]).await; Mock::given(method("GET")) .and(path("/backend-api/plugins/featured")) .and(query_param("platform", "codex")) @@ -1932,10 +1906,7 @@ async fn plugin_list_uses_cached_global_remote_catalog_and_refreshes_it() -> Res "Capture notes", ); mount_remote_plugin_list(&server, "GLOBAL", &cached_body).await; - mount_remote_installed_plugins(&server, "GLOBAL", empty_remote_installed_plugins_body()).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[]).await; let mut mcp = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -1968,10 +1939,7 @@ async fn plugin_list_uses_cached_global_remote_catalog_and_refreshes_it() -> Res server.reset().await; mount_remote_plugin_list(&server, "GLOBAL", &refreshed_body).await; - mount_remote_installed_plugins(&server, "GLOBAL", empty_remote_installed_plugins_body()).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[]).await; let request_id = mcp .send_plugin_list_request(PluginListParams { @@ -2047,10 +2015,7 @@ async fn plugin_list_includes_openai_curated_remote_collection_when_remote_plugi } }"#; mount_openai_curated_remote_collection_plugin_list(&server, collection_body).await; - mount_remote_installed_plugins(&server, "GLOBAL", empty_remote_installed_plugins_body()).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[]).await; let mut mcp = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -2136,10 +2101,7 @@ async fn plugin_list_propagates_openai_curated_remote_collection_errors_when_rem .respond_with(ResponseTemplate::new(500).set_body_string("temporary failure")) .mount(&server) .await; - mount_remote_installed_plugins(&server, "GLOBAL", empty_remote_installed_plugins_body()).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[]).await; let mut mcp = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -2457,9 +2419,14 @@ plugin_sharing = true .push(unlisted_installed_body["plugins"][0].clone()); let workspace_installed_body = serde_json::to_string(&workspace_installed_body)?; let global_installed_body = remote_installed_plugin_body("", "1.2.3", /*enabled*/ true); - mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await; - mount_remote_installed_plugins(&server, "WORKSPACE", &workspace_installed_body).await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins( + &server, + &[ + global_installed_body.as_str(), + workspace_installed_body.as_str(), + ], + ) + .await; let mut mcp = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -2507,8 +2474,7 @@ plugin_sharing = true ) ] ); - wait_for_remote_installed_scope_request(&server, "WORKSPACE").await?; - wait_for_remote_installed_scope_request(&server, "GLOBAL").await?; + wait_for_unscoped_remote_installed_request(&server).await?; Ok(()) } @@ -2559,9 +2525,7 @@ plugin_sharing = false .expect("installed plugins should be an array") .push(shared_installed_body["plugins"][0].clone()); let workspace_installed_body = serde_json::to_string(&workspace_installed_body)?; - mount_remote_installed_plugins(&server, "GLOBAL", empty_remote_installed_plugins_body()).await; - mount_remote_installed_plugins(&server, "WORKSPACE", &workspace_installed_body).await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[workspace_installed_body.as_str()]).await; let mut mcp = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -2595,8 +2559,7 @@ plugin_sharing = false true )] ); - wait_for_remote_installed_scope_request(&server, "WORKSPACE").await?; - wait_for_remote_installed_scope_request(&server, "GLOBAL").await?; + wait_for_unscoped_remote_installed_request(&server).await?; Ok(()) } @@ -2624,9 +2587,6 @@ plugin_sharing = false .chatgpt_account_id("account-123"), AuthCredentialsStoreMode::File, )?; - mount_remote_installed_plugins(&server, "GLOBAL", empty_remote_installed_plugins_body()).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; let bundle_url = mount_remote_plugin_bundle( &server, "private-linear", @@ -2643,12 +2603,8 @@ plugin_sharing = false ))?; user_installed_body["plugins"][0]["release"]["bundle_download_url"] = serde_json::json!(bundle_url); - mount_remote_installed_plugins( - &server, - "USER", - &serde_json::to_string(&user_installed_body)?, - ) - .await; + let user_installed_body = serde_json::to_string(&user_installed_body)?; + mount_unscoped_remote_installed_plugins(&server, &[user_installed_body.as_str()]).await; let mut mcp = TestAppServer::new_with_env( codex_home.path(), @@ -2686,7 +2642,7 @@ plugin_sharing = false ), ) .await?; - wait_for_remote_installed_scope_request(&server, "USER").await?; + wait_for_unscoped_remote_installed_request(&server).await?; Ok(()) } @@ -2723,10 +2679,7 @@ plugin_sharing = false .await; let global_installed_body = remote_installed_plugin_body(&bundle_url, "1.2.3", /*enabled*/ true); - mount_remote_installed_plugins(&server, "GLOBAL", &global_installed_body).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[global_installed_body.as_str()]).await; let mut mcp = TestAppServer::new_with_env( codex_home.path(), @@ -2763,8 +2716,7 @@ plugin_sharing = false .path() .join("plugins/cache/openai-curated-remote/linear/1.2.3/.codex-plugin/plugin.json"); wait_for_path_exists(&installed_path).await?; - wait_for_remote_installed_scope_request(&server, "GLOBAL").await?; - wait_for_remote_installed_scope_request(&server, "WORKSPACE").await?; + wait_for_unscoped_remote_installed_request(&server).await?; Ok(()) } @@ -2800,8 +2752,7 @@ async fn plugin_list_fetches_workspace_directory_kind_when_remote_plugin_disable /*enabled*/ Some(false), ); mount_remote_plugin_list(&server, "WORKSPACE", &workspace_plugin_body).await; - mount_remote_installed_plugins(&server, "WORKSPACE", &workspace_installed_body).await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[workspace_installed_body.as_str()]).await; let mut mcp = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -2913,21 +2864,14 @@ plugin_sharing = false ) .mount(&server) .await; - mount_remote_installed_plugins( - &server, - "USER", - &user_remote_plugin_page_body( - "plugins~Plugin_55555555555555555555555555555555", - "private-linear", - "Private Linear", - "PRIVATE", - /*enabled*/ Some(true), - ), - ) - .await; - mount_remote_installed_plugins(&server, "GLOBAL", empty_remote_installed_plugins_body()).await; - mount_remote_installed_plugins(&server, "WORKSPACE", empty_remote_installed_plugins_body()) - .await; + let user_installed_body = user_remote_plugin_page_body( + "plugins~Plugin_55555555555555555555555555555555", + "private-linear", + "Private Linear", + "PRIVATE", + /*enabled*/ Some(true), + ); + mount_unscoped_remote_installed_plugins(&server, &[user_installed_body.as_str()]).await; let mut mcp = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -3052,9 +2996,7 @@ async fn plugin_list_fetches_shared_with_me_kind() -> Result<()> { .push(unlisted_installed_body["plugins"][0].clone()); let workspace_installed_body = serde_json::to_string(&workspace_installed_body)?; mount_shared_workspace_plugins(&server, &shared_plugin_body).await; - mount_remote_installed_plugins(&server, "GLOBAL", empty_remote_installed_plugins_body()).await; - mount_remote_installed_plugins(&server, "WORKSPACE", &workspace_installed_body).await; - mount_empty_user_installed_plugins(&server).await; + mount_unscoped_remote_installed_plugins(&server, &[workspace_installed_body.as_str()]).await; let mut mcp = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -3182,8 +3124,7 @@ async fn plugin_list_fetches_shared_with_me_kind() -> Result<()> { share_context.discoverability, Some(PluginShareDiscoverability::Unlisted) ); - wait_for_remote_installed_scope_request(&server, "WORKSPACE").await?; - wait_for_remote_installed_scope_request(&server, "GLOBAL").await?; + wait_for_unscoped_remote_installed_request(&server).await?; wait_for_remote_plugin_request_count(&server, "/ps/plugins/list", /*expected_count*/ 0).await?; Ok(()) } @@ -3389,19 +3330,7 @@ async fn plugin_list_marks_remote_plugin_disabled_by_admin() -> Result<()> { .mount(&server) .await; } - for (scope, body) in [ - ("GLOBAL", global_installed_body), - ("WORKSPACE", empty_page_body), - ] { - Mock::given(method("GET")) - .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", scope)) - .and(header("authorization", "Bearer chatgpt-token")) - .and(header("chatgpt-account-id", "account-123")) - .respond_with(ResponseTemplate::new(200).set_body_string(body)) - .mount(&server) - .await; - } + mount_unscoped_remote_installed_plugins(&server, &[global_installed_body]).await; let mut mcp = TestAppServer::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -3609,7 +3538,7 @@ async fn wait_for_remote_plugin_request_count( Ok(()) } -async fn wait_for_remote_installed_scope_request(server: &MockServer, scope: &str) -> Result<()> { +async fn wait_for_unscoped_remote_installed_request(server: &MockServer) -> Result<()> { timeout(DEFAULT_TIMEOUT, async { loop { let Some(requests) = server.received_requests().await else { @@ -3618,10 +3547,7 @@ async fn wait_for_remote_installed_scope_request(server: &MockServer, scope: &st if requests.iter().any(|request| { request.method == "GET" && request.url.path().ends_with("/ps/plugins/installed") - && request - .url - .query_pairs() - .any(|(name, value)| name == "scope" && value == scope) + && request.url.query_pairs().all(|(name, _)| name != "scope") }) { return Ok::<(), anyhow::Error>(()); } @@ -3777,21 +3703,36 @@ async fn mount_shared_workspace_plugins(server: &MockServer, body: &str) { .await; } -async fn mount_remote_installed_plugins(server: &MockServer, scope: &str, body: &str) { +async fn mount_unscoped_remote_installed_plugins(server: &MockServer, bodies: &[&str]) { + let plugins = bodies + .iter() + .flat_map(|body| { + serde_json::from_str::(body) + .expect("installed plugin response should be valid JSON")["plugins"] + .as_array() + .expect("installed plugin response should contain a plugins array") + .clone() + }) + .collect::>(); Mock::given(method("GET")) .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", scope)) + .and(query_param_is_missing("scope")) + .and(query_param("includeDownloadUrls", "true")) + .and(query_param("limit", "1000")) .and(header("authorization", "Bearer chatgpt-token")) .and(header("chatgpt-account-id", "account-123")) - .respond_with(ResponseTemplate::new(200).set_body_string(body)) + .and(header("oai-product-sku", "codex")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "plugins": plugins, + "pagination": { + "limit": 1000, + "next_page_token": null, + } + }))) .mount(server) .await; } -async fn mount_empty_user_installed_plugins(server: &MockServer) { - mount_remote_installed_plugins(server, "USER", empty_remote_installed_plugins_body()).await; -} - fn empty_remote_installed_plugins_body() -> &'static str { r#"{ "plugins": [], diff --git a/codex-rs/app-server/tests/suite/v2/skills_list.rs b/codex-rs/app-server/tests/suite/v2/skills_list.rs index 5993bf6da227..f9e2a7891ffa 100644 --- a/codex-rs/app-server/tests/suite/v2/skills_list.rs +++ b/codex-rs/app-server/tests/suite/v2/skills_list.rs @@ -34,6 +34,7 @@ use wiremock::matchers::header; use wiremock::matchers::method; use wiremock::matchers::path; use wiremock::matchers::query_param; +use wiremock::matchers::query_param_is_missing; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); const WATCHER_TIMEOUT: Duration = Duration::from_secs(20); @@ -366,20 +367,17 @@ async fn skills_list_loads_remote_installed_plugin_skills_from_cache() -> Result "remote installed plugin cache has not been refreshed yet" ); - for (scope, body) in [ - ("GLOBAL", global_installed_body), - ("USER", empty_page_body), - ("WORKSPACE", empty_page_body), - ] { - Mock::given(method("GET")) - .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", scope)) - .and(header("authorization", "Bearer chatgpt-token")) - .and(header("chatgpt-account-id", "account-123")) - .respond_with(ResponseTemplate::new(200).set_body_string(body)) - .mount(&server) - .await; - } + Mock::given(method("GET")) + .and(path("/backend-api/ps/plugins/installed")) + .and(query_param_is_missing("scope")) + .and(query_param("includeDownloadUrls", "true")) + .and(query_param("limit", "1000")) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .and(header("oai-product-sku", "codex")) + .respond_with(ResponseTemplate::new(200).set_body_string(global_installed_body)) + .mount(&server) + .await; let plugin_list_request_id = mcp .send_plugin_list_request(PluginListParams { diff --git a/codex-rs/core-plugins/src/discoverable_tests.rs b/codex-rs/core-plugins/src/discoverable_tests.rs index 48482ed430ef..66e29d1d81b1 100644 --- a/codex-rs/core-plugins/src/discoverable_tests.rs +++ b/codex-rs/core-plugins/src/discoverable_tests.rs @@ -33,6 +33,7 @@ use wiremock::ResponseTemplate; use wiremock::matchers::method; use wiremock::matchers::path; use wiremock::matchers::query_param; +use wiremock::matchers::query_param_is_missing; #[tokio::test] async fn returns_fallback_plugins_when_remote_disabled_for_codex_auth() { @@ -859,20 +860,20 @@ plugins = true .await .expect("remote plugin catalog cache should write"); - for scope in ["GLOBAL", "USER", "WORKSPACE"] { - Mock::given(method("GET")) - .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", scope)) - .respond_with(ResponseTemplate::new(200).set_body_json(json!({ - "plugins": [], - "pagination": { - "next_page_token": null - } - }))) - .expect(1) - .mount(&server) - .await; - } + Mock::given(method("GET")) + .and(path("/backend-api/ps/plugins/installed")) + .and(query_param_is_missing("scope")) + .and(query_param("includeDownloadUrls", "true")) + .and(query_param("limit", "1000")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "plugins": [], + "pagination": { + "next_page_token": null + } + }))) + .expect(1) + .mount(&server) + .await; plugins_manager .build_and_cache_remote_installed_plugin_marketplaces( &plugins, diff --git a/codex-rs/core-plugins/src/manager.rs b/codex-rs/core-plugins/src/manager.rs index 4d215b61d455..097c883f101f 100644 --- a/codex-rs/core-plugins/src/manager.rs +++ b/codex-rs/core-plugins/src/manager.rs @@ -44,6 +44,7 @@ use crate::marketplace_upgrade::upgrade_configured_git_marketplaces; use crate::remote::REMOTE_GLOBAL_MARKETPLACE_NAME; use crate::remote::RecommendedPluginsMode; use crate::remote::RemoteInstalledPlugin; +use crate::remote::RemoteInstalledPluginSnapshotCache; use crate::remote::RemotePluginCatalogError; use crate::remote::RemotePluginServiceConfig; use crate::remote_legacy::RemotePluginFetchError; @@ -365,6 +366,7 @@ pub struct PluginsManager { loaded_plugins_load_semaphore: Semaphore, tool_suggest_metadata_cache: ToolSuggestMetadataCache, remote_installed_plugins_cache: RwLock>>, + remote_installed_plugin_snapshot_cache: Arc, remote_installed_plugins_cache_refresh_state: RwLock, global_remote_catalog_cache_refresh_state: RwLock, restriction_product: Option, @@ -440,6 +442,9 @@ impl PluginsManager { loaded_plugins_load_semaphore: Semaphore::new(/*permits*/ 1), tool_suggest_metadata_cache: ToolSuggestMetadataCache::new(), remote_installed_plugins_cache: RwLock::new(None), + remote_installed_plugin_snapshot_cache: Arc::new( + RemoteInstalledPluginSnapshotCache::default(), + ), remote_installed_plugins_cache_refresh_state: RwLock::new( RemoteInstalledPluginsCacheRefreshState::default(), ), @@ -873,43 +878,85 @@ impl PluginsManager { visible_marketplaces: &[&str], on_effective_plugins_changed: Option>, ) -> Result, RemotePluginCatalogError> { + let snapshot_generation = self.remote_installed_plugin_snapshot_cache.generation(); let plugins = crate::remote::fetch_remote_installed_plugins( &remote_plugin_service_config(config), auth, + self.remote_installed_plugin_snapshot_cache.as_ref(), ) .await?; let marketplaces = crate::remote::group_remote_installed_plugins_by_marketplaces( &plugins, visible_marketplaces, ); - let changed = self.write_remote_installed_plugins_cache(plugins); - if changed && let Some(on_effective_plugins_changed) = on_effective_plugins_changed { + let changed = self.write_remote_installed_plugins_cache(snapshot_generation, plugins); + if changed == Some(true) + && let Some(on_effective_plugins_changed) = on_effective_plugins_changed + { on_effective_plugins_changed(); } Ok(marketplaces) } - fn write_remote_installed_plugins_cache(&self, plugins: Vec) -> bool { + pub async fn fetch_remote_marketplaces_for_config( + &self, + config: &PluginsConfigInput, + auth: Option<&CodexAuth>, + sources: &[crate::remote::RemoteMarketplaceSource], + global_catalog_cache_path: Option<&std::path::Path>, + ) -> Result, RemotePluginCatalogError> { + crate::remote::fetch_remote_marketplaces_with_snapshot( + &remote_plugin_service_config(config), + auth, + sources, + global_catalog_cache_path, + self.remote_installed_plugin_snapshot_cache.as_ref(), + ) + .await + } + + pub async fn fetch_openai_curated_remote_collection_marketplace_for_config( + &self, + config: &PluginsConfigInput, + auth: Option<&CodexAuth>, + ) -> Result, RemotePluginCatalogError> { + crate::remote::fetch_openai_curated_remote_collection_marketplace_with_snapshot( + &remote_plugin_service_config(config), + auth, + self.remote_installed_plugin_snapshot_cache.as_ref(), + ) + .await + } + + fn write_remote_installed_plugins_cache( + &self, + snapshot_generation: u64, + plugins: Vec, + ) -> Option { let mut cache = match self.remote_installed_plugins_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; + if snapshot_generation != self.remote_installed_plugin_snapshot_cache.generation() { + return None; + } if cache.as_ref().is_some_and(|cache| cache.eq(&plugins)) { - return false; + return Some(false); } *cache = Some(plugins); drop(cache); self.clear_loaded_plugins_cache(); - true + Some(true) } pub fn clear_remote_installed_plugins_cache(&self) -> bool { + let cleared_snapshot = self.remote_installed_plugin_snapshot_cache.invalidate(); let mut cache = match self.remote_installed_plugins_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; if cache.is_none() { - return false; + return cleared_snapshot; } *cache = None; drop(cache); @@ -917,6 +964,10 @@ impl PluginsManager { true } + pub fn invalidate_remote_installed_plugin_snapshot(&self) { + self.remote_installed_plugin_snapshot_cache.invalidate(); + } + pub fn maybe_start_remote_plugin_caches_refresh( self: &Arc, config: &PluginsConfigInput, @@ -999,6 +1050,7 @@ impl PluginsManager { self.codex_home.clone(), remote_plugin_service_config(config), auth, + Arc::clone(&self.remote_installed_plugin_snapshot_cache), Some(on_local_cache_changed), ); } @@ -1349,6 +1401,7 @@ impl PluginsManager { ); return Err(err); } + self.clear_remote_installed_plugins_cache(); let plugin_id = resolved.plugin_id.clone(); match self.install_resolved_plugin(resolved).await { Ok(outcome) => Ok(outcome), @@ -1518,6 +1571,7 @@ impl PluginsManager { ) .await .map_err(PluginUninstallError::from)?; + self.clear_remote_installed_plugins_cache(); self.uninstall_plugin_id(plugin_id).await } @@ -2340,38 +2394,48 @@ impl PluginsManager { } }; + let snapshot_generation = self.remote_installed_plugin_snapshot_cache.generation(); let installed_plugins = crate::remote::fetch_remote_installed_plugins( &request.service_config, request.auth.as_ref(), + self.remote_installed_plugin_snapshot_cache.as_ref(), ) .await; match installed_plugins { Ok(installed_plugins) => { // TODO(remote plugins): reconcile missing or stale local bundles before // publishing remote installed state as effective local plugin config. - let changed = self.write_remote_installed_plugins_cache(installed_plugins); - let should_notify = changed - || matches!( - request.notify, - RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh - ); - if should_notify - && let Some(on_effective_plugins_changed) = - request.on_effective_plugins_changed - { - on_effective_plugins_changed(); + if let Some(changed) = self.write_remote_installed_plugins_cache( + snapshot_generation, + installed_plugins, + ) { + let should_notify = changed + || matches!( + request.notify, + RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh + ); + if should_notify + && let Some(on_effective_plugins_changed) = + request.on_effective_plugins_changed + { + on_effective_plugins_changed(); + } } } Err( RemotePluginCatalogError::AuthRequired | RemotePluginCatalogError::UnsupportedAuthMode, ) => { - let changed = self.clear_remote_installed_plugins_cache(); - if changed - && let Some(on_effective_plugins_changed) = - request.on_effective_plugins_changed + if snapshot_generation + == self.remote_installed_plugin_snapshot_cache.generation() { - on_effective_plugins_changed(); + let changed = self.clear_remote_installed_plugins_cache(); + if changed + && let Some(on_effective_plugins_changed) = + request.on_effective_plugins_changed + { + on_effective_plugins_changed(); + } } } Err(err) => { diff --git a/codex-rs/core-plugins/src/manager_tests.rs b/codex-rs/core-plugins/src/manager_tests.rs index da7bc0b5a5f6..f111643b8a13 100644 --- a/codex-rs/core-plugins/src/manager_tests.rs +++ b/codex-rs/core-plugins/src/manager_tests.rs @@ -787,6 +787,18 @@ fn remote_installed_plugin_in_marketplace( } } +fn seed_remote_installed_plugins_cache( + manager: &PluginsManager, + plugins: Vec, +) { + let snapshot_generation = manager.remote_installed_plugin_snapshot_cache.generation(); + assert!( + manager + .write_remote_installed_plugins_cache(snapshot_generation, plugins) + .is_some() + ); +} + fn write_cached_plugin(codex_home: &Path, marketplace_name: &str, plugin_name: &str) { write_plugin_with_version( &codex_home @@ -1067,7 +1079,7 @@ plugins = true let config = load_config(codex_home.path(), codex_home.path()).await; let manager = PluginsManager::new(codex_home.path().to_path_buf()); - manager.write_remote_installed_plugins_cache(vec![remote_installed_linear_plugin()]); + seed_remote_installed_plugins_cache(&manager, vec![remote_installed_linear_plugin()]); let outcome = manager.plugins_for_config(&config).await; assert_eq!(outcome, PluginLoadOutcome::default()); @@ -1268,10 +1280,13 @@ enabled = true let config = load_config(codex_home.path(), codex_home.path()).await; let manager = PluginsManager::new(codex_home.path().to_path_buf()); - manager.write_remote_installed_plugins_cache(vec![ - remote_installed_plugin("linear"), - remote_installed_plugin("remote-only"), - ]); + seed_remote_installed_plugins_cache( + &manager, + vec![ + remote_installed_plugin("linear"), + remote_installed_plugin("remote-only"), + ], + ); let outcome = manager.plugins_for_config(&config).await; assert_eq!( @@ -1318,10 +1333,13 @@ enabled = true Some(Product::Codex), Some(AuthMode::Chatgpt), ); - manager.write_remote_installed_plugins_cache(vec![ - remote_installed_plugin("linear"), - remote_installed_plugin("remote-only"), - ]); + seed_remote_installed_plugins_cache( + &manager, + vec![ + remote_installed_plugin("linear"), + remote_installed_plugin("remote-only"), + ], + ); let outcome = manager.plugins_for_config(&config).await; assert_eq!( @@ -1407,7 +1425,7 @@ async fn build_remote_installed_plugin_marketplaces_from_cache_uses_remote_metad screenshot_urls: Vec::new(), }); plugin.keywords = vec!["issues".to_string()]; - manager.write_remote_installed_plugins_cache(vec![plugin]); + seed_remote_installed_plugins_cache(&manager, vec![plugin]); let marketplaces = manager .build_remote_installed_plugin_marketplaces_from_cache(&[REMOTE_GLOBAL_MARKETPLACE_NAME]) @@ -1459,16 +1477,19 @@ async fn build_remote_installed_plugin_marketplaces_from_cache_uses_remote_metad async fn build_remote_installed_plugin_marketplaces_from_cache_filters_by_marketplace_name() { let codex_home = TempDir::new().unwrap(); let manager = PluginsManager::new(codex_home.path().to_path_buf()); - manager.write_remote_installed_plugins_cache(vec![ - remote_installed_plugin_in_marketplace( - "workspace-linear", - REMOTE_WORKSPACE_MARKETPLACE_NAME, - ), - remote_installed_plugin_in_marketplace( - "shared-linear", - REMOTE_WORKSPACE_SHARED_WITH_ME_MARKETPLACE_NAME, - ), - ]); + seed_remote_installed_plugins_cache( + &manager, + vec![ + remote_installed_plugin_in_marketplace( + "workspace-linear", + REMOTE_WORKSPACE_MARKETPLACE_NAME, + ), + remote_installed_plugin_in_marketplace( + "shared-linear", + REMOTE_WORKSPACE_SHARED_WITH_ME_MARKETPLACE_NAME, + ), + ], + ); let marketplaces = manager .build_remote_installed_plugin_marketplaces_from_cache(&[REMOTE_WORKSPACE_MARKETPLACE_NAME]) @@ -5046,7 +5067,7 @@ plugins = true let manager = PluginsManager::new(tmp.path().to_path_buf()); let mut installed_linear = remote_installed_plugin("linear"); installed_linear.id = "plugin_linear".to_string(); - manager.write_remote_installed_plugins_cache(vec![installed_linear]); + seed_remote_installed_plugins_cache(&manager, vec![installed_linear]); let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); let disabled_tools = [ToolSuggestDisabledTool::plugin( "github@openai-curated-remote", diff --git a/codex-rs/core-plugins/src/remote.rs b/codex-rs/core-plugins/src/remote.rs index 515bbb71c00a..1bcbc17cd225 100644 --- a/codex-rs/core-plugins/src/remote.rs +++ b/codex-rs/core-plugins/src/remote.rs @@ -33,6 +33,7 @@ use tracing::instrument; use url::Url; mod catalog_cache; +mod installed_snapshot; mod remote_installed_plugin_sync; mod share; @@ -40,6 +41,7 @@ mod share; #[path = "remote_tests.rs"] mod tests; +pub(crate) use installed_snapshot::RemoteInstalledPluginSnapshotCache; pub use remote_installed_plugin_sync::RemoteInstalledPluginBundleSyncError; pub use remote_installed_plugin_sync::RemoteInstalledPluginBundleSyncOutcome; pub use remote_installed_plugin_sync::RemotePluginCacheMutationGuard; @@ -647,6 +649,24 @@ pub async fn fetch_remote_marketplaces( auth: Option<&CodexAuth>, sources: &[RemoteMarketplaceSource], global_catalog_cache_path: Option<&Path>, +) -> Result, RemotePluginCatalogError> { + let installed_snapshot_cache = RemoteInstalledPluginSnapshotCache::default(); + fetch_remote_marketplaces_with_snapshot( + config, + auth, + sources, + global_catalog_cache_path, + &installed_snapshot_cache, + ) + .await +} + +pub(crate) async fn fetch_remote_marketplaces_with_snapshot( + config: &RemotePluginServiceConfig, + auth: Option<&CodexAuth>, + sources: &[RemoteMarketplaceSource], + global_catalog_cache_path: Option<&Path>, + installed_snapshot_cache: &installed_snapshot::RemoteInstalledPluginSnapshotCache, ) -> Result, RemotePluginCatalogError> { let auth = ensure_chatgpt_auth(auth)?; let mut marketplaces = Vec::new(); @@ -657,7 +677,15 @@ pub async fn fetch_remote_marketplaces( ) }); let workspace_installed_plugins = if needs_workspace_installed { - Some(fetch_installed_plugins_for_scope(config, auth, RemotePluginScope::Workspace).await?) + Some( + installed_snapshot::installed_plugins_for_scope( + installed_snapshot_cache, + config, + auth, + RemotePluginScope::Workspace, + ) + .await?, + ) } else { None }; @@ -672,8 +700,13 @@ pub async fn fetch_remote_marketplaces( codex_home, config, auth, ) { - let installed_plugins = - fetch_installed_plugins_for_scope(config, auth, scope).await?; + let installed_plugins = installed_snapshot::installed_plugins_for_scope( + installed_snapshot_cache, + config, + auth, + scope, + ) + .await?; if let Some(marketplace) = build_remote_marketplace( scope.marketplace_name(), scope.marketplace_display_name(), @@ -687,7 +720,12 @@ pub async fn fetch_remote_marketplaces( } let (directory_plugins, installed_plugins) = tokio::try_join!( fetch_directory_plugins_for_scope(config, auth, scope), - fetch_installed_plugins_for_scope(config, auth, scope), + installed_snapshot::installed_plugins_for_scope( + installed_snapshot_cache, + config, + auth, + scope, + ), )?; let directory_plugins_for_cache = global_catalog_cache_path.map(|_| directory_plugins.clone()); @@ -715,7 +753,12 @@ pub async fn fetch_remote_marketplaces( let scope = RemotePluginScope::User; let (directory_plugins, installed_plugins) = tokio::try_join!( fetch_directory_plugins_for_scope(config, auth, scope), - fetch_installed_plugins_for_scope(config, auth, scope), + installed_snapshot::installed_plugins_for_scope( + installed_snapshot_cache, + config, + auth, + scope, + ), )?; if let Some(marketplace) = build_remote_marketplace( scope.marketplace_name(), @@ -930,6 +973,20 @@ pub fn cached_global_remote_discoverable_plugins( pub async fn fetch_openai_curated_remote_collection_marketplace( config: &RemotePluginServiceConfig, auth: Option<&CodexAuth>, +) -> Result, RemotePluginCatalogError> { + let installed_snapshot_cache = RemoteInstalledPluginSnapshotCache::default(); + fetch_openai_curated_remote_collection_marketplace_with_snapshot( + config, + auth, + &installed_snapshot_cache, + ) + .await +} + +pub(crate) async fn fetch_openai_curated_remote_collection_marketplace_with_snapshot( + config: &RemotePluginServiceConfig, + auth: Option<&CodexAuth>, + installed_snapshot_cache: &installed_snapshot::RemoteInstalledPluginSnapshotCache, ) -> Result, RemotePluginCatalogError> { let auth = ensure_chatgpt_auth(auth)?; let scope = RemotePluginScope::Global; @@ -940,7 +997,12 @@ pub async fn fetch_openai_curated_remote_collection_marketplace( scope, OPENAI_CURATED_REMOTE_COLLECTION_KEY, ), - fetch_installed_plugins_for_scope(config, auth, scope), + installed_snapshot::installed_plugins_for_scope( + installed_snapshot_cache, + config, + auth, + scope, + ), )?; build_remote_marketplace( @@ -992,29 +1054,14 @@ fn build_remote_marketplace( pub(crate) async fn fetch_remote_installed_plugins( config: &RemotePluginServiceConfig, auth: Option<&CodexAuth>, + installed_snapshot_cache: &installed_snapshot::RemoteInstalledPluginSnapshotCache, ) -> Result, RemotePluginCatalogError> { let auth = ensure_chatgpt_auth(auth)?; - let global = async { - let scope = RemotePluginScope::Global; - let installed_plugins = fetch_installed_plugins_for_scope(config, auth, scope).await?; - Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) - }; - let workspace = async { - let scope = RemotePluginScope::Workspace; - let installed_plugins = fetch_installed_plugins_for_scope(config, auth, scope).await?; - Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) - }; - let user = async { - let scope = RemotePluginScope::User; - let installed_plugins = fetch_installed_plugins_for_scope(config, auth, scope).await?; - Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) - }; - - let (global, workspace, user) = tokio::try_join!(global, workspace, user)?; - let mut installed_plugins = [global, workspace, user] - .into_iter() - .flat_map(|(_scope, plugins)| plugins) - .map(|plugin| remote_installed_plugin_to_cache_entry(&plugin)) + let mut installed_plugins = installed_snapshot_cache + .get_or_fetch(config, auth) + .await? + .iter() + .map(remote_installed_plugin_to_cache_entry) .collect::, _>>()?; installed_plugins.sort_by(|left, right| { left.marketplace_name @@ -1769,9 +1816,10 @@ async fn fetch_installed_plugins_for_scope_with_download_url( let response = get_remote_plugin_installed_page( config, auth, - scope, + Some(scope), page_token.as_deref(), include_download_urls, + /*limit*/ None, ) .await?; plugins.extend(response.plugins); @@ -1824,18 +1872,24 @@ async fn get_remote_shared_workspace_plugins_page( async fn get_remote_plugin_installed_page( config: &RemotePluginServiceConfig, auth: &CodexAuth, - scope: RemotePluginScope, + scope: Option, page_token: Option<&str>, include_download_urls: bool, + limit: Option, ) -> Result { let base_url = config.chatgpt_base_url.trim_end_matches('/'); let url = format!("{base_url}/ps/plugins/installed"); let client = build_reqwest_client(); let mut request = authenticated_request(client.get(&url), auth)?; - request = request.query(&[("scope", scope.api_value())]); + if let Some(scope) = scope { + request = request.query(&[("scope", scope.api_value())]); + } if include_download_urls { request = request.query(&[("includeDownloadUrls", true)]); } + if let Some(limit) = limit { + request = request.query(&[("limit", limit)]); + } if let Some(page_token) = page_token { request = request.query(&[("pageToken", page_token)]); } diff --git a/codex-rs/core-plugins/src/remote/installed_snapshot.rs b/codex-rs/core-plugins/src/remote/installed_snapshot.rs new file mode 100644 index 000000000000..499e1646c9fb --- /dev/null +++ b/codex-rs/core-plugins/src/remote/installed_snapshot.rs @@ -0,0 +1,235 @@ +use super::CODEX_PRODUCT_SKU; +use super::RemotePluginCatalogError; +use super::RemotePluginInstalledItem; +use super::RemotePluginScope; +use super::RemotePluginServiceConfig; +use super::get_remote_plugin_installed_page; +use codex_app_server_protocol::AuthMode; +use codex_login::CodexAuth; +use codex_protocol::account::PlanType; +use std::collections::BTreeSet; +use std::sync::Arc; +use std::sync::RwLock; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::time::Duration; +use std::time::Instant; +use tokio::sync::Semaphore; + +const REMOTE_INSTALLED_PLUGIN_PAGE_LIMIT: u32 = 1000; +const REMOTE_INSTALLED_PLUGIN_SNAPSHOT_TTL: Duration = Duration::from_secs(30); +const REMOTE_INSTALLED_PLUGIN_FAILURE_TTL: Duration = Duration::from_secs(1); +const REMOTE_INSTALLED_PLUGIN_SNAPSHOT_INVALIDATED: &str = + "remote installed plugin snapshot was invalidated while loading"; + +#[derive(Clone, Debug, PartialEq, Eq)] +struct InstalledSnapshotCacheKey { + chatgpt_base_url: String, + account_id: Option, + chatgpt_user_id: Option, + plan_type: Option, + auth_mode: AuthMode, + product_sku: &'static str, +} + +impl InstalledSnapshotCacheKey { + fn new(config: &RemotePluginServiceConfig, auth: &CodexAuth) -> Self { + Self { + chatgpt_base_url: config.chatgpt_base_url.trim_end_matches('/').to_string(), + // ChatGPT-Account-ID is the active workspace/account selected for this request. + account_id: auth.get_account_id(), + chatgpt_user_id: auth.get_chatgpt_user_id(), + plan_type: auth.account_plan_type(), + auth_mode: auth.api_auth_mode(), + product_sku: CODEX_PRODUCT_SKU, + } + } +} + +struct CachedInstalledSnapshot { + key: InstalledSnapshotCacheKey, + value: CachedInstalledSnapshotValue, + expires_at: Instant, +} + +enum CachedInstalledSnapshotValue { + Plugins(Arc>), + Error(String), +} + +pub(super) fn snapshot_invalidated_error() -> RemotePluginCatalogError { + RemotePluginCatalogError::UnexpectedResponse( + REMOTE_INSTALLED_PLUGIN_SNAPSHOT_INVALIDATED.to_string(), + ) +} + +/// Identity-keyed installed-state cache shared by remote marketplace queries. +/// +/// The cached response is raw so bundle sync and marketplace projection consume the same +/// download URLs, delivery overrides, policies, and scope metadata. +pub(crate) struct RemoteInstalledPluginSnapshotCache { + cached: RwLock>, + fetch_semaphore: Semaphore, + generation: AtomicU64, +} + +impl Default for RemoteInstalledPluginSnapshotCache { + fn default() -> Self { + Self { + cached: RwLock::new(None), + fetch_semaphore: Semaphore::new(/*permits*/ 1), + generation: AtomicU64::new(0), + } + } +} + +impl RemoteInstalledPluginSnapshotCache { + pub(super) async fn get_or_fetch( + &self, + config: &RemotePluginServiceConfig, + auth: &CodexAuth, + ) -> Result>, RemotePluginCatalogError> { + let key = InstalledSnapshotCacheKey::new(config, auth); + let generation = self.generation(); + if let Some(result) = self.cached_result_for_key(&key) { + if generation == self.generation() { + return result; + } + return Err(snapshot_invalidated_error()); + } + + let _fetch_permit = self.fetch_semaphore.acquire().await.map_err(|_| { + RemotePluginCatalogError::UnexpectedResponse( + "remote installed plugin snapshot fetch gate was closed".to_string(), + ) + })?; + if generation != self.generation() { + return Err(snapshot_invalidated_error()); + } + if let Some(result) = self.cached_result_for_key(&key) { + if generation == self.generation() { + return result; + } + return Err(snapshot_invalidated_error()); + } + + let result = fetch_unscoped_installed_plugins(config, auth) + .await + .map(Arc::new); + let mut cached = match self.cached.write() { + Ok(cached) => cached, + Err(err) => err.into_inner(), + }; + if generation != self.generation() { + return Err(snapshot_invalidated_error()); + } + let (value, ttl) = match &result { + Ok(plugins) => ( + CachedInstalledSnapshotValue::Plugins(Arc::clone(plugins)), + REMOTE_INSTALLED_PLUGIN_SNAPSHOT_TTL, + ), + Err(err) => ( + CachedInstalledSnapshotValue::Error(err.to_string()), + REMOTE_INSTALLED_PLUGIN_FAILURE_TTL, + ), + }; + *cached = Some(CachedInstalledSnapshot { + key, + value, + expires_at: Instant::now() + ttl, + }); + result + } + + pub(crate) fn generation(&self) -> u64 { + self.generation.load(Ordering::SeqCst) + } + + pub(crate) fn invalidate(&self) -> bool { + self.generation.fetch_add(1, Ordering::SeqCst); + let mut cached = match self.cached.write() { + Ok(cached) => cached, + Err(err) => err.into_inner(), + }; + cached.take().is_some() + } + + fn cached_result_for_key( + &self, + key: &InstalledSnapshotCacheKey, + ) -> Option>, RemotePluginCatalogError>> { + let cached = match self.cached.read() { + Ok(cached) => cached, + Err(err) => err.into_inner(), + }; + cached + .as_ref() + .filter(|cached| cached.key == *key && Instant::now() < cached.expires_at) + .map(|cached| match &cached.value { + CachedInstalledSnapshotValue::Plugins(plugins) => Ok(Arc::clone(plugins)), + CachedInstalledSnapshotValue::Error(message) => Err( + RemotePluginCatalogError::UnexpectedResponse(message.clone()), + ), + }) + } +} + +pub(super) async fn installed_plugins_for_scope( + cache: &RemoteInstalledPluginSnapshotCache, + config: &RemotePluginServiceConfig, + auth: &CodexAuth, + scope: RemotePluginScope, +) -> Result, RemotePluginCatalogError> { + Ok(cache + .get_or_fetch(config, auth) + .await? + .iter() + .filter(|plugin| plugin.plugin.scope == scope) + .cloned() + .collect()) +} + +async fn fetch_unscoped_installed_plugins( + config: &RemotePluginServiceConfig, + auth: &CodexAuth, +) -> Result, RemotePluginCatalogError> { + let mut plugins = Vec::new(); + let mut page_token = None; + let mut seen_page_tokens = BTreeSet::new(); + let mut page_count = 0_u64; + + loop { + let response = get_remote_plugin_installed_page( + config, + auth, + /*scope*/ None, + page_token.as_deref(), + /*include_download_urls*/ true, + Some(REMOTE_INSTALLED_PLUGIN_PAGE_LIMIT), + ) + .await?; + page_count += 1; + plugins.extend(response.plugins); + let Some(next_page_token) = response.pagination.next_page_token else { + break; + }; + if next_page_token.is_empty() || !seen_page_tokens.insert(next_page_token.clone()) { + return Err(RemotePluginCatalogError::UnexpectedResponse( + "remote installed plugin pagination returned an empty or repeated next page token" + .to_string(), + )); + } + page_token = Some(next_page_token); + } + + tracing::debug!( + page_count, + plugin_count = plugins.len(), + "fetched unscoped remote installed plugin snapshot" + ); + Ok(plugins) +} + +#[cfg(test)] +#[path = "installed_snapshot_tests.rs"] +mod tests; diff --git a/codex-rs/core-plugins/src/remote/installed_snapshot_tests.rs b/codex-rs/core-plugins/src/remote/installed_snapshot_tests.rs new file mode 100644 index 000000000000..1ee551f3e56e --- /dev/null +++ b/codex-rs/core-plugins/src/remote/installed_snapshot_tests.rs @@ -0,0 +1,221 @@ +use super::*; +use pretty_assertions::assert_eq; +use serde_json::Value; +use serde_json::json; +use std::time::Duration; +use wiremock::Mock; +use wiremock::MockServer; +use wiremock::ResponseTemplate; +use wiremock::matchers::header; +use wiremock::matchers::method; +use wiremock::matchers::path; +use wiremock::matchers::query_param; +use wiremock::matchers::query_param_is_missing; + +fn service_config(server: &MockServer) -> RemotePluginServiceConfig { + RemotePluginServiceConfig { + chatgpt_base_url: server.uri(), + } +} + +fn installed_plugin(index: usize, scope: RemotePluginScope) -> Value { + let scope = match scope { + RemotePluginScope::Global => "GLOBAL", + RemotePluginScope::User => "USER", + RemotePluginScope::Workspace => "WORKSPACE", + }; + let mut plugin = json!({ + "id": format!("plugin-{index:03}"), + "name": format!("plugin-{index:03}"), + "scope": scope, + "installation_policy": "AVAILABLE", + "authentication_policy": "ON_USE", + "status": "ENABLED", + "release": { + "version": "1.2.3", + "display_name": format!("Plugin {index:03}"), + "description": "Remote plugin", + "bundle_download_url": format!("https://example.com/plugin-{index:03}.tar.gz"), + "app_manifest": {"apps": {"app": {"id": format!("app-{index:03}")}}}, + "interface": {}, + "skills": [] + }, + "enabled": index.is_multiple_of(2), + "disabled_skill_names": [] + }); + if scope == "WORKSPACE" { + plugin["discoverability"] = json!("LISTED"); + } + plugin +} + +fn installed_page(plugins: Vec, next_page_token: Option<&str>) -> Value { + json!({ + "plugins": plugins, + "pagination": { + "limit": REMOTE_INSTALLED_PLUGIN_PAGE_LIMIT, + "next_page_token": next_page_token, + } + }) +} + +fn unscoped_installed_request() -> wiremock::MockBuilder { + Mock::given(method("GET")) + .and(path("/ps/plugins/installed")) + .and(query_param_is_missing("scope")) + .and(query_param("includeDownloadUrls", "true")) + .and(query_param("limit", "1000")) + .and(header("authorization", "Bearer Access Token")) + .and(header("chatgpt-account-id", "account_id")) + .and(header("OAI-Product-Sku", "codex")) +} + +#[tokio::test] +async fn unscoped_snapshot_follows_pagination_and_preserves_raw_metadata() { + let server = MockServer::start().await; + let first_page_plugins = (0..50) + .map(|index| installed_plugin(index, RemotePluginScope::Global)) + .collect(); + unscoped_installed_request() + .and(query_param_is_missing("pageToken")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(installed_page(first_page_plugins, Some("page-2"))), + ) + .expect(1) + .mount(&server) + .await; + unscoped_installed_request() + .and(query_param("pageToken", "page-2")) + .respond_with(ResponseTemplate::new(200).set_body_json(installed_page( + vec![ + installed_plugin(50, RemotePluginScope::User), + installed_plugin(51, RemotePluginScope::Workspace), + ], + None, + ))) + .expect(1) + .mount(&server) + .await; + + let cache = RemoteInstalledPluginSnapshotCache::default(); + let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); + let snapshot = cache + .get_or_fetch(&service_config(&server), &auth) + .await + .expect("unscoped snapshot should load"); + + assert_eq!(snapshot.len(), 52); + assert_eq!(snapshot[50].plugin.scope, RemotePluginScope::User); + assert_eq!(snapshot[51].plugin.scope, RemotePluginScope::Workspace); + assert_eq!( + snapshot[51].plugin.release.bundle_download_url.as_deref(), + Some("https://example.com/plugin-051.tar.gz") + ); + assert_eq!( + snapshot[51].plugin.release.app_manifest, + Some(json!({"apps": {"app": {"id": "app-051"}}})) + ); + assert_eq!( + snapshot[51].plugin.installation_policy, + codex_app_server_protocol::PluginInstallPolicy::Available + ); + assert_eq!( + snapshot[51].plugin.authentication_policy, + codex_app_server_protocol::PluginAuthPolicy::OnUse + ); +} + +#[tokio::test] +async fn concurrent_scope_partitions_share_one_upstream_request() { + let server = MockServer::start().await; + unscoped_installed_request() + .respond_with( + ResponseTemplate::new(200) + .set_delay(Duration::from_millis(100)) + .set_body_json(installed_page( + vec![ + installed_plugin(0, RemotePluginScope::Global), + installed_plugin(1, RemotePluginScope::User), + installed_plugin(2, RemotePluginScope::Workspace), + ], + None, + )), + ) + .expect(1) + .mount(&server) + .await; + + let cache = RemoteInstalledPluginSnapshotCache::default(); + let config = service_config(&server); + let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); + let (global, user, workspace) = tokio::join!( + installed_plugins_for_scope(&cache, &config, &auth, RemotePluginScope::Global), + installed_plugins_for_scope(&cache, &config, &auth, RemotePluginScope::User), + installed_plugins_for_scope(&cache, &config, &auth, RemotePluginScope::Workspace), + ); + + for (result, expected_name) in [ + (global, "plugin-000"), + (user, "plugin-001"), + (workspace, "plugin-002"), + ] { + let plugins = result.expect("scope partition should load"); + assert_eq!(plugins.len(), 1); + assert_eq!(plugins[0].plugin.name, expected_name); + } +} + +#[tokio::test] +async fn concurrent_failures_share_one_upstream_request() { + let server = MockServer::start().await; + unscoped_installed_request() + .respond_with(ResponseTemplate::new(503).set_delay(Duration::from_millis(100))) + .expect(1) + .mount(&server) + .await; + + let cache = RemoteInstalledPluginSnapshotCache::default(); + let config = service_config(&server); + let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); + let (global, user, workspace) = tokio::join!( + installed_plugins_for_scope(&cache, &config, &auth, RemotePluginScope::Global), + installed_plugins_for_scope(&cache, &config, &auth, RemotePluginScope::User), + installed_plugins_for_scope(&cache, &config, &auth, RemotePluginScope::Workspace), + ); + + for result in [global, user, workspace] { + assert!(result.is_err()); + } +} + +#[tokio::test] +async fn invalidation_aborts_an_in_flight_snapshot() { + let server = MockServer::start().await; + unscoped_installed_request() + .respond_with( + ResponseTemplate::new(200) + .set_delay(Duration::from_millis(100)) + .set_body_json(installed_page( + vec![installed_plugin(0, RemotePluginScope::Global)], + None, + )), + ) + .expect(1) + .mount(&server) + .await; + + let cache = RemoteInstalledPluginSnapshotCache::default(); + let config = service_config(&server); + let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); + let (result, ()) = tokio::join!(cache.get_or_fetch(&config, &auth), async { + tokio::time::sleep(Duration::from_millis(20)).await; + cache.invalidate(); + }); + + assert!(matches!( + result, + Err(RemotePluginCatalogError::UnexpectedResponse(message)) + if message == REMOTE_INSTALLED_PLUGIN_SNAPSHOT_INVALIDATED + )); +} diff --git a/codex-rs/core-plugins/src/remote/remote_installed_plugin_sync.rs b/codex-rs/core-plugins/src/remote/remote_installed_plugin_sync.rs index 2a23b066edaa..f65c656388c4 100644 --- a/codex-rs/core-plugins/src/remote/remote_installed_plugin_sync.rs +++ b/codex-rs/core-plugins/src/remote/remote_installed_plugin_sync.rs @@ -4,11 +4,13 @@ use super::REMOTE_WORKSPACE_MARKETPLACE_NAME; use super::REMOTE_WORKSPACE_SHARED_WITH_ME_MARKETPLACE_NAME; use super::REMOTE_WORKSPACE_SHARED_WITH_ME_PRIVATE_MARKETPLACE_NAME; use super::REMOTE_WORKSPACE_SHARED_WITH_ME_UNLISTED_MARKETPLACE_NAME; +use super::RemoteInstalledPluginSnapshotCache; use super::RemotePluginCatalogError; use super::RemotePluginScope; use super::RemotePluginServiceConfig; use super::ensure_chatgpt_auth; use super::fetch_installed_plugins_for_scope_with_download_url; +use super::installed_snapshot::snapshot_invalidated_error; use super::remote_plugin_canonical_marketplace_name; use crate::store::PLUGINS_CACHE_DIR; use crate::store::PluginStore; @@ -83,6 +85,7 @@ pub(crate) fn maybe_start_remote_installed_plugin_bundle_sync( codex_home: PathBuf, config: RemotePluginServiceConfig, auth: Option, + installed_snapshot_cache: Arc, on_local_cache_changed: Option>, ) { let Some(auth) = auth else { @@ -96,8 +99,13 @@ pub(crate) fn maybe_start_remote_installed_plugin_bundle_sync( } tokio::spawn(async move { - let result = - sync_remote_installed_plugin_bundles_once(codex_home, &config, Some(&auth)).await; + let result = sync_remote_installed_plugin_bundles_once_with_snapshot( + codex_home, + &config, + Some(&auth), + Some(installed_snapshot_cache.as_ref()), + ) + .await; match result { Ok(outcome) => { if outcome.changed_local_cache() @@ -127,34 +135,48 @@ pub async fn sync_remote_installed_plugin_bundles_once( codex_home: PathBuf, config: &RemotePluginServiceConfig, auth: Option<&CodexAuth>, +) -> Result { + sync_remote_installed_plugin_bundles_once_with_snapshot( + codex_home, config, auth, /*installed_snapshot_cache*/ None, + ) + .await +} + +async fn sync_remote_installed_plugin_bundles_once_with_snapshot( + codex_home: PathBuf, + config: &RemotePluginServiceConfig, + auth: Option<&CodexAuth>, + installed_snapshot_cache: Option<&RemoteInstalledPluginSnapshotCache>, ) -> Result { let auth = ensure_chatgpt_auth(auth)?; - let global = async { - let scope = RemotePluginScope::Global; - let installed_plugins = fetch_installed_plugins_for_scope_with_download_url( - config, auth, scope, /*include_download_urls*/ true, - ) - .await?; - Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) - }; - let workspace = async { - let scope = RemotePluginScope::Workspace; - let installed_plugins = fetch_installed_plugins_for_scope_with_download_url( - config, auth, scope, /*include_download_urls*/ true, - ) - .await?; - Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) - }; - let user = async { - let scope = RemotePluginScope::User; - let installed_plugins = fetch_installed_plugins_for_scope_with_download_url( - config, auth, scope, /*include_download_urls*/ true, + let installed_snapshot = installed_snapshot_cache.map(|installed_snapshot_cache| { + ( + installed_snapshot_cache, + installed_snapshot_cache.generation(), ) - .await?; - Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) + }); + let installed_plugin_groups = if let Some((installed_snapshot_cache, _)) = installed_snapshot { + vec![ + installed_snapshot_cache + .get_or_fetch(config, auth) + .await? + .as_ref() + .to_vec(), + ] + } else { + let fetch_scope = |scope| { + fetch_installed_plugins_for_scope_with_download_url( + config, auth, scope, /*include_download_urls*/ true, + ) + }; + let (global, workspace, user) = tokio::try_join!( + fetch_scope(RemotePluginScope::Global), + fetch_scope(RemotePluginScope::Workspace), + fetch_scope(RemotePluginScope::User), + )?; + vec![global, workspace, user] }; - - let (global, workspace, user) = tokio::try_join!(global, workspace, user)?; + ensure_installed_snapshot_current(installed_snapshot)?; let store = PluginStore::try_new(codex_home.clone())?; let mut installed_plugin_names_by_marketplace = BTreeMap::>::from_iter([ @@ -183,8 +205,9 @@ pub async fn sync_remote_installed_plugin_bundles_once( let mut installed_plugin_ids = BTreeSet::new(); let mut failed_remote_plugin_ids = BTreeSet::new(); - for (_scope, installed_plugins) in [global, workspace, user] { + for installed_plugins in installed_plugin_groups { for installed_plugin in installed_plugins { + ensure_installed_snapshot_current(installed_snapshot)?; let plugin = installed_plugin.plugin; let marketplace_name = remote_plugin_canonical_marketplace_name(&plugin)?.to_string(); installed_plugin_names_by_marketplace @@ -270,6 +293,7 @@ pub async fn sync_remote_installed_plugin_bundles_once( } } + ensure_installed_snapshot_current(installed_snapshot)?; let removed_cache_plugin_ids = tokio::task::spawn_blocking(move || { remove_stale_remote_plugin_caches( codex_home.as_path(), @@ -286,6 +310,17 @@ pub async fn sync_remote_installed_plugin_bundles_once( }) } +fn ensure_installed_snapshot_current( + installed_snapshot: Option<(&RemoteInstalledPluginSnapshotCache, u64)>, +) -> Result<(), RemoteInstalledPluginBundleSyncError> { + if let Some((installed_snapshot_cache, generation)) = installed_snapshot + && generation != installed_snapshot_cache.generation() + { + return Err(snapshot_invalidated_error().into()); + } + Ok(()) +} + pub fn mark_remote_plugin_cache_mutation_in_flight( codex_home: &Path, marketplace_name: &str, diff --git a/codex-rs/core/src/plugins/discoverable_tests.rs b/codex-rs/core/src/plugins/discoverable_tests.rs index 53d010d190ac..a61af3569ec7 100644 --- a/codex-rs/core/src/plugins/discoverable_tests.rs +++ b/codex-rs/core/src/plugins/discoverable_tests.rs @@ -63,6 +63,7 @@ async fn list_tool_suggest_discoverable_plugins_includes_cached_remote_global_pl use wiremock::matchers::method; use wiremock::matchers::path; use wiremock::matchers::query_param; + use wiremock::matchers::query_param_is_missing; let codex_home = tempdir().expect("tempdir should succeed"); write_file( @@ -243,20 +244,20 @@ plugins = true .all(|plugin| plugin.id != "github@openai-curated-remote") ); - for scope in ["GLOBAL", "USER", "WORKSPACE"] { - Mock::given(method("GET")) - .and(path("/backend-api/ps/plugins/installed")) - .and(query_param("scope", scope)) - .respond_with(ResponseTemplate::new(200).set_body_json(json!({ - "plugins": [], - "pagination": { - "next_page_token": null - } - }))) - .expect(1) - .mount(&server) - .await; - } + Mock::given(method("GET")) + .and(path("/backend-api/ps/plugins/installed")) + .and(query_param_is_missing("scope")) + .and(query_param("includeDownloadUrls", "true")) + .and(query_param("limit", "1000")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "plugins": [], + "pagination": { + "next_page_token": null + } + }))) + .expect(1) + .mount(&server) + .await; plugins_manager .build_and_cache_remote_installed_plugin_marketplaces( &config.plugins_config_input(), diff --git a/codex-rs/core/tests/suite/request_plugin_install.rs b/codex-rs/core/tests/suite/request_plugin_install.rs index da10cd1149a2..c3260ab0e987 100644 --- a/codex-rs/core/tests/suite/request_plugin_install.rs +++ b/codex-rs/core/tests/suite/request_plugin_install.rs @@ -46,6 +46,7 @@ use wiremock::ResponseTemplate; use wiremock::matchers::method; use wiremock::matchers::path; use wiremock::matchers::query_param; +use wiremock::matchers::query_param_is_missing; const TOOL_SEARCH_TOOL_NAME: &str = "tool_search"; const LIST_AVAILABLE_PLUGINS_TO_INSTALL_TOOL_NAME: &str = "list_available_plugins_to_install"; @@ -233,7 +234,11 @@ fn remote_installed_plugins_response(plugins: Vec) -> ResponseTemplate { async fn mount_empty_remote_installed_plugins(server: &wiremock::MockServer) -> MockGuard { Mock::given(method("GET")) .and(path("/ps/plugins/installed")) + .and(query_param_is_missing("scope")) + .and(query_param("includeDownloadUrls", "true")) + .and(query_param("limit", "1000")) .respond_with(remote_installed_plugins_response(Vec::new())) + .expect(1) .mount_as_scoped(server) .await } @@ -241,7 +246,9 @@ async fn mount_empty_remote_installed_plugins(server: &wiremock::MockServer) -> async fn mount_remote_calendar_installed_plugins(server: &wiremock::MockServer) { Mock::given(method("GET")) .and(path("/ps/plugins/installed")) - .and(query_param("scope", "GLOBAL")) + .and(query_param_is_missing("scope")) + .and(query_param("includeDownloadUrls", "true")) + .and(query_param("limit", "1000")) .respond_with(remote_installed_plugins_response(vec![json!({ "id": REMOTE_CALENDAR_PLUGIN_ID, "name": "calendar", @@ -259,15 +266,6 @@ async fn mount_remote_calendar_installed_plugins(server: &wiremock::MockServer) .with_priority(1) .mount(server) .await; - for scope in ["WORKSPACE", "USER"] { - Mock::given(method("GET")) - .and(path("/ps/plugins/installed")) - .and(query_param("scope", scope)) - .respond_with(remote_installed_plugins_response(Vec::new())) - .with_priority(1) - .mount(server) - .await; - } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] From 1809b2142313b1a9e7071b59a5d7cf80f8d4057c Mon Sep 17 00:00:00 2001 From: Eric Ning Date: Thu, 2 Jul 2026 09:11:32 -0600 Subject: [PATCH 2/2] Fix installed plugin auth mode type --- codex-rs/core-plugins/src/remote/installed_snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/core-plugins/src/remote/installed_snapshot.rs b/codex-rs/core-plugins/src/remote/installed_snapshot.rs index 499e1646c9fb..63506362b1b2 100644 --- a/codex-rs/core-plugins/src/remote/installed_snapshot.rs +++ b/codex-rs/core-plugins/src/remote/installed_snapshot.rs @@ -4,9 +4,9 @@ use super::RemotePluginInstalledItem; use super::RemotePluginScope; use super::RemotePluginServiceConfig; use super::get_remote_plugin_installed_page; -use codex_app_server_protocol::AuthMode; use codex_login::CodexAuth; use codex_protocol::account::PlanType; +use codex_protocol::auth::AuthMode; use std::collections::BTreeSet; use std::sync::Arc; use std::sync::RwLock;