33
33
#include < filesystem>
34
34
#include < algorithm>
35
35
36
+ static const char * RPC_DEBUG = std::getenv(" GGML_RPC_DEBUG" );
37
+
38
+ #define LOG_DBG (...) \
39
+ do { if (RPC_DEBUG) GGML_LOG_DEBUG (__VA_ARGS__); } while (0 )
40
+
41
+
36
42
namespace fs = std::filesystem;
37
43
38
44
static constexpr size_t MAX_CHUNK_SIZE = 1024ull * 1024ull * 1024ull ; // 1 GiB
@@ -49,7 +55,7 @@ struct socket_t {
49
55
sockfd_t fd;
50
56
socket_t (sockfd_t fd) : fd(fd) {}
51
57
~socket_t () {
52
- GGML_PRINT_DEBUG (" [%s] closing socket %d\n " , __func__, this ->fd );
58
+ LOG_DBG (" [%s] closing socket %d\n " , __func__, this ->fd );
53
59
#ifdef _WIN32
54
60
closesocket (this ->fd );
55
61
#else
@@ -277,14 +283,14 @@ static std::shared_ptr<socket_t> socket_connect(const char * host, int port) {
277
283
return nullptr ;
278
284
}
279
285
if (!set_no_delay (sockfd)) {
280
- fprintf (stderr, " Failed to set TCP_NODELAY\n " );
286
+ GGML_LOG_ERROR ( " Failed to set TCP_NODELAY\n " );
281
287
return nullptr ;
282
288
}
283
289
addr.sin_family = AF_INET;
284
290
addr.sin_port = htons (port);
285
291
struct hostent * server = gethostbyname (host);
286
292
if (server == NULL ) {
287
- fprintf (stderr, " Cannot resolve host '%s'\n " , host);
293
+ GGML_LOG_ERROR ( " Cannot resolve host '%s'\n " , host);
288
294
return nullptr ;
289
295
}
290
296
memcpy (&addr.sin_addr .s_addr , server->h_addr , server->h_length );
@@ -301,7 +307,7 @@ static std::shared_ptr<socket_t> socket_accept(sockfd_t srv_sockfd) {
301
307
return nullptr ;
302
308
}
303
309
if (!set_no_delay (client_socket_fd)) {
304
- fprintf (stderr, " Failed to set TCP_NODELAY\n " );
310
+ GGML_LOG_ERROR ( " Failed to set TCP_NODELAY\n " );
305
311
return nullptr ;
306
312
}
307
313
return client_socket;
@@ -314,11 +320,11 @@ static std::shared_ptr<socket_t> create_server_socket(const char * host, int por
314
320
return nullptr ;
315
321
}
316
322
if (!set_reuse_addr (sockfd)) {
317
- fprintf (stderr, " Failed to set SO_REUSEADDR\n " );
323
+ GGML_LOG_ERROR ( " Failed to set SO_REUSEADDR\n " );
318
324
return nullptr ;
319
325
}
320
326
if (inet_addr (host) == INADDR_NONE) {
321
- fprintf (stderr, " Invalid host address: %s\n " , host);
327
+ GGML_LOG_ERROR ( " Invalid host address: %s\n " , host);
322
328
return nullptr ;
323
329
}
324
330
struct sockaddr_in serv_addr;
@@ -361,7 +367,7 @@ static bool recv_data(sockfd_t sockfd, void * data, size_t size) {
361
367
return false ;
362
368
}
363
369
if (n == 0 ) {
364
- GGML_LOG_ERROR (" recv returned 0 (peer closed?)\n " );
370
+ LOG_DBG (" recv returned 0 (peer closed?)\n " );
365
371
return false ;
366
372
}
367
373
bytes_recv += (size_t )n;
@@ -395,7 +401,7 @@ static bool recv_msg(sockfd_t sockfd, std::vector<uint8_t> & input) {
395
401
try {
396
402
input.resize (size);
397
403
} catch (const std::bad_alloc & e) {
398
- fprintf (stderr, " Failed to allocate input buffer of size %" PRIu64 " \n " , size);
404
+ GGML_LOG_ERROR ( " Failed to allocate input buffer of size %" PRIu64 " \n " , size);
399
405
return false ;
400
406
}
401
407
return recv_data (sockfd, input.data (), size);
@@ -479,11 +485,11 @@ static bool check_server_version(const std::shared_ptr<socket_t> & sock) {
479
485
status = send_rpc_cmd (sock, RPC_CMD_HELLO, nullptr , 0 , &response, sizeof (response));
480
486
RPC_STATUS_ASSERT (status);
481
487
if (response.major != RPC_PROTO_MAJOR_VERSION || response.minor > RPC_PROTO_MINOR_VERSION) {
482
- fprintf (stderr, " RPC server version mismatch: %d.%d.%d\n " , response.major , response.minor , response.patch );
488
+ GGML_LOG_ERROR ( " RPC server version mismatch: %d.%d.%d\n " , response.major , response.minor , response.patch );
483
489
return false ;
484
490
}
485
491
if (response.minor != RPC_PROTO_MINOR_VERSION || response.patch != RPC_PROTO_PATCH_VERSION) {
486
- fprintf (stderr, " WARNING: RPC server version mismatch: %d.%d.%d\n " , response.major , response.minor , response.patch );
492
+ GGML_LOG_INFO ( " WARNING: RPC server version mismatch: %d.%d.%d\n " , response.major , response.minor , response.patch );
487
493
}
488
494
return true ;
489
495
}
@@ -524,7 +530,7 @@ static std::shared_ptr<socket_t> get_socket(const std::string & endpoint) {
524
530
if (!check_server_version (sock)) {
525
531
return nullptr ;
526
532
}
527
- GGML_PRINT_DEBUG (" [%s] connected to %s, sockfd=%d\n " , __func__, endpoint.c_str (), sock->fd );
533
+ LOG_DBG (" [%s] connected to %s, sockfd=%d\n " , __func__, endpoint.c_str (), sock->fd );
528
534
sockets[endpoint] = sock;
529
535
return sock;
530
536
}
@@ -845,7 +851,7 @@ ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(const char * endpoint) {
845
851
}
846
852
auto sock = get_socket (endpoint);
847
853
if (sock == nullptr ) {
848
- fprintf (stderr, " Failed to connect to %s\n " , endpoint);
854
+ GGML_LOG_ERROR ( " Failed to connect to %s\n " , endpoint);
849
855
return nullptr ;
850
856
}
851
857
size_t alignment = get_alignment (sock);
@@ -954,7 +960,7 @@ void rpc_server::hello(rpc_msg_hello_rsp & response) {
954
960
response.major = RPC_PROTO_MAJOR_VERSION;
955
961
response.minor = RPC_PROTO_MINOR_VERSION;
956
962
response.patch = RPC_PROTO_PATCH_VERSION;
957
- GGML_PRINT_DEBUG (" [%s] version: %d.%d.%d\n " , __func__, response.major , response.minor , response.patch );
963
+ LOG_DBG (" [%s] version: %d.%d.%d\n " , __func__, response.major , response.minor , response.patch );
958
964
}
959
965
960
966
bool rpc_server::get_alloc_size (const rpc_msg_get_alloc_size_req & request, rpc_msg_get_alloc_size_rsp & response) {
@@ -974,15 +980,15 @@ bool rpc_server::get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_
974
980
GGML_LOG_ERROR (" Null tensor pointer passed to server get_alloc_size function.\n " );
975
981
return false ;
976
982
}
977
-
983
+ LOG_DBG ( " [%s] buffer: %p, data: %p \n " , __func__, ( void *)tensor-> buffer , tensor-> data );
978
984
if (tensor->buffer == nullptr ) {
979
985
// No buffer allocated.
980
986
buft = ggml_backend_get_default_buffer_type (backend);
981
987
} else {
982
988
buft = tensor->buffer ->buft ;
983
989
}
984
990
985
- response.alloc_size = ggml_backend_buft_get_alloc_size (buft,tensor);
991
+ response.alloc_size = ggml_backend_buft_get_alloc_size (buft, tensor);
986
992
987
993
return true ;
988
994
}
@@ -996,30 +1002,30 @@ void rpc_server::alloc_buffer(const rpc_msg_alloc_buffer_req & request, rpc_msg_
996
1002
uint64_t rpk = random_id ();
997
1003
response.remote_ptr = rpk;
998
1004
response.remote_size = buffer->size ;
999
- GGML_PRINT_DEBUG (" [%s] size: %" PRIu64 " -> handle: %" PRIu64 " , remote_size: %" PRIu64 " \n " ,
1005
+ LOG_DBG (" [%s] size: %" PRIu64 " -> handle: %" PRIu64 " , remote_size: %" PRIu64 " \n " ,
1000
1006
__func__, request.size , rpk, response.remote_size );
1001
1007
buffers[rpk] = buffer;
1002
1008
} else {
1003
- GGML_LOG_ERROR (" [%s] size: %" PRIu64 " -> failed\n " , __func__, request.size );
1009
+ LOG_DBG (" [%s] size: %" PRIu64 " -> failed\n " , __func__, request.size );
1004
1010
}
1005
1011
}
1006
1012
1007
1013
void rpc_server::get_alignment (rpc_msg_get_alignment_rsp & response) {
1008
1014
ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type (backend);
1009
1015
size_t alignment = ggml_backend_buft_get_alignment (buft);
1010
- GGML_PRINT_DEBUG (" [%s] alignment: %lu\n " , __func__, alignment);
1016
+ LOG_DBG (" [%s] alignment: %lu\n " , __func__, alignment);
1011
1017
response.alignment = alignment;
1012
1018
}
1013
1019
1014
1020
void rpc_server::get_max_size (rpc_msg_get_max_size_rsp & response) {
1015
1021
ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type (backend);
1016
1022
size_t max_size = ggml_backend_buft_get_max_size (buft);
1017
- GGML_PRINT_DEBUG (" [%s] max_size: %lu\n " , __func__, max_size);
1023
+ LOG_DBG (" [%s] max_size: %lu\n " , __func__, max_size);
1018
1024
response.max_size = max_size;
1019
1025
}
1020
1026
1021
1027
bool rpc_server::buffer_get_base (const rpc_msg_buffer_get_base_req & request, rpc_msg_buffer_get_base_rsp & response) {
1022
- GGML_PRINT_DEBUG (" [%s] remote_ptr: %" PRIx64 " \n " , __func__, request.remote_ptr );
1028
+ LOG_DBG (" [%s] remote_ptr: %" PRIx64 " \n " , __func__, request.remote_ptr );
1023
1029
auto it = buffers.find (request.remote_ptr );
1024
1030
if (it == buffers.end ()) {
1025
1031
GGML_LOG_ERROR (" [%s] buffer handle not found: %" PRIu64 " \n " , __func__, request.remote_ptr );
@@ -1032,7 +1038,7 @@ bool rpc_server::buffer_get_base(const rpc_msg_buffer_get_base_req & request, rp
1032
1038
}
1033
1039
1034
1040
bool rpc_server::free_buffer (const rpc_msg_free_buffer_req & request) {
1035
- GGML_PRINT_DEBUG (" [%s] remote_ptr: %" PRIx64 " \n " , __func__, request.remote_ptr );
1041
+ LOG_DBG (" [%s] remote_ptr: %" PRIx64 " \n " , __func__, request.remote_ptr );
1036
1042
auto it = buffers.find (request.remote_ptr );
1037
1043
if (it == buffers.end ()) {
1038
1044
GGML_LOG_ERROR (" [%s] buffer handle not found: %" PRIu64 " \n " , __func__, request.remote_ptr );
@@ -1045,7 +1051,7 @@ bool rpc_server::free_buffer(const rpc_msg_free_buffer_req & request) {
1045
1051
}
1046
1052
1047
1053
bool rpc_server::buffer_clear (const rpc_msg_buffer_clear_req & request) {
1048
- GGML_PRINT_DEBUG (" [%s] remote_ptr: %" PRIx64 " , value: %u\n " , __func__, request.remote_ptr , request.value );
1054
+ LOG_DBG (" [%s] remote_ptr: %" PRIx64 " , value: %u\n " , __func__, request.remote_ptr , request.value );
1049
1055
auto it = buffers.find (request.remote_ptr );
1050
1056
if (it == buffers.end ()) {
1051
1057
GGML_LOG_ERROR (" [%s] buffer handle not found: %" PRIu64 " \n " , __func__, request.remote_ptr );
@@ -1126,7 +1132,7 @@ bool rpc_server::set_tensor(const std::vector<uint8_t> & input) {
1126
1132
GGML_LOG_ERROR (" [%s] error deserializing tensor\n " , __func__);
1127
1133
return false ;
1128
1134
}
1129
- GGML_PRINT_DEBUG (" [%s] buffer: %p, data: %p, offset: %" PRIu64 " , size: %zu\n " , __func__, (void *)tensor->buffer , tensor->data , offset, size);
1135
+ LOG_DBG (" [%s] buffer: %p, data: %p, offset: %" PRIu64 " , size: %zu\n " , __func__, (void *)tensor->buffer , tensor->data , offset, size);
1130
1136
1131
1137
// sanitize tensor->data
1132
1138
{
@@ -1149,7 +1155,7 @@ bool rpc_server::set_tensor(const std::vector<uint8_t> & input) {
1149
1155
fs::path cache_file = fs::path (cache_dir) / hash_str;
1150
1156
std::ofstream ofs (cache_file, std::ios::binary);
1151
1157
ofs.write ((const char *)data, size);
1152
- printf (" [%s] saved to '%s'\n " , __func__, cache_file.c_str ());
1158
+ GGML_LOG_INFO (" [%s] saved to '%s'\n " , __func__, cache_file.c_str ());
1153
1159
}
1154
1160
ggml_backend_tensor_set (tensor, data, offset, size);
1155
1161
return true ;
@@ -1195,8 +1201,8 @@ bool rpc_server::set_tensor_hash(const rpc_msg_set_tensor_hash_req & request, rp
1195
1201
GGML_LOG_ERROR (" [%s] error deserializing tensor\n " , __func__);
1196
1202
return false ;
1197
1203
}
1198
- GGML_PRINT_DEBUG (" [%s] buffer: %p, data: %p, offset: %" PRIu64 " , size: %zu, hash: %" PRIx64 " \n " ,
1199
- __func__, (void *)tensor->buffer , tensor->data , request.offset , size, request.hash );
1204
+ LOG_DBG (" [%s] buffer: %p, data: %p, offset: %" PRIu64 " , size: %zu, hash: %" PRIx64 " \n " ,
1205
+ __func__, (void *)tensor->buffer , tensor->data , request.offset , size, request.hash );
1200
1206
1201
1207
// sanitize tensor->data
1202
1208
{
@@ -1230,7 +1236,7 @@ bool rpc_server::init_tensor(const rpc_msg_init_tensor_req & request) {
1230
1236
GGML_LOG_ERROR (" Null tensor pointer passed to server init_tensor function.\n " );
1231
1237
return false ;
1232
1238
}
1233
-
1239
+ LOG_DBG ( " [%s] buffer: %p, data: %p \n " , __func__, ( void *)tensor-> buffer , tensor-> data );
1234
1240
// Call the backend's buffer_init_tensor function
1235
1241
ggml_backend_buffer_t buffer = tensor->buffer ;
1236
1242
if (buffer && buffer->iface .init_tensor ) {
@@ -1263,7 +1269,7 @@ bool rpc_server::get_tensor(const rpc_msg_get_tensor_req & request, std::vector<
1263
1269
GGML_LOG_ERROR (" [%s] error deserializing tensor\n " , __func__);
1264
1270
return false ;
1265
1271
}
1266
- GGML_PRINT_DEBUG (" [%s] buffer: %p, data: %p, offset: %" PRIu64 " , size: %" PRIu64 " \n " , __func__, (void *)tensor->buffer , tensor->data , request.offset , request.size );
1272
+ LOG_DBG (" [%s] buffer: %p, data: %p, offset: %" PRIu64 " , size: %" PRIu64 " \n " , __func__, (void *)tensor->buffer , tensor->data , request.offset , request.size );
1267
1273
1268
1274
// sanitize tensor->data
1269
1275
{
@@ -1307,7 +1313,7 @@ bool rpc_server::copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_co
1307
1313
uint64_t dst_buf_sz = (uint64_t ) ggml_backend_buffer_get_size(dst->buffer);
1308
1314
1309
1315
if (dst_data + src_size > dst_base + dst_buf_sz) {
1310
- GGML_PRINT_DEBUG (" [%s] out-of-bounds write in rpc_server::copy_tensor:\n "
1316
+ GGML_LOG_ERROR (" [%s] out-of-bounds write in rpc_server::copy_tensor:\n "
1311
1317
" write range : [0x%" PRIx64 " , 0x%" PRIx64 " ]\n "
1312
1318
" buffer base: [0x%" PRIx64 " , 0x%" PRIx64 " ]\n " ,
1313
1319
__func__,
@@ -1318,8 +1324,8 @@ bool rpc_server::copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_co
1318
1324
return false ;
1319
1325
}
1320
1326
1321
- GGML_PRINT_DEBUG (" [%s] src->buffer: %p, dst->buffer: %p\n " ,
1322
- __func__, (void *) src->buffer, (void *) dst->buffer);
1327
+ LOG_DBG (" [%s] src->buffer: %p, dst->buffer: %p\n " ,
1328
+ __func__, (void *) src->buffer, (void *) dst->buffer);
1323
1329
1324
1330
response.result = ggml_backend_buffer_copy_tensor(src, dst);
1325
1331
return true ;
@@ -1395,7 +1401,7 @@ bool rpc_server::graph_compute(const std::vector<uint8_t> & input, rpc_msg_graph
1395
1401
return false ;
1396
1402
}
1397
1403
const rpc_tensor * tensors = (const rpc_tensor *)(input.data () + sizeof (n_nodes) + n_nodes*sizeof (uint64_t ) + sizeof (n_tensors));
1398
- GGML_PRINT_DEBUG (" [%s] n_nodes: %u, n_tensors: %u\n " , __func__, n_nodes, n_tensors);
1404
+ LOG_DBG (" [%s] n_nodes: %u, n_tensors: %u\n " , __func__, n_nodes, n_tensors);
1399
1405
1400
1406
size_t buf_size = ggml_tensor_overhead ()*(n_nodes + n_tensors) + ggml_graph_overhead_custom (n_nodes, false );
1401
1407
@@ -1518,7 +1524,7 @@ static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
1518
1524
}
1519
1525
// the second command sent by the client must be HELLO
1520
1526
if (cmd != RPC_CMD_HELLO) {
1521
- fprintf (stderr, " Expected HELLO command, update client\n " );
1527
+ GGML_LOG_ERROR ( " Expected HELLO command, update client\n " );
1522
1528
return ;
1523
1529
}
1524
1530
if (!recv_msg (sockfd, nullptr , 0 )) {
@@ -1535,7 +1541,7 @@ static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
1535
1541
}
1536
1542
if (cmd >= RPC_CMD_COUNT) {
1537
1543
// fail fast if the command is invalid
1538
- fprintf (stderr, " Unknown command: %d\n " , cmd);
1544
+ GGML_LOG_ERROR ( " Unknown command: %d\n " , cmd);
1539
1545
break ;
1540
1546
}
1541
1547
switch (cmd) {
@@ -1723,7 +1729,7 @@ static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
1723
1729
break ;
1724
1730
}
1725
1731
default : {
1726
- fprintf (stderr, " Unknown command: %d\n " , cmd);
1732
+ GGML_LOG_ERROR ( " Unknown command: %d\n " , cmd);
1727
1733
return ;
1728
1734
}
1729
1735
}
0 commit comments