001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet.otel; 018 019import com.soklet.ConnectionRejectionReason; 020import com.soklet.MarshaledResponse; 021import com.soklet.MetricsCollector; 022import com.soklet.Request; 023import com.soklet.RequestReadFailureReason; 024import com.soklet.RequestRejectionReason; 025import com.soklet.ResourceMethod; 026import com.soklet.ResourcePathDeclaration; 027import com.soklet.ServerSentEvent; 028import com.soklet.ServerSentEventComment; 029import com.soklet.ServerSentEventConnection; 030import com.soklet.ServerType; 031import io.opentelemetry.api.GlobalOpenTelemetry; 032import io.opentelemetry.api.OpenTelemetry; 033import io.opentelemetry.api.common.AttributeKey; 034import io.opentelemetry.api.common.Attributes; 035import io.opentelemetry.api.metrics.DoubleHistogram; 036import io.opentelemetry.api.metrics.LongCounter; 037import io.opentelemetry.api.metrics.LongHistogram; 038import io.opentelemetry.api.metrics.LongUpDownCounter; 039import io.opentelemetry.api.metrics.Meter; 040import io.opentelemetry.api.metrics.MeterBuilder; 041import org.jspecify.annotations.NonNull; 042import org.jspecify.annotations.Nullable; 043 044import javax.annotation.concurrent.NotThreadSafe; 045import javax.annotation.concurrent.ThreadSafe; 046import java.net.InetSocketAddress; 047import java.time.Duration; 048import java.util.List; 049import java.util.Locale; 050 051import static java.util.Objects.requireNonNull; 052 053/** 054 * OpenTelemetry-backed {@link MetricsCollector} for Soklet HTTP and SSE telemetry. 055 * <p> 056 * This implementation records counters/histograms via OpenTelemetry's metrics API and is designed to be 057 * lightweight, thread-safe, and non-blocking in request hot paths. 058 * <p> 059 * By default, standard HTTP metrics use OpenTelemetry Semantic Convention names. Soklet-specific concepts 060 * (for example SSE queue/drop/broadcast details) are emitted with {@code soklet.*} names. 061 * <p> 062 * See <a href="https://soklet.com/docs/metrics-collection">https://soklet.com/docs/metrics-collection</a> for Soklet's metrics/telemetry documentation. 063 * 064 * @author <a href="https://www.revetkn.com">Mark Allen</a> 065 */ 066@ThreadSafe 067public final class OpenTelemetryMetricsCollector implements MetricsCollector { 068 @NonNull 069 private static final String UNMATCHED_ROUTE; 070 @NonNull 071 private static final String UNKNOWN_COMMENT_TYPE; 072 @NonNull 073 private static final String BROADCAST_PAYLOAD_EVENT; 074 @NonNull 075 private static final String BROADCAST_PAYLOAD_COMMENT; 076 @NonNull 077 private static final String DEFAULT_INSTRUMENTATION_NAME; 078 @NonNull 079 private static final String URL_SCHEME_HTTP; 080 081 @NonNull 082 private static final AttributeKey<String> SERVER_TYPE_ATTRIBUTE_KEY; 083 @NonNull 084 private static final AttributeKey<String> FAILURE_REASON_ATTRIBUTE_KEY; 085 @NonNull 086 private static final AttributeKey<String> ERROR_TYPE_ATTRIBUTE_KEY; 087 @NonNull 088 private static final AttributeKey<String> HTTP_METHOD_ATTRIBUTE_KEY; 089 @NonNull 090 private static final AttributeKey<String> HTTP_ROUTE_ATTRIBUTE_KEY; 091 @NonNull 092 private static final AttributeKey<String> URL_SCHEME_ATTRIBUTE_KEY; 093 @NonNull 094 private static final AttributeKey<Long> HTTP_STATUS_CODE_ATTRIBUTE_KEY; 095 @NonNull 096 private static final AttributeKey<String> SSE_TERMINATION_REASON_ATTRIBUTE_KEY; 097 @NonNull 098 private static final AttributeKey<String> SSE_DROP_REASON_ATTRIBUTE_KEY; 099 @NonNull 100 private static final AttributeKey<String> SSE_COMMENT_TYPE_ATTRIBUTE_KEY; 101 @NonNull 102 private static final AttributeKey<String> SSE_BROADCAST_PAYLOAD_TYPE_ATTRIBUTE_KEY; 103 104 static { 105 UNMATCHED_ROUTE = "_unmatched"; 106 UNKNOWN_COMMENT_TYPE = "unknown"; 107 BROADCAST_PAYLOAD_EVENT = "event"; 108 BROADCAST_PAYLOAD_COMMENT = "comment"; 109 DEFAULT_INSTRUMENTATION_NAME = "com.soklet.otel"; 110 URL_SCHEME_HTTP = "http"; 111 112 SERVER_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.server.type"); 113 FAILURE_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.failure.reason"); 114 ERROR_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("error.type"); 115 HTTP_METHOD_ATTRIBUTE_KEY = AttributeKey.stringKey("http.request.method"); 116 HTTP_ROUTE_ATTRIBUTE_KEY = AttributeKey.stringKey("http.route"); 117 URL_SCHEME_ATTRIBUTE_KEY = AttributeKey.stringKey("url.scheme"); 118 HTTP_STATUS_CODE_ATTRIBUTE_KEY = AttributeKey.longKey("http.response.status_code"); 119 SSE_TERMINATION_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.termination.reason"); 120 SSE_DROP_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.drop.reason"); 121 SSE_COMMENT_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.comment.type"); 122 SSE_BROADCAST_PAYLOAD_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.broadcast.payload.type"); 123 } 124 125 @NonNull 126 private final LongCounter connectionsAcceptedCounter; 127 @NonNull 128 private final LongCounter connectionsRejectedCounter; 129 @NonNull 130 private final LongCounter requestsAcceptedCounter; 131 @NonNull 132 private final LongCounter requestsRejectedCounter; 133 @NonNull 134 private final LongCounter requestReadFailureCounter; 135 @NonNull 136 private final LongUpDownCounter activeRequestsCounter; 137 @NonNull 138 private final DoubleHistogram requestDurationHistogram; 139 @NonNull 140 private final DoubleHistogram responseWriteDurationHistogram; 141 @NonNull 142 private final LongCounter responseWriteFailureCounter; 143 @NonNull 144 private final LongCounter requestThrowableCounter; 145 @NonNull 146 private final LongHistogram requestBodySizeHistogram; 147 @NonNull 148 private final LongHistogram responseBodySizeHistogram; 149 150 @NonNull 151 private final LongUpDownCounter activeServerSentEventConnectionsCounter; 152 @NonNull 153 private final LongCounter serverSentEventConnectionsEstablishedCounter; 154 @NonNull 155 private final LongCounter serverSentEventHandshakeFailureCounter; 156 @NonNull 157 private final LongCounter serverSentEventConnectionsTerminatedCounter; 158 @NonNull 159 private final DoubleHistogram serverSentEventConnectionDurationHistogram; 160 @NonNull 161 private final LongCounter serverSentEventWrittenCounter; 162 @NonNull 163 private final LongCounter serverSentEventWriteFailureCounter; 164 @NonNull 165 private final DoubleHistogram serverSentEventWriteDurationHistogram; 166 @NonNull 167 private final DoubleHistogram serverSentEventDeliveryLagHistogram; 168 @NonNull 169 private final LongHistogram serverSentEventPayloadSizeHistogram; 170 @NonNull 171 private final LongHistogram serverSentEventQueueDepthHistogram; 172 @NonNull 173 private final LongCounter serverSentEventDropCounter; 174 175 @NonNull 176 private final LongCounter serverSentEventCommentWrittenCounter; 177 @NonNull 178 private final LongCounter serverSentEventCommentWriteFailureCounter; 179 @NonNull 180 private final DoubleHistogram serverSentEventCommentWriteDurationHistogram; 181 @NonNull 182 private final DoubleHistogram serverSentEventCommentDeliveryLagHistogram; 183 @NonNull 184 private final LongHistogram serverSentEventCommentPayloadSizeHistogram; 185 @NonNull 186 private final LongHistogram serverSentEventCommentQueueDepthHistogram; 187 @NonNull 188 private final LongCounter serverSentEventCommentDropCounter; 189 190 @NonNull 191 private final LongCounter serverSentEventBroadcastAttemptCounter; 192 @NonNull 193 private final LongCounter serverSentEventBroadcastEnqueuedCounter; 194 @NonNull 195 private final LongCounter serverSentEventBroadcastDroppedCounter; 196 @NonNull 197 private final MetricNamingStrategy metricNamingStrategy; 198 199 /** 200 * Acquires a builder for {@link OpenTelemetryMetricsCollector} instances, using {@link GlobalOpenTelemetry} 201 * by default. 202 * 203 * @return the builder 204 */ 205 @NonNull 206 public static Builder builder() { 207 return new Builder(); 208 } 209 210 /** 211 * Acquires a builder seeded with a required {@link Meter}. 212 * 213 * @param meter the meter used to build instruments 214 * @return the builder 215 */ 216 @NonNull 217 public static Builder withMeter(@NonNull Meter meter) { 218 requireNonNull(meter); 219 return builder().meter(meter); 220 } 221 222 /** 223 * Acquires a builder seeded with a required {@link OpenTelemetry} instance. 224 * 225 * @param openTelemetry the OpenTelemetry instance used to build a meter 226 * @return the builder 227 */ 228 @NonNull 229 public static Builder withOpenTelemetry(@NonNull OpenTelemetry openTelemetry) { 230 requireNonNull(openTelemetry); 231 return builder().openTelemetry(openTelemetry); 232 } 233 234 /** 235 * Creates an instance from a required {@link Meter} without additional customization. 236 * 237 * @param meter the meter used to build instruments 238 * @return an {@link OpenTelemetryMetricsCollector} instance 239 */ 240 @NonNull 241 public static OpenTelemetryMetricsCollector fromMeter(@NonNull Meter meter) { 242 return withMeter(meter).build(); 243 } 244 245 /** 246 * Creates an instance from a required {@link OpenTelemetry} without additional customization. 247 * 248 * @param openTelemetry the OpenTelemetry instance used to build a meter 249 * @return an {@link OpenTelemetryMetricsCollector} instance 250 */ 251 @NonNull 252 public static OpenTelemetryMetricsCollector fromOpenTelemetry(@NonNull OpenTelemetry openTelemetry) { 253 return withOpenTelemetry(openTelemetry).build(); 254 } 255 256 private OpenTelemetryMetricsCollector(@NonNull Builder builder) { 257 requireNonNull(builder); 258 Meter meter = requireNonNull(builder.resolveMeter()); 259 this.metricNamingStrategy = requireNonNull(builder.metricNamingStrategy); 260 261 String activeRequestsMetricName = activeRequestsMetricNameFor(this.metricNamingStrategy); 262 String requestDurationMetricName = requestDurationMetricNameFor(this.metricNamingStrategy); 263 String requestBodySizeMetricName = requestBodySizeMetricNameFor(this.metricNamingStrategy); 264 String responseBodySizeMetricName = responseBodySizeMetricNameFor(this.metricNamingStrategy); 265 266 this.connectionsAcceptedCounter = meter.counterBuilder("soklet.server.connections.accepted") 267 .setDescription("Total number of accepted inbound TCP connections.") 268 .setUnit("{connection}") 269 .build(); 270 this.connectionsRejectedCounter = meter.counterBuilder("soklet.server.connections.rejected") 271 .setDescription("Total number of rejected inbound TCP connections.") 272 .setUnit("{connection}") 273 .build(); 274 this.requestsAcceptedCounter = meter.counterBuilder("soklet.server.requests.accepted") 275 .setDescription("Total number of accepted requests before app-level handling.") 276 .setUnit("{request}") 277 .build(); 278 this.requestsRejectedCounter = meter.counterBuilder("soklet.server.requests.rejected") 279 .setDescription("Total number of rejected requests before app-level handling.") 280 .setUnit("{request}") 281 .build(); 282 this.requestReadFailureCounter = meter.counterBuilder("soklet.server.request.read.failures") 283 .setDescription("Total number of request read/parse failures.") 284 .setUnit("{request}") 285 .build(); 286 this.activeRequestsCounter = meter.upDownCounterBuilder(activeRequestsMetricName) 287 .setDescription("Number of in-flight requests currently being handled.") 288 .setUnit("{request}") 289 .build(); 290 this.requestDurationHistogram = meter.histogramBuilder(requestDurationMetricName) 291 .setDescription("Total request handling duration.") 292 .setUnit("s") 293 .build(); 294 this.responseWriteDurationHistogram = meter.histogramBuilder("soklet.server.response.write.duration") 295 .setDescription("Duration spent writing response bytes.") 296 .setUnit("s") 297 .build(); 298 this.responseWriteFailureCounter = meter.counterBuilder("soklet.server.response.write.failures") 299 .setDescription("Total number of response write failures.") 300 .setUnit("{response}") 301 .build(); 302 this.requestThrowableCounter = meter.counterBuilder("soklet.server.request.throwables") 303 .setDescription("Total number of throwables observed during request handling.") 304 .setUnit("{throwable}") 305 .build(); 306 this.requestBodySizeHistogram = meter.histogramBuilder(requestBodySizeMetricName) 307 .ofLongs() 308 .setDescription("Request body size in bytes.") 309 .setUnit("By") 310 .build(); 311 this.responseBodySizeHistogram = meter.histogramBuilder(responseBodySizeMetricName) 312 .ofLongs() 313 .setDescription("Response body size in bytes.") 314 .setUnit("By") 315 .build(); 316 317 this.activeServerSentEventConnectionsCounter = meter.upDownCounterBuilder("soklet.sse.connections.active") 318 .setDescription("Number of active SSE connections.") 319 .setUnit("{connection}") 320 .build(); 321 this.serverSentEventConnectionsEstablishedCounter = meter.counterBuilder("soklet.sse.connections.established") 322 .setDescription("Total number of SSE connections established.") 323 .setUnit("{connection}") 324 .build(); 325 this.serverSentEventHandshakeFailureCounter = meter.counterBuilder("soklet.sse.connections.handshake.failures") 326 .setDescription("Total number of SSE handshake failures.") 327 .setUnit("{connection}") 328 .build(); 329 this.serverSentEventConnectionsTerminatedCounter = meter.counterBuilder("soklet.sse.connections.terminated") 330 .setDescription("Total number of terminated SSE connections.") 331 .setUnit("{connection}") 332 .build(); 333 this.serverSentEventConnectionDurationHistogram = meter.histogramBuilder("soklet.sse.connection.duration") 334 .setDescription("SSE connection duration.") 335 .setUnit("s") 336 .build(); 337 this.serverSentEventWrittenCounter = meter.counterBuilder("soklet.sse.events.written") 338 .setDescription("Total number of SSE events successfully written.") 339 .setUnit("{event}") 340 .build(); 341 this.serverSentEventWriteFailureCounter = meter.counterBuilder("soklet.sse.events.write.failures") 342 .setDescription("Total number of SSE events that failed to write.") 343 .setUnit("{event}") 344 .build(); 345 this.serverSentEventWriteDurationHistogram = meter.histogramBuilder("soklet.sse.events.write.duration") 346 .setDescription("SSE event write duration.") 347 .setUnit("s") 348 .build(); 349 this.serverSentEventDeliveryLagHistogram = meter.histogramBuilder("soklet.sse.events.delivery.lag") 350 .setDescription("Time spent waiting in the SSE queue before write.") 351 .setUnit("s") 352 .build(); 353 this.serverSentEventPayloadSizeHistogram = meter.histogramBuilder("soklet.sse.events.payload.size") 354 .ofLongs() 355 .setDescription("Serialized SSE event payload size in bytes.") 356 .setUnit("By") 357 .build(); 358 this.serverSentEventQueueDepthHistogram = meter.histogramBuilder("soklet.sse.events.queue.depth") 359 .ofLongs() 360 .setDescription("Queued element depth when SSE event write/drop outcome is observed.") 361 .setUnit("{item}") 362 .build(); 363 this.serverSentEventDropCounter = meter.counterBuilder("soklet.sse.events.dropped") 364 .setDescription("Total number of SSE events dropped before enqueue.") 365 .setUnit("{event}") 366 .build(); 367 368 this.serverSentEventCommentWrittenCounter = meter.counterBuilder("soklet.sse.comments.written") 369 .setDescription("Total number of SSE comments successfully written.") 370 .setUnit("{comment}") 371 .build(); 372 this.serverSentEventCommentWriteFailureCounter = meter.counterBuilder("soklet.sse.comments.write.failures") 373 .setDescription("Total number of SSE comments that failed to write.") 374 .setUnit("{comment}") 375 .build(); 376 this.serverSentEventCommentWriteDurationHistogram = meter.histogramBuilder("soklet.sse.comments.write.duration") 377 .setDescription("SSE comment write duration.") 378 .setUnit("s") 379 .build(); 380 this.serverSentEventCommentDeliveryLagHistogram = meter.histogramBuilder("soklet.sse.comments.delivery.lag") 381 .setDescription("Time spent waiting in the SSE queue before comment write.") 382 .setUnit("s") 383 .build(); 384 this.serverSentEventCommentPayloadSizeHistogram = meter.histogramBuilder("soklet.sse.comments.payload.size") 385 .ofLongs() 386 .setDescription("Serialized SSE comment payload size in bytes.") 387 .setUnit("By") 388 .build(); 389 this.serverSentEventCommentQueueDepthHistogram = meter.histogramBuilder("soklet.sse.comments.queue.depth") 390 .ofLongs() 391 .setDescription("Queued element depth when SSE comment write/drop outcome is observed.") 392 .setUnit("{item}") 393 .build(); 394 this.serverSentEventCommentDropCounter = meter.counterBuilder("soklet.sse.comments.dropped") 395 .setDescription("Total number of SSE comments dropped before enqueue.") 396 .setUnit("{comment}") 397 .build(); 398 399 this.serverSentEventBroadcastAttemptCounter = meter.counterBuilder("soklet.sse.broadcast.attempted") 400 .setDescription("Total number of attempted SSE broadcast deliveries.") 401 .setUnit("{delivery}") 402 .build(); 403 this.serverSentEventBroadcastEnqueuedCounter = meter.counterBuilder("soklet.sse.broadcast.enqueued") 404 .setDescription("Total number of SSE broadcast deliveries successfully enqueued.") 405 .setUnit("{delivery}") 406 .build(); 407 this.serverSentEventBroadcastDroppedCounter = meter.counterBuilder("soklet.sse.broadcast.dropped") 408 .setDescription("Total number of SSE broadcast deliveries dropped before enqueue.") 409 .setUnit("{delivery}") 410 .build(); 411 } 412 413 @Override 414 public void didAcceptConnection(@NonNull ServerType serverType, 415 @Nullable InetSocketAddress remoteAddress) { 416 requireNonNull(serverType); 417 this.connectionsAcceptedCounter.add(1, serverTypeAttributes(serverType)); 418 } 419 420 @Override 421 public void didFailToAcceptConnection(@NonNull ServerType serverType, 422 @Nullable InetSocketAddress remoteAddress, 423 @NonNull ConnectionRejectionReason reason, 424 @Nullable Throwable throwable) { 425 requireNonNull(serverType); 426 requireNonNull(reason); 427 this.connectionsRejectedCounter.add(1, serverTypeAndReasonAttributes(serverType, reason)); 428 } 429 430 @Override 431 public void didAcceptRequest(@NonNull ServerType serverType, 432 @Nullable InetSocketAddress remoteAddress, 433 @Nullable String requestTarget) { 434 requireNonNull(serverType); 435 this.requestsAcceptedCounter.add(1, serverTypeAttributes(serverType)); 436 } 437 438 @Override 439 public void didFailToAcceptRequest(@NonNull ServerType serverType, 440 @Nullable InetSocketAddress remoteAddress, 441 @Nullable String requestTarget, 442 @NonNull RequestRejectionReason reason, 443 @Nullable Throwable throwable) { 444 requireNonNull(serverType); 445 requireNonNull(reason); 446 this.requestsRejectedCounter.add(1, serverTypeAndReasonAttributes(serverType, reason)); 447 } 448 449 @Override 450 public void didFailToReadRequest(@NonNull ServerType serverType, 451 @Nullable InetSocketAddress remoteAddress, 452 @Nullable String requestTarget, 453 @NonNull RequestReadFailureReason reason, 454 @Nullable Throwable throwable) { 455 requireNonNull(serverType); 456 requireNonNull(reason); 457 this.requestReadFailureCounter.add(1, serverTypeAndReasonAttributes(serverType, reason)); 458 } 459 460 @Override 461 public void didStartRequestHandling(@NonNull ServerType serverType, 462 @NonNull Request request, 463 @Nullable ResourceMethod resourceMethod) { 464 requireNonNull(serverType); 465 requireNonNull(request); 466 467 this.activeRequestsCounter.add(1, activeRequestAttributes(serverType, request)); 468 } 469 470 @Override 471 public void didFinishRequestHandling(@NonNull ServerType serverType, 472 @NonNull Request request, 473 @Nullable ResourceMethod resourceMethod, 474 @NonNull MarshaledResponse marshaledResponse, 475 @NonNull Duration duration, 476 @NonNull List<@NonNull Throwable> throwables) { 477 requireNonNull(serverType); 478 requireNonNull(request); 479 requireNonNull(marshaledResponse); 480 requireNonNull(duration); 481 requireNonNull(throwables); 482 483 Throwable throwable = throwables.isEmpty() ? null : throwables.get(0); 484 Attributes attributes = requestAttributes(serverType, request, resourceMethod, marshaledResponse.getStatusCode(), throwable); 485 486 this.activeRequestsCounter.add(-1, activeRequestAttributes(serverType, request)); 487 this.requestDurationHistogram.record(seconds(duration), attributes); 488 this.requestBodySizeHistogram.record(request.getBody().map(body -> (long) body.length).orElse(0L), attributes); 489 this.responseBodySizeHistogram.record(marshaledResponse.getBody().map(body -> (long) body.length).orElse(0L), attributes); 490 491 if (!throwables.isEmpty()) 492 this.requestThrowableCounter.add(throwables.size(), attributes); 493 } 494 495 @Override 496 public void didWriteResponse(@NonNull ServerType serverType, 497 @NonNull Request request, 498 @Nullable ResourceMethod resourceMethod, 499 @NonNull MarshaledResponse marshaledResponse, 500 @NonNull Duration responseWriteDuration) { 501 requireNonNull(serverType); 502 requireNonNull(request); 503 requireNonNull(marshaledResponse); 504 requireNonNull(responseWriteDuration); 505 506 this.responseWriteDurationHistogram.record( 507 seconds(responseWriteDuration), 508 requestAttributes(serverType, request, resourceMethod, marshaledResponse.getStatusCode(), null) 509 ); 510 } 511 512 @Override 513 public void didFailToWriteResponse(@NonNull ServerType serverType, 514 @NonNull Request request, 515 @Nullable ResourceMethod resourceMethod, 516 @NonNull MarshaledResponse marshaledResponse, 517 @NonNull Duration responseWriteDuration, 518 @NonNull Throwable throwable) { 519 requireNonNull(serverType); 520 requireNonNull(request); 521 requireNonNull(marshaledResponse); 522 requireNonNull(responseWriteDuration); 523 requireNonNull(throwable); 524 525 Attributes attributes = requestAttributes(serverType, request, resourceMethod, marshaledResponse.getStatusCode(), throwable); 526 this.responseWriteFailureCounter.add(1, attributes); 527 this.responseWriteDurationHistogram.record(seconds(responseWriteDuration), attributes); 528 } 529 530 @Override 531 public void didEstablishServerSentEventConnection(@NonNull ServerSentEventConnection serverSentEventConnection) { 532 requireNonNull(serverSentEventConnection); 533 534 Attributes attributes = serverSentEventAttributes(serverSentEventConnection); 535 this.activeServerSentEventConnectionsCounter.add(1, attributes); 536 this.serverSentEventConnectionsEstablishedCounter.add(1, attributes); 537 } 538 539 @Override 540 public void didFailToEstablishServerSentEventConnection(@NonNull Request request, 541 @Nullable ResourceMethod resourceMethod, 542 ServerSentEventConnection.@NonNull HandshakeFailureReason reason, 543 @Nullable Throwable throwable) { 544 requireNonNull(request); 545 requireNonNull(reason); 546 547 this.serverSentEventHandshakeFailureCounter.add(1, 548 Attributes.builder() 549 .put(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name()) 550 .put(HTTP_ROUTE_ATTRIBUTE_KEY, routeFor(resourceMethod)) 551 .put(FAILURE_REASON_ATTRIBUTE_KEY, enumValue(reason)) 552 .build() 553 ); 554 } 555 556 @Override 557 public void didTerminateServerSentEventConnection(@NonNull ServerSentEventConnection serverSentEventConnection, 558 @NonNull Duration connectionDuration, 559 ServerSentEventConnection.@NonNull TerminationReason terminationReason, 560 @Nullable Throwable throwable) { 561 requireNonNull(serverSentEventConnection); 562 requireNonNull(connectionDuration); 563 requireNonNull(terminationReason); 564 565 Attributes routeAttributes = serverSentEventAttributes(serverSentEventConnection); 566 Attributes durationAttributes = Attributes.builder() 567 .putAll(routeAttributes) 568 .put(SSE_TERMINATION_REASON_ATTRIBUTE_KEY, enumValue(terminationReason)) 569 .build(); 570 571 this.activeServerSentEventConnectionsCounter.add(-1, routeAttributes); 572 this.serverSentEventConnectionsTerminatedCounter.add(1, durationAttributes); 573 this.serverSentEventConnectionDurationHistogram.record(seconds(connectionDuration), durationAttributes); 574 } 575 576 @Override 577 public void didWriteServerSentEvent(@NonNull ServerSentEventConnection serverSentEventConnection, 578 @NonNull ServerSentEvent serverSentEvent, 579 @NonNull Duration writeDuration, 580 @Nullable Duration deliveryLag, 581 @Nullable Integer payloadBytes, 582 @Nullable Integer queueDepth) { 583 requireNonNull(serverSentEventConnection); 584 requireNonNull(serverSentEvent); 585 requireNonNull(writeDuration); 586 587 Attributes attributes = serverSentEventAttributes(serverSentEventConnection); 588 this.serverSentEventWrittenCounter.add(1, attributes); 589 this.serverSentEventWriteDurationHistogram.record(seconds(writeDuration), attributes); 590 591 if (deliveryLag != null) 592 this.serverSentEventDeliveryLagHistogram.record(seconds(deliveryLag), attributes); 593 if (payloadBytes != null) 594 this.serverSentEventPayloadSizeHistogram.record(payloadBytes, attributes); 595 if (queueDepth != null) 596 this.serverSentEventQueueDepthHistogram.record(queueDepth, attributes); 597 } 598 599 @Override 600 public void didFailToWriteServerSentEvent(@NonNull ServerSentEventConnection serverSentEventConnection, 601 @NonNull ServerSentEvent serverSentEvent, 602 @NonNull Duration writeDuration, 603 @NonNull Throwable throwable, 604 @Nullable Duration deliveryLag, 605 @Nullable Integer payloadBytes, 606 @Nullable Integer queueDepth) { 607 requireNonNull(serverSentEventConnection); 608 requireNonNull(serverSentEvent); 609 requireNonNull(writeDuration); 610 requireNonNull(throwable); 611 612 Attributes attributes = serverSentEventAttributes(serverSentEventConnection); 613 this.serverSentEventWriteFailureCounter.add(1, attributes); 614 this.serverSentEventWriteDurationHistogram.record(seconds(writeDuration), attributes); 615 616 if (deliveryLag != null) 617 this.serverSentEventDeliveryLagHistogram.record(seconds(deliveryLag), attributes); 618 if (payloadBytes != null) 619 this.serverSentEventPayloadSizeHistogram.record(payloadBytes, attributes); 620 if (queueDepth != null) 621 this.serverSentEventQueueDepthHistogram.record(queueDepth, attributes); 622 } 623 624 @Override 625 public void didDropServerSentEvent(@NonNull ServerSentEventConnection serverSentEventConnection, 626 @NonNull ServerSentEvent serverSentEvent, 627 @NonNull ServerSentEventDropReason reason, 628 @Nullable Integer payloadBytes, 629 @Nullable Integer queueDepth) { 630 requireNonNull(serverSentEventConnection); 631 requireNonNull(serverSentEvent); 632 requireNonNull(reason); 633 634 Attributes attributes = Attributes.builder() 635 .putAll(serverSentEventAttributes(serverSentEventConnection)) 636 .put(SSE_DROP_REASON_ATTRIBUTE_KEY, enumValue(reason)) 637 .build(); 638 this.serverSentEventDropCounter.add(1, attributes); 639 640 if (payloadBytes != null) 641 this.serverSentEventPayloadSizeHistogram.record(payloadBytes, attributes); 642 if (queueDepth != null) 643 this.serverSentEventQueueDepthHistogram.record(queueDepth, attributes); 644 } 645 646 @Override 647 public void didWriteServerSentEventComment(@NonNull ServerSentEventConnection serverSentEventConnection, 648 @NonNull ServerSentEventComment serverSentEventComment, 649 @NonNull Duration writeDuration, 650 @Nullable Duration deliveryLag, 651 @Nullable Integer payloadBytes, 652 @Nullable Integer queueDepth) { 653 requireNonNull(serverSentEventConnection); 654 requireNonNull(serverSentEventComment); 655 requireNonNull(writeDuration); 656 657 Attributes attributes = serverSentEventCommentAttributes(serverSentEventConnection, serverSentEventComment.getCommentType()); 658 this.serverSentEventCommentWrittenCounter.add(1, attributes); 659 this.serverSentEventCommentWriteDurationHistogram.record(seconds(writeDuration), attributes); 660 661 if (deliveryLag != null) 662 this.serverSentEventCommentDeliveryLagHistogram.record(seconds(deliveryLag), attributes); 663 if (payloadBytes != null) 664 this.serverSentEventCommentPayloadSizeHistogram.record(payloadBytes, attributes); 665 if (queueDepth != null) 666 this.serverSentEventCommentQueueDepthHistogram.record(queueDepth, attributes); 667 } 668 669 @Override 670 public void didFailToWriteServerSentEventComment(@NonNull ServerSentEventConnection serverSentEventConnection, 671 @NonNull ServerSentEventComment serverSentEventComment, 672 @NonNull Duration writeDuration, 673 @NonNull Throwable throwable, 674 @Nullable Duration deliveryLag, 675 @Nullable Integer payloadBytes, 676 @Nullable Integer queueDepth) { 677 requireNonNull(serverSentEventConnection); 678 requireNonNull(serverSentEventComment); 679 requireNonNull(writeDuration); 680 requireNonNull(throwable); 681 682 Attributes attributes = serverSentEventCommentAttributes(serverSentEventConnection, serverSentEventComment.getCommentType()); 683 this.serverSentEventCommentWriteFailureCounter.add(1, attributes); 684 this.serverSentEventCommentWriteDurationHistogram.record(seconds(writeDuration), attributes); 685 686 if (deliveryLag != null) 687 this.serverSentEventCommentDeliveryLagHistogram.record(seconds(deliveryLag), attributes); 688 if (payloadBytes != null) 689 this.serverSentEventCommentPayloadSizeHistogram.record(payloadBytes, attributes); 690 if (queueDepth != null) 691 this.serverSentEventCommentQueueDepthHistogram.record(queueDepth, attributes); 692 } 693 694 @Override 695 public void didDropServerSentEventComment(@NonNull ServerSentEventConnection serverSentEventConnection, 696 @NonNull ServerSentEventComment serverSentEventComment, 697 @NonNull ServerSentEventDropReason reason, 698 @Nullable Integer payloadBytes, 699 @Nullable Integer queueDepth) { 700 requireNonNull(serverSentEventConnection); 701 requireNonNull(serverSentEventComment); 702 requireNonNull(reason); 703 704 Attributes attributes = Attributes.builder() 705 .putAll(serverSentEventCommentAttributes(serverSentEventConnection, serverSentEventComment.getCommentType())) 706 .put(SSE_DROP_REASON_ATTRIBUTE_KEY, enumValue(reason)) 707 .build(); 708 this.serverSentEventCommentDropCounter.add(1, attributes); 709 710 if (payloadBytes != null) 711 this.serverSentEventCommentPayloadSizeHistogram.record(payloadBytes, attributes); 712 if (queueDepth != null) 713 this.serverSentEventCommentQueueDepthHistogram.record(queueDepth, attributes); 714 } 715 716 @Override 717 public void didBroadcastServerSentEvent(@NonNull ResourcePathDeclaration route, 718 int attempted, 719 int enqueued, 720 int dropped) { 721 requireNonNull(route); 722 recordBroadcastTotals(route, BROADCAST_PAYLOAD_EVENT, UNKNOWN_COMMENT_TYPE, attempted, enqueued, dropped); 723 } 724 725 @Override 726 public void didBroadcastServerSentEventComment(@NonNull ResourcePathDeclaration route, 727 ServerSentEventComment.@NonNull CommentType commentType, 728 int attempted, 729 int enqueued, 730 int dropped) { 731 requireNonNull(route); 732 requireNonNull(commentType); 733 recordBroadcastTotals(route, BROADCAST_PAYLOAD_COMMENT, enumValue(commentType), attempted, enqueued, dropped); 734 } 735 736 @NonNull 737 private Attributes serverTypeAttributes(@NonNull ServerType serverType) { 738 requireNonNull(serverType); 739 return Attributes.of(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType)); 740 } 741 742 @NonNull 743 private Attributes serverTypeAndReasonAttributes(@NonNull ServerType serverType, 744 @NonNull Enum<?> reason) { 745 requireNonNull(serverType); 746 requireNonNull(reason); 747 return Attributes.builder() 748 .put(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType)) 749 .put(FAILURE_REASON_ATTRIBUTE_KEY, enumValue(reason)) 750 .build(); 751 } 752 753 @NonNull 754 private Attributes activeRequestAttributes(@NonNull ServerType serverType, 755 @NonNull Request request) { 756 requireNonNull(serverType); 757 requireNonNull(request); 758 759 var builder = Attributes.builder() 760 .put(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name()); 761 762 if (this.metricNamingStrategy == MetricNamingStrategy.SEMCONV) { 763 builder.put(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP); 764 } else { 765 builder.put(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType)); 766 } 767 768 return builder.build(); 769 } 770 771 @NonNull 772 private Attributes requestAttributes(@NonNull ServerType serverType, 773 @NonNull Request request, 774 @Nullable ResourceMethod resourceMethod, 775 @Nullable Integer statusCode, 776 @Nullable Throwable throwable) { 777 requireNonNull(serverType); 778 requireNonNull(request); 779 780 var builder = Attributes.builder() 781 .put(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name()); 782 783 if (this.metricNamingStrategy == MetricNamingStrategy.SEMCONV) { 784 builder.put(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP); 785 786 if (resourceMethod != null) 787 builder.put(HTTP_ROUTE_ATTRIBUTE_KEY, routeFor(resourceMethod)); 788 } else { 789 builder.put(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType)) 790 .put(HTTP_ROUTE_ATTRIBUTE_KEY, routeFor(resourceMethod)); 791 } 792 793 if (statusCode != null) 794 builder.put(HTTP_STATUS_CODE_ATTRIBUTE_KEY, statusCode.longValue()); 795 796 String errorType = errorTypeFor(statusCode, throwable); 797 798 if (errorType != null) 799 builder.put(ERROR_TYPE_ATTRIBUTE_KEY, errorType); 800 801 return builder.build(); 802 } 803 804 @NonNull 805 private Attributes serverSentEventAttributes(@NonNull ServerSentEventConnection serverSentEventConnection) { 806 requireNonNull(serverSentEventConnection); 807 return Attributes.of( 808 HTTP_ROUTE_ATTRIBUTE_KEY, 809 routeFor(serverSentEventConnection.getResourceMethod()) 810 ); 811 } 812 813 @NonNull 814 private Attributes serverSentEventCommentAttributes(@NonNull ServerSentEventConnection serverSentEventConnection, 815 ServerSentEventComment.@NonNull CommentType commentType) { 816 requireNonNull(serverSentEventConnection); 817 requireNonNull(commentType); 818 return Attributes.builder() 819 .putAll(serverSentEventAttributes(serverSentEventConnection)) 820 .put(SSE_COMMENT_TYPE_ATTRIBUTE_KEY, enumValue(commentType)) 821 .build(); 822 } 823 824 private void recordBroadcastTotals(@NonNull ResourcePathDeclaration route, 825 @NonNull String payloadType, 826 @NonNull String commentType, 827 int attempted, 828 int enqueued, 829 int dropped) { 830 requireNonNull(route); 831 requireNonNull(payloadType); 832 requireNonNull(commentType); 833 834 Attributes attributes = Attributes.builder() 835 .put(HTTP_ROUTE_ATTRIBUTE_KEY, route.getPath()) 836 .put(SSE_BROADCAST_PAYLOAD_TYPE_ATTRIBUTE_KEY, payloadType) 837 .put(SSE_COMMENT_TYPE_ATTRIBUTE_KEY, commentType) 838 .build(); 839 840 if (attempted > 0) 841 this.serverSentEventBroadcastAttemptCounter.add(attempted, attributes); 842 if (enqueued > 0) 843 this.serverSentEventBroadcastEnqueuedCounter.add(enqueued, attributes); 844 if (dropped > 0) 845 this.serverSentEventBroadcastDroppedCounter.add(dropped, attributes); 846 } 847 848 @NonNull 849 private static String activeRequestsMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) { 850 requireNonNull(metricNamingStrategy); 851 if (metricNamingStrategy == MetricNamingStrategy.SEMCONV) 852 return "http.server.active_requests"; 853 return "soklet.server.requests.active"; 854 } 855 856 @NonNull 857 private static String requestDurationMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) { 858 requireNonNull(metricNamingStrategy); 859 if (metricNamingStrategy == MetricNamingStrategy.SEMCONV) 860 return "http.server.request.duration"; 861 return "soklet.server.request.duration"; 862 } 863 864 @NonNull 865 private static String requestBodySizeMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) { 866 requireNonNull(metricNamingStrategy); 867 if (metricNamingStrategy == MetricNamingStrategy.SEMCONV) 868 return "http.server.request.body.size"; 869 return "soklet.server.request.body.size"; 870 } 871 872 @NonNull 873 private static String responseBodySizeMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) { 874 requireNonNull(metricNamingStrategy); 875 if (metricNamingStrategy == MetricNamingStrategy.SEMCONV) 876 return "http.server.response.body.size"; 877 return "soklet.server.response.body.size"; 878 } 879 880 @NonNull 881 private static String routeFor(@Nullable ResourceMethod resourceMethod) { 882 if (resourceMethod == null) 883 return UNMATCHED_ROUTE; 884 return resourceMethod.getResourcePathDeclaration().getPath(); 885 } 886 887 @NonNull 888 private static String enumValue(@NonNull Enum<?> value) { 889 requireNonNull(value); 890 return value.name().toLowerCase(Locale.ROOT); 891 } 892 893 @Nullable 894 private String errorTypeFor(@Nullable Integer statusCode, 895 @Nullable Throwable throwable) { 896 if (throwable != null) 897 return throwable.getClass().getName(); 898 899 if (this.metricNamingStrategy == MetricNamingStrategy.SEMCONV 900 && statusCode != null 901 && statusCode >= 500) 902 return String.valueOf(statusCode); 903 904 return null; 905 } 906 907 private static double seconds(@NonNull Duration duration) { 908 requireNonNull(duration); 909 return duration.toNanos() / 1_000_000_000D; 910 } 911 912 /** 913 * Naming strategy for HTTP metric instrument names. 914 * <p> 915 * SSE- and Soklet-specific concepts remain under the {@code soklet.*} namespace in all strategies. 916 */ 917 public enum MetricNamingStrategy { 918 /** 919 * Use OpenTelemetry Semantic Convention names for standard HTTP server metrics. 920 */ 921 SEMCONV, 922 /** 923 * Use {@code soklet.*} names for all metrics. 924 */ 925 SOKLET 926 } 927 928 /** 929 * Builder used to construct instances of {@link OpenTelemetryMetricsCollector}. 930 */ 931 @NotThreadSafe 932 public static final class Builder { 933 @Nullable 934 private Meter meter; 935 @Nullable 936 private OpenTelemetry openTelemetry; 937 @NonNull 938 private MetricNamingStrategy metricNamingStrategy; 939 @NonNull 940 private String instrumentationName; 941 @Nullable 942 private String instrumentationVersion; 943 944 private Builder() { 945 this.openTelemetry = GlobalOpenTelemetry.get(); 946 this.metricNamingStrategy = MetricNamingStrategy.SEMCONV; 947 this.instrumentationName = DEFAULT_INSTRUMENTATION_NAME; 948 this.instrumentationVersion = null; 949 } 950 951 /** 952 * Sets a specific meter to use for metric instruments. 953 * 954 * @param meter the meter to use 955 * @return this builder 956 */ 957 @NonNull 958 public Builder meter(@NonNull Meter meter) { 959 this.meter = requireNonNull(meter); 960 return this; 961 } 962 963 /** 964 * Sets the OpenTelemetry API object used to construct a meter if {@link #meter(Meter)} is not set. 965 * 966 * @param openTelemetry the OpenTelemetry instance 967 * @return this builder 968 */ 969 @NonNull 970 public Builder openTelemetry(@NonNull OpenTelemetry openTelemetry) { 971 this.openTelemetry = requireNonNull(openTelemetry); 972 return this; 973 } 974 975 /** 976 * Sets the naming strategy for HTTP metrics. 977 * 978 * @param metricNamingStrategy the naming strategy 979 * @return this builder 980 */ 981 @NonNull 982 public Builder metricNamingStrategy(@NonNull MetricNamingStrategy metricNamingStrategy) { 983 this.metricNamingStrategy = requireNonNull(metricNamingStrategy); 984 return this; 985 } 986 987 /** 988 * Sets the instrumentation scope name to use when constructing a meter. 989 * 990 * @param instrumentationName the instrumentation scope name 991 * @return this builder 992 */ 993 @NonNull 994 public Builder instrumentationName(@NonNull String instrumentationName) { 995 this.instrumentationName = requireNonNull(instrumentationName); 996 return this; 997 } 998 999 /** 1000 * Sets an optional instrumentation scope version to use when constructing a meter. 1001 * 1002 * @param instrumentationVersion the instrumentation scope version, or {@code null} 1003 * @return this builder 1004 */ 1005 @NonNull 1006 public Builder instrumentationVersion(@Nullable String instrumentationVersion) { 1007 this.instrumentationVersion = instrumentationVersion; 1008 return this; 1009 } 1010 1011 @NonNull 1012 private Meter resolveMeter() { 1013 if (this.meter != null) 1014 return this.meter; 1015 1016 MeterBuilder meterBuilder = requireNonNull(this.openTelemetry).meterBuilder(this.instrumentationName); 1017 1018 if (this.instrumentationVersion != null) 1019 meterBuilder = meterBuilder.setInstrumentationVersion(this.instrumentationVersion); 1020 1021 return meterBuilder.build(); 1022 } 1023 1024 /** 1025 * Builds the collector. 1026 * 1027 * @return the collector instance 1028 */ 1029 @NonNull 1030 public OpenTelemetryMetricsCollector build() { 1031 return new OpenTelemetryMetricsCollector(this); 1032 } 1033 } 1034}