Skip to content

Commit c498fc8

Browse files
authored
rpc : use ggml logging facilities
Use RPC_DEBUG environment variable to enable debug messages. Add helper macro LOG_DBG() which does an early check of the env var before calling GGML_LOG_DEBUG(). Make sure we log a debug message for every server function.
1 parent e7a5130 commit c498fc8

File tree

1 file changed

+41
-35
lines changed

1 file changed

+41
-35
lines changed

ggml/src/ggml-rpc/ggml-rpc.cpp

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@
3131
#include <filesystem>
3232
#include <algorithm>
3333

34+
static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG");
35+
36+
#define LOG_DBG(...) \
37+
do { if (RPC_DEBUG) GGML_LOG_DEBUG(__VA_ARGS__); } while (0)
38+
39+
3440
namespace fs = std::filesystem;
3541

3642
static constexpr size_t MAX_CHUNK_SIZE = 1024ull * 1024ull * 1024ull; // 1 GiB
@@ -47,7 +53,7 @@ struct socket_t {
4753
sockfd_t fd;
4854
socket_t(sockfd_t fd) : fd(fd) {}
4955
~socket_t() {
50-
GGML_PRINT_DEBUG("[%s] closing socket %d\n", __func__, this->fd);
56+
LOG_DBG("[%s] closing socket %d\n", __func__, this->fd);
5157
#ifdef _WIN32
5258
closesocket(this->fd);
5359
#else
@@ -265,14 +271,14 @@ static std::shared_ptr<socket_t> socket_connect(const char * host, int port) {
265271
return nullptr;
266272
}
267273
if (!set_no_delay(sockfd)) {
268-
fprintf(stderr, "Failed to set TCP_NODELAY\n");
274+
GGML_LOG_ERROR("Failed to set TCP_NODELAY\n");
269275
return nullptr;
270276
}
271277
addr.sin_family = AF_INET;
272278
addr.sin_port = htons(port);
273279
struct hostent * server = gethostbyname(host);
274280
if (server == NULL) {
275-
fprintf(stderr, "Cannot resolve host '%s'\n", host);
281+
GGML_LOG_ERROR("Cannot resolve host '%s'\n", host);
276282
return nullptr;
277283
}
278284
memcpy(&addr.sin_addr.s_addr, server->h_addr, server->h_length);
@@ -289,7 +295,7 @@ static std::shared_ptr<socket_t> socket_accept(sockfd_t srv_sockfd) {
289295
return nullptr;
290296
}
291297
if (!set_no_delay(client_socket_fd)) {
292-
fprintf(stderr, "Failed to set TCP_NODELAY\n");
298+
GGML_LOG_ERROR("Failed to set TCP_NODELAY\n");
293299
return nullptr;
294300
}
295301
return client_socket;
@@ -302,11 +308,11 @@ static std::shared_ptr<socket_t> create_server_socket(const char * host, int por
302308
return nullptr;
303309
}
304310
if (!set_reuse_addr(sockfd)) {
305-
fprintf(stderr, "Failed to set SO_REUSEADDR\n");
311+
GGML_LOG_ERROR("Failed to set SO_REUSEADDR\n");
306312
return nullptr;
307313
}
308314
if (inet_addr(host) == INADDR_NONE) {
309-
fprintf(stderr, "Invalid host address: %s\n", host);
315+
GGML_LOG_ERROR("Invalid host address: %s\n", host);
310316
return nullptr;
311317
}
312318
struct sockaddr_in serv_addr;
@@ -349,7 +355,7 @@ static bool recv_data(sockfd_t sockfd, void * data, size_t size) {
349355
return false;
350356
}
351357
if (n == 0) {
352-
GGML_LOG_ERROR("recv returned 0 (peer closed?)\n");
358+
LOG_DBG("recv returned 0 (peer closed?)\n");
353359
return false;
354360
}
355361
bytes_recv += (size_t)n;
@@ -383,7 +389,7 @@ static bool recv_msg(sockfd_t sockfd, std::vector<uint8_t> & input) {
383389
try {
384390
input.resize(size);
385391
} catch (const std::bad_alloc & e) {
386-
fprintf(stderr, "Failed to allocate input buffer of size %" PRIu64 "\n", size);
392+
GGML_LOG_ERROR("Failed to allocate input buffer of size %" PRIu64 "\n", size);
387393
return false;
388394
}
389395
return recv_data(sockfd, input.data(), size);
@@ -443,11 +449,11 @@ static bool check_server_version(const std::shared_ptr<socket_t> & sock) {
443449
bool status = send_rpc_cmd(sock, RPC_CMD_HELLO, nullptr, 0, &response, sizeof(response));
444450
RPC_STATUS_ASSERT(status);
445451
if (response.major != RPC_PROTO_MAJOR_VERSION || response.minor > RPC_PROTO_MINOR_VERSION) {
446-
fprintf(stderr, "RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
452+
GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
447453
return false;
448454
}
449455
if (response.minor != RPC_PROTO_MINOR_VERSION || response.patch != RPC_PROTO_PATCH_VERSION) {
450-
fprintf(stderr, "WARNING: RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
456+
GGML_LOG_INFO("WARNING: RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
451457
}
452458
return true;
453459
}
@@ -488,7 +494,7 @@ static std::shared_ptr<socket_t> get_socket(const std::string & endpoint) {
488494
if (!check_server_version(sock)) {
489495
return nullptr;
490496
}
491-
GGML_PRINT_DEBUG("[%s] connected to %s, sockfd=%d\n", __func__, endpoint.c_str(), sock->fd);
497+
LOG_DBG("[%s] connected to %s, sockfd=%d\n", __func__, endpoint.c_str(), sock->fd);
492498
sockets[endpoint] = sock;
493499
return sock;
494500
}
@@ -809,7 +815,7 @@ ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(const char * endpoint) {
809815
}
810816
auto sock = get_socket(endpoint);
811817
if (sock == nullptr) {
812-
fprintf(stderr, "Failed to connect to %s\n", endpoint);
818+
GGML_LOG_ERROR("Failed to connect to %s\n", endpoint);
813819
return nullptr;
814820
}
815821
size_t alignment = get_alignment(sock);
@@ -909,7 +915,7 @@ void rpc_server::hello(rpc_msg_hello_rsp & response) {
909915
response.major = RPC_PROTO_MAJOR_VERSION;
910916
response.minor = RPC_PROTO_MINOR_VERSION;
911917
response.patch = RPC_PROTO_PATCH_VERSION;
912-
GGML_PRINT_DEBUG("[%s] version: %d.%d.%d\n", __func__, response.major, response.minor, response.patch);
918+
LOG_DBG("[%s] version: %d.%d.%d\n", __func__, response.major, response.minor, response.patch);
913919
}
914920

915921
bool rpc_server::get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_msg_get_alloc_size_rsp & response) {
@@ -929,15 +935,15 @@ bool rpc_server::get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_
929935
GGML_LOG_ERROR("Null tensor pointer passed to server get_alloc_size function.\n");
930936
return false;
931937
}
932-
938+
LOG_DBG("[%s] buffer: %p, data: %p\n", __func__, (void*)tensor->buffer, tensor->data);
933939
if (tensor->buffer == nullptr) {
934940
//No buffer allocated.
935941
buft = ggml_backend_get_default_buffer_type(backend);
936942
} else {
937943
buft = tensor->buffer->buft;
938944
}
939945

940-
response.alloc_size = ggml_backend_buft_get_alloc_size(buft,tensor);
946+
response.alloc_size = ggml_backend_buft_get_alloc_size(buft, tensor);
941947

942948
return true;
943949
}
@@ -950,29 +956,29 @@ void rpc_server::alloc_buffer(const rpc_msg_alloc_buffer_req & request, rpc_msg_
950956
if (buffer != nullptr) {
951957
response.remote_ptr = reinterpret_cast<uint64_t>(buffer);
952958
response.remote_size = buffer->size;
953-
GGML_PRINT_DEBUG("[%s] size: %" PRIu64 " -> remote_ptr: %" PRIx64 ", remote_size: %" PRIu64 "\n", __func__, request.size, response.remote_ptr, response.remote_size);
959+
LOG_DBG("[%s] size: %" PRIu64 " -> remote_ptr: %" PRIx64 ", remote_size: %" PRIu64 "\n", __func__, request.size, response.remote_ptr, response.remote_size);
954960
buffers.insert(buffer);
955961
} else {
956-
GGML_LOG_ERROR("[%s] size: %" PRIu64 " -> failed\n", __func__, request.size);
962+
LOG_DBG("[%s] size: %" PRIu64 " -> failed\n", __func__, request.size);
957963
}
958964
}
959965

960966
void rpc_server::get_alignment(rpc_msg_get_alignment_rsp & response) {
961967
ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend);
962968
size_t alignment = ggml_backend_buft_get_alignment(buft);
963-
GGML_PRINT_DEBUG("[%s] alignment: %lu\n", __func__, alignment);
969+
LOG_DBG("[%s] alignment: %lu\n", __func__, alignment);
964970
response.alignment = alignment;
965971
}
966972

967973
void rpc_server::get_max_size(rpc_msg_get_max_size_rsp & response) {
968974
ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend);
969975
size_t max_size = ggml_backend_buft_get_max_size(buft);
970-
GGML_PRINT_DEBUG("[%s] max_size: %lu\n", __func__, max_size);
976+
LOG_DBG("[%s] max_size: %lu\n", __func__, max_size);
971977
response.max_size = max_size;
972978
}
973979

974980
bool rpc_server::buffer_get_base(const rpc_msg_buffer_get_base_req & request, rpc_msg_buffer_get_base_rsp & response) {
975-
GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
981+
LOG_DBG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
976982
ggml_backend_buffer_t buffer = reinterpret_cast<ggml_backend_buffer_t>(request.remote_ptr);
977983
if (buffers.find(buffer) == buffers.end()) {
978984
GGML_LOG_ERROR("[%s] buffer not found\n", __func__);
@@ -984,7 +990,7 @@ bool rpc_server::buffer_get_base(const rpc_msg_buffer_get_base_req & request, rp
984990
}
985991

986992
bool rpc_server::free_buffer(const rpc_msg_free_buffer_req & request) {
987-
GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
993+
LOG_DBG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
988994
ggml_backend_buffer_t buffer = reinterpret_cast<ggml_backend_buffer_t>(request.remote_ptr);
989995
if (buffers.find(buffer) == buffers.end()) {
990996
GGML_LOG_ERROR("[%s] buffer not found\n", __func__);
@@ -996,7 +1002,7 @@ bool rpc_server::free_buffer(const rpc_msg_free_buffer_req & request) {
9961002
}
9971003

9981004
bool rpc_server::buffer_clear(const rpc_msg_buffer_clear_req & request) {
999-
GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 ", value: %u\n", __func__, request.remote_ptr, request.value);
1005+
LOG_DBG("[%s] remote_ptr: %" PRIx64 ", value: %u\n", __func__, request.remote_ptr, request.value);
10001006
ggml_backend_buffer_t buffer = reinterpret_cast<ggml_backend_buffer_t>(request.remote_ptr);
10011007
if (buffers.find(buffer) == buffers.end()) {
10021008
GGML_LOG_ERROR("[%s] buffer not found\n", __func__);
@@ -1073,7 +1079,7 @@ bool rpc_server::set_tensor(const std::vector<uint8_t> & input) {
10731079
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
10741080
return false;
10751081
}
1076-
GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu\n", __func__, (void*)tensor->buffer, tensor->data, offset, size);
1082+
LOG_DBG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu\n", __func__, (void*)tensor->buffer, tensor->data, offset, size);
10771083

10781084
// sanitize tensor->data
10791085
{
@@ -1096,7 +1102,7 @@ bool rpc_server::set_tensor(const std::vector<uint8_t> & input) {
10961102
fs::path cache_file = fs::path(cache_dir) / hash_str;
10971103
std::ofstream ofs(cache_file, std::ios::binary);
10981104
ofs.write((const char *)data, size);
1099-
printf("[%s] saved to '%s'\n", __func__, cache_file.c_str());
1105+
GGML_LOG_INFO("[%s] saved to '%s'\n", __func__, cache_file.c_str());
11001106
}
11011107
ggml_backend_tensor_set(tensor, data, offset, size);
11021108
return true;
@@ -1142,8 +1148,8 @@ bool rpc_server::set_tensor_hash(const rpc_msg_set_tensor_hash_req & request, rp
11421148
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
11431149
return false;
11441150
}
1145-
GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu, hash: %" PRIx64 "\n",
1146-
__func__, (void*)tensor->buffer, tensor->data, request.offset, size, request.hash);
1151+
LOG_DBG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu, hash: %" PRIx64 "\n",
1152+
__func__, (void*)tensor->buffer, tensor->data, request.offset, size, request.hash);
11471153

11481154
// sanitize tensor->data
11491155
{
@@ -1177,7 +1183,7 @@ bool rpc_server::init_tensor(const rpc_msg_init_tensor_req & request) {
11771183
GGML_LOG_ERROR("Null tensor pointer passed to server init_tensor function.\n");
11781184
return false;
11791185
}
1180-
1186+
LOG_DBG("[%s] buffer: %p, data: %p\n", __func__, (void*)tensor->buffer, tensor->data);
11811187
// Call the backend's buffer_init_tensor function
11821188
ggml_backend_buffer_t buffer = tensor->buffer;
11831189
if (buffer && buffer->iface.init_tensor) {
@@ -1210,7 +1216,7 @@ bool rpc_server::get_tensor(const rpc_msg_get_tensor_req & request, std::vector<
12101216
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
12111217
return false;
12121218
}
1213-
GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %" PRIu64 "\n", __func__, (void*)tensor->buffer, tensor->data, request.offset, request.size);
1219+
LOG_DBG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %" PRIu64 "\n", __func__, (void*)tensor->buffer, tensor->data, request.offset, request.size);
12141220

12151221
// sanitize tensor->data
12161222
{
@@ -1254,7 +1260,7 @@ bool rpc_server::copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_co
12541260
uint64_t dst_buf_sz = (uint64_t) ggml_backend_buffer_get_size(dst->buffer);
12551261

12561262
if (dst_data + src_size > dst_base + dst_buf_sz) {
1257-
GGML_PRINT_DEBUG("[%s] out-of-bounds write in rpc_server::copy_tensor:\n"
1263+
GGML_LOG_ERROR("[%s] out-of-bounds write in rpc_server::copy_tensor:\n"
12581264
" write range : [0x%" PRIx64 ", 0x%" PRIx64 "]\n"
12591265
" buffer base: [0x%" PRIx64 ", 0x%" PRIx64 "]\n",
12601266
__func__,
@@ -1265,8 +1271,8 @@ bool rpc_server::copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_co
12651271
return false;
12661272
}
12671273

1268-
GGML_PRINT_DEBUG("[%s] src->buffer: %p, dst->buffer: %p\n",
1269-
__func__, (void*) src->buffer, (void*) dst->buffer);
1274+
LOG_DBG("[%s] src->buffer: %p, dst->buffer: %p\n",
1275+
__func__, (void*) src->buffer, (void*) dst->buffer);
12701276

12711277
response.result = ggml_backend_buffer_copy_tensor(src, dst);
12721278
return true;
@@ -1342,7 +1348,7 @@ bool rpc_server::graph_compute(const std::vector<uint8_t> & input, rpc_msg_graph
13421348
return false;
13431349
}
13441350
const rpc_tensor * tensors = (const rpc_tensor *)(input.data() + sizeof(n_nodes) + n_nodes*sizeof(uint64_t) + sizeof(n_tensors));
1345-
GGML_PRINT_DEBUG("[%s] n_nodes: %u, n_tensors: %u\n", __func__, n_nodes, n_tensors);
1351+
LOG_DBG("[%s] n_nodes: %u, n_tensors: %u\n", __func__, n_nodes, n_tensors);
13461352

13471353
size_t buf_size = ggml_tensor_overhead()*(n_nodes + n_tensors) + ggml_graph_overhead_custom(n_nodes, false);
13481354

@@ -1394,7 +1400,7 @@ static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
13941400
}
13951401
// the first command sent by the client must be HELLO
13961402
if (cmd != RPC_CMD_HELLO) {
1397-
fprintf(stderr, "Expected HELLO command, update client\n");
1403+
GGML_LOG_ERROR("Expected HELLO command, update client\n");
13981404
return;
13991405
}
14001406
if (!recv_msg(sockfd, nullptr, 0)) {
@@ -1411,7 +1417,7 @@ static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
14111417
}
14121418
if (cmd >= RPC_CMD_COUNT) {
14131419
// fail fast if the command is invalid
1414-
fprintf(stderr, "Unknown command: %d\n", cmd);
1420+
GGML_LOG_ERROR("Unknown command: %d\n", cmd);
14151421
break;
14161422
}
14171423
switch (cmd) {
@@ -1599,7 +1605,7 @@ static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
15991605
break;
16001606
}
16011607
default: {
1602-
fprintf(stderr, "Unknown command: %d\n", cmd);
1608+
GGML_LOG_ERROR("Unknown command: %d\n", cmd);
16031609
return;
16041610
}
16051611
}

0 commit comments

Comments
 (0)