13
13
14
14
use Psr \Log \LoggerInterface ;
15
15
use Psr \Log \NullLogger ;
16
+ use Symfony \AI \McpSdk \Exception \TransportNotConnectedException ;
17
+ use Symfony \AI \McpSdk \Message \Error ;
18
+ use Symfony \AI \McpSdk \Message \Request ;
19
+ use Symfony \AI \McpSdk \Message \Response ;
16
20
use Symfony \AI \McpSdk \Server \JsonRpcHandler ;
21
+ use Symfony \AI \McpSdk \Server \KeepAliveSessionInterface ;
22
+ use Symfony \AI \McpSdk \Server \PendingResponse ;
23
+ use Symfony \AI \McpSdk \Server \PendingResponseBag ;
17
24
use Symfony \AI \McpSdk \Server \TransportInterface ;
25
+ use Symfony \Component \Clock \ClockInterface ;
26
+ use Symfony \Component \Uid \Uuid ;
18
27
19
- final readonly class Server
28
+ final class Server
20
29
{
30
+ private ?TransportInterface $ transport = null ;
31
+
32
+ private PendingResponseBag $ pendingResponses ;
33
+
21
34
public function __construct (
22
35
private JsonRpcHandler $ jsonRpcHandler ,
36
+ private ClockInterface $ clock ,
37
+ private ?KeepAliveSessionInterface $ keepAliveSession = null ,
23
38
private LoggerInterface $ logger = new NullLogger (),
24
39
) {
40
+ $ this ->pendingResponses = new PendingResponseBag ($ clock , new \DateInterval ('PT30S ' ));
25
41
}
26
42
27
43
public function connect (TransportInterface $ transport ): void
28
44
{
45
+ $ this ->transport = $ transport ;
46
+
29
47
$ transport ->initialize ();
30
48
$ this ->logger ->info ('Transport initialized ' );
31
49
50
+ $ this ->keepAliveSession ?->start();
51
+
32
52
while ($ transport ->isConnected ()) {
33
53
foreach ($ transport ->receive () as $ message ) {
34
54
if (null === $ message ) {
@@ -41,6 +61,15 @@ public function connect(TransportInterface $transport): void
41
61
continue ;
42
62
}
43
63
64
+ if ($ response instanceof Response || $ response instanceof Error) {
65
+ if ($ this ->pendingResponses ->resolve ($ response )) {
66
+ continue ;
67
+ }
68
+
69
+ $ this ->logger ->warning (\sprintf ('No handler found for response id "%s". ' , $ response ->id ), ['response ' => $ response ]);
70
+ continue ;
71
+ }
72
+
44
73
$ transport ->send ($ response );
45
74
}
46
75
} catch (\JsonException $ e ) {
@@ -52,10 +81,50 @@ public function connect(TransportInterface $transport): void
52
81
}
53
82
}
54
83
55
- usleep (1000 );
84
+ $ this ->pendingResponses ->gc (function (PendingResponse $ pendingResponse , Error $ error ): void {
85
+ $ this ->logger ->warning ('Pending response timed out ' , ['pendingResponse ' => $ pendingResponse , 'error ' => $ error ]);
86
+ });
87
+
88
+ $ this ->keepAliveSession ?->tick(function (): void {
89
+ $ id = (string ) Uuid::v4 ();
90
+
91
+ $ this ->sendRequest (new Request ($ id , 'ping ' , []), function (Response |Error $ response ): void {
92
+ // Per MCP spec, ping errors should terminate the connection, but some clients
93
+ // don't handle this correctly. We may want to consider adding a strict mode with
94
+ // strict error handling.
95
+ if ($ response instanceof Error) {
96
+ $ this ->logger ->warning ('KeepAlive ping returned error response ' , ['error ' => $ response ]);
97
+ }
98
+ });
99
+ });
100
+
101
+ $ this ->clock ->sleep (0.001 );
56
102
}
57
103
104
+ $ this ->keepAliveSession ?->stop();
58
105
$ transport ->close ();
59
106
$ this ->logger ->info ('Transport closed ' );
60
107
}
108
+
109
+ /**
110
+ * @throws \JsonException When JSON encoding fails
111
+ */
112
+ public function sendRequest (Request $ request , ?\Closure $ callback = null ): void
113
+ {
114
+ if (null === $ this ->transport ) {
115
+ throw new TransportNotConnectedException ();
116
+ }
117
+
118
+ $ this ->logger ->info ('Sending request ' , ['request ' => $ request ]);
119
+
120
+ if ([] === $ request ->params ) {
121
+ $ encodedRequest = json_encode ($ request , \JSON_THROW_ON_ERROR | \JSON_FORCE_OBJECT );
122
+ } else {
123
+ $ encodedRequest = json_encode ($ request , \JSON_THROW_ON_ERROR );
124
+ }
125
+
126
+ $ this ->transport ->send ($ encodedRequest );
127
+
128
+ $ this ->pendingResponses ->add (new PendingResponse ($ request ->id , $ this ->clock ->now (), $ callback ));
129
+ }
61
130
}
0 commit comments