001/*
002 * Copyright 2022-2026 Revetware LLC.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.soklet.otel;
018
019import com.soklet.ConnectionRejectionReason;
020import com.soklet.MarshaledResponse;
021import com.soklet.MetricsCollector;
022import com.soklet.Request;
023import com.soklet.RequestReadFailureReason;
024import com.soklet.RequestRejectionReason;
025import com.soklet.ResourceMethod;
026import com.soklet.ResourcePathDeclaration;
027import com.soklet.ServerType;
028import com.soklet.SseComment;
029import com.soklet.SseConnection;
030import com.soklet.SseEvent;
031import com.soklet.StreamTermination;
032import io.opentelemetry.api.GlobalOpenTelemetry;
033import io.opentelemetry.api.OpenTelemetry;
034import io.opentelemetry.api.common.AttributeKey;
035import io.opentelemetry.api.common.Attributes;
036import io.opentelemetry.api.metrics.DoubleHistogram;
037import io.opentelemetry.api.metrics.LongCounter;
038import io.opentelemetry.api.metrics.LongHistogram;
039import io.opentelemetry.api.metrics.LongUpDownCounter;
040import io.opentelemetry.api.metrics.Meter;
041import io.opentelemetry.api.metrics.MeterBuilder;
042import org.jspecify.annotations.NonNull;
043import org.jspecify.annotations.Nullable;
044
045import javax.annotation.concurrent.NotThreadSafe;
046import javax.annotation.concurrent.ThreadSafe;
047import java.net.InetSocketAddress;
048import java.time.Duration;
049import java.util.List;
050import java.util.Locale;
051
052import static java.util.Objects.requireNonNull;
053
054/**
055 * OpenTelemetry-backed {@link MetricsCollector} for Soklet HTTP and SSE telemetry.
056 * <p>
057 * This implementation records counters/histograms via OpenTelemetry's metrics API and is designed to be
058 * lightweight, thread-safe, and non-blocking in request hot paths.
059 * <p>
060 * By default, standard HTTP metrics use OpenTelemetry Semantic Convention names. Soklet-specific concepts
061 * (for example SSE queue/drop/broadcast details) are emitted with {@code soklet.*} names.
062 * <p>
063 * If inbound requests include W3C trace context, Soklet exposes it via {@link Request#getTraceContext()} to
064 * custom metrics collectors and application code. This metrics-only implementation intentionally does not emit
065 * trace IDs, parent IDs, or {@code tracestate} values as metric attributes because those values are high-cardinality
066 * and belong in logs, spans, or exemplar-aware tracing integrations instead.
067 * <p>
068 * See <a href="https://soklet.com/docs/metrics-collection">https://soklet.com/docs/metrics-collection</a> for Soklet's metrics/telemetry documentation.
069 *
070 * @author <a href="https://www.revetkn.com">Mark Allen</a>
071 */
072@ThreadSafe
073public final class OpenTelemetryMetricsCollector implements MetricsCollector {
074        @NonNull
075        private static final String UNMATCHED_ROUTE;
076        @NonNull
077        private static final String UNKNOWN_COMMENT_TYPE;
078        @NonNull
079        private static final String BROADCAST_PAYLOAD_EVENT;
080        @NonNull
081        private static final String BROADCAST_PAYLOAD_COMMENT;
082        @NonNull
083        private static final String DEFAULT_INSTRUMENTATION_NAME;
084        @NonNull
085        private static final String URL_SCHEME_HTTP;
086
087        @NonNull
088        private static final AttributeKey<String> SERVER_TYPE_ATTRIBUTE_KEY;
089        @NonNull
090        private static final AttributeKey<String> FAILURE_REASON_ATTRIBUTE_KEY;
091        @NonNull
092        private static final AttributeKey<String> ERROR_TYPE_ATTRIBUTE_KEY;
093        @NonNull
094        private static final AttributeKey<String> HTTP_METHOD_ATTRIBUTE_KEY;
095        @NonNull
096        private static final AttributeKey<String> HTTP_ROUTE_ATTRIBUTE_KEY;
097        @NonNull
098        private static final AttributeKey<String> URL_SCHEME_ATTRIBUTE_KEY;
099        @NonNull
100        private static final AttributeKey<Long> HTTP_STATUS_CODE_ATTRIBUTE_KEY;
101        @NonNull
102        private static final AttributeKey<String> SSE_TERMINATION_REASON_ATTRIBUTE_KEY;
103        @NonNull
104        private static final AttributeKey<String> SSE_DROP_REASON_ATTRIBUTE_KEY;
105        @NonNull
106        private static final AttributeKey<String> SSE_COMMENT_TYPE_ATTRIBUTE_KEY;
107        @NonNull
108        private static final AttributeKey<String> SSE_BROADCAST_PAYLOAD_TYPE_ATTRIBUTE_KEY;
109
110        static {
111                UNMATCHED_ROUTE = "_unmatched";
112                UNKNOWN_COMMENT_TYPE = "unknown";
113                BROADCAST_PAYLOAD_EVENT = "event";
114                BROADCAST_PAYLOAD_COMMENT = "comment";
115                DEFAULT_INSTRUMENTATION_NAME = "com.soklet.otel";
116                URL_SCHEME_HTTP = "http";
117
118                SERVER_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.server.type");
119                FAILURE_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.failure.reason");
120                ERROR_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("error.type");
121                HTTP_METHOD_ATTRIBUTE_KEY = AttributeKey.stringKey("http.request.method");
122                HTTP_ROUTE_ATTRIBUTE_KEY = AttributeKey.stringKey("http.route");
123                URL_SCHEME_ATTRIBUTE_KEY = AttributeKey.stringKey("url.scheme");
124                HTTP_STATUS_CODE_ATTRIBUTE_KEY = AttributeKey.longKey("http.response.status_code");
125                SSE_TERMINATION_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.termination.reason");
126                SSE_DROP_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.drop.reason");
127                SSE_COMMENT_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.comment.type");
128                SSE_BROADCAST_PAYLOAD_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.broadcast.payload.type");
129        }
130
131        @NonNull
132        private final LongCounter connectionsAcceptedCounter;
133        @NonNull
134        private final LongCounter connectionsRejectedCounter;
135        @NonNull
136        private final LongCounter requestsAcceptedCounter;
137        @NonNull
138        private final LongCounter requestsRejectedCounter;
139        @NonNull
140        private final LongCounter requestReadFailureCounter;
141        @NonNull
142        private final LongUpDownCounter activeRequestsCounter;
143        @NonNull
144        private final DoubleHistogram requestDurationHistogram;
145        @NonNull
146        private final DoubleHistogram responseWriteDurationHistogram;
147        @NonNull
148        private final LongCounter responseWriteFailureCounter;
149        @NonNull
150        private final LongCounter requestThrowableCounter;
151        @NonNull
152        private final LongHistogram requestBodySizeHistogram;
153        @NonNull
154        private final LongHistogram responseBodySizeHistogram;
155
156        @NonNull
157        private final LongUpDownCounter activeServerSentEventStreamsCounter;
158        @NonNull
159        private final LongCounter serverSentEventStreamsEstablishedCounter;
160        @NonNull
161        private final LongCounter serverSentEventHandshakeFailureCounter;
162        @NonNull
163        private final LongCounter serverSentEventStreamsTerminatedCounter;
164        @NonNull
165        private final DoubleHistogram serverSentEventStreamDurationHistogram;
166        @NonNull
167        private final LongCounter serverSentEventWrittenCounter;
168        @NonNull
169        private final LongCounter serverSentEventWriteFailureCounter;
170        @NonNull
171        private final DoubleHistogram serverSentEventWriteDurationHistogram;
172        @NonNull
173        private final DoubleHistogram serverSentEventDeliveryLagHistogram;
174        @NonNull
175        private final LongHistogram serverSentEventPayloadSizeHistogram;
176        @NonNull
177        private final LongHistogram serverSentEventQueueDepthHistogram;
178        @NonNull
179        private final LongCounter serverSentEventDropCounter;
180
181        @NonNull
182        private final LongCounter serverSentEventCommentWrittenCounter;
183        @NonNull
184        private final LongCounter serverSentEventCommentWriteFailureCounter;
185        @NonNull
186        private final DoubleHistogram serverSentEventCommentWriteDurationHistogram;
187        @NonNull
188        private final DoubleHistogram serverSentEventCommentDeliveryLagHistogram;
189        @NonNull
190        private final LongHistogram serverSentEventCommentPayloadSizeHistogram;
191        @NonNull
192        private final LongHistogram serverSentEventCommentQueueDepthHistogram;
193        @NonNull
194        private final LongCounter serverSentEventCommentDropCounter;
195
196        @NonNull
197        private final LongCounter serverSentEventBroadcastAttemptCounter;
198        @NonNull
199        private final LongCounter serverSentEventBroadcastEnqueuedCounter;
200        @NonNull
201        private final LongCounter serverSentEventBroadcastDroppedCounter;
202        @NonNull
203        private final MetricNamingStrategy metricNamingStrategy;
204
205        /**
206         * Acquires a builder for {@link OpenTelemetryMetricsCollector} instances, using {@link GlobalOpenTelemetry}
207         * by default.
208         *
209         * @return the builder
210         */
211        @NonNull
212        public static Builder builder() {
213                return new Builder();
214        }
215
216        /**
217         * Acquires a builder seeded with a required {@link Meter}.
218         *
219         * @param meter the meter used to build instruments
220         * @return the builder
221         */
222        @NonNull
223        public static Builder withMeter(@NonNull Meter meter) {
224                requireNonNull(meter);
225                return builder().meter(meter);
226        }
227
228        /**
229         * Acquires a builder seeded with a required {@link OpenTelemetry} instance.
230         *
231         * @param openTelemetry the OpenTelemetry instance used to build a meter
232         * @return the builder
233         */
234        @NonNull
235        public static Builder withOpenTelemetry(@NonNull OpenTelemetry openTelemetry) {
236                requireNonNull(openTelemetry);
237                return builder().openTelemetry(openTelemetry);
238        }
239
240        /**
241         * Creates an instance from a required {@link Meter} without additional customization.
242         *
243         * @param meter the meter used to build instruments
244         * @return an {@link OpenTelemetryMetricsCollector} instance
245         */
246        @NonNull
247        public static OpenTelemetryMetricsCollector fromMeter(@NonNull Meter meter) {
248                return withMeter(meter).build();
249        }
250
251        /**
252         * Creates an instance from a required {@link OpenTelemetry} without additional customization.
253         *
254         * @param openTelemetry the OpenTelemetry instance used to build a meter
255         * @return an {@link OpenTelemetryMetricsCollector} instance
256         */
257        @NonNull
258        public static OpenTelemetryMetricsCollector fromOpenTelemetry(@NonNull OpenTelemetry openTelemetry) {
259                return withOpenTelemetry(openTelemetry).build();
260        }
261
262        private OpenTelemetryMetricsCollector(@NonNull Builder builder) {
263                requireNonNull(builder);
264                Meter meter = requireNonNull(builder.resolveMeter());
265                this.metricNamingStrategy = requireNonNull(builder.metricNamingStrategy);
266
267                String activeRequestsMetricName = activeRequestsMetricNameFor(this.metricNamingStrategy);
268                String requestDurationMetricName = requestDurationMetricNameFor(this.metricNamingStrategy);
269                String requestBodySizeMetricName = requestBodySizeMetricNameFor(this.metricNamingStrategy);
270                String responseBodySizeMetricName = responseBodySizeMetricNameFor(this.metricNamingStrategy);
271
272                this.connectionsAcceptedCounter = meter.counterBuilder("soklet.server.connections.accepted")
273                                .setDescription("Total number of accepted inbound TCP connections.")
274                                .setUnit("{connection}")
275                                .build();
276                this.connectionsRejectedCounter = meter.counterBuilder("soklet.server.connections.rejected")
277                                .setDescription("Total number of rejected inbound TCP connections.")
278                                .setUnit("{connection}")
279                                .build();
280                this.requestsAcceptedCounter = meter.counterBuilder("soklet.server.requests.accepted")
281                                .setDescription("Total number of accepted requests before app-level handling.")
282                                .setUnit("{request}")
283                                .build();
284                this.requestsRejectedCounter = meter.counterBuilder("soklet.server.requests.rejected")
285                                .setDescription("Total number of rejected requests before app-level handling.")
286                                .setUnit("{request}")
287                                .build();
288                this.requestReadFailureCounter = meter.counterBuilder("soklet.server.request.read.failures")
289                                .setDescription("Total number of request read/parse failures.")
290                                .setUnit("{request}")
291                                .build();
292                this.activeRequestsCounter = meter.upDownCounterBuilder(activeRequestsMetricName)
293                                .setDescription("Number of in-flight requests currently being handled.")
294                                .setUnit("{request}")
295                                .build();
296                this.requestDurationHistogram = meter.histogramBuilder(requestDurationMetricName)
297                                .setDescription("Total request handling duration.")
298                                .setUnit("s")
299                                .build();
300                this.responseWriteDurationHistogram = meter.histogramBuilder("soklet.server.response.write.duration")
301                                .setDescription("Duration spent writing response bytes.")
302                                .setUnit("s")
303                                .build();
304                this.responseWriteFailureCounter = meter.counterBuilder("soklet.server.response.write.failures")
305                                .setDescription("Total number of response write failures.")
306                                .setUnit("{response}")
307                                .build();
308                this.requestThrowableCounter = meter.counterBuilder("soklet.server.request.throwables")
309                                .setDescription("Total number of throwables observed during request handling.")
310                                .setUnit("{throwable}")
311                                .build();
312                this.requestBodySizeHistogram = meter.histogramBuilder(requestBodySizeMetricName)
313                                .ofLongs()
314                                .setDescription("Request body size in bytes.")
315                                .setUnit("By")
316                                .build();
317                this.responseBodySizeHistogram = meter.histogramBuilder(responseBodySizeMetricName)
318                                .ofLongs()
319                                .setDescription("Response body size in bytes.")
320                                .setUnit("By")
321                                .build();
322
323                this.activeServerSentEventStreamsCounter = meter.upDownCounterBuilder("soklet.sse.streams.active")
324                                .setDescription("Number of active SSE streams.")
325                                .setUnit("{stream}")
326                                .build();
327                this.serverSentEventStreamsEstablishedCounter = meter.counterBuilder("soklet.sse.streams.established")
328                                .setDescription("Total number of SSE streams established.")
329                                .setUnit("{stream}")
330                                .build();
331                this.serverSentEventHandshakeFailureCounter = meter.counterBuilder("soklet.sse.handshakes.rejected")
332                                .setDescription("Total number of rejected SSE handshakes.")
333                                .setUnit("{handshake}")
334                                .build();
335                this.serverSentEventStreamsTerminatedCounter = meter.counterBuilder("soklet.sse.streams.terminated")
336                                .setDescription("Total number of terminated SSE streams.")
337                                .setUnit("{stream}")
338                                .build();
339                this.serverSentEventStreamDurationHistogram = meter.histogramBuilder("soklet.sse.stream.duration")
340                                .setDescription("SSE stream duration.")
341                                .setUnit("s")
342                                .build();
343                this.serverSentEventWrittenCounter = meter.counterBuilder("soklet.sse.events.written")
344                                .setDescription("Total number of SSE events successfully written.")
345                                .setUnit("{event}")
346                                .build();
347                this.serverSentEventWriteFailureCounter = meter.counterBuilder("soklet.sse.events.write.failures")
348                                .setDescription("Total number of SSE events that failed to write.")
349                                .setUnit("{event}")
350                                .build();
351                this.serverSentEventWriteDurationHistogram = meter.histogramBuilder("soklet.sse.events.write.duration")
352                                .setDescription("SSE event write duration.")
353                                .setUnit("s")
354                                .build();
355                this.serverSentEventDeliveryLagHistogram = meter.histogramBuilder("soklet.sse.events.delivery.lag")
356                                .setDescription("Time spent waiting in the SSE queue before write.")
357                                .setUnit("s")
358                                .build();
359                this.serverSentEventPayloadSizeHistogram = meter.histogramBuilder("soklet.sse.events.payload.size")
360                                .ofLongs()
361                                .setDescription("Serialized SSE event payload size in bytes.")
362                                .setUnit("By")
363                                .build();
364                this.serverSentEventQueueDepthHistogram = meter.histogramBuilder("soklet.sse.events.queue.depth")
365                                .ofLongs()
366                                .setDescription("Queued element depth when SSE event write/drop outcome is observed.")
367                                .setUnit("{item}")
368                                .build();
369                this.serverSentEventDropCounter = meter.counterBuilder("soklet.sse.events.dropped")
370                                .setDescription("Total number of SSE events dropped before enqueue.")
371                                .setUnit("{event}")
372                                .build();
373
374                this.serverSentEventCommentWrittenCounter = meter.counterBuilder("soklet.sse.comments.written")
375                                .setDescription("Total number of SSE comments successfully written.")
376                                .setUnit("{comment}")
377                                .build();
378                this.serverSentEventCommentWriteFailureCounter = meter.counterBuilder("soklet.sse.comments.write.failures")
379                                .setDescription("Total number of SSE comments that failed to write.")
380                                .setUnit("{comment}")
381                                .build();
382                this.serverSentEventCommentWriteDurationHistogram = meter.histogramBuilder("soklet.sse.comments.write.duration")
383                                .setDescription("SSE comment write duration.")
384                                .setUnit("s")
385                                .build();
386                this.serverSentEventCommentDeliveryLagHistogram = meter.histogramBuilder("soklet.sse.comments.delivery.lag")
387                                .setDescription("Time spent waiting in the SSE queue before comment write.")
388                                .setUnit("s")
389                                .build();
390                this.serverSentEventCommentPayloadSizeHistogram = meter.histogramBuilder("soklet.sse.comments.payload.size")
391                                .ofLongs()
392                                .setDescription("Serialized SSE comment payload size in bytes.")
393                                .setUnit("By")
394                                .build();
395                this.serverSentEventCommentQueueDepthHistogram = meter.histogramBuilder("soklet.sse.comments.queue.depth")
396                                .ofLongs()
397                                .setDescription("Queued element depth when SSE comment write/drop outcome is observed.")
398                                .setUnit("{item}")
399                                .build();
400                this.serverSentEventCommentDropCounter = meter.counterBuilder("soklet.sse.comments.dropped")
401                                .setDescription("Total number of SSE comments dropped before enqueue.")
402                                .setUnit("{comment}")
403                                .build();
404
405                this.serverSentEventBroadcastAttemptCounter = meter.counterBuilder("soklet.sse.broadcast.attempted")
406                                .setDescription("Total number of attempted SSE broadcast deliveries.")
407                                .setUnit("{delivery}")
408                                .build();
409                this.serverSentEventBroadcastEnqueuedCounter = meter.counterBuilder("soklet.sse.broadcast.enqueued")
410                                .setDescription("Total number of SSE broadcast deliveries successfully enqueued.")
411                                .setUnit("{delivery}")
412                                .build();
413                this.serverSentEventBroadcastDroppedCounter = meter.counterBuilder("soklet.sse.broadcast.dropped")
414                                .setDescription("Total number of SSE broadcast deliveries dropped before enqueue.")
415                                .setUnit("{delivery}")
416                                .build();
417        }
418
419        @Override
420        public void didAcceptConnection(@NonNull ServerType serverType,
421                                                                                                                                        @Nullable InetSocketAddress remoteAddress) {
422                requireNonNull(serverType);
423                this.connectionsAcceptedCounter.add(1, serverTypeAttributes(serverType));
424        }
425
426        @Override
427        public void didFailToAcceptConnection(@NonNull ServerType serverType,
428                                                                                                                                                                @Nullable InetSocketAddress remoteAddress,
429                                                                                                                                                                @NonNull ConnectionRejectionReason reason,
430                                                                                                                                                                @Nullable Throwable throwable) {
431                requireNonNull(serverType);
432                requireNonNull(reason);
433                this.connectionsRejectedCounter.add(1, serverTypeAndReasonAttributes(serverType, reason));
434        }
435
436        @Override
437        public void didAcceptRequest(@NonNull ServerType serverType,
438                                                                                                                         @Nullable InetSocketAddress remoteAddress,
439                                                                                                                         @Nullable String requestTarget) {
440                requireNonNull(serverType);
441                this.requestsAcceptedCounter.add(1, serverTypeAttributes(serverType));
442        }
443
444        @Override
445        public void didFailToAcceptRequest(@NonNull ServerType serverType,
446                                                                                                                                                 @Nullable InetSocketAddress remoteAddress,
447                                                                                                                                                 @Nullable String requestTarget,
448                                                                                                                                                 @NonNull RequestRejectionReason reason,
449                                                                                                                                                 @Nullable Throwable throwable) {
450                requireNonNull(serverType);
451                requireNonNull(reason);
452                this.requestsRejectedCounter.add(1, serverTypeAndReasonAttributes(serverType, reason));
453        }
454
455        @Override
456        public void didFailToReadRequest(@NonNull ServerType serverType,
457                                                                                                                                         @Nullable InetSocketAddress remoteAddress,
458                                                                                                                                         @Nullable String requestTarget,
459                                                                                                                                         @NonNull RequestReadFailureReason reason,
460                                                                                                                                         @Nullable Throwable throwable) {
461                requireNonNull(serverType);
462                requireNonNull(reason);
463                this.requestReadFailureCounter.add(1, serverTypeAndReasonAttributes(serverType, reason));
464        }
465
466        @Override
467        public void didStartRequestHandling(@NonNull ServerType serverType,
468                                                                                                                                                        @NonNull Request request,
469                                                                                                                                                        @Nullable ResourceMethod resourceMethod) {
470                requireNonNull(serverType);
471                requireNonNull(request);
472
473                this.activeRequestsCounter.add(1, activeRequestAttributes(serverType, request));
474        }
475
476        @Override
477        public void didFinishRequestHandling(@NonNull ServerType serverType,
478                                                                                                                                                         @NonNull Request request,
479                                                                                                                                                         @Nullable ResourceMethod resourceMethod,
480                                                                                                                                                         @NonNull MarshaledResponse marshaledResponse,
481                                                                                                                                                         @NonNull Duration duration,
482                                                                                                                                                         @NonNull List<@NonNull Throwable> throwables) {
483                requireNonNull(serverType);
484                requireNonNull(request);
485                requireNonNull(marshaledResponse);
486                requireNonNull(duration);
487                requireNonNull(throwables);
488
489                Throwable throwable = throwables.isEmpty() ? null : throwables.get(0);
490                Attributes attributes = requestAttributes(serverType, request, resourceMethod, marshaledResponse.getStatusCode(), throwable);
491
492                this.activeRequestsCounter.add(-1, activeRequestAttributes(serverType, request));
493                this.requestDurationHistogram.record(seconds(duration), attributes);
494                this.requestBodySizeHistogram.record(request.getBody().map(body -> (long) body.length).orElse(0L), attributes);
495                this.responseBodySizeHistogram.record(marshaledResponse.getBodyLength(), attributes);
496
497                if (!throwables.isEmpty())
498                        this.requestThrowableCounter.add(throwables.size(), attributes);
499        }
500
501        @Override
502        public void didWriteResponse(@NonNull ServerType serverType,
503                                                                                                                         @NonNull Request request,
504                                                                                                                         @Nullable ResourceMethod resourceMethod,
505                                                                                                                         @NonNull MarshaledResponse marshaledResponse,
506                                                                                                                         @NonNull Duration responseWriteDuration) {
507                requireNonNull(serverType);
508                requireNonNull(request);
509                requireNonNull(marshaledResponse);
510                requireNonNull(responseWriteDuration);
511
512                this.responseWriteDurationHistogram.record(
513                                seconds(responseWriteDuration),
514                                requestAttributes(serverType, request, resourceMethod, marshaledResponse.getStatusCode(), null)
515                );
516        }
517
518        @Override
519        public void didFailToWriteResponse(@NonNull ServerType serverType,
520                                                                                                                                                 @NonNull Request request,
521                                                                                                                                                 @Nullable ResourceMethod resourceMethod,
522                                                                                                                                                 @NonNull MarshaledResponse marshaledResponse,
523                                                                                                                                                 @NonNull Duration responseWriteDuration,
524                                                                                                                                                 @NonNull Throwable throwable) {
525                requireNonNull(serverType);
526                requireNonNull(request);
527                requireNonNull(marshaledResponse);
528                requireNonNull(responseWriteDuration);
529                requireNonNull(throwable);
530
531                Attributes attributes = requestAttributes(serverType, request, resourceMethod, marshaledResponse.getStatusCode(), throwable);
532                this.responseWriteFailureCounter.add(1, attributes);
533                this.responseWriteDurationHistogram.record(seconds(responseWriteDuration), attributes);
534        }
535
536        @Override
537        public void didEstablishSseConnection(@NonNull SseConnection sseConnection) {
538                requireNonNull(sseConnection);
539
540                Attributes attributes = serverSentEventAttributes(sseConnection);
541                this.activeServerSentEventStreamsCounter.add(1, attributes);
542                this.serverSentEventStreamsEstablishedCounter.add(1, attributes);
543        }
544
545        @Override
546        public void didFailToEstablishSseConnection(@NonNull Request request,
547                                                                                                                                                                                        @Nullable ResourceMethod resourceMethod,
548                                                                                                                                                                                        SseConnection.@NonNull HandshakeFailureReason reason,
549                                                                                                                                                                                        @Nullable Throwable throwable) {
550                requireNonNull(request);
551                requireNonNull(reason);
552
553                this.serverSentEventHandshakeFailureCounter.add(1,
554                                Attributes.builder()
555                                                .put(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name())
556                                                .put(HTTP_ROUTE_ATTRIBUTE_KEY, routeFor(resourceMethod))
557                                                .put(FAILURE_REASON_ATTRIBUTE_KEY, enumValue(reason))
558                                                .build()
559                );
560        }
561
562        @Override
563        public void didTerminateSseConnection(@NonNull SseConnection sseConnection,
564                                                                                                                                                                @NonNull StreamTermination termination) {
565                requireNonNull(sseConnection);
566                requireNonNull(termination);
567
568                Attributes routeAttributes = serverSentEventAttributes(sseConnection);
569                Attributes durationAttributes = Attributes.builder()
570                                .putAll(routeAttributes)
571                                .put(SSE_TERMINATION_REASON_ATTRIBUTE_KEY, enumValue(termination.getReason()))
572                                .build();
573
574                this.activeServerSentEventStreamsCounter.add(-1, routeAttributes);
575                this.serverSentEventStreamsTerminatedCounter.add(1, durationAttributes);
576                this.serverSentEventStreamDurationHistogram.record(seconds(termination.getDuration()), durationAttributes);
577        }
578
579        @Override
580        public void didWriteSseEvent(@NonNull SseConnection sseConnection,
581                                                                                                                         @NonNull SseEvent sseEvent,
582                                                                                                                         @NonNull Duration writeDuration,
583                                                                                                                         @Nullable Duration deliveryLag,
584                                                                                                                         @Nullable Integer payloadBytes,
585                                                                                                                         @Nullable Integer queueDepth) {
586                requireNonNull(sseConnection);
587                requireNonNull(sseEvent);
588                requireNonNull(writeDuration);
589
590                Attributes attributes = serverSentEventAttributes(sseConnection);
591                this.serverSentEventWrittenCounter.add(1, attributes);
592                this.serverSentEventWriteDurationHistogram.record(seconds(writeDuration), attributes);
593
594                if (deliveryLag != null)
595                        this.serverSentEventDeliveryLagHistogram.record(seconds(deliveryLag), attributes);
596                if (payloadBytes != null)
597                        this.serverSentEventPayloadSizeHistogram.record(payloadBytes, attributes);
598                if (queueDepth != null)
599                        this.serverSentEventQueueDepthHistogram.record(queueDepth, attributes);
600        }
601
602        @Override
603        public void didFailToWriteSseEvent(@NonNull SseConnection sseConnection,
604                                                                                                                                                 @NonNull SseEvent sseEvent,
605                                                                                                                                                 @NonNull Duration writeDuration,
606                                                                                                                                                 @NonNull Throwable throwable,
607                                                                                                                                                 @Nullable Duration deliveryLag,
608                                                                                                                                                 @Nullable Integer payloadBytes,
609                                                                                                                                                 @Nullable Integer queueDepth) {
610                requireNonNull(sseConnection);
611                requireNonNull(sseEvent);
612                requireNonNull(writeDuration);
613                requireNonNull(throwable);
614
615                Attributes attributes = serverSentEventAttributes(sseConnection);
616                this.serverSentEventWriteFailureCounter.add(1, attributes);
617                this.serverSentEventWriteDurationHistogram.record(seconds(writeDuration), attributes);
618
619                if (deliveryLag != null)
620                        this.serverSentEventDeliveryLagHistogram.record(seconds(deliveryLag), attributes);
621                if (payloadBytes != null)
622                        this.serverSentEventPayloadSizeHistogram.record(payloadBytes, attributes);
623                if (queueDepth != null)
624                        this.serverSentEventQueueDepthHistogram.record(queueDepth, attributes);
625        }
626
627        @Override
628        public void didDropSseEvent(@NonNull SseConnection sseConnection,
629                                                                                                                        @NonNull SseEvent sseEvent,
630                                                                                                                        @NonNull SseEventDropReason reason,
631                                                                                                                        @Nullable Integer payloadBytes,
632                                                                                                                        @Nullable Integer queueDepth) {
633                requireNonNull(sseConnection);
634                requireNonNull(sseEvent);
635                requireNonNull(reason);
636
637                Attributes attributes = Attributes.builder()
638                                .putAll(serverSentEventAttributes(sseConnection))
639                                .put(SSE_DROP_REASON_ATTRIBUTE_KEY, enumValue(reason))
640                                .build();
641                this.serverSentEventDropCounter.add(1, attributes);
642
643                if (payloadBytes != null)
644                        this.serverSentEventPayloadSizeHistogram.record(payloadBytes, attributes);
645                if (queueDepth != null)
646                        this.serverSentEventQueueDepthHistogram.record(queueDepth, attributes);
647        }
648
649        @Override
650        public void didWriteSseComment(@NonNull SseConnection sseConnection,
651                                                                                                                                 @NonNull SseComment sseComment,
652                                                                                                                                 @NonNull Duration writeDuration,
653                                                                                                                                 @Nullable Duration deliveryLag,
654                                                                                                                                 @Nullable Integer payloadBytes,
655                                                                                                                                 @Nullable Integer queueDepth) {
656                requireNonNull(sseConnection);
657                requireNonNull(sseComment);
658                requireNonNull(writeDuration);
659
660                Attributes attributes = serverSentEventCommentAttributes(sseConnection, sseComment.getCommentType());
661                this.serverSentEventCommentWrittenCounter.add(1, attributes);
662                this.serverSentEventCommentWriteDurationHistogram.record(seconds(writeDuration), attributes);
663
664                if (deliveryLag != null)
665                        this.serverSentEventCommentDeliveryLagHistogram.record(seconds(deliveryLag), attributes);
666                if (payloadBytes != null)
667                        this.serverSentEventCommentPayloadSizeHistogram.record(payloadBytes, attributes);
668                if (queueDepth != null)
669                        this.serverSentEventCommentQueueDepthHistogram.record(queueDepth, attributes);
670        }
671
672        @Override
673        public void didFailToWriteSseComment(@NonNull SseConnection sseConnection,
674                                                                                                                                                         @NonNull SseComment sseComment,
675                                                                                                                                                         @NonNull Duration writeDuration,
676                                                                                                                                                         @NonNull Throwable throwable,
677                                                                                                                                                         @Nullable Duration deliveryLag,
678                                                                                                                                                         @Nullable Integer payloadBytes,
679                                                                                                                                                         @Nullable Integer queueDepth) {
680                requireNonNull(sseConnection);
681                requireNonNull(sseComment);
682                requireNonNull(writeDuration);
683                requireNonNull(throwable);
684
685                Attributes attributes = serverSentEventCommentAttributes(sseConnection, sseComment.getCommentType());
686                this.serverSentEventCommentWriteFailureCounter.add(1, attributes);
687                this.serverSentEventCommentWriteDurationHistogram.record(seconds(writeDuration), attributes);
688
689                if (deliveryLag != null)
690                        this.serverSentEventCommentDeliveryLagHistogram.record(seconds(deliveryLag), attributes);
691                if (payloadBytes != null)
692                        this.serverSentEventCommentPayloadSizeHistogram.record(payloadBytes, attributes);
693                if (queueDepth != null)
694                        this.serverSentEventCommentQueueDepthHistogram.record(queueDepth, attributes);
695        }
696
697        @Override
698        public void didDropSseComment(@NonNull SseConnection sseConnection,
699                                                                                                                                @NonNull SseComment sseComment,
700                                                                                                                                @NonNull SseEventDropReason reason,
701                                                                                                                                @Nullable Integer payloadBytes,
702                                                                                                                                @Nullable Integer queueDepth) {
703                requireNonNull(sseConnection);
704                requireNonNull(sseComment);
705                requireNonNull(reason);
706
707                Attributes attributes = Attributes.builder()
708                                .putAll(serverSentEventCommentAttributes(sseConnection, sseComment.getCommentType()))
709                                .put(SSE_DROP_REASON_ATTRIBUTE_KEY, enumValue(reason))
710                                .build();
711                this.serverSentEventCommentDropCounter.add(1, attributes);
712
713                if (payloadBytes != null)
714                        this.serverSentEventCommentPayloadSizeHistogram.record(payloadBytes, attributes);
715                if (queueDepth != null)
716                        this.serverSentEventCommentQueueDepthHistogram.record(queueDepth, attributes);
717        }
718
719        @Override
720        public void didBroadcastSseEvent(@NonNull ResourcePathDeclaration route,
721                                                                                                                                         int attempted,
722                                                                                                                                         int enqueued,
723                                                                                                                                         int dropped) {
724                requireNonNull(route);
725                recordBroadcastTotals(route, BROADCAST_PAYLOAD_EVENT, UNKNOWN_COMMENT_TYPE, attempted, enqueued, dropped);
726        }
727
728        @Override
729        public void didBroadcastSseComment(@NonNull ResourcePathDeclaration route,
730                                                                                                                                                 SseComment.@NonNull CommentType commentType,
731                                                                                                                                                 int attempted,
732                                                                                                                                                 int enqueued,
733                                                                                                                                                 int dropped) {
734                requireNonNull(route);
735                requireNonNull(commentType);
736                recordBroadcastTotals(route, BROADCAST_PAYLOAD_COMMENT, enumValue(commentType), attempted, enqueued, dropped);
737        }
738
739        @NonNull
740        private Attributes serverTypeAttributes(@NonNull ServerType serverType) {
741                requireNonNull(serverType);
742                return Attributes.of(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType));
743        }
744
745        @NonNull
746        private Attributes serverTypeAndReasonAttributes(@NonNull ServerType serverType,
747                                                                                                                                                                                                         @NonNull Enum<?> reason) {
748                requireNonNull(serverType);
749                requireNonNull(reason);
750                return Attributes.builder()
751                                .put(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType))
752                                .put(FAILURE_REASON_ATTRIBUTE_KEY, enumValue(reason))
753                                .build();
754        }
755
756        @NonNull
757        private Attributes activeRequestAttributes(@NonNull ServerType serverType,
758                                                                                                                                                                                 @NonNull Request request) {
759                requireNonNull(serverType);
760                requireNonNull(request);
761
762                var builder = Attributes.builder()
763                                .put(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name());
764
765                if (this.metricNamingStrategy == MetricNamingStrategy.SEMCONV) {
766                        builder.put(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP);
767                } else {
768                        builder.put(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType));
769                }
770
771                return builder.build();
772        }
773
774        @NonNull
775        private Attributes requestAttributes(@NonNull ServerType serverType,
776                                                                                                                                                         @NonNull Request request,
777                                                                                                                                                         @Nullable ResourceMethod resourceMethod,
778                                                                                                                                                         @Nullable Integer statusCode,
779                                                                                                                                                         @Nullable Throwable throwable) {
780                requireNonNull(serverType);
781                requireNonNull(request);
782
783                var builder = Attributes.builder()
784                                .put(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name());
785
786                if (this.metricNamingStrategy == MetricNamingStrategy.SEMCONV) {
787                        builder.put(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP);
788
789                        if (resourceMethod != null)
790                                builder.put(HTTP_ROUTE_ATTRIBUTE_KEY, routeFor(resourceMethod));
791                } else {
792                        builder.put(SERVER_TYPE_ATTRIBUTE_KEY, enumValue(serverType))
793                                        .put(HTTP_ROUTE_ATTRIBUTE_KEY, routeFor(resourceMethod));
794                }
795
796                if (statusCode != null)
797                        builder.put(HTTP_STATUS_CODE_ATTRIBUTE_KEY, statusCode.longValue());
798
799                String errorType = errorTypeFor(statusCode, throwable);
800
801                if (errorType != null)
802                        builder.put(ERROR_TYPE_ATTRIBUTE_KEY, errorType);
803
804                return builder.build();
805        }
806
807        @NonNull
808        private Attributes serverSentEventAttributes(@NonNull SseConnection sseConnection) {
809                requireNonNull(sseConnection);
810                return Attributes.of(
811                                HTTP_ROUTE_ATTRIBUTE_KEY,
812                                routeFor(sseConnection.getResourceMethod())
813                );
814        }
815
816        @NonNull
817        private Attributes serverSentEventCommentAttributes(@NonNull SseConnection sseConnection,
818                                                                                                                                                                                                                        SseComment.@NonNull CommentType commentType) {
819                requireNonNull(sseConnection);
820                requireNonNull(commentType);
821                return Attributes.builder()
822                                .putAll(serverSentEventAttributes(sseConnection))
823                                .put(SSE_COMMENT_TYPE_ATTRIBUTE_KEY, enumValue(commentType))
824                                .build();
825        }
826
827        private void recordBroadcastTotals(@NonNull ResourcePathDeclaration route,
828                                                                                                                                                 @NonNull String payloadType,
829                                                                                                                                                 @NonNull String commentType,
830                                                                                                                                                 int attempted,
831                                                                                                                                                 int enqueued,
832                                                                                                                                                 int dropped) {
833                requireNonNull(route);
834                requireNonNull(payloadType);
835                requireNonNull(commentType);
836
837                Attributes attributes = Attributes.builder()
838                                .put(HTTP_ROUTE_ATTRIBUTE_KEY, route.getPath())
839                                .put(SSE_BROADCAST_PAYLOAD_TYPE_ATTRIBUTE_KEY, payloadType)
840                                .put(SSE_COMMENT_TYPE_ATTRIBUTE_KEY, commentType)
841                                .build();
842
843                if (attempted > 0)
844                        this.serverSentEventBroadcastAttemptCounter.add(attempted, attributes);
845                if (enqueued > 0)
846                        this.serverSentEventBroadcastEnqueuedCounter.add(enqueued, attributes);
847                if (dropped > 0)
848                        this.serverSentEventBroadcastDroppedCounter.add(dropped, attributes);
849        }
850
851        @NonNull
852        private static String activeRequestsMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) {
853                requireNonNull(metricNamingStrategy);
854                if (metricNamingStrategy == MetricNamingStrategy.SEMCONV)
855                        return "http.server.active_requests";
856                return "soklet.server.requests.active";
857        }
858
859        @NonNull
860        private static String requestDurationMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) {
861                requireNonNull(metricNamingStrategy);
862                if (metricNamingStrategy == MetricNamingStrategy.SEMCONV)
863                        return "http.server.request.duration";
864                return "soklet.server.request.duration";
865        }
866
867        @NonNull
868        private static String requestBodySizeMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) {
869                requireNonNull(metricNamingStrategy);
870                if (metricNamingStrategy == MetricNamingStrategy.SEMCONV)
871                        return "http.server.request.body.size";
872                return "soklet.server.request.body.size";
873        }
874
875        @NonNull
876        private static String responseBodySizeMetricNameFor(@NonNull MetricNamingStrategy metricNamingStrategy) {
877                requireNonNull(metricNamingStrategy);
878                if (metricNamingStrategy == MetricNamingStrategy.SEMCONV)
879                        return "http.server.response.body.size";
880                return "soklet.server.response.body.size";
881        }
882
883        @NonNull
884        private static String routeFor(@Nullable ResourceMethod resourceMethod) {
885                if (resourceMethod == null)
886                        return UNMATCHED_ROUTE;
887                return resourceMethod.getResourcePathDeclaration().getPath();
888        }
889
890        @NonNull
891        private static String enumValue(@NonNull Enum<?> value) {
892                requireNonNull(value);
893                return value.name().toLowerCase(Locale.ROOT);
894        }
895
896        @Nullable
897        private String errorTypeFor(@Nullable Integer statusCode,
898                                                                                                                        @Nullable Throwable throwable) {
899                if (throwable != null)
900                        return throwable.getClass().getName();
901
902                if (this.metricNamingStrategy == MetricNamingStrategy.SEMCONV
903                                && statusCode != null
904                                && statusCode >= 500)
905                        return String.valueOf(statusCode);
906
907                return null;
908        }
909
910        private static double seconds(@NonNull Duration duration) {
911                requireNonNull(duration);
912                return duration.toNanos() / 1_000_000_000D;
913        }
914
915        /**
916         * Naming strategy for HTTP metric instrument names.
917         * <p>
918         * SSE- and Soklet-specific concepts remain under the {@code soklet.*} namespace in all strategies.
919         */
920        public enum MetricNamingStrategy {
921                /**
922                 * Use OpenTelemetry Semantic Convention names for standard HTTP server metrics.
923                 */
924                SEMCONV,
925                /**
926                 * Use {@code soklet.*} names for all metrics.
927                 */
928                SOKLET
929        }
930
931        /**
932         * Builder used to construct instances of {@link OpenTelemetryMetricsCollector}.
933         */
934        @NotThreadSafe
935        public static final class Builder {
936                @Nullable
937                private Meter meter;
938                @Nullable
939                private OpenTelemetry openTelemetry;
940                @NonNull
941                private MetricNamingStrategy metricNamingStrategy;
942                @NonNull
943                private String instrumentationName;
944                @Nullable
945                private String instrumentationVersion;
946
947                private Builder() {
948                        this.openTelemetry = GlobalOpenTelemetry.get();
949                        this.metricNamingStrategy = MetricNamingStrategy.SEMCONV;
950                        this.instrumentationName = DEFAULT_INSTRUMENTATION_NAME;
951                        this.instrumentationVersion = null;
952                }
953
954                /**
955                 * Sets a specific meter to use for metric instruments.
956                 *
957                 * @param meter the meter to use
958                 * @return this builder
959                 */
960                @NonNull
961                public Builder meter(@NonNull Meter meter) {
962                        this.meter = requireNonNull(meter);
963                        return this;
964                }
965
966                /**
967                 * Sets the OpenTelemetry API object used to construct a meter if {@link #meter(Meter)} is not set.
968                 *
969                 * @param openTelemetry the OpenTelemetry instance
970                 * @return this builder
971                 */
972                @NonNull
973                public Builder openTelemetry(@NonNull OpenTelemetry openTelemetry) {
974                        this.openTelemetry = requireNonNull(openTelemetry);
975                        return this;
976                }
977
978                /**
979                 * Sets the naming strategy for HTTP metrics.
980                 *
981                 * @param metricNamingStrategy the naming strategy
982                 * @return this builder
983                 */
984                @NonNull
985                public Builder metricNamingStrategy(@NonNull MetricNamingStrategy metricNamingStrategy) {
986                        this.metricNamingStrategy = requireNonNull(metricNamingStrategy);
987                        return this;
988                }
989
990                /**
991                 * Sets the instrumentation scope name to use when constructing a meter.
992                 *
993                 * @param instrumentationName the instrumentation scope name
994                 * @return this builder
995                 */
996                @NonNull
997                public Builder instrumentationName(@NonNull String instrumentationName) {
998                        this.instrumentationName = requireNonNull(instrumentationName);
999                        return this;
1000                }
1001
1002                /**
1003                 * Sets an optional instrumentation scope version to use when constructing a meter.
1004                 *
1005                 * @param instrumentationVersion the instrumentation scope version, or {@code null}
1006                 * @return this builder
1007                 */
1008                @NonNull
1009                public Builder instrumentationVersion(@Nullable String instrumentationVersion) {
1010                        this.instrumentationVersion = instrumentationVersion;
1011                        return this;
1012                }
1013
1014                @NonNull
1015                private Meter resolveMeter() {
1016                        if (this.meter != null)
1017                                return this.meter;
1018
1019                        MeterBuilder meterBuilder = requireNonNull(this.openTelemetry).meterBuilder(this.instrumentationName);
1020
1021                        if (this.instrumentationVersion != null)
1022                                meterBuilder = meterBuilder.setInstrumentationVersion(this.instrumentationVersion);
1023
1024                        return meterBuilder.build();
1025                }
1026
1027                /**
1028                 * Builds the collector.
1029                 *
1030                 * @return the collector instance
1031                 */
1032                @NonNull
1033                public OpenTelemetryMetricsCollector build() {
1034                        return new OpenTelemetryMetricsCollector(this);
1035                }
1036        }
1037}