Skip to content

Commit 7cfe3c7

Browse files
wal: update documentation
This commit updates obsolete parts of the code, adds more extensive description of its work, moves documentation related to communication bus (cbus) to the seperate page, as it's widely used not only in wal description but also in relay. Co-authored-by: Sergey Petrenko <[email protected]>
1 parent ec22654 commit 7cfe3c7

File tree

6 files changed

+471
-225
lines changed

6 files changed

+471
-225
lines changed

source/cbus.rst

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
.. _cbus:
2+
3+
Cbus (Communication bus)
4+
========================
5+
6+
7+
Introduction
8+
------------
9+
10+
Cbus serves as a way to communicate between cords. It's widely used in
11+
description of the :ref:`replication` and :ref:`relay` internal work.
12+
So, here's an overview of its basic API and architecture.
13+
14+
15+
General overview
16+
----------------
17+
18+
Basically, ``cbus`` is just a shared between all cords singleton with the
19+
following structure:
20+
21+
.. code-block:: c
22+
23+
/* lib/core/cbus.c */
24+
struct cbus {
25+
/** cbus statistics */
26+
struct rmean *stats;
27+
/** A mutex to protect bus join. */
28+
pthread_mutex_t mutex;
29+
/** Condition for synchronized start of the bus. */
30+
pthread_cond_t cond;
31+
/** Connected endpoints */
32+
struct rlist endpoints;
33+
};
34+
35+
It provides the user with the message consumers (endpoints) and the way for
36+
producers to connect to these endpoints (pipes). Messages on their own
37+
includes routes, according to which they are delivered by ``cbus_loop`` or
38+
``cbus_process``. Let's consider in details the aforementioned entities.
39+
40+
Endpoints
41+
---------
42+
43+
Cbus endpoint works as a message (event) consumer, which must have a unique
44+
name as all endpoints are saved in ``rlist`` structure of the ``cbus``
45+
singleton. The name is used to identify the endpoint when establishing a
46+
route. Endpoint has the following structure:
47+
48+
.. code-block:: c
49+
50+
/* lib/core/cbus.h */
51+
struct cbus_endpoint {
52+
char name[FIBER_NAME_MAX];
53+
/** Member of cbus->endpoints */
54+
struct rlist in_cbus;
55+
/** The lock around the pipe. */
56+
pthread_mutex_t mutex;
57+
/** A queue with incoming messages. */
58+
struct stailq output;
59+
/** Consumer cord loop */
60+
ev_loop *consumer;
61+
/** Async to notify the consumer */
62+
ev_async async;
63+
/** Count of connected pipes */
64+
uint32_t n_pipes;
65+
/** Condition for endpoint destroy */
66+
struct fiber_cond cond;
67+
};
68+
69+
Endpoint can be created via ``cbus_endpoint_create``:
70+
71+
.. code-block:: c
72+
73+
/* lib/core/cbus.c */
74+
int
75+
cbus_endpoint_create(struct cbus_endpoint *endpoint, const char *name,
76+
void (*fetch_cb)(...), void *fetch_data)
77+
{
78+
...
79+
snprintf(endpoint->name, sizeof(endpoint->name), "%s", name);
80+
endpoint->consumer = cord()->loop;
81+
...
82+
ev_async_init(&endpoint->async, fetch_cb);
83+
endpoint->async.data = fetch_data;
84+
ev_async_start(endpoint->consumer, &endpoint->async);
85+
86+
/* Register in singleton */
87+
rlist_add_tail(&cbus.endpoints, &endpoint->in_cbus);
88+
89+
/* Alert producers */
90+
tt_pthread_cond_broadcast(&cbus.cond);
91+
tt_pthread_mutex_unlock(&cbus.mutex);
92+
return 0;
93+
}
94+
95+
The function expects ``fetch_cb``, which is a callback to fetch new messages.
96+
It's registered as an ``ev_async`` watcher (see ``man libev``). As soon as all
97+
fields of the endpoint are initialized and it's added to the ``cbus``
98+
registry, ``cbus_endpoint_create`` wakes up all producers (pipes), which are
99+
blocked waiting for this endpoint to become available.
100+
101+
Endpoint can be destroyed only when no associated producers remains and its
102+
queue with incoming messages is empty:
103+
104+
.. code-block:: c
105+
106+
/* lib/core/cbus.c */
107+
int
108+
cbus_endpoint_destroy(struct cbus_endpoint *endpoint,
109+
void (*process_cb)(struct cbus_endpoint *endpoint))
110+
{
111+
tt_pthread_mutex_lock(&cbus.mutex);
112+
rlist_del(&endpoint->in_cbus);
113+
tt_pthread_mutex_unlock(&cbus.mutex);
114+
115+
while (true) {
116+
if (process_cb)
117+
process_cb(endpoint);
118+
if (endpoint->n_pipes == 0 && stailq_empty(&endpoint->output))
119+
break;
120+
fiber_cond_wait(&endpoint->cond);
121+
}
122+
...
123+
ev_async_stop(endpoint->consumer, &endpoint->async);
124+
...
125+
return 0;
126+
}
127+
128+
129+
Cpipes (communication pipes)
130+
----------------------------
131+
132+
The cpipe serves as a uni-directional FIFO queue from one cord to another.
133+
It works as a message (event) producer and has the following structure:
134+
135+
.. code-block:: c
136+
137+
/* lib/core/cbus.h */
138+
struct cpipe {
139+
/** Staging area for pushed messages */
140+
struct stailq input;
141+
/** Counters are useful for finer-grained scheduling. */
142+
int n_input;
143+
/**
144+
* When pushing messages, keep the staged input size under
145+
* this limit (speeds up message delivery and reduces
146+
* latency, while still keeping the bus mutex cold enough).
147+
*/
148+
int max_input;
149+
struct ev_async flush_input;
150+
/** The event loop of the producer cord. */
151+
struct ev_loop *producer;
152+
/**
153+
* The cbus endpoint at the destination cord to handle
154+
* flushed messages.
155+
*/
156+
struct cbus_endpoint *endpoint;
157+
/**
158+
* Triggers to call on flush event, if the input queue
159+
* is not empty.
160+
*/
161+
struct rlist on_flush;
162+
};
163+
164+
It can be created via ``cpipe_create``:
165+
166+
.. code-block:: c
167+
168+
/* lib/core/cbus.c */
169+
void
170+
cpipe_create(struct cpipe *pipe, const char *consumer)
171+
{
172+
stailq_create(&pipe->input);
173+
174+
pipe->n_input = 0;
175+
pipe->max_input = INT_MAX;
176+
pipe->producer = cord()->loop;
177+
178+
ev_async_init(&pipe->flush_input, cpipe_flush_cb);
179+
pipe->flush_input.data = pipe;
180+
rlist_create(&pipe->on_flush);
181+
182+
tt_pthread_mutex_lock(&cbus.mutex);
183+
struct cbus_endpoint *endpoint =
184+
cbus_find_endpoint_locked(&cbus, consumer);
185+
while (endpoint == NULL) {
186+
tt_pthread_cond_wait(&cbus.cond, &cbus.mutex);
187+
endpoint = cbus_find_endpoint_locked(&cbus, consumer);
188+
}
189+
pipe->endpoint = endpoint;
190+
++pipe->endpoint->n_pipes;
191+
tt_pthread_mutex_unlock(&cbus.mutex);
192+
}
193+
194+
As we can see the function waits until the needed endpoint appears in ``cbus``
195+
registry. This is why we alerted all producers in ``cbus_endpoint_create``.
196+
197+
``cpipe_flush_cb`` watcher is also registered here. It flushes messages
198+
from the ``pipe->input`` to the ``pipe->endpoint->output``. Note, that
199+
it is invoked not only once per loop iteration but also when ``max_input``
200+
is reached:
201+
202+
.. code-block:: c
203+
204+
/* lib/core/cbus.c */
205+
static inline void
206+
cpipe_push_input(struct cpipe *pipe, struct cmsg *msg)
207+
{
208+
assert(loop() == pipe->producer);
209+
210+
stailq_add_tail_entry(&pipe->input, msg, fifo);
211+
pipe->n_input++;
212+
if (pipe->n_input >= pipe->max_input)
213+
ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM);
214+
}
215+
216+
217+
Event loop and messages
218+
-----------------------
219+
220+
In order to enter the message processing loop ``cbus_loop`` can be used:
221+
222+
.. code-block:: c
223+
224+
/* lib/core/cbus.c */
225+
void
226+
cbus_loop(struct cbus_endpoint *endpoint)
227+
{
228+
while (true) {
229+
cbus_process(endpoint);
230+
// cbus_process() code
231+
struct stailq output;
232+
stailq_create(&output);
233+
cbus_endpoint_fetch(endpoint, &output);
234+
struct cmsg *msg, *msg_next;
235+
stailq_foreach_entry_safe(msg, msg_next, &output, fifo)
236+
cmsg_deliver(msg);
237+
238+
if (fiber_is_cancelled())
239+
break;
240+
fiber_yield();
241+
}
242+
}
243+
244+
The ``cbus_process`` above fetches message from an endpoint's queue and
245+
process them with ``cmsg_deliver``.
246+
247+
Every message traveling between cords has the following structure:
248+
249+
.. code-block:: c
250+
251+
/* lib/core/cbus.h */
252+
struct cmsg {
253+
/**
254+
* A member of the linked list - fifo of the pipe the
255+
* message is stuck in currently, waiting to get
256+
* delivered.
257+
*/
258+
struct stailq_entry fifo;
259+
/** The message routing path. */
260+
const struct cmsg_hop *route;
261+
/** The current hop the message is at. */
262+
const struct cmsg_hop *hop;
263+
};
264+
265+
struct cmsg_hop {
266+
/** The message delivery function. */
267+
cmsg_f f;
268+
/**
269+
* The next destination to which the message
270+
* should be routed after its delivered locally.
271+
*/
272+
struct cpipe *pipe;
273+
};
274+
275+
A message may need to be delivered to many destinations before it can
276+
be dispensed with. For example, it may be necessary to return a message
277+
to the sender just to destroy it.
278+
279+
Message travel route is an array of cmsg_hop entries. The first
280+
entry contains a delivery function at the first destination,
281+
and the next destination. Subsequent entries are alike. The
282+
last entry has a delivery function (most often a message
283+
destructor) and NULL for the next destination.
284+
285+
As in ``cbus_process`` we deal with already delivered messages, in
286+
``cmsg_deliver`` we invoke the message delivery function ``f`` of the
287+
current hop and dispatch it to the next hop.
288+
289+
.. code-block:: c
290+
291+
/* lib/core/cbus.c */
292+
void
293+
cmsg_deliver(struct cmsg *msg)
294+
{
295+
struct cpipe *pipe = msg->hop->pipe;
296+
msg->hop->f(msg);
297+
cmsg_dispatch(pipe, msg);
298+
/* cmsg_dispatch() code */
299+
if (pipe) {
300+
msg->hop++;
301+
cpipe_push(pipe, msg);
302+
}
303+
}

source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ Tarantool internals
1111
wal
1212
wal-fmt
1313
allocators
14+
cbus

source/relay.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.. _relay:
2+
13
Relay
24
=====
35

source/toctree.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@
1212
wal
1313
wal-fmt
1414
allocators
15+
cbus

0 commit comments

Comments
 (0)