001/*
002 * Copyright 2022-2026 Revetware LLC.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.soklet.otel;
018
019import com.soklet.LifecycleObserver;
020import com.soklet.MarshaledResponse;
021import com.soklet.McpEndpoint;
022import com.soklet.McpJsonRpcError;
023import com.soklet.McpJsonRpcRequestId;
024import com.soklet.McpRequestOutcome;
025import com.soklet.McpSseStream;
026import com.soklet.Request;
027import com.soklet.ResourceMethod;
028import com.soklet.ServerType;
029import com.soklet.SseComment;
030import com.soklet.SseConnection;
031import com.soklet.SseEvent;
032import com.soklet.StreamTermination;
033import com.soklet.StreamTerminationReason;
034import com.soklet.StreamingResponseHandle;
035import com.soklet.TraceContext;
036import com.soklet.TraceStateEntry;
037import io.opentelemetry.api.GlobalOpenTelemetry;
038import io.opentelemetry.api.OpenTelemetry;
039import io.opentelemetry.api.common.AttributeKey;
040import io.opentelemetry.api.common.Attributes;
041import io.opentelemetry.api.trace.Span;
042import io.opentelemetry.api.trace.SpanBuilder;
043import io.opentelemetry.api.trace.SpanContext;
044import io.opentelemetry.api.trace.SpanKind;
045import io.opentelemetry.api.trace.StatusCode;
046import io.opentelemetry.api.trace.TraceFlags;
047import io.opentelemetry.api.trace.TraceState;
048import io.opentelemetry.api.trace.TraceStateBuilder;
049import io.opentelemetry.api.trace.Tracer;
050import io.opentelemetry.api.trace.TracerBuilder;
051import io.opentelemetry.context.Context;
052import org.jspecify.annotations.NonNull;
053import org.jspecify.annotations.Nullable;
054
055import javax.annotation.concurrent.NotThreadSafe;
056import javax.annotation.concurrent.ThreadSafe;
057import java.net.InetSocketAddress;
058import java.time.Duration;
059import java.time.Instant;
060import java.util.List;
061import java.util.Locale;
062import java.util.Map;
063import java.util.concurrent.ConcurrentHashMap;
064import java.util.concurrent.ConcurrentMap;
065import java.util.concurrent.atomic.AtomicBoolean;
066
067import static java.util.Objects.requireNonNull;
068
069/**
070 * OpenTelemetry-backed {@link LifecycleObserver} that emits server spans for Soklet lifecycle events.
071 * <p>
072 * This type complements {@link OpenTelemetryMetricsCollector}: metrics remain low-cardinality aggregate telemetry,
073 * while this observer emits per-request and per-stream spans.
074 *
075 * @author <a href="https://www.revetkn.com">Mark Allen</a>
076 */
077@ThreadSafe
078public final class OpenTelemetryLifecycleObserver implements LifecycleObserver, AutoCloseable {
079        @NonNull
080        private static final String DEFAULT_INSTRUMENTATION_NAME;
081        @NonNull
082        private static final String URL_SCHEME_HTTP;
083        @NonNull
084        private static final String SERVER_TYPE_HTTP;
085        @NonNull
086        private static final String SERVER_TYPE_SSE;
087        @NonNull
088        private static final String SERVER_TYPE_MCP;
089
090        @NonNull
091        private static final AttributeKey<String> SERVER_TYPE_ATTRIBUTE_KEY;
092        @NonNull
093        private static final AttributeKey<String> HTTP_METHOD_ATTRIBUTE_KEY;
094        @NonNull
095        private static final AttributeKey<String> HTTP_ROUTE_ATTRIBUTE_KEY;
096        @NonNull
097        private static final AttributeKey<String> URL_SCHEME_ATTRIBUTE_KEY;
098        @NonNull
099        private static final AttributeKey<Long> HTTP_STATUS_CODE_ATTRIBUTE_KEY;
100        @NonNull
101        private static final AttributeKey<String> ERROR_TYPE_ATTRIBUTE_KEY;
102        @NonNull
103        private static final AttributeKey<String> CLIENT_ADDRESS_ATTRIBUTE_KEY;
104        @NonNull
105        private static final AttributeKey<String> REQUEST_ID_ATTRIBUTE_KEY;
106        @NonNull
107        private static final AttributeKey<String> STREAM_TERMINATION_REASON_ATTRIBUTE_KEY;
108        @NonNull
109        private static final AttributeKey<String> RPC_SYSTEM_ATTRIBUTE_KEY;
110        @NonNull
111        private static final AttributeKey<String> RPC_METHOD_ATTRIBUTE_KEY;
112        @NonNull
113        private static final AttributeKey<String> MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY;
114        @NonNull
115        private static final AttributeKey<Boolean> MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY;
116        @NonNull
117        private static final AttributeKey<Boolean> MCP_REQUEST_ID_PRESENT_ATTRIBUTE_KEY;
118        @NonNull
119        private static final AttributeKey<String> MCP_REQUEST_OUTCOME_ATTRIBUTE_KEY;
120        @NonNull
121        private static final AttributeKey<Long> MCP_JSON_RPC_ERROR_CODE_ATTRIBUTE_KEY;
122        @NonNull
123        private static final AttributeKey<Boolean> SSE_CLIENT_CONTEXT_PRESENT_ATTRIBUTE_KEY;
124        @NonNull
125        private static final AttributeKey<String> SSE_PAYLOAD_TYPE_ATTRIBUTE_KEY;
126        @NonNull
127        private static final AttributeKey<String> SSE_EVENT_TYPE_ATTRIBUTE_KEY;
128
129        static {
130                DEFAULT_INSTRUMENTATION_NAME = "com.soklet.otel";
131                URL_SCHEME_HTTP = "http";
132                SERVER_TYPE_HTTP = "http";
133                SERVER_TYPE_SSE = "server_sent_event";
134                SERVER_TYPE_MCP = "mcp";
135
136                SERVER_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.server.type");
137                HTTP_METHOD_ATTRIBUTE_KEY = AttributeKey.stringKey("http.request.method");
138                HTTP_ROUTE_ATTRIBUTE_KEY = AttributeKey.stringKey("http.route");
139                URL_SCHEME_ATTRIBUTE_KEY = AttributeKey.stringKey("url.scheme");
140                HTTP_STATUS_CODE_ATTRIBUTE_KEY = AttributeKey.longKey("http.response.status_code");
141                ERROR_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("error.type");
142                CLIENT_ADDRESS_ATTRIBUTE_KEY = AttributeKey.stringKey("client.address");
143                REQUEST_ID_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.request.id");
144                STREAM_TERMINATION_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.stream.termination.reason");
145                RPC_SYSTEM_ATTRIBUTE_KEY = AttributeKey.stringKey("rpc.system");
146                RPC_METHOD_ATTRIBUTE_KEY = AttributeKey.stringKey("rpc.method");
147                MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.mcp.endpoint.class");
148                MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY = AttributeKey.booleanKey("soklet.mcp.session.id.present");
149                MCP_REQUEST_ID_PRESENT_ATTRIBUTE_KEY = AttributeKey.booleanKey("soklet.mcp.request.id.present");
150                MCP_REQUEST_OUTCOME_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.mcp.request.outcome");
151                MCP_JSON_RPC_ERROR_CODE_ATTRIBUTE_KEY = AttributeKey.longKey("rpc.jsonrpc.error_code");
152                SSE_CLIENT_CONTEXT_PRESENT_ATTRIBUTE_KEY = AttributeKey.booleanKey("soklet.sse.client_context.present");
153                SSE_PAYLOAD_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.payload.type");
154                SSE_EVENT_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.event.type");
155        }
156
157        @NonNull
158        private final Tracer tracer;
159        @NonNull
160        private final SpanNamingStrategy spanNamingStrategy;
161        @NonNull
162        private final SpanPolicy spanPolicy;
163        @NonNull
164        private final ConcurrentMap<IdentityKey<Request>, SpanState> httpRequestSpans;
165        @NonNull
166        private final ConcurrentMap<IdentityKey<Request>, SpanState> mcpRequestSpans;
167        @NonNull
168        private final ConcurrentMap<IdentityKey<SseConnection>, SpanState> sseConnectionSpans;
169        @NonNull
170        private final ConcurrentMap<IdentityKey<McpSseStream>, SpanState> mcpSseStreamSpans;
171        @NonNull
172        private final AtomicBoolean closed;
173
174        @NonNull
175        public static Builder builder() {
176                return new Builder();
177        }
178
179        @NonNull
180        public static Builder withOpenTelemetry(@NonNull OpenTelemetry openTelemetry) {
181                requireNonNull(openTelemetry);
182                return builder().openTelemetry(openTelemetry);
183        }
184
185        @NonNull
186        public static Builder withTracer(@NonNull Tracer tracer) {
187                requireNonNull(tracer);
188                return builder().tracer(tracer);
189        }
190
191        @NonNull
192        public static OpenTelemetryLifecycleObserver fromOpenTelemetry(@NonNull OpenTelemetry openTelemetry) {
193                return withOpenTelemetry(openTelemetry).build();
194        }
195
196        @NonNull
197        public static OpenTelemetryLifecycleObserver fromTracer(@NonNull Tracer tracer) {
198                return withTracer(tracer).build();
199        }
200
201        private OpenTelemetryLifecycleObserver(@NonNull Builder builder) {
202                requireNonNull(builder);
203
204                this.tracer = requireNonNull(builder.resolveTracer());
205                this.spanNamingStrategy = requireNonNull(builder.spanNamingStrategy);
206                this.spanPolicy = requireNonNull(builder.spanPolicy);
207                this.httpRequestSpans = new ConcurrentHashMap<>();
208                this.mcpRequestSpans = new ConcurrentHashMap<>();
209                this.sseConnectionSpans = new ConcurrentHashMap<>();
210                this.mcpSseStreamSpans = new ConcurrentHashMap<>();
211                this.closed = new AtomicBoolean(false);
212        }
213
214        @NonNull
215        public Integer getActiveSpanCount() {
216                return this.httpRequestSpans.size()
217                                + this.mcpRequestSpans.size()
218                                + this.sseConnectionSpans.size()
219                                + this.mcpSseStreamSpans.size();
220        }
221
222        @Override
223        public void close() {
224                if (!this.closed.compareAndSet(false, true))
225                        return;
226
227                drain(this.httpRequestSpans);
228                drain(this.mcpRequestSpans);
229                drain(this.sseConnectionSpans);
230                drain(this.mcpSseStreamSpans);
231        }
232
233        @Override
234        public void didStartRequestHandling(@NonNull ServerType serverType,
235                                                                                                                                                        @NonNull Request request,
236                                                                                                                                                        @Nullable ResourceMethod resourceMethod) {
237                requireNonNull(serverType);
238                requireNonNull(request);
239
240                if (this.closed.get() || !this.spanPolicy.recordHttpRequestSpans())
241                        return;
242
243                safelyRun(() -> {
244                        Instant startedAt = Instant.now();
245                        Span span = this.tracer.spanBuilder(this.spanNamingStrategy.httpRequestSpanName(request, resourceMethod))
246                                        .setSpanKind(SpanKind.SERVER)
247                                        .setParent(parentContextFor(request))
248                                        .setStartTimestamp(startedAt)
249                                        .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, serverTypeValue(serverType))
250                                        .setAttribute(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name())
251                                        .setAttribute(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP)
252                                        .startSpan();
253
254                        try {
255                                setRouteAttribute(span, resourceMethod);
256                                setOptionalRequestAttributes(span, request);
257
258                                if (this.closed.get()) {
259                                        endSpanSafely(span);
260                                        return;
261                                }
262
263                                SpanState spanState = new SpanState(span, request, resourceMethod, startedAt);
264                                storeReplacing(this.httpRequestSpans, new IdentityKey<>(request), spanState);
265                        } catch (RuntimeException e) {
266                                endSpanSafely(span);
267                                throw e;
268                        }
269                });
270        }
271
272        @Override
273        public void didFinishRequestHandling(@NonNull ServerType serverType,
274                                                                                                                                                         @NonNull Request request,
275                                                                                                                                                         @Nullable ResourceMethod resourceMethod,
276                                                                                                                                                         @NonNull MarshaledResponse marshaledResponse,
277                                                                                                                                                         @NonNull Duration duration,
278                                                                                                                                                         @NonNull List<@NonNull Throwable> throwables) {
279                requireNonNull(serverType);
280                requireNonNull(request);
281                requireNonNull(marshaledResponse);
282                requireNonNull(duration);
283                requireNonNull(throwables);
284
285                if (this.closed.get() || !this.spanPolicy.recordHttpRequestSpans())
286                        return;
287
288                IdentityKey<Request> key = new IdentityKey<>(request);
289                SpanState spanState = this.httpRequestSpans.get(key);
290
291                if (spanState == null)
292                        return;
293
294                safelyRun(() -> {
295                        boolean keepOpen = marshaledResponse.isStreaming() && this.spanPolicy.recordStreamingResponseSpans();
296
297                        try {
298                                applyHttpFinish(spanState.span(), resourceMethod, marshaledResponse, throwables);
299                        } finally {
300                                if (!keepOpen && this.httpRequestSpans.remove(key, spanState))
301                                        endSpanSafely(spanState.span(), spanState.startedAt().plus(duration));
302                        }
303                });
304        }
305
306        @Override
307        public void didTerminateResponseStream(@NonNull StreamingResponseHandle streamingResponse,
308                                                                                                                                                                 @NonNull StreamTermination termination) {
309                requireNonNull(streamingResponse);
310                requireNonNull(termination);
311
312                if (this.closed.get() || !this.spanPolicy.recordHttpRequestSpans() || !this.spanPolicy.recordStreamingResponseSpans())
313                        return;
314
315                safelyRun(() -> {
316                        IdentityKey<Request> key = new IdentityKey<>(streamingResponse.getRequest());
317                        SpanState spanState = this.httpRequestSpans.remove(key);
318
319                        if (spanState == null)
320                                spanState = backfilledStreamingSpan(streamingResponse);
321
322                        try {
323                                applyStreamTermination(spanState.span(), termination);
324                        } finally {
325                                endSpanSafely(spanState.span(), streamingResponse.getEstablishedAt().plus(termination.getDuration()));
326                        }
327                });
328        }
329
330        @Override
331        public void didEstablishSseConnection(@NonNull SseConnection sseConnection) {
332                requireNonNull(sseConnection);
333
334                if (this.closed.get() || !this.spanPolicy.recordSseConnectionSpans())
335                        return;
336
337                safelyRun(() -> {
338                        Span span = this.tracer.spanBuilder(this.spanNamingStrategy.sseConnectionSpanName(sseConnection))
339                                        .setSpanKind(SpanKind.SERVER)
340                                        .setParent(parentContextFor(sseConnection.getRequest()))
341                                        .setStartTimestamp(sseConnection.getEstablishedAt())
342                                        .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, SERVER_TYPE_SSE)
343                                        .setAttribute(HTTP_METHOD_ATTRIBUTE_KEY, sseConnection.getRequest().getHttpMethod().name())
344                                        .setAttribute(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP)
345                                        .setAttribute(HTTP_ROUTE_ATTRIBUTE_KEY, sseConnection.getResourceMethod().getResourcePathDeclaration().getPath())
346                                        .setAttribute(SSE_CLIENT_CONTEXT_PRESENT_ATTRIBUTE_KEY, sseConnection.getClientContext().isPresent())
347                                        .startSpan();
348
349                        try {
350                                setOptionalRequestAttributes(span, sseConnection.getRequest());
351
352                                if (this.closed.get()) {
353                                        endSpanSafely(span);
354                                        return;
355                                }
356
357                                storeReplacing(this.sseConnectionSpans, new IdentityKey<>(sseConnection),
358                                                new SpanState(span, sseConnection.getRequest(), sseConnection.getResourceMethod(), sseConnection.getEstablishedAt()));
359                        } catch (RuntimeException e) {
360                                endSpanSafely(span);
361                                throw e;
362                        }
363                });
364        }
365
366        @Override
367        public void didWriteSseEvent(@NonNull SseConnection sseConnection,
368                                                                                                                         @NonNull SseEvent sseEvent,
369                                                                                                                         @NonNull Duration writeDuration) {
370                requireNonNull(sseConnection);
371                requireNonNull(sseEvent);
372                requireNonNull(writeDuration);
373
374                if (this.closed.get() || !this.spanPolicy.recordSseWriteEvents())
375                        return;
376
377                safelyRun(() -> {
378                        SpanState spanState = this.sseConnectionSpans.get(new IdentityKey<>(sseConnection));
379
380                        if (spanState != null)
381                                spanState.span().addEvent("sse.event.written", Attributes.of(
382                                                SSE_PAYLOAD_TYPE_ATTRIBUTE_KEY, "event",
383                                                SSE_EVENT_TYPE_ATTRIBUTE_KEY, sseEvent.getEvent().orElse("message")));
384                });
385        }
386
387        @Override
388        public void didFailToWriteSseEvent(@NonNull SseConnection sseConnection,
389                                                                                                                                                 @NonNull SseEvent sseEvent,
390                                                                                                                                                 @NonNull Duration writeDuration,
391                                                                                                                                                 @NonNull Throwable throwable) {
392                requireNonNull(sseConnection);
393                requireNonNull(sseEvent);
394                requireNonNull(writeDuration);
395                requireNonNull(throwable);
396
397                if (this.closed.get())
398                        return;
399
400                safelyRun(() -> {
401                        SpanState spanState = this.sseConnectionSpans.get(new IdentityKey<>(sseConnection));
402
403                        if (spanState != null)
404                                recordException(spanState.span(), throwable);
405                });
406        }
407
408        @Override
409        public void didWriteSseComment(@NonNull SseConnection sseConnection,
410                                                                                                                                 @NonNull SseComment sseComment,
411                                                                                                                                 @NonNull Duration writeDuration) {
412                requireNonNull(sseConnection);
413                requireNonNull(sseComment);
414                requireNonNull(writeDuration);
415
416                if (this.closed.get() || !this.spanPolicy.recordSseWriteEvents())
417                        return;
418
419                safelyRun(() -> {
420                        SpanState spanState = this.sseConnectionSpans.get(new IdentityKey<>(sseConnection));
421
422                        if (spanState != null)
423                                spanState.span().addEvent("sse.comment.written", Attributes.of(
424                                                SSE_PAYLOAD_TYPE_ATTRIBUTE_KEY, "comment",
425                                                SSE_EVENT_TYPE_ATTRIBUTE_KEY, enumValue(sseComment.getCommentType())));
426                });
427        }
428
429        @Override
430        public void didFailToWriteSseComment(@NonNull SseConnection sseConnection,
431                                                                                                                                                         @NonNull SseComment sseComment,
432                                                                                                                                                         @NonNull Duration writeDuration,
433                                                                                                                                                         @NonNull Throwable throwable) {
434                requireNonNull(sseConnection);
435                requireNonNull(sseComment);
436                requireNonNull(writeDuration);
437                requireNonNull(throwable);
438
439                if (this.closed.get())
440                        return;
441
442                safelyRun(() -> {
443                        SpanState spanState = this.sseConnectionSpans.get(new IdentityKey<>(sseConnection));
444
445                        if (spanState != null)
446                                recordException(spanState.span(), throwable);
447                });
448        }
449
450        @Override
451        public void didTerminateSseConnection(@NonNull SseConnection sseConnection,
452                                                                                                                                                                @NonNull StreamTermination termination) {
453                requireNonNull(sseConnection);
454                requireNonNull(termination);
455
456                if (this.closed.get() || !this.spanPolicy.recordSseConnectionSpans())
457                        return;
458
459                safelyRun(() -> {
460                        SpanState spanState = this.sseConnectionSpans.remove(new IdentityKey<>(sseConnection));
461
462                        if (spanState == null)
463                                return;
464
465                        try {
466                                applyStreamTermination(spanState.span(), termination);
467                        } finally {
468                                endSpanSafely(spanState.span(), sseConnection.getEstablishedAt().plus(termination.getDuration()));
469                        }
470                });
471        }
472
473        @Override
474        public void didStartMcpRequestHandling(@NonNull Request request,
475                                                                                                                                                                 @NonNull Class<? extends McpEndpoint> endpointClass,
476                                                                                                                                                                 @Nullable String sessionId,
477                                                                                                                                                                 @NonNull String jsonRpcMethod,
478                                                                                                                                                                 @Nullable McpJsonRpcRequestId jsonRpcRequestId) {
479                requireNonNull(request);
480                requireNonNull(endpointClass);
481                requireNonNull(jsonRpcMethod);
482
483                if (this.closed.get() || !this.spanPolicy.recordMcpRequestSpans())
484                        return;
485
486                safelyRun(() -> {
487                        Instant startedAt = Instant.now();
488                        Span span = this.tracer.spanBuilder(this.spanNamingStrategy.mcpRequestSpanName(request, endpointClass, jsonRpcMethod))
489                                        .setSpanKind(SpanKind.SERVER)
490                                        .setParent(parentContextFor(request))
491                                        .setStartTimestamp(startedAt)
492                                        .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, SERVER_TYPE_MCP)
493                                        .setAttribute(RPC_SYSTEM_ATTRIBUTE_KEY, "jsonrpc")
494                                        .setAttribute(RPC_METHOD_ATTRIBUTE_KEY, jsonRpcMethod)
495                                        .setAttribute(MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY, endpointClass.getName())
496                                        .setAttribute(MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY, sessionId != null)
497                                        .setAttribute(MCP_REQUEST_ID_PRESENT_ATTRIBUTE_KEY, jsonRpcRequestId != null)
498                                        .startSpan();
499
500                        try {
501                                setOptionalRequestAttributes(span, request);
502
503                                if (this.closed.get()) {
504                                        endSpanSafely(span);
505                                        return;
506                                }
507
508                                storeReplacing(this.mcpRequestSpans, new IdentityKey<>(request), new SpanState(span, request, null, startedAt));
509                        } catch (RuntimeException e) {
510                                endSpanSafely(span);
511                                throw e;
512                        }
513                });
514        }
515
516        @Override
517        public void didFinishMcpRequestHandling(@NonNull Request request,
518                                                                                                                                                                        @NonNull Class<? extends McpEndpoint> endpointClass,
519                                                                                                                                                                        @Nullable String sessionId,
520                                                                                                                                                                        @NonNull String jsonRpcMethod,
521                                                                                                                                                                        @Nullable McpJsonRpcRequestId jsonRpcRequestId,
522                                                                                                                                                                        @NonNull McpRequestOutcome requestOutcome,
523                                                                                                                                                                        @Nullable McpJsonRpcError jsonRpcError,
524                                                                                                                                                                        @NonNull Duration duration,
525                                                                                                                                                                        @NonNull List<@NonNull Throwable> throwables) {
526                requireNonNull(request);
527                requireNonNull(endpointClass);
528                requireNonNull(jsonRpcMethod);
529                requireNonNull(requestOutcome);
530                requireNonNull(duration);
531                requireNonNull(throwables);
532
533                if (this.closed.get() || !this.spanPolicy.recordMcpRequestSpans())
534                        return;
535
536                safelyRun(() -> {
537                        SpanState spanState = this.mcpRequestSpans.remove(new IdentityKey<>(request));
538
539                        if (spanState == null)
540                                return;
541
542                        try {
543                                spanState.span().setAttribute(MCP_REQUEST_OUTCOME_ATTRIBUTE_KEY, enumValue(requestOutcome));
544
545                                if (jsonRpcError != null) {
546                                        spanState.span().setAttribute(MCP_JSON_RPC_ERROR_CODE_ATTRIBUTE_KEY, jsonRpcError.code().longValue());
547                                        spanState.span().setAttribute(ERROR_TYPE_ATTRIBUTE_KEY, "json_rpc_error");
548                                }
549
550                                for (Throwable throwable : throwables)
551                                        recordException(spanState.span(), throwable);
552
553                                if (jsonRpcError != null || !throwables.isEmpty() || requestOutcome == McpRequestOutcome.JSON_RPC_ERROR)
554                                        spanState.span().setStatus(StatusCode.ERROR);
555                        } finally {
556                                endSpanSafely(spanState.span(), spanState.startedAt().plus(duration));
557                        }
558                });
559        }
560
561        @Override
562        public void didCreateMcpSession(@NonNull Request request,
563                                                                                                                                        @NonNull Class<? extends McpEndpoint> endpointClass,
564                                                                                                                                        @NonNull String sessionId) {
565                requireNonNull(request);
566                requireNonNull(endpointClass);
567                requireNonNull(sessionId);
568
569                if (this.closed.get() || !this.spanPolicy.recordMcpSessionEvents())
570                        return;
571
572                safelyRun(() -> {
573                        SpanState spanState = this.mcpRequestSpans.get(new IdentityKey<>(request));
574
575                        if (spanState != null)
576                                spanState.span().addEvent("mcp.session.created", Attributes.of(
577                                                MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY, endpointClass.getName(),
578                                                MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY, true));
579                });
580        }
581
582        @Override
583        public void didEstablishMcpSseStream(@NonNull McpSseStream stream) {
584                requireNonNull(stream);
585
586                if (this.closed.get() || !this.spanPolicy.recordMcpSseStreamSpans())
587                        return;
588
589                safelyRun(() -> {
590                        Span span = this.tracer.spanBuilder(this.spanNamingStrategy.mcpSseStreamSpanName(stream))
591                                        .setSpanKind(SpanKind.SERVER)
592                                        .setParent(parentContextFor(stream.getRequest()))
593                                        .setStartTimestamp(stream.getEstablishedAt())
594                                        .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, SERVER_TYPE_MCP)
595                                        .setAttribute(MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY, stream.getEndpointClass().getName())
596                                        .setAttribute(MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY, true)
597                                        .startSpan();
598
599                        try {
600                                setOptionalRequestAttributes(span, stream.getRequest());
601
602                                if (this.closed.get()) {
603                                        endSpanSafely(span);
604                                        return;
605                                }
606
607                                storeReplacing(this.mcpSseStreamSpans, new IdentityKey<>(stream), new SpanState(span, stream.getRequest(), null, stream.getEstablishedAt()));
608                        } catch (RuntimeException e) {
609                                endSpanSafely(span);
610                                throw e;
611                        }
612                });
613        }
614
615        @Override
616        public void didTerminateMcpSseStream(@NonNull McpSseStream stream,
617                                                                                                                                                         @NonNull StreamTermination termination) {
618                requireNonNull(stream);
619                requireNonNull(termination);
620
621                if (this.closed.get() || !this.spanPolicy.recordMcpSseStreamSpans())
622                        return;
623
624                safelyRun(() -> {
625                        SpanState spanState = this.mcpSseStreamSpans.remove(new IdentityKey<>(stream));
626
627                        if (spanState == null)
628                                return;
629
630                        try {
631                                applyStreamTermination(spanState.span(), termination);
632                        } finally {
633                                endSpanSafely(spanState.span(), stream.getEstablishedAt().plus(termination.getDuration()));
634                        }
635                });
636        }
637
638        private void applyHttpFinish(@NonNull Span span,
639                                                                                                                         @Nullable ResourceMethod resourceMethod,
640                                                                                                                         @NonNull MarshaledResponse marshaledResponse,
641                                                                                                                         @NonNull List<@NonNull Throwable> throwables) {
642                requireNonNull(span);
643                requireNonNull(marshaledResponse);
644                requireNonNull(throwables);
645
646                setRouteAttribute(span, resourceMethod);
647                span.setAttribute(HTTP_STATUS_CODE_ATTRIBUTE_KEY, marshaledResponse.getStatusCode().longValue());
648
649                for (Throwable throwable : throwables)
650                        recordException(span, throwable);
651
652                if (!throwables.isEmpty()) {
653                        span.setStatus(StatusCode.ERROR);
654                } else if (marshaledResponse.getStatusCode() >= 500) {
655                        span.setAttribute(ERROR_TYPE_ATTRIBUTE_KEY, "http.status_code");
656                        span.setStatus(StatusCode.ERROR);
657                }
658        }
659
660        @NonNull
661        private SpanState backfilledStreamingSpan(@NonNull StreamingResponseHandle stream) {
662                requireNonNull(stream);
663
664                Span span = this.tracer.spanBuilder(this.spanNamingStrategy.streamingResponseSpanName(stream))
665                                .setSpanKind(SpanKind.SERVER)
666                                .setParent(parentContextFor(stream.getRequest()))
667                                .setStartTimestamp(stream.getEstablishedAt())
668                                .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, serverTypeValue(stream.getServerType()))
669                                .setAttribute(HTTP_METHOD_ATTRIBUTE_KEY, stream.getRequest().getHttpMethod().name())
670                                .setAttribute(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP)
671                                .startSpan();
672
673                try {
674                        setRouteAttribute(span, stream.getResourceMethod().orElse(null));
675                        setOptionalRequestAttributes(span, stream.getRequest());
676                        return new SpanState(span, stream.getRequest(), stream.getResourceMethod().orElse(null), stream.getEstablishedAt());
677                } catch (RuntimeException e) {
678                        endSpanSafely(span);
679                        throw e;
680                }
681        }
682
683        private void applyStreamTermination(@NonNull Span span,
684                                                                                                                                                        @NonNull StreamTermination termination) {
685                requireNonNull(span);
686                requireNonNull(termination);
687
688                span.setAttribute(STREAM_TERMINATION_REASON_ATTRIBUTE_KEY, enumValue(termination.getReason()));
689                termination.getCause().ifPresent(throwable -> recordException(span, throwable));
690
691                if (isError(termination))
692                        span.setStatus(StatusCode.ERROR);
693        }
694
695        private boolean isError(@NonNull StreamTermination termination) {
696                requireNonNull(termination);
697
698                if (termination.getCause().isPresent())
699                        return true;
700
701                return switch (termination.getReason()) {
702                        case COMPLETED, CLIENT_DISCONNECTED, SERVER_STOPPING, APPLICATION_CANCELED, SESSION_TERMINATED -> false;
703                        case PROTOCOL_UNSUPPORTED, RESPONSE_TIMEOUT, RESPONSE_IDLE_TIMEOUT, BACKPRESSURE, WRITE_FAILED,
704                                         PRODUCER_FAILED, INTERNAL_ERROR, SIMULATOR_LIMIT_EXCEEDED, UNKNOWN -> true;
705                };
706        }
707
708        private void recordException(@NonNull Span span,
709                                                                                                                         @NonNull Throwable throwable) {
710                requireNonNull(span);
711                requireNonNull(throwable);
712
713                span.recordException(throwable);
714                span.setAttribute(ERROR_TYPE_ATTRIBUTE_KEY, throwable.getClass().getName());
715                span.setStatus(StatusCode.ERROR);
716        }
717
718        private void safelyRun(@NonNull Runnable runnable) {
719                requireNonNull(runnable);
720
721                try {
722                        runnable.run();
723                } catch (RuntimeException e) {
724                        // Telemetry failures must never affect application request handling.
725                }
726        }
727
728        private void endSpanSafely(@NonNull Span span) {
729                requireNonNull(span);
730
731                try {
732                        span.end();
733                } catch (RuntimeException e) {
734                        // Telemetry failures must never affect application request handling.
735                }
736        }
737
738        private void endSpanSafely(@NonNull Span span,
739                                                                                                                 @NonNull Instant timestamp) {
740                requireNonNull(span);
741                requireNonNull(timestamp);
742
743                try {
744                        span.end(timestamp);
745                } catch (RuntimeException e) {
746                        // Telemetry failures must never affect application request handling.
747                }
748        }
749
750        private void setRouteAttribute(@NonNull Span span,
751                                                                                                                                 @Nullable ResourceMethod resourceMethod) {
752                requireNonNull(span);
753
754                if (resourceMethod != null)
755                        span.setAttribute(HTTP_ROUTE_ATTRIBUTE_KEY, resourceMethod.getResourcePathDeclaration().getPath());
756        }
757
758        private void setOptionalRequestAttributes(@NonNull Span span,
759                                                                                                                                                                                @NonNull Request request) {
760                requireNonNull(span);
761                requireNonNull(request);
762
763                if (this.spanPolicy.recordClientAddress()) {
764                        InetSocketAddress remoteAddress = request.getRemoteAddress().orElse(null);
765
766                        if (remoteAddress != null && remoteAddress.getAddress() != null)
767                                span.setAttribute(CLIENT_ADDRESS_ATTRIBUTE_KEY, remoteAddress.getAddress().getHostAddress());
768                }
769
770                if (this.spanPolicy.recordRequestId())
771                        span.setAttribute(REQUEST_ID_ATTRIBUTE_KEY, String.valueOf(request.getId()));
772        }
773
774        @NonNull
775        private Context parentContextFor(@NonNull Request request) {
776                requireNonNull(request);
777
778                TraceContext traceContext = request.getTraceContext().orElse(null);
779
780                if (traceContext == null)
781                        return Context.root();
782
783                try {
784                        TraceStateBuilder traceStateBuilder = TraceState.builder();
785
786                        for (TraceStateEntry traceStateEntry : traceContext.getTraceStateEntries())
787                                traceStateBuilder.put(traceStateEntry.getKey(), traceStateEntry.getValue());
788
789                        SpanContext spanContext = SpanContext.createFromRemoteParent(
790                                        traceContext.getTraceId(),
791                                        traceContext.getParentId(),
792                                        TraceFlags.fromByte((byte) (traceContext.getTraceFlags() & 0xFF)),
793                                        traceStateBuilder.build());
794
795                        if (!spanContext.isValid())
796                                return Context.root();
797
798                        return Span.wrap(spanContext).storeInContext(Context.root());
799                } catch (RuntimeException e) {
800                        return Context.root();
801                }
802        }
803
804        private <T> void storeReplacing(@NonNull ConcurrentMap<IdentityKey<T>, SpanState> spanStates,
805                                                                                                                                        @NonNull IdentityKey<T> identityKey,
806                                                                                                                                        @NonNull SpanState spanState) {
807                requireNonNull(spanStates);
808                requireNonNull(identityKey);
809                requireNonNull(spanState);
810
811                spanStates.compute(identityKey, (key, existingSpanState) -> {
812                        if (existingSpanState != null)
813                                endServerStopping(existingSpanState);
814
815                        if (this.closed.get()) {
816                                endSpanSafely(spanState.span());
817                                return null;
818                        }
819
820                        return spanState;
821                });
822        }
823
824        private void drain(@NonNull ConcurrentMap<?, SpanState> spanStates) {
825                requireNonNull(spanStates);
826
827                for (Map.Entry<?, SpanState> entry : spanStates.entrySet()) {
828                        try {
829                                SpanState spanState = entry.getValue();
830
831                                if (spanStates.remove(entry.getKey(), spanState))
832                                        endServerStopping(spanState);
833                        } catch (RuntimeException e) {
834                                // Drain must best-effort every active span even if one entry fails.
835                        }
836                }
837        }
838
839        private void endServerStopping(@NonNull SpanState spanState) {
840                requireNonNull(spanState);
841
842                safelyRun(() -> {
843                        // SERVER_STOPPING is operational shutdown, not a span error.
844                        spanState.span().setAttribute(STREAM_TERMINATION_REASON_ATTRIBUTE_KEY, enumValue(StreamTerminationReason.SERVER_STOPPING));
845                        spanState.span().end();
846                });
847        }
848
849        @NonNull
850        private static String serverTypeValue(@NonNull ServerType serverType) {
851                requireNonNull(serverType);
852
853                return switch (serverType) {
854                        case STANDARD_HTTP -> SERVER_TYPE_HTTP;
855                        case SSE -> SERVER_TYPE_SSE;
856                        case MCP -> SERVER_TYPE_MCP;
857                };
858        }
859
860        @NonNull
861        private static String enumValue(@NonNull Enum<?> value) {
862                requireNonNull(value);
863                return value.name().toLowerCase(Locale.ROOT);
864        }
865
866        @NotThreadSafe
867        public static final class Builder {
868                @Nullable
869                private OpenTelemetry openTelemetry;
870                @Nullable
871                private Tracer tracer;
872                @NonNull
873                private String instrumentationName;
874                @Nullable
875                private String instrumentationVersion;
876                @NonNull
877                private SpanNamingStrategy spanNamingStrategy;
878                @NonNull
879                private SpanPolicy spanPolicy;
880
881                private Builder() {
882                        this.openTelemetry = GlobalOpenTelemetry.get();
883                        this.instrumentationName = DEFAULT_INSTRUMENTATION_NAME;
884                        this.instrumentationVersion = defaultInstrumentationVersion();
885                        this.spanNamingStrategy = SpanNamingStrategy.defaultInstance();
886                        this.spanPolicy = SpanPolicy.defaultInstance();
887                }
888
889                @NonNull
890                public Builder openTelemetry(@NonNull OpenTelemetry openTelemetry) {
891                        this.openTelemetry = requireNonNull(openTelemetry);
892                        this.tracer = null;
893                        return this;
894                }
895
896                @NonNull
897                public Builder tracer(@NonNull Tracer tracer) {
898                        this.tracer = requireNonNull(tracer);
899                        return this;
900                }
901
902                @NonNull
903                public Builder instrumentationName(@NonNull String instrumentationName) {
904                        this.instrumentationName = requireNonNull(instrumentationName);
905                        this.tracer = null;
906                        return this;
907                }
908
909                @NonNull
910                public Builder instrumentationVersion(@Nullable String instrumentationVersion) {
911                        this.instrumentationVersion = instrumentationVersion;
912                        this.tracer = null;
913                        return this;
914                }
915
916                @NonNull
917                public Builder spanNamingStrategy(@NonNull SpanNamingStrategy spanNamingStrategy) {
918                        this.spanNamingStrategy = requireNonNull(spanNamingStrategy);
919                        return this;
920                }
921
922                @NonNull
923                public Builder spanPolicy(@NonNull SpanPolicy spanPolicy) {
924                        this.spanPolicy = requireNonNull(spanPolicy);
925                        return this;
926                }
927
928                @NonNull
929                public OpenTelemetryLifecycleObserver build() {
930                        return new OpenTelemetryLifecycleObserver(this);
931                }
932
933                @NonNull
934                private Tracer resolveTracer() {
935                        if (this.tracer != null)
936                                return this.tracer;
937
938                        TracerBuilder tracerBuilder = requireNonNull(this.openTelemetry).tracerBuilder(this.instrumentationName);
939
940                        if (this.instrumentationVersion != null)
941                                tracerBuilder.setInstrumentationVersion(this.instrumentationVersion);
942
943                        return tracerBuilder.build();
944                }
945        }
946
947        @Nullable
948        private static String defaultInstrumentationVersion() {
949                return OpenTelemetryLifecycleObserver.class.getPackage().getImplementationVersion();
950        }
951
952        private record SpanState(
953                        @NonNull Span span,
954                        @NonNull Request request,
955                        @Nullable ResourceMethod resourceMethod,
956                        @NonNull Instant startedAt
957        ) {
958                private SpanState {
959                        requireNonNull(span);
960                        requireNonNull(request);
961                        requireNonNull(startedAt);
962                }
963        }
964
965        @ThreadSafe
966        private static final class IdentityKey<T> {
967                @NonNull
968                private final T value;
969                private final Integer hashCode;
970
971                private IdentityKey(@NonNull T value) {
972                        this.value = requireNonNull(value);
973                        this.hashCode = System.identityHashCode(value);
974                }
975
976                @Override
977                public boolean equals(@Nullable Object object) {
978                        if (this == object)
979                                return true;
980
981                        if (!(object instanceof IdentityKey<?> identityKey))
982                                return false;
983
984                        return this.value == identityKey.value;
985                }
986
987                @Override
988                public int hashCode() {
989                        return this.hashCode;
990                }
991
992                @Override
993                @NonNull
994                public String toString() {
995                        return "%s{value=%s}".formatted(getClass().getSimpleName(), this.value);
996                }
997        }
998}