Skip to content

Commit c86e843

Browse files
authored
Cosmic replicaset Kubernetes API client large scale issue (#1590)
* add compression and stream parsing support * handle custom metrics disablement correctly * add check and always clean up inflater resources * return response code from fallback
1 parent 6f4c31f commit c86e843

3 files changed

Lines changed: 186 additions & 12 deletions

File tree

source/plugins/ruby/CustomMetricsUtils.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def check_custom_metrics_availability
2020
return true
2121
end
2222

23-
if enable_custom_metrics.nil? || enable_custom_metrics.to_s.downcase == 'false'
23+
if enable_custom_metrics.nil? || enable_custom_metrics.to_s.empty? || enable_custom_metrics.to_s.downcase == 'false'
2424
return false
2525
end
2626

source/plugins/ruby/KubernetesApiClient.rb

Lines changed: 182 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ class KubernetesApiClient
1010
require "time"
1111
require "ipaddress"
1212
require "jwt"
13+
require "zlib"
14+
require "stringio"
15+
require 'yajl'
1316

1417
require_relative "oms_common"
1518
require_relative "constants"
@@ -864,18 +867,187 @@ def getResourcesAndContinuationTokenV2(uri, api_group: nil)
864867
resourceInventory = nil
865868
responseCode = nil
866869
begin
867-
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2 : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}"
868-
responseCode, resourceInfo = getKubeResourceInfoV2(uri, api_group: api_group)
869-
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2 : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}"
870-
if !responseCode.nil? && responseCode == "200" && !resourceInfo.nil?
871-
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2:Start:Parsing data for #{uri} using JSON @ #{Time.now.utc.iso8601}"
872-
resourceInventory = JSON.parse(resourceInfo.body)
873-
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2:End:Parsing data for #{uri} using JSON @ #{Time.now.utc.iso8601}"
874-
resourceInfo = nil
870+
resource_path = getResourceUri(uri, api_group)
871+
if resource_path.nil?
872+
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: resource path nil for #{uri}"
873+
return continuationToken, resourceInventory, responseCode
875874
end
876-
if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?)
877-
continuationToken = resourceInventory["metadata"]["continue"]
875+
parsed_items = []
876+
metadata_continue = nil
877+
resource_version = nil
878+
parse_mode = "stream"
879+
total_uncompressed_bytes = 0
880+
total_compressed_bytes = 0
881+
started_at = Time.now.utc
882+
883+
begin
884+
parsed_uri = URI.parse(resource_path)
885+
if !File.exist?(@@CaFile)
886+
raise "#{@@CaFile} doesnt exist"
887+
end
888+
889+
Net::HTTP.start(parsed_uri.host, parsed_uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER, :open_timeout => 20, :read_timeout => 40) do |http|
890+
kubeApiRequest = Net::HTTP::Get.new(parsed_uri.request_uri)
891+
kubeApiRequest['Authorization'] = 'Bearer ' + getTokenStr
892+
kubeApiRequest['User-Agent'] = getUserAgent()
893+
kubeApiRequest['Accept-Encoding'] = 'gzip'
894+
kubeApiRequest['Accept'] = 'application/json'
895+
896+
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2(stream): Requesting #{uri} (api_group=#{api_group}) @ #{started_at.iso8601}"
897+
898+
http.request(kubeApiRequest) do |response|
899+
responseCode = response.code
900+
unless responseCode == '200'
901+
parse_mode = 'error'
902+
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: Non-success code #{responseCode} for #{uri}"
903+
# Send telemetry for non-success response codes
904+
@@K8sApiResponseTelemetryTimeTracker = ApplicationInsightsUtility.sendAPIResponseTelemetry(responseCode, uri, "K8sAPIStatus", @@K8sApiResponseCodeHash, @@K8sApiResponseTelemetryTimeTracker)
905+
break
906+
end
907+
908+
# Decide whether to stream or fallback to full parse based on Content-Length (if small, cheaper to full-parse)
909+
content_length = nil
910+
begin
911+
content_length = Integer(response['Content-Length']) if response['Content-Length']
912+
rescue; end
913+
small_threshold = 256 * 1024 # 256KB
914+
915+
if content_length && content_length <= small_threshold
916+
# Read whole (possibly compressed) body then use faster parser for small payloads
917+
body_buf = +"" # mutable string
918+
response.read_body { |c| body_buf << c }
919+
total_compressed_bytes = body_buf.bytesize
920+
if response['Content-Encoding'] == 'gzip'
921+
begin
922+
body_buf = Zlib::GzipReader.new(StringIO.new(body_buf)).read
923+
parse_mode = 'full_gzip'
924+
rescue => gzerr
925+
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: gzip decompress(small) failed: #{gzerr}; using compressed body (parse will likely fail)"
926+
end
927+
else
928+
parse_mode = 'full_plain'
929+
end
930+
total_uncompressed_bytes = body_buf.bytesize
931+
resourceInventory = JSON.parse(body_buf)
932+
else
933+
# Streaming path - CRITICAL: Create parser ONCE outside the read_body loop
934+
parse_mode = 'stream'
935+
is_gzip = (response['Content-Encoding'] == 'gzip')
936+
inflater = nil
937+
yajl_parser = nil
938+
begin
939+
if is_gzip
940+
# Use Inflate with gzip window bits for streaming
941+
inflater = Zlib::Inflate.new(Zlib::MAX_WBITS + 32)
942+
parse_mode = 'stream_gzip'
943+
end
944+
945+
# Create Yajl parser ONCE and reuse for all chunks
946+
yajl_parser = Yajl::Parser.new
947+
948+
# Set up the parser callback to extract items and continuation token
949+
yajl_parser.on_parse_complete = lambda do |obj|
950+
if obj.is_a?(Hash)
951+
if obj.key?('items') && obj['items'].is_a?(Array)
952+
# Force deep copy via JSON round-trip to avoid Yajl object reference issues
953+
begin
954+
serialized_items = JSON.generate(obj['items'])
955+
deep_copied_items = JSON.parse(serialized_items)
956+
parsed_items.concat(deep_copied_items)
957+
rescue => json_err
958+
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: JSON round-trip failed: #{json_err}, using shallow copy"
959+
parsed_items.concat(obj['items'])
960+
end
961+
end
962+
if obj.key?('metadata') && obj['metadata'].is_a?(Hash)
963+
metadata_continue = obj['metadata']['continue'] if obj['metadata'].key?('continue')
964+
resource_version = obj['metadata']['resourceVersion'] if obj['metadata'].key?('resourceVersion')
965+
end
966+
end
967+
end
968+
969+
# Stream and parse chunks
970+
chunk_count = 0
971+
response.read_body do |compressed_chunk|
972+
chunk_count += 1
973+
total_compressed_bytes += compressed_chunk.bytesize
974+
975+
decompressed = if is_gzip
976+
begin
977+
inflater.inflate(compressed_chunk)
978+
rescue Zlib::Error => zerr
979+
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: gzip inflate failed at chunk #{chunk_count}: #{zerr}"
980+
raise
981+
end
982+
else
983+
compressed_chunk
984+
end
985+
986+
total_uncompressed_bytes += decompressed.bytesize
987+
988+
# Feed decompressed chunk to the parser
989+
# Yajl can handle incomplete JSON and will buffer internally
990+
begin
991+
yajl_parser << decompressed
992+
rescue Yajl::ParseError => perr
993+
# Only log parse errors, don't break the stream - might be incomplete chunk
994+
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: Yajl parse error at chunk #{chunk_count}: #{perr}"
995+
end
996+
997+
# Yield control periodically to allow other threads to run (every 10 chunks)
998+
Thread.pass if chunk_count % 10 == 0
999+
end # read_body
1000+
1001+
# Finalize the parsing - this triggers on_parse_complete callback
1002+
yajl_parser.parse("") rescue nil
1003+
1004+
# Build minimal inventory structure
1005+
resourceInventory = {
1006+
'metadata' => { 'continue' => metadata_continue, 'resourceVersion' => resource_version },
1007+
'items' => parsed_items
1008+
}
1009+
1010+
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2: Successfully parsed #{parsed_items.length} items in #{chunk_count} chunks"
1011+
1012+
rescue => stream_err
1013+
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: Stream processing error: #{stream_err}"
1014+
raise
1015+
ensure
1016+
# Always clean up inflater resources, regardless of success or failure
1017+
if inflater
1018+
inflater.finish rescue nil
1019+
inflater.close rescue nil
1020+
end
1021+
end
1022+
end # streaming path
1023+
end # http.request
1024+
end # Net::HTTP.start
1025+
rescue => inner_err
1026+
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: streaming fetch/parse failed for #{uri}: #{inner_err}; falling back to legacy getKubeResourceInfoV2"
1027+
parse_mode = 'fallback'
1028+
begin
1029+
# Fallback to legacy path
1030+
fallbackResponseCode, resourceInfo = getKubeResourceInfoV2(uri, api_group: api_group)
1031+
responseCode = fallbackResponseCode
1032+
if fallbackResponseCode == '200' && resourceInfo && resourceInfo.body && !resourceInfo.body.empty?
1033+
resourceInventory = JSON.parse(resourceInfo.body)
1034+
# Set continuationToken from fallback response
1035+
if resourceInventory && resourceInventory['metadata']
1036+
continuationToken = resourceInventory['metadata']['continue']
1037+
end
1038+
end
1039+
rescue => legacy_err
1040+
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: legacy fallback also failed: #{legacy_err}"
1041+
ApplicationInsightsUtility.sendExceptionTelemetry(legacy_err)
1042+
end
1043+
end
1044+
1045+
# Derive continuation token if not already set
1046+
if continuationToken.nil? && resourceInventory && resourceInventory['metadata']
1047+
continuationToken = resourceInventory['metadata']['continue'] if resourceInventory['metadata'].key?('continue')
8781048
end
1049+
duration_ms = ((Time.now.utc - started_at) * 1000).round(1)
1050+
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2: mode=#{parse_mode} code=#{responseCode} items=#{resourceInventory && resourceInventory['items'] ? resourceInventory['items'].length : 'n/a'} cont=#{continuationToken.nil? ? 'nil' : continuationToken.empty? ? 'empty' : 'set'} compBytes=#{total_compressed_bytes} uncompBytes=#{total_uncompressed_bytes} ms=#{duration_ms} uri=#{uri}"
8791051
rescue => errorStr
8801052
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2:Failed in get resources for #{uri} and continuation token: #{errorStr}"
8811053
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)

source/plugins/ruby/in_kube_podinventory.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,9 @@ def getPodInventoryRecords(item, serviceRecords, batchTime = Time.utc.iso8601)
689689
records.push(record)
690690
end #container status block end
691691

692-
@mdmPodRecordItems.push(mdmPodRecord.dup)
692+
if CustomMetricsUtils.check_custom_metrics_availability
693+
@mdmPodRecordItems.push(mdmPodRecord.dup)
694+
end
693695

694696
records.each do |record|
695697
if !record.nil?

0 commit comments

Comments
 (0)