001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet.otel; 018 019import com.soklet.ConnectionRejectionReason; 020import com.soklet.MarshaledResponse; 021import com.soklet.MetricsCollector; 022import com.soklet.Request; 023import com.soklet.RequestReadFailureReason; 024import com.soklet.RequestRejectionReason; 025import com.soklet.ResourceMethod; 026import com.soklet.ResourcePathDeclaration; 027import com.soklet.ServerType; 028import com.soklet.SseComment; 029import com.soklet.SseConnection; 030import com.soklet.SseEvent; 031import com.soklet.StreamTermination; 032import io.opentelemetry.api.GlobalOpenTelemetry; 033import io.opentelemetry.api.OpenTelemetry; 034import io.opentelemetry.api.common.AttributeKey; 035import io.opentelemetry.api.common.Attributes; 036import io.opentelemetry.api.metrics.DoubleHistogram; 037import io.opentelemetry.api.metrics.LongCounter; 038import io.opentelemetry.api.metrics.LongHistogram; 039import io.opentelemetry.api.metrics.LongUpDownCounter; 040import io.opentelemetry.api.metrics.Meter; 041import io.opentelemetry.api.metrics.MeterBuilder; 042import org.jspecify.annotations.NonNull; 043import org.jspecify.annotations.Nullable; 044 045import javax.annotation.concurrent.NotThreadSafe; 046import javax.annotation.concurrent.ThreadSafe; 047import java.net.InetSocketAddress; 048import java.time.Duration; 049import java.util.List; 050import java.util.Locale; 051 052import static java.util.Objects.requireNonNull; 053 054/** 055 * OpenTelemetry-backed {@link MetricsCollector} for Soklet HTTP and SSE telemetry. 056 * <p> 057 * This implementation records counters/histograms via OpenTelemetry's metrics API and is designed to be 058 * lightweight, thread-safe, and non-blocking in request hot paths. 059 * <p> 060 * By default, standard HTTP metrics use OpenTelemetry Semantic Convention names. Soklet-specific concepts 061 * (for example SSE queue/drop/broadcast details) are emitted with {@code soklet.*} names. 062 * <p> 063 * If inbound requests include W3C trace context, Soklet exposes it via {@link Request#getTraceContext()} to 064 * custom metrics collectors and application code. This metrics-only implementation intentionally does not emit 065 * trace IDs, parent IDs, or {@code tracestate} values as metric attributes because those values are high-cardinality 066 * and belong in logs, spans, or exemplar-aware tracing integrations instead. 067 * <p> 068 * See <a href="https://soklet.com/docs/metrics-collection">https://soklet.com/docs/metrics-collection</a> for Soklet's metrics/telemetry documentation. 069 * 070 * @author <a href="https://www.revetkn.com">Mark Allen</a> 071 */ 072@ThreadSafe 073public final class OpenTelemetryMetricsCollector implements MetricsCollector { 074 @NonNull 075 private static final String UNMATCHED_ROUTE; 076 @NonNull 077 private static final String UNKNOWN_COMMENT_TYPE; 078 @NonNull 079 private static final String BROADCAST_PAYLOAD_EVENT; 080 @NonNull 081 private static final String BROADCAST_PAYLOAD_COMMENT; 082 @NonNull 083 private static final String DEFAULT_INSTRUMENTATION_NAME; 084 @NonNull 085 private static final String URL_SCHEME_HTTP; 086 087 @NonNull 088 private static final AttributeKey<String> SERVER_TYPE_ATTRIBUTE_KEY; 089 @NonNull 090 private static final AttributeKey<String> FAILURE_REASON_ATTRIBUTE_KEY; 091 @NonNull 092 private static final AttributeKey<String> ERROR_TYPE_ATTRIBUTE_KEY; 093 @NonNull 094 private static final AttributeKey<String> HTTP_METHOD_ATTRIBUTE_KEY; 095 @NonNull 096 private static final AttributeKey<String> HTTP_ROUTE_ATTRIBUTE_KEY; 097 @NonNull 098 private static final AttributeKey<String> URL_SCHEME_ATTRIBUTE_KEY; 099 @NonNull 100 private static final AttributeKey<Long> HTTP_STATUS_CODE_ATTRIBUTE_KEY; 101 @NonNull 102 private static final AttributeKey<String> SSE_TERMINATION_REASON_ATTRIBUTE_KEY; 103 @NonNull 104 private static final AttributeKey<String> SSE_DROP_REASON_ATTRIBUTE_KEY; 105 @NonNull 106 private static final AttributeKey<String> SSE_COMMENT_TYPE_ATTRIBUTE_KEY; 107 @NonNull 108 private static final AttributeKey<String> SSE_BROADCAST_PAYLOAD_TYPE_ATTRIBUTE_KEY; 109 110 static { 111 UNMATCHED_ROUTE = "_unmatched"; 112 UNKNOWN_COMMENT_TYPE = "unknown"; 113 BROADCAST_PAYLOAD_EVENT = "event"; 114 BROADCAST_PAYLOAD_COMMENT = "comment"; 115 DEFAULT_INSTRUMENTATION_NAME = "com.soklet.otel"; 116 URL_SCHEME_HTTP = "http"; 117 118 SERVER_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.server.type"); 119 FAILURE_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.failure.reason"); 120 ERROR_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("error.type"); 121 HTTP_METHOD_ATTRIBUTE_KEY = AttributeKey.stringKey("http.request.method"); 122 HTTP_ROUTE_ATTRIBUTE_KEY = AttributeKey.stringKey("http.route"); 123 URL_SCHEME_ATTRIBUTE_KEY = AttributeKey.stringKey("url.scheme"); 124 HTTP_STATUS_CODE_ATTRIBUTE_KEY = AttributeKey.longKey("http.response.status_code"); 125 SSE_TERMINATION_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.termination.reason"); 126 SSE_DROP_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.drop.reason"); 127 SSE_COMMENT_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.comment.type"); 128 SSE_BROADCAST_PAYLOAD_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.broadcast.payload.type"); 129 } 130 131 @NonNull 132 private final LongCounter connectionsAcceptedCounter; 133 @NonNull 134 private final LongCounter connectionsRejectedCounter; 135 @NonNull 136 private final LongCounter requestsAcceptedCounter; 137 @NonNull 138 private final LongCounter requestsRejectedCounter; 139 @NonNull 140 private final LongCounter requestReadFailureCounter; 141 @NonNull 142 private final LongUpDownCounter activeRequestsCounter; 143 @NonNull 144 private final DoubleHistogram requestDurationHistogram; 145 @NonNull 146 private final DoubleHistogram responseWriteDurationHistogram; 147 @NonNull 148 private final LongCounter responseWriteFailureCounter; 149 @NonNull 150 private final LongCounter requestThrowableCounter; 151 @NonNull 152 private final LongHistogram requestBodySizeHistogram; 153 @NonNull 154 private final LongHistogram responseBodySizeHistogram; 155 156 @NonNull 157 private final LongUpDownCounter activeServerSentEventStreamsCounter; 158 @NonNull 159 private final LongCounter serverSentEventStreamsEstablishedCounter; 160 @NonNull 161 private final LongCounter serverSentEventHandshakeFailureCounter; 162 @NonNull 163 private final LongCounter serverSentEventStreamsTerminatedCounter; 164 @NonNull 165 private final DoubleHistogram serverSentEventStreamDurationHistogram; 166 @NonNull 167 private final LongCounter serverSentEventWrittenCounter; 168 @NonNull 169 private final LongCounter serverSentEventWriteFailureCounter; 170 @NonNull 171 private final DoubleHistogram serverSentEventWriteDurationHistogram; 172 @NonNull 173 private final DoubleHistogram serverSentEventDeliveryLagHistogram; 174 @NonNull 175 private final LongHistogram serverSentEventPayloadSizeHistogram; 176 @NonNull 177 private final LongHistogram serverSentEventQueueDepthHistogram; 178 @NonNull 179 private final LongCounter serverSentEventDropCounter; 180 181 @NonNull 182 private final LongCounter serverSentEventCommentWrittenCounter; 183 @NonNull 184 private final LongCounter serverSentEventCommentWriteFailureCounter; 185 @NonNull 186 private final DoubleHistogram serverSentEventCommentWriteDurationHistogram; 187 @NonNull 188 private final DoubleHistogram serverSentEventCommentDeliveryLagHistogram; 189 @NonNull 190 private final LongHistogram serverSentEventCommentPayloadSizeHistogram; 191 @NonNull 192 private final LongHistogram serverSentEventCommentQueueDepthHistogram; 193 @NonNull 194 private final LongCounter serverSentEventCommentDropCounter; 195 196 @NonNull 197 private final LongCounter serverSentEventBroadcastAttemptCounter; 198 @NonNull 199 private final LongCounter serverSentEventBroadcastEnqueuedCounter; 200 @NonNull 201 private final LongCounter serverSentEventBroadcastDroppedCounter; 202 @NonNull 203 private final MetricNamingStrategy metricNamingStrategy; 204 205 /** 206 * Acquires a builder for {@link OpenTelemetryMetricsCollector} instances, using {@link GlobalOpenTelemetry} 207 * by default. 208 * 209 * @return the builder 210 */ 211 @NonNull 212 public static Builder builder() { 213 return new Builder(); 214 } 215 216 /** 217 * Acquires a builder seeded with a required {@link Meter}. 218 * 219 * @param meter the meter used to build instruments 220 * @return the builder 221 */ 222 @NonNull 223 public static Builder withMeter(@NonNull Meter meter) { 224 requireNonNull(meter); 225 return builder().meter(meter); 226 } 227 228 /** 229 * Acquires a builder seeded with a required {@link OpenTelemetry} instance. 230 * 231 * @param openTelemetry the OpenTelemetry instance used to build a meter 232 * @return the builder 233 */ 234 @NonNull 235 public static Builder withOpenTelemetry(@NonNull OpenTelemetry openTelemetry) { 236 requireNonNull(openTelemetry); 237 return builder().openTelemetry(openTelemetry); 238 } 239 240 /** 241 * Creates an instance from a required {@link Meter} without additional customization. 242 * 243 * @param meter the meter used to build instruments 244 * @return an {@link OpenTelemetryMetricsCollector} instance 245 */ 246 @NonNull 247 public static OpenTelemetryMetricsCollector fromMeter(@NonNull Meter meter) { 248 return withMeter(meter).build(); 249 } 250 251 /** 252 * Creates an instance from a required {@link OpenTelemetry} without additional customization. 253 * 254 * @param openTelemetry the OpenTelemetry instance used to build a meter 255 * @return an {@link OpenTelemetryMetricsCollector} instance 256 */ 257 @NonNull 258 public static OpenTelemetryMetricsCollector fromOpenTelemetry(@NonNull OpenTelemetry openTelemetry) { 259 return withOpenTelemetry(openTelemetry).build(); 260 } 261 262 private OpenTelemetryMetricsCollector(@NonNull Builder builder) { 263 requireNonNull(builder); 264 Meter meter = requireNonNull(builder.resolveMeter()); 265 this.metricNamingStrategy = requireNonNull(builder.metricNamingStrategy); 266 267 String activeRequestsMetricName = activeRequestsMetricNameFor(this.metricNamingStrategy); 268 String requestDurationMetricName = requestDurationMetricNameFor(this.metricNamingStrategy); 269 String requestBodySizeMetricName = requestBodySizeMetricNameFor(this.metricNamingStrategy); 270 String responseBodySizeMetricName = responseBodySizeMetricNameFor(this.metricNamingStrategy); 271 272 this.connectionsAcceptedCounter = meter.counterBuilder("soklet.server.connections.accepted") 273 .setDescription("Total number of accepted inbound TCP connections.") 274 .setUnit("{connection}") 275 .build(); 276 this.connectionsRejectedCounter = meter.counterBuilder("soklet.server.connections.rejected") 277 .setDescription("Total number of rejected inbound TCP connections.") 278 .setUnit("{connection}") 279 .build(); 280 this.requestsAcceptedCounter = meter.counterBuilder("soklet.server.requests.accepted") 281 .setDescription("Total number of accepted requests before app-level handling.") 282 .setUnit("{request}") 283 .build(); 284 this.requestsRejectedCounter = meter.counterBuilder("soklet.server.requests.rejected") 285 .setDescription("Total number of rejected requests before app-level handling.") 286 .setUnit("{request}") 287 .build(); 288 this.requestReadFailureCounter = meter.counterBuilder("soklet.server.request.read.failures") 289 .setDescription("Total number of request read/parse failures.") 290 .setUnit("{request}") 291 .build(); 292 this.activeRequestsCounter = meter.upDownCounterBuilder(activeRequestsMetricName) 293 .setDescription("Number of in-flight requests currently being handled.") 294 .setUnit("{request}") 295 .build(); 296 this.requestDurationHistogram = meter.histogramBuilder(requestDurationMetricName) 297 .setDescription("Total request handling duration.") 298 .setUnit("s") 299 .build(); 300 this.responseWriteDurationHistogram = meter.histogramBuilder("soklet.server.response.write.duration") 301 .setDescription("Duration spent writing response bytes.") 302 .setUnit("s") 303 .build(); 304 this.responseWriteFailureCounter = meter.counterBuilder("soklet.server.response.write.failures") 305 .setDescription("Total number of response write failures.") 306 .setUnit("{response}") 307 .build(); 308 this.requestThrowableCounter = meter.counterBuilder("soklet.server.request.throwables") 309 .setDescription("Total number of throwables observed during request handling.") 310 .setUnit("{throwable}") 311 .build(); 312 this.requestBodySizeHistogram = meter.histogramBuilder(requestBodySizeMetricName) 313 .ofLongs() 314 .setDescription("Request body size in bytes.") 315 .setUnit("By") 316 .build(); 317 this.responseBodySizeHistogram = meter.histogramBuilder(responseBodySizeMetricName) 318 .ofLongs() 319 .setDescription("Response body size in bytes.") 320 .setUnit("By") 321 .build(); 322 323 this.activeServerSentEventStreamsCounter = meter.upDownCounterBuilder("soklet.sse.streams.active") 324 .setDescription("Number of active SSE streams.") 325 .setUnit("{stream}") 326 .build(); 327 this.serverSentEventStreamsEstablishedCounter = meter.counterBuilder("soklet.sse.streams.established") 328 .setDescription("Total number of SSE streams established.") 329 .setUnit("{stream}") 330 .build(); 331 this.serverSentEventHandshakeFailureCounter = meter.counterBuilder("soklet.sse.handshakes.rejected") 332 .setDescription("Total number of rejected SSE handshakes.") 333 .setUnit("{handshake}") 334 .build(); 335 this.serverSentEventStreamsTerminatedCounter = meter.counterBuilder("soklet.sse.streams.terminated") 336 .setDescription("Total number of terminated SSE streams.") 337 .setUnit("{stream}") 338 .build(); 339 this.serverSentEventStreamDurationHistogram = meter.histogramBuilder("soklet.sse.stream.duration") 340 .setDescription("SSE stream duration.") 341 .setUnit("s") 342 .build(); 343 this.serverSentEventWrittenCounter = meter.counterBuilder("soklet.sse.events.written") 344 .setDescription("Total number of SSE events successfully written.") 345 .setUnit("{event}") 346 .build(); 347 this.serverSentEventWriteFailureCounter = meter.counterBuilder("soklet.sse.events.write.failures") 348 .setDescription("Total number of SSE events that failed to write.") 349 .setUnit("{event}") 350 .build(); 351 this.serverSentEventWriteDurationHistogram = meter.histogramBuilder("soklet.sse.events.write.duration") 352 .setDescription("SSE event write duration.") 353 .setUnit("s") 354 .build(); 355 this.serverSentEventDeliveryLagHistogram = meter.histogramBuilder("soklet.sse.events.delivery.lag") 356 .setDescription("Time spent waiting in the SSE queue before write.") 357 .setUnit("s") 358 .build(); 359 this.serverSentEventPayloadSizeHistogram = meter.histogramBuilder("soklet.sse.events.payload.size") 360 .ofLongs() 361 .setDescription("Serialized SSE event payload size in bytes.") 362 .setUnit("By") 363 .build(); 364 this.serverSentEventQueueDepthHistogram = meter.histogramBuilder("soklet.sse.events.queue.depth") 365 .ofLongs() 366 .setDescription("Queued element depth when SSE event write/drop outcome is observed.") 367 .setUnit("{item}") 368 .build(); 369 this.serverSentEventDropCounter = meter.counterBuilder("soklet.sse.events.dropped") 370 .setDescription("Total number of SSE events dropped before enqueue.") 371 .setUnit("{event}") 372 .build(); 373 374 this.serverSentEventCommentWrittenCounter = meter.counterBuilder("soklet.sse.comments.written") 375 .setDescription("Total number of SSE comments successfully written.") 376 .setUnit("{comment}") 377 .build(); 378 this.serverSentEventCommentWriteFailureCounter = meter.counterBuilder("soklet.sse.comments.write.failures") 379 .setDescription("Total number of SSE comments that failed to write.") 380 .setUnit("{comment}") 381 .build(); 382 this.serverSentEventCommentWriteDurationHistogram = meter.histogramBuilder("soklet.sse.comments.write.duration") 383 .setDescription("SSE comment write duration.") 384 .setUnit("s") 385 .build(); 386 this.serverSentEventCommentDeliveryLagHistogram = meter.histogramBuilder("soklet.sse.comments.delivery.lag") 387 .setDescription("Time spent waiting in the SSE queue before comment write.") 388 .setUnit("s") 389 .build(); 390 this.serverSentEventCommentPayloadSizeHistogram = meter.histogramBuilder("soklet.sse.comments.payload.size") 391 .ofLongs() 392 .setDescription("Serialized SSE comment payload size in bytes.") 393 .setUnit("By") 394 .build(); 395 this.serverSentEventCommentQueueDepthHistogram = meter.histogramBuilder("soklet.sse.comments.queue.depth") 396 .ofLongs() 397 .setDescription("Queued element depth when SSE comment write/drop outcome is observed.") 398 .setUnit("{item}") 399 .build(); 400 this.serverSentEventCommentDropCounter = meter.counterBuilder("soklet.sse.comments.dropped") 401 .setDescription("Total number of SSE comments dropped before enqueue.") 402 .setUnit("{comment}") 403 .build(); 404 405 this.serverSentEventBroadcastAttemptCounter = meter.counterBuilder("soklet.sse.broadcast.attempted") 406 .setDescription("Total number of attempted SSE broadcast deliveries.") 407 .setUnit("{delivery}") 408 .build(); 409 this.serverSentEventBroadcastEnqueuedCounter = meter.counterBuilder("soklet.sse.broadcast.enqueued") 410 .setDescription("Total number of SSE broadcast deliveries successfully enqueued.") 411 .setUnit("{delivery}") 412 .build(); 413 this.serverSentEventBroadcastDroppedCounter = meter.counterBuilder("soklet.sse.broadcast.dropped") 414 .setDescription("Total number of SSE broadcast deliveries dropped before enqueue.") 415 .setUnit("{delivery}") 416 .build(); 417 } 418 419 @Override 420 public void didAcceptConnection(@NonNull ServerType serverType, 421 @Nullable InetSocketAddress remoteAddress) { 422 requireNonNull(serverType); 423 this.connectionsAcceptedCounter.add(1, serverTypeAttributes(serverType)); 424 } 425 426 @Override 427 public void didFailToAcceptConnection(@NonNull ServerType serverType, 428 @Nullable InetSocketAddress remoteAddress, 429 @NonNull ConnectionRejectionReason reason, 430 @Nullable Throwable throwable) { 431 requireNonNull(serverType); 432 requireNonNull(reason); 433 this.connectionsRejectedCounter.add(1, serverTypeAndReasonAttributes(serverType, reason)); 434 } 435 436 @Override 437 public void didAcceptRequest(@NonNull ServerType serverType, 438 @Nullable InetSocketAddress remoteAddress, 439 @Nullable String requestTarget) { 440 requireNonNull(serverType); 441 this.requestsAcceptedCounter.add(1, serverTypeAttributes(serverType)); 442 } 443 444 @Override 445 public void didFailToAcceptRequest(@NonNull ServerType serverType, 446 @Nullable InetSocketAddress remoteAddress, 447 @Nullable String requestTarget, 448 @NonNull RequestRejectionReason reason, 449 @Nullable Throwable throwable) { 450 requireNonNull(serverType); 451 requireNonNull(reason); 452 this.requestsRejectedCounter.add(1, serverTypeAndReasonAttributes(serverType, reason)); 453 } 454 455 @Override 456 public void didFailToReadRequest(@NonNull ServerType serverType, 457 @Nullable InetSocketAddress remoteAddress, 458 @Nullable String requestTarget, 459 @NonNull RequestReadFailureReason reason, 460 @Nullable Throwable throwable) { 461 requireNonNull(serverType); 462 requireNonNull(reason); 463 this.requestReadFailureCounter.add(1, serverTypeAndReasonAttributes(serverType, reason)); 464 } 465 466 @Override 467 public void didStartRequestHandling(@NonNull ServerType serverType, 468 @NonNull Request request, 469 @Nullable ResourceMethod resourceMethod) { 470 requireNonNull(serverType); 471 requireNonNull(request); 472 473 this.activeRequestsCounter.add(1, activeRequestAttributes(serverType, request)); 474 } 475 476 @Override 477 public void didFinishRequestHandling(@NonNull ServerType serverType, 478 @NonNull Request request, 479 @Nullable ResourceMethod resourceMethod, 480 @NonNull MarshaledResponse marshaledResponse, 481 @NonNull Duration duration, 482 @NonNull List<@NonNull Throwable> throwables) { 483 requireNonNull(serverType); 484 requireNonNull(request); 485 requireNonNull(marshaledResponse); 486 requireNonNull(duration); 487 requireNonNull(throwables); 488 489 Throwable throwable = throwables.isEmpty() ? null : throwables.get(0); 490 Attributes attributes = requestAttributes(serverType, request, resourceMethod, marshaledResponse.getStatusCode(), throwable); 491 492 this.activeRequestsCounter.add(-1, activeRequestAttributes(serverType, request)); 493 this.requestDurationHistogram.record(seconds(duration), attributes); 494 this.requestBodySizeHistogram.record(request.getBody().map(body -> (long) body.length).orElse(0L), attributes); 495 this.responseBodySizeHistogram.record(marshaledResponse.getBodyLength(), attributes); 496 497 if (!throwables.isEmpty()) 498 this.requestThrowableCounter.add(throwables.size(), attributes); 499 } 500 501 @Override 502 public void didWriteResponse(@NonNull ServerType serverType, 503 @NonNull Request request, 504 @Nullable ResourceMethod resourceMethod, 505 @NonNull MarshaledResponse marshaledResponse, 506 @NonNull Duration responseWriteDuration) { 507 requireNonNull(serverType); 508 requireNonNull(request); 509 requireNonNull(marshaledResponse); 510 requireNonNull(responseWriteDuration); 511 512 this.responseWriteDurationHistogram.record( 513 seconds(responseWriteDuration), 514 requestAttributes(serverType, request, resourceMethod, marshaledResponse.getStatusCode(), null) 515 ); 516 } 517 518 @Override 519 public void didFailToWriteResponse(@NonNull ServerType serverType, 520 @NonNull Request request, 521 @Nullable ResourceMethod resourceMethod, 522 @NonNull MarshaledResponse marshaledResponse, 523 @NonNull Duration responseWriteDuration, 524 @NonNull Throwable throwable) { 525 requireNonNull(serverType); 526 requireNonNull(request); 527 requireNonNull(marshaledResponse); 528 requireNonNull(responseWriteDuration); 529 requireNonNull(throwable); 530 531 Attributes attributes = requestAttributes(serverType, request, resourceMethod, marshaledResponse.getStatusCode(), throwable); 532 this.responseWriteFailureCounter.add(1, attributes); 533 this.responseWriteDurationHistogram.record(seconds(responseWriteDuration), attributes); 534 } 535 536 @Override 537 public void didEstablishSseConnection(@NonNull SseConnection sseConnection) { 538 requireNonNull(sseConnection); 539 540 Attributes attributes = serverSentEventAttributes(sseConnection); 541 this.activeServerSentEventStreamsCounter.add(1, attributes); 542 this.serverSentEventStreamsEstablishedCounter.add(1, attributes); 543 } 544 545 @Override 546 public void didFailToEstablishSseConnection(@NonNull Request request, 547 @Nullable ResourceMethod resourceMethod, 548 SseConnection.@NonNull HandshakeFailureReason reason, 549 @Nullable Throwable throwable) { 550 requireNonNull(request); 551 requireNonNull(reason); 552 553 this.serverSentEventHandshakeFailureCounter.add(1, 554 Attributes.builder() 555 .put(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name()) 556 .put(HTTP_ROUTE_ATTRIBUTE_KEY, routeFor(resourceMethod)) 557 .put(FAILURE_REASON_ATTRIBUTE_KEY, enumValue(reason)) 558 .build() 559 ); 560 } 561 562 @Override 563 public void didTerminateSseConnection(@NonNull SseConnection sseConnection, 564 @NonNull StreamTermination termination) { 565 requireNonNull(sseConnection); 566 requireNonNull(termination); 567 568 Attributes routeAttributes = serverSentEventAttributes(sseConnection); 569 Attributes durationAttributes = Attributes.builder() 570 .putAll(routeAttributes) 571 .put(SSE_TERMINATION_REASON_ATTRIBUTE_KEY, enumValue(termination.getReason())) 572 .build(); 573 574 this.activeServerSentEventStreamsCounter.add(-1, routeAttributes); 575 this.serverSentEventStreamsTerminatedCounter.add(1, durationAttributes); 576 this.serverSentEventStreamDurationHistogram.record(seconds(termination.getDuration()), durationAttributes); 577 } 578 579 @Override 580 public void didWriteSseEvent(@NonNull SseConnection sseConnection, 581 @NonNull SseEvent sseEvent, 582 @NonNull Duration writeDuration, 583 @Nullable Duration deliveryLag, 584 @Nullable Integer payloadBytes, 585 @Nullable Integer queueDepth) { 586 requireNonNull(sseConnection); 587 requireNonNull(sseEvent); 588 requireNonNull(writeDuration); 589 590 Attributes attributes = serverSentEventAttributes(sseConnection); 591 this.serverSentEventWrittenCounter.add(1, attributes); 592 this.serverSentEventWriteDurationHistogram.record(seconds(writeDuration), attributes); 593 594 if (deliveryLag != null) 595 this.serverSentEventDeliveryLagHistogram.record(seconds(deliveryLag), attributes); 596 if (payloadBytes != null) 597 this.serverSentEventPayloadSizeHistogram.record(payloadBytes, attributes); 598 if (queueDepth != null) 599 this.serverSentEventQueueDepthHistogram.record(queueDepth, attributes); 600 } 601 602 @Override 603 public void didFailToWriteSseEvent(@NonNull SseConnection sseConnection, 604 @NonNull SseEvent sseEvent, 605 @NonNull Duration writeDuration, 606 @NonNull Throwable throwable, 607 @Nullable Duration deliveryLag, 608 @Nullable Integer payloadBytes, 609 @Nullable Integer queueDepth) { 610 requireNonNull(sseConnection); 611 requireNonNull(sseEvent); 612 requireNonNull(writeDuration); 613 requireNonNull(throwable); 614 615 Attributes attributes = serverSentEventAttributes(sseConnection); 616 this.serverSentEventWriteFailureCounter.add(1, attributes); 617 this.serverSentEventWriteDurationHistogram.record(seconds(writeDuration), attributes); 618 619 if (deliveryLag != null) 620 this.serverSentEventDeliveryLagHistogram.record(seconds(deliveryLag), attributes); 621 if (payloadBytes != null) 622 this.serverSentEventPayloadSizeHistogram.record(payloadBytes, attributes); 623 if (queueDepth != null) 624 this.serverSentEventQueueDepthHistogram.record(queueDepth, attributes); 625 } 626 627 @Override 628 public void didDropSseEvent(@NonNull SseConnection sseConnection, 629 @NonNull SseEvent sseEvent, 630 @NonNull SseEventDropReason reason, 631 @Nullable Integer payloadBytes, 632 @Nullable Integer queueDepth) { 633 requireNonNull(sseConnection); 634 requireNonNull(sseEvent); 635 requireNonNull(reason); 636 637 Attributes attributes = Attributes.builder() 638 .putAll(serverSentEventAttributes(sseConnection)) 639 .put(SSE_DROP_REASON_ATTRIBUTE_KEY, enumValue(reason)) 640 .build(); 641 this.serverSentEventDropCounter.add(1, attributes); 642 643 if (payloadBytes != null) 644 this.serverSentEventPayloadSizeHistogram.record(payloadBytes, attributes); 645 if (queueDepth != null) 646 this.serverSentEventQueueDepthHistogram.record(queueDepth, attributes); 647 } 648 649 @Override 650 public void didWriteSseComment(@NonNull SseConnection sseConnection, 651 @NonNull SseComment sseComment, 652 @NonNull Duration writeDuration, 653 @Nullable Duration deliveryLag, 654 @Nullable Integer payloadBytes, 655 @Nullable Integer queueDepth) { 656 requireNonNull(sseConnection); 657 requireNonNull(sseComment); 658 requireNonNull(writeDuration); 659 660 Attributes attributes = serverSentEventCommentAttributes(sseConnection, sseComment.getCommentType()); 661 this.serverSentEventCommentWrittenCounter.add(1, attributes); 662 this.serverSentEventCommentWriteDurationHistogram.record(seconds(writeDuration), attributes); 663 664 if (deliveryLag != null) 665 this.serverSentEventCommentDeliveryLagHistogram.record(seconds(deliveryLag), attributes); 666 if (payloadBytes != null) 667 this.serverSentEventCommentPayloadSizeHistogram.record(payloadBytes, attributes); 668 if (queueDepth != null) 669 this.serverSentEventCommentQueueDepthHistogram.record(queueDepth, attributes); 670 } 671 672 @Override 673 public void didFailToWriteSseComment(@NonNull SseConnection sseConnection, 674 @NonNull SseComment sseComment, 675 @NonNull Duration writeDuration, 676 @NonNull Throwable throwable, 677 @Nullable Duration deliveryLag, 678 @Nullable Integer payloadBytes, 679 @Nullable Integer queueDepth) { 680 requireNonNull(sseConnection); 681 requireNonNull(sseComment); 682 requireNonNull(writeDuration); 683 requireNonNull(throwable); 684 685 Attributes attributes = serverSentEventCommentAttributes(sseConnection, sseComment.getCommentType()); 686 this.serverSentEventCommentWriteFailureCounter.add(1, attributes); 687 this.serverSentEventCommentWriteDurationHistogram.record(seconds(writeDuration), attributes); 688 689 if (deliveryLag != null) 690 this.serverSentEventCommentDeliveryLagHistogram.record(seconds(deliveryLag), attributes); 691 if (payloadBytes != null) 692 this.serverSentEventCommentPayloadSizeHistogram.record(payloadBytes, attributes); 693 if (queueDepth != null) 694 this.serverSentEventCommentQueueDepthHistogram.record(queueDepth, attributes); 695 } 696 697 @Override 698 public void didDropSseComment(@NonNull SseConnection sseConnection, 699 @NonNull SseComment sseComment, 700 @NonNull SseEventDropReason reason, 701 @Nullable Integer payloadBytes, 702 @Nullable Integer queueDepth) { 703 requireNonNull(sseConnection); 704 requireNonNull(sseComment); 705 requireNonNull(reason); 706 707 Attributes attributes = Attributes.builder() 708 .putAll(serverSentEventCommentAttributes(sseConnection, sseComment.getCommentType())) 709 .put(SSE_DROP_REASON_ATTRIBUTE_KEY, enumValue(reason)) 710 .build(); 711 this.serverSentEventCommentDropCounter.add(1, attributes); 712 713 if (payloadBytes != null) 714 this.serverSentEventCommentPayloadSizeHistogram.record(payloadBytes, attributes); 715 if (queueDepth != null) 716 this.serverSentEventCommentQueueDepthHistogram.record(queueDepth, attributes); 717 } 718 719 @Override 720 public void didBroadcastSseEvent(@NonNull ResourcePathDeclaration route, 721 int attempted, 722 int enqueued, 723 int dropped) { 724 requireNonNull(route); 725 recordBroadcastTotals(route, BROADCAST_PAYLOAD_EVENT, UNKNOWN_COMMENT_TYPE, attempted, enqueued, dropped); 726 } 727 728 @Override 729 public void didBroadcastSseComment(@NonNull ResourcePathDeclaration route, 730 SseComment.@NonNull CommentType commentType, 731 int attempted, 732 int enqueued, 733 int dropped) { 734 requireNonNull(route); 735 requireNonNull(commentType); 736 recordBroadcastTotals(route, BROADCAST_PAYLOAD_COMMENT, enumValue(commentType), attempted, enqueued, dropped); 737 } 738 739 @NonNull 740 private Attributes serverTypeAttributes(@NonNull ServerType serverType) { 741 requireNonNull(serverType); 742 return Attributes.of(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType)); 743 } 744 745 @NonNull 746 private Attributes serverTypeAndReasonAttributes(@NonNull ServerType serverType, 747 @NonNull Enum<?> reason) { 748 requireNonNull(serverType); 749 requireNonNull(reason); 750 return Attributes.builder() 751 .put(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType)) 752 .put(FAILURE_REASON_ATTRIBUTE_KEY, enumValue(reason)) 753 .build(); 754 } 755 756 @NonNull 757 private Attributes activeRequestAttributes(@NonNull ServerType serverType, 758 @NonNull Request request) { 759 requireNonNull(serverType); 760 requireNonNull(request); 761 762 var builder = Attributes.builder() 763 .put(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name()); 764 765 if (this.metricNamingStrategy == MetricNamingStrategy.SEMCONV) { 766 builder.put(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP); 767 } else { 768 builder.put(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType)); 769 } 770 771 return builder.build(); 772 } 773 774 @NonNull 775 private Attributes requestAttributes(@NonNull ServerType serverType, 776 @NonNull Request request, 777 @Nullable ResourceMethod resourceMethod, 778 @Nullable Integer statusCode, 779 @Nullable Throwable throwable) { 780 requireNonNull(serverType); 781 requireNonNull(request); 782 783 var builder = Attributes.builder() 784 .put(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name()); 785 786 if (this.metricNamingStrategy == MetricNamingStrategy.SEMCONV) { 787 builder.put(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP); 788 789 if (resourceMethod != null) 790 builder.put(HTTP_ROUTE_ATTRIBUTE_KEY, routeFor(resourceMethod)); 791 } else { 792 builder.put(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType)) 793 .put(HTTP_ROUTE_ATTRIBUTE_KEY, routeFor(resourceMethod)); 794 } 795 796 if (statusCode != null) 797 builder.put(HTTP_STATUS_CODE_ATTRIBUTE_KEY, statusCode.longValue()); 798 799 String errorType = errorTypeFor(statusCode, throwable); 800 801 if (errorType != null) 802 builder.put(ERROR_TYPE_ATTRIBUTE_KEY, errorType); 803 804 return builder.build(); 805 } 806 807 @NonNull 808 private Attributes serverSentEventAttributes(@NonNull SseConnection sseConnection) { 809 requireNonNull(sseConnection); 810 return Attributes.of( 811 HTTP_ROUTE_ATTRIBUTE_KEY, 812 routeFor(sseConnection.getResourceMethod()) 813 ); 814 } 815 816 @NonNull 817 private Attributes serverSentEventCommentAttributes(@NonNull SseConnection sseConnection, 818 SseComment.@NonNull CommentType commentType) { 819 requireNonNull(sseConnection); 820 requireNonNull(commentType); 821 return Attributes.builder() 822 .putAll(serverSentEventAttributes(sseConnection)) 823 .put(SSE_COMMENT_TYPE_ATTRIBUTE_KEY, enumValue(commentType)) 824 .build(); 825 } 826 827 private void recordBroadcastTotals(@NonNull ResourcePathDeclaration route, 828 @NonNull String payloadType, 829 @NonNull String commentType, 830 int attempted, 831 int enqueued, 832 int dropped) { 833 requireNonNull(route); 834 requireNonNull(payloadType); 835 requireNonNull(commentType); 836 837 Attributes attributes = Attributes.builder() 838 .put(HTTP_ROUTE_ATTRIBUTE_KEY, route.getPath()) 839 .put(SSE_BROADCAST_PAYLOAD_TYPE_ATTRIBUTE_KEY, payloadType) 840 .put(SSE_COMMENT_TYPE_ATTRIBUTE_KEY, commentType) 841 .build(); 842 843 if (attempted > 0) 844 this.serverSentEventBroadcastAttemptCounter.add(attempted, attributes); 845 if (enqueued > 0) 846 this.serverSentEventBroadcastEnqueuedCounter.add(enqueued, attributes); 847 if (dropped > 0) 848 this.serverSentEventBroadcastDroppedCounter.add(dropped, attributes); 849 } 850 851 @NonNull 852 private static String activeRequestsMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) { 853 requireNonNull(metricNamingStrategy); 854 if (metricNamingStrategy == MetricNamingStrategy.SEMCONV) 855 return "http.server.active_requests"; 856 return "soklet.server.requests.active"; 857 } 858 859 @NonNull 860 private static String requestDurationMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) { 861 requireNonNull(metricNamingStrategy); 862 if (metricNamingStrategy == MetricNamingStrategy.SEMCONV) 863 return "http.server.request.duration"; 864 return "soklet.server.request.duration"; 865 } 866 867 @NonNull 868 private static String requestBodySizeMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) { 869 requireNonNull(metricNamingStrategy); 870 if (metricNamingStrategy == MetricNamingStrategy.SEMCONV) 871 return "http.server.request.body.size"; 872 return "soklet.server.request.body.size"; 873 } 874 875 @NonNull 876 private static String responseBodySizeMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) { 877 requireNonNull(metricNamingStrategy); 878 if (metricNamingStrategy == MetricNamingStrategy.SEMCONV) 879 return "http.server.response.body.size"; 880 return "soklet.server.response.body.size"; 881 } 882 883 @NonNull 884 private static String routeFor(@Nullable ResourceMethod resourceMethod) { 885 if (resourceMethod == null) 886 return UNMATCHED_ROUTE; 887 return resourceMethod.getResourcePathDeclaration().getPath(); 888 } 889 890 @NonNull 891 private static String enumValue(@NonNull Enum<?> value) { 892 requireNonNull(value); 893 return value.name().toLowerCase(Locale.ROOT); 894 } 895 896 @Nullable 897 private String errorTypeFor(@Nullable Integer statusCode, 898 @Nullable Throwable throwable) { 899 if (throwable != null) 900 return throwable.getClass().getName(); 901 902 if (this.metricNamingStrategy == MetricNamingStrategy.SEMCONV 903 && statusCode != null 904 && statusCode >= 500) 905 return String.valueOf(statusCode); 906 907 return null; 908 } 909 910 private static double seconds(@NonNull Duration duration) { 911 requireNonNull(duration); 912 return duration.toNanos() / 1_000_000_000D; 913 } 914 915 /** 916 * Naming strategy for HTTP metric instrument names. 917 * <p> 918 * SSE- and Soklet-specific concepts remain under the {@code soklet.*} namespace in all strategies. 919 */ 920 public enum MetricNamingStrategy { 921 /** 922 * Use OpenTelemetry Semantic Convention names for standard HTTP server metrics. 923 */ 924 SEMCONV, 925 /** 926 * Use {@code soklet.*} names for all metrics. 927 */ 928 SOKLET 929 } 930 931 /** 932 * Builder used to construct instances of {@link OpenTelemetryMetricsCollector}. 933 */ 934 @NotThreadSafe 935 public static final class Builder { 936 @Nullable 937 private Meter meter; 938 @Nullable 939 private OpenTelemetry openTelemetry; 940 @NonNull 941 private MetricNamingStrategy metricNamingStrategy; 942 @NonNull 943 private String instrumentationName; 944 @Nullable 945 private String instrumentationVersion; 946 947 private Builder() { 948 this.openTelemetry = GlobalOpenTelemetry.get(); 949 this.metricNamingStrategy = MetricNamingStrategy.SEMCONV; 950 this.instrumentationName = DEFAULT_INSTRUMENTATION_NAME; 951 this.instrumentationVersion = null; 952 } 953 954 /** 955 * Sets a specific meter to use for metric instruments. 956 * 957 * @param meter the meter to use 958 * @return this builder 959 */ 960 @NonNull 961 public Builder meter(@NonNull Meter meter) { 962 this.meter = requireNonNull(meter); 963 return this; 964 } 965 966 /** 967 * Sets the OpenTelemetry API object used to construct a meter if {@link #meter(Meter)} is not set. 968 * 969 * @param openTelemetry the OpenTelemetry instance 970 * @return this builder 971 */ 972 @NonNull 973 public Builder openTelemetry(@NonNull OpenTelemetry openTelemetry) { 974 this.openTelemetry = requireNonNull(openTelemetry); 975 return this; 976 } 977 978 /** 979 * Sets the naming strategy for HTTP metrics. 980 * 981 * @param metricNamingStrategy the naming strategy 982 * @return this builder 983 */ 984 @NonNull 985 public Builder metricNamingStrategy(@NonNull MetricNamingStrategy metricNamingStrategy) { 986 this.metricNamingStrategy = requireNonNull(metricNamingStrategy); 987 return this; 988 } 989 990 /** 991 * Sets the instrumentation scope name to use when constructing a meter. 992 * 993 * @param instrumentationName the instrumentation scope name 994 * @return this builder 995 */ 996 @NonNull 997 public Builder instrumentationName(@NonNull String instrumentationName) { 998 this.instrumentationName = requireNonNull(instrumentationName); 999 return this; 1000 } 1001 1002 /** 1003 * Sets an optional instrumentation scope version to use when constructing a meter. 1004 * 1005 * @param instrumentationVersion the instrumentation scope version, or {@code null} 1006 * @return this builder 1007 */ 1008 @NonNull 1009 public Builder instrumentationVersion(@Nullable String instrumentationVersion) { 1010 this.instrumentationVersion = instrumentationVersion; 1011 return this; 1012 } 1013 1014 @NonNull 1015 private Meter resolveMeter() { 1016 if (this.meter != null) 1017 return this.meter; 1018 1019 MeterBuilder meterBuilder = requireNonNull(this.openTelemetry).meterBuilder(this.instrumentationName); 1020 1021 if (this.instrumentationVersion != null) 1022 meterBuilder = meterBuilder.setInstrumentationVersion(this.instrumentationVersion); 1023 1024 return meterBuilder.build(); 1025 } 1026 1027 /** 1028 * Builds the collector. 1029 * 1030 * @return the collector instance 1031 */ 1032 @NonNull 1033 public OpenTelemetryMetricsCollector build() { 1034 return new OpenTelemetryMetricsCollector(this); 1035 } 1036 } 1037}