001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet.otel; 018 019import com.soklet.LifecycleObserver; 020import com.soklet.MarshaledResponse; 021import com.soklet.McpEndpoint; 022import com.soklet.McpJsonRpcError; 023import com.soklet.McpJsonRpcRequestId; 024import com.soklet.McpRequestOutcome; 025import com.soklet.McpSseStream; 026import com.soklet.Request; 027import com.soklet.ResourceMethod; 028import com.soklet.ServerType; 029import com.soklet.SseComment; 030import com.soklet.SseConnection; 031import com.soklet.SseEvent; 032import com.soklet.StreamTermination; 033import com.soklet.StreamTerminationReason; 034import com.soklet.StreamingResponseHandle; 035import com.soklet.TraceContext; 036import com.soklet.TraceStateEntry; 037import io.opentelemetry.api.GlobalOpenTelemetry; 038import io.opentelemetry.api.OpenTelemetry; 039import io.opentelemetry.api.common.AttributeKey; 040import io.opentelemetry.api.common.Attributes; 041import io.opentelemetry.api.trace.Span; 042import io.opentelemetry.api.trace.SpanBuilder; 043import io.opentelemetry.api.trace.SpanContext; 044import io.opentelemetry.api.trace.SpanKind; 045import io.opentelemetry.api.trace.StatusCode; 046import io.opentelemetry.api.trace.TraceFlags; 047import io.opentelemetry.api.trace.TraceState; 048import io.opentelemetry.api.trace.TraceStateBuilder; 049import io.opentelemetry.api.trace.Tracer; 050import io.opentelemetry.api.trace.TracerBuilder; 051import io.opentelemetry.context.Context; 052import org.jspecify.annotations.NonNull; 053import org.jspecify.annotations.Nullable; 054 055import javax.annotation.concurrent.NotThreadSafe; 056import javax.annotation.concurrent.ThreadSafe; 057import java.net.InetSocketAddress; 058import java.time.Duration; 059import java.time.Instant; 060import java.util.List; 061import java.util.Locale; 062import java.util.Map; 063import java.util.concurrent.ConcurrentHashMap; 064import java.util.concurrent.ConcurrentMap; 065import java.util.concurrent.atomic.AtomicBoolean; 066 067import static java.util.Objects.requireNonNull; 068 069/** 070 * OpenTelemetry-backed {@link LifecycleObserver} that emits server spans for Soklet lifecycle events. 071 * <p> 072 * This type complements {@link OpenTelemetryMetricsCollector}: metrics remain low-cardinality aggregate telemetry, 073 * while this observer emits per-request and per-stream spans. 074 * 075 * @author <a href="https://www.revetkn.com">Mark Allen</a> 076 */ 077@ThreadSafe 078public final class OpenTelemetryLifecycleObserver implements LifecycleObserver, AutoCloseable { 079 @NonNull 080 private static final String DEFAULT_INSTRUMENTATION_NAME; 081 @NonNull 082 private static final String URL_SCHEME_HTTP; 083 @NonNull 084 private static final String SERVER_TYPE_HTTP; 085 @NonNull 086 private static final String SERVER_TYPE_SSE; 087 @NonNull 088 private static final String SERVER_TYPE_MCP; 089 090 @NonNull 091 private static final AttributeKey<String> SERVER_TYPE_ATTRIBUTE_KEY; 092 @NonNull 093 private static final AttributeKey<String> HTTP_METHOD_ATTRIBUTE_KEY; 094 @NonNull 095 private static final AttributeKey<String> HTTP_ROUTE_ATTRIBUTE_KEY; 096 @NonNull 097 private static final AttributeKey<String> URL_SCHEME_ATTRIBUTE_KEY; 098 @NonNull 099 private static final AttributeKey<Long> HTTP_STATUS_CODE_ATTRIBUTE_KEY; 100 @NonNull 101 private static final AttributeKey<String> ERROR_TYPE_ATTRIBUTE_KEY; 102 @NonNull 103 private static final AttributeKey<String> CLIENT_ADDRESS_ATTRIBUTE_KEY; 104 @NonNull 105 private static final AttributeKey<String> REQUEST_ID_ATTRIBUTE_KEY; 106 @NonNull 107 private static final AttributeKey<String> STREAM_TERMINATION_REASON_ATTRIBUTE_KEY; 108 @NonNull 109 private static final AttributeKey<String> RPC_SYSTEM_ATTRIBUTE_KEY; 110 @NonNull 111 private static final AttributeKey<String> RPC_METHOD_ATTRIBUTE_KEY; 112 @NonNull 113 private static final AttributeKey<String> MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY; 114 @NonNull 115 private static final AttributeKey<Boolean> MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY; 116 @NonNull 117 private static final AttributeKey<Boolean> MCP_REQUEST_ID_PRESENT_ATTRIBUTE_KEY; 118 @NonNull 119 private static final AttributeKey<String> MCP_REQUEST_OUTCOME_ATTRIBUTE_KEY; 120 @NonNull 121 private static final AttributeKey<Long> MCP_JSON_RPC_ERROR_CODE_ATTRIBUTE_KEY; 122 @NonNull 123 private static final AttributeKey<Boolean> SSE_CLIENT_CONTEXT_PRESENT_ATTRIBUTE_KEY; 124 @NonNull 125 private static final AttributeKey<String> SSE_PAYLOAD_TYPE_ATTRIBUTE_KEY; 126 @NonNull 127 private static final AttributeKey<String> SSE_EVENT_TYPE_ATTRIBUTE_KEY; 128 129 static { 130 DEFAULT_INSTRUMENTATION_NAME = "com.soklet.otel"; 131 URL_SCHEME_HTTP = "http"; 132 SERVER_TYPE_HTTP = "http"; 133 SERVER_TYPE_SSE = "server_sent_event"; 134 SERVER_TYPE_MCP = "mcp"; 135 136 SERVER_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.server.type"); 137 HTTP_METHOD_ATTRIBUTE_KEY = AttributeKey.stringKey("http.request.method"); 138 HTTP_ROUTE_ATTRIBUTE_KEY = AttributeKey.stringKey("http.route"); 139 URL_SCHEME_ATTRIBUTE_KEY = AttributeKey.stringKey("url.scheme"); 140 HTTP_STATUS_CODE_ATTRIBUTE_KEY = AttributeKey.longKey("http.response.status_code"); 141 ERROR_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("error.type"); 142 CLIENT_ADDRESS_ATTRIBUTE_KEY = AttributeKey.stringKey("client.address"); 143 REQUEST_ID_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.request.id"); 144 STREAM_TERMINATION_REASON_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.stream.termination.reason"); 145 RPC_SYSTEM_ATTRIBUTE_KEY = AttributeKey.stringKey("rpc.system"); 146 RPC_METHOD_ATTRIBUTE_KEY = AttributeKey.stringKey("rpc.method"); 147 MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.mcp.endpoint.class"); 148 MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY = AttributeKey.booleanKey("soklet.mcp.session.id.present"); 149 MCP_REQUEST_ID_PRESENT_ATTRIBUTE_KEY = AttributeKey.booleanKey("soklet.mcp.request.id.present"); 150 MCP_REQUEST_OUTCOME_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.mcp.request.outcome"); 151 MCP_JSON_RPC_ERROR_CODE_ATTRIBUTE_KEY = AttributeKey.longKey("rpc.jsonrpc.error_code"); 152 SSE_CLIENT_CONTEXT_PRESENT_ATTRIBUTE_KEY = AttributeKey.booleanKey("soklet.sse.client_context.present"); 153 SSE_PAYLOAD_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.payload.type"); 154 SSE_EVENT_TYPE_ATTRIBUTE_KEY = AttributeKey.stringKey("soklet.sse.event.type"); 155 } 156 157 @NonNull 158 private final Tracer tracer; 159 @NonNull 160 private final SpanNamingStrategy spanNamingStrategy; 161 @NonNull 162 private final SpanPolicy spanPolicy; 163 @NonNull 164 private final ConcurrentMap<IdentityKey<Request>, SpanState> httpRequestSpans; 165 @NonNull 166 private final ConcurrentMap<IdentityKey<Request>, SpanState> mcpRequestSpans; 167 @NonNull 168 private final ConcurrentMap<IdentityKey<SseConnection>, SpanState> sseConnectionSpans; 169 @NonNull 170 private final ConcurrentMap<IdentityKey<McpSseStream>, SpanState> mcpSseStreamSpans; 171 @NonNull 172 private final AtomicBoolean closed; 173 174 @NonNull 175 public static Builder builder() { 176 return new Builder(); 177 } 178 179 @NonNull 180 public static Builder withOpenTelemetry(@NonNull OpenTelemetry openTelemetry) { 181 requireNonNull(openTelemetry); 182 return builder().openTelemetry(openTelemetry); 183 } 184 185 @NonNull 186 public static Builder withTracer(@NonNull Tracer tracer) { 187 requireNonNull(tracer); 188 return builder().tracer(tracer); 189 } 190 191 @NonNull 192 public static OpenTelemetryLifecycleObserver fromOpenTelemetry(@NonNull OpenTelemetry openTelemetry) { 193 return withOpenTelemetry(openTelemetry).build(); 194 } 195 196 @NonNull 197 public static OpenTelemetryLifecycleObserver fromTracer(@NonNull Tracer tracer) { 198 return withTracer(tracer).build(); 199 } 200 201 private OpenTelemetryLifecycleObserver(@NonNull Builder builder) { 202 requireNonNull(builder); 203 204 this.tracer = requireNonNull(builder.resolveTracer()); 205 this.spanNamingStrategy = requireNonNull(builder.spanNamingStrategy); 206 this.spanPolicy = requireNonNull(builder.spanPolicy); 207 this.httpRequestSpans = new ConcurrentHashMap<>(); 208 this.mcpRequestSpans = new ConcurrentHashMap<>(); 209 this.sseConnectionSpans = new ConcurrentHashMap<>(); 210 this.mcpSseStreamSpans = new ConcurrentHashMap<>(); 211 this.closed = new AtomicBoolean(false); 212 } 213 214 @NonNull 215 public Integer getActiveSpanCount() { 216 return this.httpRequestSpans.size() 217 + this.mcpRequestSpans.size() 218 + this.sseConnectionSpans.size() 219 + this.mcpSseStreamSpans.size(); 220 } 221 222 @Override 223 public void close() { 224 if (!this.closed.compareAndSet(false, true)) 225 return; 226 227 drain(this.httpRequestSpans); 228 drain(this.mcpRequestSpans); 229 drain(this.sseConnectionSpans); 230 drain(this.mcpSseStreamSpans); 231 } 232 233 @Override 234 public void didStartRequestHandling(@NonNull ServerType serverType, 235 @NonNull Request request, 236 @Nullable ResourceMethod resourceMethod) { 237 requireNonNull(serverType); 238 requireNonNull(request); 239 240 if (this.closed.get() || !this.spanPolicy.recordHttpRequestSpans()) 241 return; 242 243 safelyRun(() -> { 244 Instant startedAt = Instant.now(); 245 Span span = this.tracer.spanBuilder(this.spanNamingStrategy.httpRequestSpanName(request, resourceMethod)) 246 .setSpanKind(SpanKind.SERVER) 247 .setParent(parentContextFor(request)) 248 .setStartTimestamp(startedAt) 249 .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, serverTypeValue(serverType)) 250 .setAttribute(HTTP_METHOD_ATTRIBUTE_KEY, request.getHttpMethod().name()) 251 .setAttribute(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP) 252 .startSpan(); 253 254 try { 255 setRouteAttribute(span, resourceMethod); 256 setOptionalRequestAttributes(span, request); 257 258 if (this.closed.get()) { 259 endSpanSafely(span); 260 return; 261 } 262 263 SpanState spanState = new SpanState(span, request, resourceMethod, startedAt); 264 storeReplacing(this.httpRequestSpans, new IdentityKey<>(request), spanState); 265 } catch (RuntimeException e) { 266 endSpanSafely(span); 267 throw e; 268 } 269 }); 270 } 271 272 @Override 273 public void didFinishRequestHandling(@NonNull ServerType serverType, 274 @NonNull Request request, 275 @Nullable ResourceMethod resourceMethod, 276 @NonNull MarshaledResponse marshaledResponse, 277 @NonNull Duration duration, 278 @NonNull List<@NonNull Throwable> throwables) { 279 requireNonNull(serverType); 280 requireNonNull(request); 281 requireNonNull(marshaledResponse); 282 requireNonNull(duration); 283 requireNonNull(throwables); 284 285 if (this.closed.get() || !this.spanPolicy.recordHttpRequestSpans()) 286 return; 287 288 IdentityKey<Request> key = new IdentityKey<>(request); 289 SpanState spanState = this.httpRequestSpans.get(key); 290 291 if (spanState == null) 292 return; 293 294 safelyRun(() -> { 295 boolean keepOpen = marshaledResponse.isStreaming() && this.spanPolicy.recordStreamingResponseSpans(); 296 297 try { 298 applyHttpFinish(spanState.span(), resourceMethod, marshaledResponse, throwables); 299 } finally { 300 if (!keepOpen && this.httpRequestSpans.remove(key, spanState)) 301 endSpanSafely(spanState.span(), spanState.startedAt().plus(duration)); 302 } 303 }); 304 } 305 306 @Override 307 public void didTerminateResponseStream(@NonNull StreamingResponseHandle streamingResponse, 308 @NonNull StreamTermination termination) { 309 requireNonNull(streamingResponse); 310 requireNonNull(termination); 311 312 if (this.closed.get() || !this.spanPolicy.recordHttpRequestSpans() || !this.spanPolicy.recordStreamingResponseSpans()) 313 return; 314 315 safelyRun(() -> { 316 IdentityKey<Request> key = new IdentityKey<>(streamingResponse.getRequest()); 317 SpanState spanState = this.httpRequestSpans.remove(key); 318 319 if (spanState == null) 320 spanState = backfilledStreamingSpan(streamingResponse); 321 322 try { 323 applyStreamTermination(spanState.span(), termination); 324 } finally { 325 endSpanSafely(spanState.span(), streamingResponse.getEstablishedAt().plus(termination.getDuration())); 326 } 327 }); 328 } 329 330 @Override 331 public void didEstablishSseConnection(@NonNull SseConnection sseConnection) { 332 requireNonNull(sseConnection); 333 334 if (this.closed.get() || !this.spanPolicy.recordSseConnectionSpans()) 335 return; 336 337 safelyRun(() -> { 338 Span span = this.tracer.spanBuilder(this.spanNamingStrategy.sseConnectionSpanName(sseConnection)) 339 .setSpanKind(SpanKind.SERVER) 340 .setParent(parentContextFor(sseConnection.getRequest())) 341 .setStartTimestamp(sseConnection.getEstablishedAt()) 342 .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, SERVER_TYPE_SSE) 343 .setAttribute(HTTP_METHOD_ATTRIBUTE_KEY, sseConnection.getRequest().getHttpMethod().name()) 344 .setAttribute(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP) 345 .setAttribute(HTTP_ROUTE_ATTRIBUTE_KEY, sseConnection.getResourceMethod().getResourcePathDeclaration().getPath()) 346 .setAttribute(SSE_CLIENT_CONTEXT_PRESENT_ATTRIBUTE_KEY, sseConnection.getClientContext().isPresent()) 347 .startSpan(); 348 349 try { 350 setOptionalRequestAttributes(span, sseConnection.getRequest()); 351 352 if (this.closed.get()) { 353 endSpanSafely(span); 354 return; 355 } 356 357 storeReplacing(this.sseConnectionSpans, new IdentityKey<>(sseConnection), 358 new SpanState(span, sseConnection.getRequest(), sseConnection.getResourceMethod(), sseConnection.getEstablishedAt())); 359 } catch (RuntimeException e) { 360 endSpanSafely(span); 361 throw e; 362 } 363 }); 364 } 365 366 @Override 367 public void didWriteSseEvent(@NonNull SseConnection sseConnection, 368 @NonNull SseEvent sseEvent, 369 @NonNull Duration writeDuration) { 370 requireNonNull(sseConnection); 371 requireNonNull(sseEvent); 372 requireNonNull(writeDuration); 373 374 if (this.closed.get() || !this.spanPolicy.recordSseWriteEvents()) 375 return; 376 377 safelyRun(() -> { 378 SpanState spanState = this.sseConnectionSpans.get(new IdentityKey<>(sseConnection)); 379 380 if (spanState != null) 381 spanState.span().addEvent("sse.event.written", Attributes.of( 382 SSE_PAYLOAD_TYPE_ATTRIBUTE_KEY, "event", 383 SSE_EVENT_TYPE_ATTRIBUTE_KEY, sseEvent.getEvent().orElse("message"))); 384 }); 385 } 386 387 @Override 388 public void didFailToWriteSseEvent(@NonNull SseConnection sseConnection, 389 @NonNull SseEvent sseEvent, 390 @NonNull Duration writeDuration, 391 @NonNull Throwable throwable) { 392 requireNonNull(sseConnection); 393 requireNonNull(sseEvent); 394 requireNonNull(writeDuration); 395 requireNonNull(throwable); 396 397 if (this.closed.get()) 398 return; 399 400 safelyRun(() -> { 401 SpanState spanState = this.sseConnectionSpans.get(new IdentityKey<>(sseConnection)); 402 403 if (spanState != null) 404 recordException(spanState.span(), throwable); 405 }); 406 } 407 408 @Override 409 public void didWriteSseComment(@NonNull SseConnection sseConnection, 410 @NonNull SseComment sseComment, 411 @NonNull Duration writeDuration) { 412 requireNonNull(sseConnection); 413 requireNonNull(sseComment); 414 requireNonNull(writeDuration); 415 416 if (this.closed.get() || !this.spanPolicy.recordSseWriteEvents()) 417 return; 418 419 safelyRun(() -> { 420 SpanState spanState = this.sseConnectionSpans.get(new IdentityKey<>(sseConnection)); 421 422 if (spanState != null) 423 spanState.span().addEvent("sse.comment.written", Attributes.of( 424 SSE_PAYLOAD_TYPE_ATTRIBUTE_KEY, "comment", 425 SSE_EVENT_TYPE_ATTRIBUTE_KEY, enumValue(sseComment.getCommentType()))); 426 }); 427 } 428 429 @Override 430 public void didFailToWriteSseComment(@NonNull SseConnection sseConnection, 431 @NonNull SseComment sseComment, 432 @NonNull Duration writeDuration, 433 @NonNull Throwable throwable) { 434 requireNonNull(sseConnection); 435 requireNonNull(sseComment); 436 requireNonNull(writeDuration); 437 requireNonNull(throwable); 438 439 if (this.closed.get()) 440 return; 441 442 safelyRun(() -> { 443 SpanState spanState = this.sseConnectionSpans.get(new IdentityKey<>(sseConnection)); 444 445 if (spanState != null) 446 recordException(spanState.span(), throwable); 447 }); 448 } 449 450 @Override 451 public void didTerminateSseConnection(@NonNull SseConnection sseConnection, 452 @NonNull StreamTermination termination) { 453 requireNonNull(sseConnection); 454 requireNonNull(termination); 455 456 if (this.closed.get() || !this.spanPolicy.recordSseConnectionSpans()) 457 return; 458 459 safelyRun(() -> { 460 SpanState spanState = this.sseConnectionSpans.remove(new IdentityKey<>(sseConnection)); 461 462 if (spanState == null) 463 return; 464 465 try { 466 applyStreamTermination(spanState.span(), termination); 467 } finally { 468 endSpanSafely(spanState.span(), sseConnection.getEstablishedAt().plus(termination.getDuration())); 469 } 470 }); 471 } 472 473 @Override 474 public void didStartMcpRequestHandling(@NonNull Request request, 475 @NonNull Class<? extends McpEndpoint> endpointClass, 476 @Nullable String sessionId, 477 @NonNull String jsonRpcMethod, 478 @Nullable McpJsonRpcRequestId jsonRpcRequestId) { 479 requireNonNull(request); 480 requireNonNull(endpointClass); 481 requireNonNull(jsonRpcMethod); 482 483 if (this.closed.get() || !this.spanPolicy.recordMcpRequestSpans()) 484 return; 485 486 safelyRun(() -> { 487 Instant startedAt = Instant.now(); 488 Span span = this.tracer.spanBuilder(this.spanNamingStrategy.mcpRequestSpanName(request, endpointClass, jsonRpcMethod)) 489 .setSpanKind(SpanKind.SERVER) 490 .setParent(parentContextFor(request)) 491 .setStartTimestamp(startedAt) 492 .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, SERVER_TYPE_MCP) 493 .setAttribute(RPC_SYSTEM_ATTRIBUTE_KEY, "jsonrpc") 494 .setAttribute(RPC_METHOD_ATTRIBUTE_KEY, jsonRpcMethod) 495 .setAttribute(MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY, endpointClass.getName()) 496 .setAttribute(MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY, sessionId != null) 497 .setAttribute(MCP_REQUEST_ID_PRESENT_ATTRIBUTE_KEY, jsonRpcRequestId != null) 498 .startSpan(); 499 500 try { 501 setOptionalRequestAttributes(span, request); 502 503 if (this.closed.get()) { 504 endSpanSafely(span); 505 return; 506 } 507 508 storeReplacing(this.mcpRequestSpans, new IdentityKey<>(request), new SpanState(span, request, null, startedAt)); 509 } catch (RuntimeException e) { 510 endSpanSafely(span); 511 throw e; 512 } 513 }); 514 } 515 516 @Override 517 public void didFinishMcpRequestHandling(@NonNull Request request, 518 @NonNull Class<? extends McpEndpoint> endpointClass, 519 @Nullable String sessionId, 520 @NonNull String jsonRpcMethod, 521 @Nullable McpJsonRpcRequestId jsonRpcRequestId, 522 @NonNull McpRequestOutcome requestOutcome, 523 @Nullable McpJsonRpcError jsonRpcError, 524 @NonNull Duration duration, 525 @NonNull List<@NonNull Throwable> throwables) { 526 requireNonNull(request); 527 requireNonNull(endpointClass); 528 requireNonNull(jsonRpcMethod); 529 requireNonNull(requestOutcome); 530 requireNonNull(duration); 531 requireNonNull(throwables); 532 533 if (this.closed.get() || !this.spanPolicy.recordMcpRequestSpans()) 534 return; 535 536 safelyRun(() -> { 537 SpanState spanState = this.mcpRequestSpans.remove(new IdentityKey<>(request)); 538 539 if (spanState == null) 540 return; 541 542 try { 543 spanState.span().setAttribute(MCP_REQUEST_OUTCOME_ATTRIBUTE_KEY, enumValue(requestOutcome)); 544 545 if (jsonRpcError != null) { 546 spanState.span().setAttribute(MCP_JSON_RPC_ERROR_CODE_ATTRIBUTE_KEY, jsonRpcError.code().longValue()); 547 spanState.span().setAttribute(ERROR_TYPE_ATTRIBUTE_KEY, "json_rpc_error"); 548 } 549 550 for (Throwable throwable : throwables) 551 recordException(spanState.span(), throwable); 552 553 if (jsonRpcError != null || !throwables.isEmpty() || requestOutcome == McpRequestOutcome.JSON_RPC_ERROR) 554 spanState.span().setStatus(StatusCode.ERROR); 555 } finally { 556 endSpanSafely(spanState.span(), spanState.startedAt().plus(duration)); 557 } 558 }); 559 } 560 561 @Override 562 public void didCreateMcpSession(@NonNull Request request, 563 @NonNull Class<? extends McpEndpoint> endpointClass, 564 @NonNull String sessionId) { 565 requireNonNull(request); 566 requireNonNull(endpointClass); 567 requireNonNull(sessionId); 568 569 if (this.closed.get() || !this.spanPolicy.recordMcpSessionEvents()) 570 return; 571 572 safelyRun(() -> { 573 SpanState spanState = this.mcpRequestSpans.get(new IdentityKey<>(request)); 574 575 if (spanState != null) 576 spanState.span().addEvent("mcp.session.created", Attributes.of( 577 MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY, endpointClass.getName(), 578 MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY, true)); 579 }); 580 } 581 582 @Override 583 public void didEstablishMcpSseStream(@NonNull McpSseStream stream) { 584 requireNonNull(stream); 585 586 if (this.closed.get() || !this.spanPolicy.recordMcpSseStreamSpans()) 587 return; 588 589 safelyRun(() -> { 590 Span span = this.tracer.spanBuilder(this.spanNamingStrategy.mcpSseStreamSpanName(stream)) 591 .setSpanKind(SpanKind.SERVER) 592 .setParent(parentContextFor(stream.getRequest())) 593 .setStartTimestamp(stream.getEstablishedAt()) 594 .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, SERVER_TYPE_MCP) 595 .setAttribute(MCP_ENDPOINT_CLASS_ATTRIBUTE_KEY, stream.getEndpointClass().getName()) 596 .setAttribute(MCP_SESSION_ID_PRESENT_ATTRIBUTE_KEY, true) 597 .startSpan(); 598 599 try { 600 setOptionalRequestAttributes(span, stream.getRequest()); 601 602 if (this.closed.get()) { 603 endSpanSafely(span); 604 return; 605 } 606 607 storeReplacing(this.mcpSseStreamSpans, new IdentityKey<>(stream), new SpanState(span, stream.getRequest(), null, stream.getEstablishedAt())); 608 } catch (RuntimeException e) { 609 endSpanSafely(span); 610 throw e; 611 } 612 }); 613 } 614 615 @Override 616 public void didTerminateMcpSseStream(@NonNull McpSseStream stream, 617 @NonNull StreamTermination termination) { 618 requireNonNull(stream); 619 requireNonNull(termination); 620 621 if (this.closed.get() || !this.spanPolicy.recordMcpSseStreamSpans()) 622 return; 623 624 safelyRun(() -> { 625 SpanState spanState = this.mcpSseStreamSpans.remove(new IdentityKey<>(stream)); 626 627 if (spanState == null) 628 return; 629 630 try { 631 applyStreamTermination(spanState.span(), termination); 632 } finally { 633 endSpanSafely(spanState.span(), stream.getEstablishedAt().plus(termination.getDuration())); 634 } 635 }); 636 } 637 638 private void applyHttpFinish(@NonNull Span span, 639 @Nullable ResourceMethod resourceMethod, 640 @NonNull MarshaledResponse marshaledResponse, 641 @NonNull List<@NonNull Throwable> throwables) { 642 requireNonNull(span); 643 requireNonNull(marshaledResponse); 644 requireNonNull(throwables); 645 646 setRouteAttribute(span, resourceMethod); 647 span.setAttribute(HTTP_STATUS_CODE_ATTRIBUTE_KEY, marshaledResponse.getStatusCode().longValue()); 648 649 for (Throwable throwable : throwables) 650 recordException(span, throwable); 651 652 if (!throwables.isEmpty()) { 653 span.setStatus(StatusCode.ERROR); 654 } else if (marshaledResponse.getStatusCode() >= 500) { 655 span.setAttribute(ERROR_TYPE_ATTRIBUTE_KEY, "http.status_code"); 656 span.setStatus(StatusCode.ERROR); 657 } 658 } 659 660 @NonNull 661 private SpanState backfilledStreamingSpan(@NonNull StreamingResponseHandle stream) { 662 requireNonNull(stream); 663 664 Span span = this.tracer.spanBuilder(this.spanNamingStrategy.streamingResponseSpanName(stream)) 665 .setSpanKind(SpanKind.SERVER) 666 .setParent(parentContextFor(stream.getRequest())) 667 .setStartTimestamp(stream.getEstablishedAt()) 668 .setAttribute(SERVER_TYPE_ATTRIBUTE_KEY, serverTypeValue(stream.getServerType())) 669 .setAttribute(HTTP_METHOD_ATTRIBUTE_KEY, stream.getRequest().getHttpMethod().name()) 670 .setAttribute(URL_SCHEME_ATTRIBUTE_KEY, URL_SCHEME_HTTP) 671 .startSpan(); 672 673 try { 674 setRouteAttribute(span, stream.getResourceMethod().orElse(null)); 675 setOptionalRequestAttributes(span, stream.getRequest()); 676 return new SpanState(span, stream.getRequest(), stream.getResourceMethod().orElse(null), stream.getEstablishedAt()); 677 } catch (RuntimeException e) { 678 endSpanSafely(span); 679 throw e; 680 } 681 } 682 683 private void applyStreamTermination(@NonNull Span span, 684 @NonNull StreamTermination termination) { 685 requireNonNull(span); 686 requireNonNull(termination); 687 688 span.setAttribute(STREAM_TERMINATION_REASON_ATTRIBUTE_KEY, enumValue(termination.getReason())); 689 termination.getCause().ifPresent(throwable -> recordException(span, throwable)); 690 691 if (isError(termination)) 692 span.setStatus(StatusCode.ERROR); 693 } 694 695 private boolean isError(@NonNull StreamTermination termination) { 696 requireNonNull(termination); 697 698 if (termination.getCause().isPresent()) 699 return true; 700 701 return switch (termination.getReason()) { 702 case COMPLETED, CLIENT_DISCONNECTED, SERVER_STOPPING, APPLICATION_CANCELED, SESSION_TERMINATED -> false; 703 case PROTOCOL_UNSUPPORTED, RESPONSE_TIMEOUT, RESPONSE_IDLE_TIMEOUT, BACKPRESSURE, WRITE_FAILED, 704 PRODUCER_FAILED, INTERNAL_ERROR, SIMULATOR_LIMIT_EXCEEDED, UNKNOWN -> true; 705 }; 706 } 707 708 private void recordException(@NonNull Span span, 709 @NonNull Throwable throwable) { 710 requireNonNull(span); 711 requireNonNull(throwable); 712 713 span.recordException(throwable); 714 span.setAttribute(ERROR_TYPE_ATTRIBUTE_KEY, throwable.getClass().getName()); 715 span.setStatus(StatusCode.ERROR); 716 } 717 718 private void safelyRun(@NonNull Runnable runnable) { 719 requireNonNull(runnable); 720 721 try { 722 runnable.run(); 723 } catch (RuntimeException e) { 724 // Telemetry failures must never affect application request handling. 725 } 726 } 727 728 private void endSpanSafely(@NonNull Span span) { 729 requireNonNull(span); 730 731 try { 732 span.end(); 733 } catch (RuntimeException e) { 734 // Telemetry failures must never affect application request handling. 735 } 736 } 737 738 private void endSpanSafely(@NonNull Span span, 739 @NonNull Instant timestamp) { 740 requireNonNull(span); 741 requireNonNull(timestamp); 742 743 try { 744 span.end(timestamp); 745 } catch (RuntimeException e) { 746 // Telemetry failures must never affect application request handling. 747 } 748 } 749 750 private void setRouteAttribute(@NonNull Span span, 751 @Nullable ResourceMethod resourceMethod) { 752 requireNonNull(span); 753 754 if (resourceMethod != null) 755 span.setAttribute(HTTP_ROUTE_ATTRIBUTE_KEY, resourceMethod.getResourcePathDeclaration().getPath()); 756 } 757 758 private void setOptionalRequestAttributes(@NonNull Span span, 759 @NonNull Request request) { 760 requireNonNull(span); 761 requireNonNull(request); 762 763 if (this.spanPolicy.recordClientAddress()) { 764 InetSocketAddress remoteAddress = request.getRemoteAddress().orElse(null); 765 766 if (remoteAddress != null && remoteAddress.getAddress() != null) 767 span.setAttribute(CLIENT_ADDRESS_ATTRIBUTE_KEY, remoteAddress.getAddress().getHostAddress()); 768 } 769 770 if (this.spanPolicy.recordRequestId()) 771 span.setAttribute(REQUEST_ID_ATTRIBUTE_KEY, String.valueOf(request.getId())); 772 } 773 774 @NonNull 775 private Context parentContextFor(@NonNull Request request) { 776 requireNonNull(request); 777 778 TraceContext traceContext = request.getTraceContext().orElse(null); 779 780 if (traceContext == null) 781 return Context.root(); 782 783 try { 784 TraceStateBuilder traceStateBuilder = TraceState.builder(); 785 786 for (TraceStateEntry traceStateEntry : traceContext.getTraceStateEntries()) 787 traceStateBuilder.put(traceStateEntry.getKey(), traceStateEntry.getValue()); 788 789 SpanContext spanContext = SpanContext.createFromRemoteParent( 790 traceContext.getTraceId(), 791 traceContext.getParentId(), 792 TraceFlags.fromByte((byte) (traceContext.getTraceFlags() & 0xFF)), 793 traceStateBuilder.build()); 794 795 if (!spanContext.isValid()) 796 return Context.root(); 797 798 return Span.wrap(spanContext).storeInContext(Context.root()); 799 } catch (RuntimeException e) { 800 return Context.root(); 801 } 802 } 803 804 private <T> void storeReplacing(@NonNull ConcurrentMap<IdentityKey<T>, SpanState> spanStates, 805 @NonNull IdentityKey<T> identityKey, 806 @NonNull SpanState spanState) { 807 requireNonNull(spanStates); 808 requireNonNull(identityKey); 809 requireNonNull(spanState); 810 811 spanStates.compute(identityKey, (key, existingSpanState) -> { 812 if (existingSpanState != null) 813 endServerStopping(existingSpanState); 814 815 if (this.closed.get()) { 816 endSpanSafely(spanState.span()); 817 return null; 818 } 819 820 return spanState; 821 }); 822 } 823 824 private void drain(@NonNull ConcurrentMap<?, SpanState> spanStates) { 825 requireNonNull(spanStates); 826 827 for (Map.Entry<?, SpanState> entry : spanStates.entrySet()) { 828 try { 829 SpanState spanState = entry.getValue(); 830 831 if (spanStates.remove(entry.getKey(), spanState)) 832 endServerStopping(spanState); 833 } catch (RuntimeException e) { 834 // Drain must best-effort every active span even if one entry fails. 835 } 836 } 837 } 838 839 private void endServerStopping(@NonNull SpanState spanState) { 840 requireNonNull(spanState); 841 842 safelyRun(() -> { 843 // SERVER_STOPPING is operational shutdown, not a span error. 844 spanState.span().setAttribute(STREAM_TERMINATION_REASON_ATTRIBUTE_KEY, enumValue(StreamTerminationReason.SERVER_STOPPING)); 845 spanState.span().end(); 846 }); 847 } 848 849 @NonNull 850 private static String serverTypeValue(@NonNull ServerType serverType) { 851 requireNonNull(serverType); 852 853 return switch (serverType) { 854 case STANDARD_HTTP -> SERVER_TYPE_HTTP; 855 case SSE -> SERVER_TYPE_SSE; 856 case MCP -> SERVER_TYPE_MCP; 857 }; 858 } 859 860 @NonNull 861 private static String enumValue(@NonNull Enum<?> value) { 862 requireNonNull(value); 863 return value.name().toLowerCase(Locale.ROOT); 864 } 865 866 @NotThreadSafe 867 public static final class Builder { 868 @Nullable 869 private OpenTelemetry openTelemetry; 870 @Nullable 871 private Tracer tracer; 872 @NonNull 873 private String instrumentationName; 874 @Nullable 875 private String instrumentationVersion; 876 @NonNull 877 private SpanNamingStrategy spanNamingStrategy; 878 @NonNull 879 private SpanPolicy spanPolicy; 880 881 private Builder() { 882 this.openTelemetry = GlobalOpenTelemetry.get(); 883 this.instrumentationName = DEFAULT_INSTRUMENTATION_NAME; 884 this.instrumentationVersion = defaultInstrumentationVersion(); 885 this.spanNamingStrategy = SpanNamingStrategy.defaultInstance(); 886 this.spanPolicy = SpanPolicy.defaultInstance(); 887 } 888 889 @NonNull 890 public Builder openTelemetry(@NonNull OpenTelemetry openTelemetry) { 891 this.openTelemetry = requireNonNull(openTelemetry); 892 this.tracer = null; 893 return this; 894 } 895 896 @NonNull 897 public Builder tracer(@NonNull Tracer tracer) { 898 this.tracer = requireNonNull(tracer); 899 return this; 900 } 901 902 @NonNull 903 public Builder instrumentationName(@NonNull String instrumentationName) { 904 this.instrumentationName = requireNonNull(instrumentationName); 905 this.tracer = null; 906 return this; 907 } 908 909 @NonNull 910 public Builder instrumentationVersion(@Nullable String instrumentationVersion) { 911 this.instrumentationVersion = instrumentationVersion; 912 this.tracer = null; 913 return this; 914 } 915 916 @NonNull 917 public Builder spanNamingStrategy(@NonNull SpanNamingStrategy spanNamingStrategy) { 918 this.spanNamingStrategy = requireNonNull(spanNamingStrategy); 919 return this; 920 } 921 922 @NonNull 923 public Builder spanPolicy(@NonNull SpanPolicy spanPolicy) { 924 this.spanPolicy = requireNonNull(spanPolicy); 925 return this; 926 } 927 928 @NonNull 929 public OpenTelemetryLifecycleObserver build() { 930 return new OpenTelemetryLifecycleObserver(this); 931 } 932 933 @NonNull 934 private Tracer resolveTracer() { 935 if (this.tracer != null) 936 return this.tracer; 937 938 TracerBuilder tracerBuilder = requireNonNull(this.openTelemetry).tracerBuilder(this.instrumentationName); 939 940 if (this.instrumentationVersion != null) 941 tracerBuilder.setInstrumentationVersion(this.instrumentationVersion); 942 943 return tracerBuilder.build(); 944 } 945 } 946 947 @Nullable 948 private static String defaultInstrumentationVersion() { 949 return OpenTelemetryLifecycleObserver.class.getPackage().getImplementationVersion(); 950 } 951 952 private record SpanState( 953 @NonNull Span span, 954 @NonNull Request request, 955 @Nullable ResourceMethod resourceMethod, 956 @NonNull Instant startedAt 957 ) { 958 private SpanState { 959 requireNonNull(span); 960 requireNonNull(request); 961 requireNonNull(startedAt); 962 } 963 } 964 965 @ThreadSafe 966 private static final class IdentityKey<T> { 967 @NonNull 968 private final T value; 969 private final Integer hashCode; 970 971 private IdentityKey(@NonNull T value) { 972 this.value = requireNonNull(value); 973 this.hashCode = System.identityHashCode(value); 974 } 975 976 @Override 977 public boolean equals(@Nullable Object object) { 978 if (this == object) 979 return true; 980 981 if (!(object instanceof IdentityKey<?> identityKey)) 982 return false; 983 984 return this.value == identityKey.value; 985 } 986 987 @Override 988 public int hashCode() { 989 return this.hashCode; 990 } 991 992 @Override 993 @NonNull 994 public String toString() { 995 return "%s{value=%s}".formatted(getClass().getSimpleName(), this.value); 996 } 997 } 998}