Skip to content

Interactive transactions and streams in iproto/http2 #5860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mary3000 opened this issue Mar 1, 2021 · 22 comments
Closed

Interactive transactions and streams in iproto/http2 #5860

mary3000 opened this issue Mar 1, 2021 · 22 comments
Assignees
Labels
epic feature A new functionality
Milestone

Comments

@mary3000
Copy link
Contributor

mary3000 commented Mar 1, 2021

Interactive transactions and streams in iproto/http2

Problem

With the introduction of the new transaction manager for memtx, it becomes possible to yield inside a transaction regardless of the engine used. This allows to implement:

  • Interactive transactions. Such a transaction doesn’t need to be sent in one request - begin, commit and tx statements are allowed to be sent and executed in different requests.
  • Streams. This concept allows multiplexing several transactions over one connection. It is a more general approach and allows to implement interactive transactions.

Currently tarantool uses iproto as the main communication protocol, but in the future http2 is planned. So the goal is to implement streams for both of the protocols.

Solutions

Rejected in favor of streams

Interactive tx over IPROTO without streams

It was decided that we need a more general approach - streams, plus not only via iproto, but http2 protocol.

issue, PR

Implementation

Introduce begin, commit, rollback commands for a connection object (IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK accordingly for iproto protocol).

Introduce remote_txn structure: it contains txn and rlist of pending messages (iproto_msg) to be processed. This structure will be allocated only for the new transactions over IPROTO inside the memory pool.

Encapsulate inside iproto_msg new fields: 1) txn - current transaction (or null) 2) next_in_tx - member of remote_txn->pending.

Encapsulate inside the iproto_connection remote_txn object.

Behaviour:

  • If call/eval implicitly begins new txn and leaves it opened, txn will be silently rolled back.
  • If there is no txn in connection, then conn:commit() / conn:rollback() will be silently ignored.

Streams

Old discussion Idea of streams in iproto was previously [discussed](https://tkn.me/tarantool/rfc-interactive-transactions-in-iproto/) ([archive](https://tkn.me/tarantool/rfc-interactive-transactions-in-iproto.tar.bz2)).

Short intro to an idea from this discussion:

** The problem **

If a CALL request leaves a transaction open upon return, the transaction
will be forcefully aborted. This makes sense for memtx, because it
doesn't support fiber yield (yet), however in case of vinyl, the user
may want to continue execution of the same transaction in the next CALL,
but currently it isn't possible.

Another use case for this is SQL EXECUTE request, which is used for
executing SQL statements. The problem is this request can only be used
for executing a single SQL statement so without transactions in IProto
it is impossible to implement SQL transactions on a remote client (e.g.
via JDBC).

See [1] for more details.

** The solution **

Introduce the concept of streams within an IProto connection, as
suggested by Georgy Kirichenko:

 1. Introduce new request header key IPROTO_STREAM_ID.
 2. The stream id is generated by the user and passed along with all
    requests that are supposed to be executed in the same stream.
 3. All requests within the same stream are executed sequentially.
 4. Requests from different streams may be executed in parallel.
 5. If a transaction is left open by a request in a stream, the next
    request will reuse it.
 6. If IPROTO_STREAM_ID is unset (0), everything works as before, i.e.
    no transaction preservation or request serialization will occur.

The net.box API will look like this:

  c = net_box.connect(...)
  s = c:make_stream(stream_id)
  s:call(...)

A net.box stream instance will be a wrapper around the connection it was
created for. It will have all the same methods as the connection itself,
but all requests sent on its behalf will have the stream id attached.

Questions extracted from a discussion:

  • How to map stream ids to stream objects?
Second, we will have to add a hash table mapping stream ids to stream
objects in the tx thread. A stream object would basically be a queue of
request awaiting execution plus an open transaction if any. When a new
request is sent to iproto, it is submitted to the tx thread and then
either executed directly by a fiber of the fiber pool or queued to the
stream object if there's already a request from the same stream being
executed by another fiber. When a fiber finishes executing a request, it
checks if there are more request in the stream queue and continues
execution if so.
  • Do we need limit the number of streams?
  • Expose tx id to client or not?
  • What happens to opened transactions in a stream when a connection is closed? Rollback? What about on_rollback triggers?
  • When to destroy a stream? (when last pending request processed / on timeout?)
  • What happens when I issue stream in reserved id?
  • How to balance stream processing? How to choose from which stream of many ones TX thread should pick up a request?
I think, round-robin is fair enough, but maybe I am wrong.
<...>
I'd rather leave all the balancing to the fiber scheduler. IProto would
simply queue responses to the appropriate stream to be processed by the
fiber that is currently executing a request that belongs to the same
stream, or if there's none, start a new fiber.
  • How do streams affect net_msg_max limit?
  • SQL support, are changes from one stream affect one another (see doc17)?
  • Maybe we need explicit stream open/close api? (see doc22)

Summary from Osipov:

- IPROTO_BEGIN/COMMIT/ROLLBACK only works if IPROTO_STREAM_ID is non-zero
- if stream id is zero, then dangling transactions are rolled back
  as they are now
- all requests inside a stream are strictly sequential
- a stream owns its own diagnostics, transaction, transaction
  isolation level, and possibly authenticated user (see below).
- better yet, IPROTO_SQL_EXECUTE is only available if stream id is
  non-zero 

The stream is a wrapper around connection object:

local stream = conn:stream([optional id])
stream:call('box.begin')
stream:call('box.insert', {...})
stream:call('box.commit')
  • There are no special commands to create and close a stream. There's no server side method to generate new STREAM_ID. Any connector is capable to generate stream IDs for given connection by any convenient strategy, for example incremental sequence per connection.
  • A stream is a part of connection. A request with the same STREAM_ID but in different connection belongs to some other stream.
  • Requests with the same non-zero STREAM_ID are processed synchronously (TX thread must process the next request of a stream strictly after the previous request of the same stream is completed).
  • Each stream can start its own transaction.
  • Once a transaction in a stream is started it's guaranteed that all requests of the stream are processed by one exclusive fiber until the transaction is ended.
  • Requests of the same stream beyond a transaction may or may not be processed by exclusive fiber.

Ids

Each stream is associated with its id. Id is generated on the client side, but is hidden from the actual user. Instead, user operates on a stream object and internally it is mapped to the corresponding id.

We introduce STREAM_ID of request in IPROTO protocol. Omitted or STREAM_ID = 0 means legacy behavior.

Serialization

Requests in a stream are processed synchronously. The reason is that transactions can now be yielding, so we must wait for the previous request before sending the next one in order to guarantee serialization.

Fibers created from a transaction

Problem

Imagine if a transaction in a stream creates new fiber (e.g. it can be done implicitly via some library call). Should this fiber see uncommitted data from this transaction? What if fiber also writes some data, should it get into transaction also?

Solution

One way to the problem is to forbid fibers that write / read data, but this is too restrictive. Let's say that such fibers are fully independent of this transaction: they can't see any data from this tx, and can create their own tx if they want.

Where stream is executed

Each stream is executed in its own fiber. Tx thread should somehow decide what to process next - maybe we need some sophisticated prioritization?

Stream closing

We can return corresponding fiber back to the pool, when:

  • Commit / rollback happens.
  • No transaction is executed in this stream and there are no pending requests left.

If connection is closed, all streams are closed and all non-committed transactions from streams are rollbacked.

Limits

Stream limit

Amount of streams is limited by the existing option - net_msg_max.

Pending messages limit

Imagine there is an infinite yielding loop inside a transaction from a stream. So there are infinitelly many messages from this stream. Again, as I understand it is already limited by the net_msg_max.

HTTP/2.0

The concept of streams is already encapsulated in the protocol. The main idea is to multiplex several streams through one TCP connection.

Stream can be started by either client or server.

Stream id must be positive integer, odd for clients, even for the server. Stream id zero is reserved.

Protocols

IPROTO

HTTP/2.0

@mary3000 mary3000 added feature A new functionality incoming labels Mar 1, 2021
@mary3000 mary3000 self-assigned this Mar 1, 2021
@kyukhin kyukhin added the teamC label Mar 1, 2021
@mary3000
Copy link
Contributor Author

mary3000 commented Mar 4, 2021

Summary of a planned solution

The stream is a wrapper around connection object:

local stream = conn:stream([optional id])
stream:call('box.begin')
stream:call('box.insert', {...})
stream:call('box.commit')

Ids

Each stream is associated with its id. Id is generated on the client side, but is hidden from the actual user. Instead, user operates on a stream object and internally it is mapped to the corresponding id.

In iproto stream id 0 will indicate that there no stream needed and behaviour is old.

Serialization

Requests in a stream are sent synchronously. The reason is that transactions can now be yielding, so we must wait for the previous request before sending the next one in order to guarantee serialization.

Fibers created from a transaction

Problem

Imagine if a transaction in a stream creates new fiber (e.g. it can be done implicitly via some library call). Should this fiber see uncommitted data from this transaction? What if fiber also writes some data, should it get into transaction also?

Solution

One way to the problem is to forbid fibers that write / read data, but this is too restrictive. Let's say that such fibers are fully independent of this transaction: they can't see any data from this tx, and can create their own tx if they want.

Where stream is executed

Each stream is executed in its own fiber. Tx thread should somehow decide what to process next - maybe we need some sophisticated prioritization?

Stream closing

We can return corresponding fiber back to the pool, when:

  • Commit / rollback happens.
  • No transaction is executed in this stream and there are no pending requests left.

If connection is closed, all streams are closed and all non-committed transactions from streams are rollbacked.

Limits

Stream limit

Amount of streams is limited by the existing option - net_msg_max.

Pending messages limit

Imagine there is an infinite yielding loop inside a transaction from a stream. So there are infinitelly many messages from this stream. Again, as I understand it is already limited by the net_msg_max.

@Totktonada
Copy link
Member

Serialization

Requests in a stream are sent synchronously. The reason is that transactions can now be yielding, so we must wait for the previous request before sending the next one in order to guarantee serialization.

Iproto is supposed to be used asynchronously. Let's look at your example:

local stream = conn:stream([optional id])
stream:call('box.begin')
stream:call('box.insert', {...})
stream:call('box.commit')

With synchronous processing we'll have three network round trips (+processing time). Support of pipelining within a stream (a kind of HTTP2 stream) will shrink the delay to near to one round trip (+processing time).


BTW, how the 'optional id' in :stream() is supposed to be used?


How to make work with an asynchronous stream from Lua convenient?

Say, if we want to process all responses asynchronously:

local stream = conn:stream({is_async = true})
local futures = {}
table.insert(futures, stream:call('box.begin'))
table.insert(futures, stream:call('box.insert', {...}))
table.insert(futures, stream:call('box.commit'))
for _, future in ipairs(futures) do
    local result = future:wait_result()
    <...>
end

Or:

local stream = conn:stream({is_async = true})
stream:call('box.begin')
stream:call('box.insert', {...})
stream:call('box.commit')
for _, result in stream:pairs() do
    <...>
end

NB: How to distinguish box.session.push() from a usual return?

@alyapunov
Copy link
Contributor

Requests in a stream are sent synchronously. The reason is that transactions can now be yielding, so we must wait for the previous request before sending the next one in order to guarantee serialization.

I would say request are processed synchronously. TX thread must process the next request of a stream strictly after the previous request of the same stream is completed. As for sending I usually understand network IO, and it must be batched when possible.

Iproto is supposed to be used asynchronously. Let's look at your example:

Actually we are designing IPROTO API, not the API of one particular connector. The idea is to create and support IPROTO API that would allow all kinds of connectors - sync/async, one-by-one of batched etc.

@alyapunov
Copy link
Contributor

BTW, how the 'optional id' in :stream() is supposed to be used?

For each stream its ID is not optional. Requests with different stream_id a treated as different streams.

@alyapunov
Copy link
Contributor

alyapunov commented Mar 4, 2021

I think we should add to RFC:

  1. We introduce STREAM_ID of request in IPROTO protocol.
  2. Omitted or STREAM_ID = 0 means legacy behavior.
  3. There are no special commands to create and close a stream. There's no server side method to generate new STREAM_ID. Any connector is capable to generate stream IDs for given connection by any convenient strategy, for example incremental sequence per connection.
  4. A stream is a part of connection. A request with the same STREAM_ID but in different connection belongs to some other stream.
  5. Requests with the same non-zero STREAM_ID are processed synchronously (TX thread must process the next request of a stream strictly after the previous request of the same stream is completed).
  6. Each stream can start its own transaction.
  7. Once a transaction in a stream is started it's guaranteed that all requests of the stream are processed by one exclusive fiber until the transaction is ended.
  8. Requests of the same stream beyond a transaction may or may not be processed by exclusive fiber.

@Gerold103
Copy link
Collaborator

Once a transaction in a stream is started it's guaranteed that all requests of the stream are processed by one exclusive fiber until the transaction is ended.

Why do you need to use the same fiber? By doing so you basically block a fiber until the transaction ends. The fiber can't process more requests from other streams or without streams, and there is -1 free fiber in the pool. Transaction data is allocated on txn's region. So what is the reason to reserve a fiber?

@Gerold103
Copy link
Collaborator

Gerold103 commented Mar 4, 2021

Imagine if a transaction in a stream creates new fiber (e.g. it can be done implicitly via some library call).

Not possible. Transaction can't live in 2 fibers. This is not related to streams, just a general restriction. If you start a new fiber in a transaction, it would be totally new fiber without any context of the creator. You can try it even now - do fiber.new() after box.begin() (they don't even share the session object).

@alyapunov
Copy link
Contributor

Not possible. Transaction can't live in 2 fibers. This is not related to streams, just a general restriction. If you start a new fiber in a transaction, it would be totally new fiber without any context of the creator. You can try it even now - do fiber.new() after box.begin()

You can create a fiber in transaction. You are right, now the new fiber doesn't inherit transaction from. In this RFC we fix this behavior and state that it will remain the same.

@Totktonada
Copy link
Member

BTW, how the 'optional id' in :stream() is supposed to be used?

For each stream its ID is not optional. Requests with different stream_id a treated as different streams.

The example was given for the net.box API, so I asked why it may be useful when creating a net.box stream. Sure, there will be an optional stream id in the protocol.

@alyapunov
Copy link
Contributor

Why do you need to use the same fiber? By doing so you basically block a fiber until the transaction ends. The fiber can't process more requests from other streams or without streams, and there is -1 free fiber in the pool. Transaction data is allocated on txn's region. So what is the reason to reserve a fiber?

Actually the only reason I see is simplicity. As for resources - I think that a transaction itself is potentially much more expensive object in comparison to fiber. Having that I'm not sure that saving a fiber is a big deal.

@Gerold103
Copy link
Collaborator

Fiber is more expensive at least because it makes system calls when created, right in TX thread, and allocates tons of memory for its stack. And because it is a limited resource. By keeping a fiber from re-usage, you reduce number of fibers serving the other iproto calls. (net_msg_max setting is related to this). With a certain amount of concurrent transactions you might exhaust the entire fiber pool so no requests would be served at all except these transactions even though TX thread might be greatly underloaded. Does not look like a "feature" really. Or even something not worthy of considering because it makes a great impact actually.

@alyapunov
Copy link
Contributor

And because it is a limited resource.

And is wrong. If we reuse fibers (as we do) - we can ignore creation cost since we do it only once. If we don't reuse - we have no fixed limits as you wrote. The first OR the second, not both.

With a certain amount of concurrent transactions you might exhaust..

Concurrent transactions are not expected to be cheap. Just one transaction in a read view can double data size needed for processing. And that cannot be fixed by any optimization. A certain amount of concurrent long-lasting transactions will kill a server in any case. All we can discuss is that amount. How many would be a good result? I think about dozens or hundreds. Number of fibers doesn't limit that numbers.
Are you really expect thousands of concurrent transactions? Or what?

Does not look like a "feature" really. Or even something not worthy of considering because it makes a great impact actually.

If a task can be divided into parts - it should be. Detaching of transaction from fiber is a good optimization follow-up issue, it's independent internal problem. Now we what introduce streams, fix their API and observable behavior.
Are you sure that nobody will accept and appreciate streams and interactive transactions without those optimizations?

@alyapunov
Copy link
Contributor

alyapunov commented Mar 11, 2021

Each stream is executed in its own fiber. Tx thread should somehow decide what to process next - maybe we need some sophisticated prioritization?

  1. By now only transactions require the same fiber. Without interactive transactions we cannot afford to pin fiber to each stream. At least because streams are not ended explicitly.
  2. Tx thread already decides what to process next. We should leave that the same except delaying of requests of already processing stream. I see it very simple: having a request A, if there is another currently processed request B of the same stream - link A to B. Once a request if completed - check linked requests and execute if any. That's all.

@akudiyar
Copy link

akudiyar commented Mar 29, 2021

Requests in a stream are sent synchronously. The reason is that transactions can now be yielding, so we must wait for the previous request before sending the next one in order to guarantee serialization.

Will the requests that share the same stream but aren't made inside in an open transaction, be serialized too?

Will several requests from several connections be able to share the same stream (~ transaction), thus making a "SELECT FOR UPDATE" scenario possible?

@EvgenyMekhanik
Copy link
Contributor

EvgenyMekhanik commented May 27, 2021

Some thoughts on the streams implementation:

  1. Each connection contains a hash table of the currently active streams. Key - stream_id.
  2. Stream is struct stream with such fields:
struct stream {
    /** Currently active stream transaction or NULL * /
    struct txn *txn;
    /** Pending requests for this stream, processed sequentially */
    struct rlist pending_requests;
    /** Flag indicates, that some stream request processed by tx thread */
    bool is_request_processed;
    /** Maybe some other fields */
};

Currently tx thread retrieves messages from the cbus queue between iproto and tx threads and processed them synchroniously. If an fiber yeild occurred while processing the message, tx thread will process the next message without waiting for the previous one.
This behavior will be changed as follows:

  1. Tx thread retrieves message from cbus queue. If msg->header.stream_id == 0, it's work as before. If msg->header.stream_id != 0, tx thread checks is stream with msg->header.stream_id exists.
  2. If such stream not exists, tx thread create new stream and insert it in connection streams hash table. Tx thread sets is_request_processed flag for this stream and starts to process this request.
  3. If such stream exists, tx thread checks is_request_processed flag for this stream. If it's false tx thread starts to process this request. Otherwise tx thread put this request to pending_requests list of this stream.
  4. When request with not zero stream_id finished we checks pending_requests for stream with such stream_id. If its' equal to zero and no transaction started in this stream we delete this stream. Otherwise tx thread retrieves and processed next request for this stream. (Potential problem is request hunger for other streams, may be it's better to put this request back to tx thread cbus queue!).
  5. If user starts transaction in stream we create new fiber for this transaction. It can be only one transaction per stream. The transaction cannot yield, so all requests in it are processed in order. If connection closed all transactions for all streams rollbacked.

@alyapunov
Copy link
Contributor

  1. The transaction cannot yield
    Why? generally it can. And I see no problems with it.

@EvgenyMekhanik
Copy link
Contributor

EvgenyMekhanik commented Jun 1, 2021

During writing the code, many questions occured:

  1. For memtx engine transaction can't yield, if memtx_use_mvcc_engine is not set. For vinyl transaction can yield.
    In iproto transaction behavior should be based on chosen engine? Or not? Also i must tell that transaction should can yield in iproto, because user can send begin, then some requests and then later send commit. So transaction fiber should yield until there is no requests in transaction. But would this work for memtx without memtx_use_mvcc_engine = true?
  2. If error occurs in some request, which belongs to transaction should we abort whole transaction or not?
    If we used memtx_use_mvcc_engine it's not so, for example:
     box.cfg { memtx_use_mvcc_engine = true }
     s = box.schema.space.create('tester')
     s:format({ {name = 'id', type = 'unsigned'} })
     s:create_index('primary', { parts = {'id'} })
     box.begin()
     s:insert({1})
     s:insert({2})
     s:insert({1})
     ---
     - error: Duplicate key exists in unique index "primary" in space "tester" with old
         tuple - [1] and new tuple - [1]
     ...
    box.commit()
    -- Transaction successfully finished!
    s:select()
    ---
    - - [1]
    - [2]
    ...
  3. What should i do with conflict transactions in separate streams? There are many options: ignore this problem?
    behavior should based on chosen engine and it's options? As i understand for memtx if memtx_use_mvcc_engine = false - transaction aborted in case it's yield, if memtx_use_mvcc_engine = true memtx transaction manager resolved all problems with separate transactions. Should i make the same transaction engine for iproto?
  4. What's about replication? Are there any problems possible? (I am not good in this question).
  5. Also i have several questions about stream object in lua world: stream is wrapper around connection, so what
    connection methods should stream provide? All + begin,commit,rollback? Or stream should skip some connection
    methods?
  6. What about bet.box lua API? For eample user type something like this (example LUA API):
local futures = {}
stream = con:stream()
table.insert(futures, steam:begin({is_async = true})) -- Can begin be async??
table.insert(futures, stream:call('box.insert', {...} , {is_async = true}))
table.insert(futures, steam:commit({is_async = true})) -- Can commit be async??
for _, future in ipairs(futures) do
    local result = future:wait_result()
    <...>
end

A brief description of how I am doing now:

  1. In iproto_enqueue_batch we check has new iproto message stream_id. If stream_id = 0 everything works as
    before. Otherwise we are looking for the stream with such stream_id in connection hash streams hash table. If there
    is no such stream we create it. Then we check is there is active transaction in stream. If there is no active transaction
    in stream we put message in stream->pending_list and if this list was empty before push, we push this message in
    tx thread. In net_send_msg function we checks if there is no active transaction in stream and no message in
    stream->pending list we destroy this stream. If there is no active transaction but there is some messages in pending
    list we remove first pending message from the list and push it to tx thread.
  2. If in net_send_msg function we see that processed msg->header.type == IPROTO_TRANSACTION_BEGIN
    We iterate over all pending messages in pending list and pushes them to tx thread, until list became empty or we find
    message with header.type == IPROTO_TRANSACTION_COMMIT or IPROTO_TRANSACTION_ROLLBACK. We
    change msg->route of all this messages to special transaction route!
    Than we stop push message to tx thread from this stream until whole transaction finished.
  3. In tx thread when we process message with header.type == IPROTO_TRANSACTION_BEGIN we create special
    fiber to process all transaction messages. There is special queue of messages to this fiber, which acceptable only
    from tx thread. If this queue is empty fiber yield. Otherwise fiber processed all messages in this queue and pushes
    requests to net thread.
  4. I special transaction route in function, called in tx thread we put incoming message to special stream transaction
    messages queue and wakeup transaction fiber.
  5. This route have only one function called in tx thread! We can't return message to net thread until it not processed
    in transaction fiber.

EvgenyMekhanik added a commit that referenced this issue Aug 11, 2021
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.

Closes #5860

@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test

stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
EvgenyMekhanik added a commit that referenced this issue Aug 11, 2021
Add stream support to `net.box`. In "net.box", stream
is an object over connection that has the same methods,
but all requests from it sends with non-zero stream ID.
Since there can be a lot of streams, we do not copy the
spaces from the connection to the stream immediately when
creating a stream, but do it only when we first access space.
Also, when updating the schema, we update the spaces in lazy
mode: each stream has it's own schema_version, when there is
some access to stream space we compare stream schema_version
and connection schema_version and if they are different update
clear stream space cache and wrap space that is being accessed
to stream cache.

Part of #5860

@TarantoolBot document
Title: stream support was added to net.box
In "net.box", stream is an object over connection that
has the same methods, but all requests from it sends
with non-zero stream ID. Stream ID is generated on the
client automatically. Simple example of stream creation
using net.box:
```lua
stream = conn:new_stream()
-- all connection methods are valid, but send requests
-- with non zero stream_id.
```
EvgenyMekhanik added a commit that referenced this issue Aug 11, 2021
Adding interactive transactions over iproto streamss requires
adding new request types for begin, commit and rollback them.
The type names of these new requests conflict with the existing
names for the 'raft' requests. Adding RAFT prefix for all requests
related to 'raft' resolves this problem.

Part of #5860

@TarantoolBot document
Title: add RAFT prefix for all requests related to 'raft'.
Rename IPROTO_PROMOTE, IPROTO_DEMOTE, IPROTO_CONFIRM and
IPROTO_ROLLBACK to IPROTO_RAFT_PROMOTE, IPROTO_RAFT_DEMOTE,
IPROTO_RAFT_CONFIRM and IPROTO_RAFT_ROLLBACK accordingly.
EvgenyMekhanik added a commit that referenced this issue Aug 11, 2021
Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.

Part of #5860

@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK
accordingly. If disconnect occurs when there is some active transaction
in stream, this transaction will be rollbacked, if it does not have time
to commit before this moment. Add new command codes for begin, commit and
rollback transactions: `IPROTO_BEGIN 14`, `IPROTO_COMMIT 15` and
`IPROTO_ROLLBACK 16` accordingly.
EvgenyMekhanik added a commit that referenced this issue Aug 11, 2021
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.

Closes #5860

@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test

stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.

Part of #5860

@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK
accordingly. If disconnect occurs when there is some active transaction
in stream, this transaction will be rollbacked, if it does not have time
to commit before this moment. Add new command codes for begin, commit and
rollback transactions: `IPROTO_BEGIN 14`, `IPROTO_COMMIT 15` and
`IPROTO_ROLLBACK 16` accordingly.
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.

Closes #5860

@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test

stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.

Part of #5860

@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK
accordingly. If disconnect occurs when there is some active transaction
in stream, this transaction will be rollbacked, if it does not have time
to commit before this moment. Add new command codes for begin, commit and
rollback transactions: `IPROTO_BEGIN 14`, `IPROTO_COMMIT 15` and
`IPROTO_ROLLBACK 16` accordingly.
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.

Closes #5860

@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test

stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.

Part of #5860

@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK
accordingly. If disconnect occurs when there is some active transaction
in stream, this transaction will be rollbacked, if it does not have time
to commit before this moment. Add new command codes for begin, commit and
rollback transactions: `IPROTO_BEGIN 14`, `IPROTO_COMMIT 15` and
`IPROTO_ROLLBACK 16` accordingly.
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.

Closes #5860

@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test

stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
EvgenyMekhanik pushed a commit that referenced this issue Aug 12, 2021
To apply a client request, we only need to know its type and body. All
the meta information, such as LSN, TSN, or replica id, must be set by
WAL. Currently, however, it isn't necessarily true: iproto leaves a
request header received over iproto as is, and tx will reuse the header
instead of allocating a new one in this case, which is needed to process
replication requests, see txn_add_redo().

Unless a client actually sets one of those meta fields, this causes no
problems. However, if we added transaction support to the replication
protocol, reusing the header would result in broken xlog, because
currently, all requests received over iproto have the is_commit field
set in xrow_header for the lack of TSN, while is_commit must only be set
for the final statement in a transaction. One way to fix it would be
clearing is_commit explicitly in iproto, but ignoring the whole header
received over iproto looks more logical and error-proof.

Needed for #5860
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
For further implementation of streams, we need to separate
requests belonging to and not belonging to streams. For this
purpose, the stream ID field was added to the iproto binary
protocol. For requests that do not belong to stream, this field
is omitted or equal to zero. For requests belonging to stream,
we use this field to determine which stream the request belongs to.

Part of #5860

@TarantoolBot document
Title: new field in binary iproto protocol

Add new field to binary iproto protocol.
`IPROTO_STREAM_ID 0x0a` determines whether a request
belongs to a stream or not. If this field is omited
or equal to zero this request doesn't belongs to stream.
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Implement streams in iproto. There is a hash table of streams for
each connection. When a new request comes with a non-zero stream ID,
we look for the stream with such ID in this table and if it does not
exist, we create it. The request is placed in the queue of pending
requests, and if this queue was empty at the time of its receipt, it
is pushed to the tx thread for processing. When a request belonging to
stream returns to the network thread after processing is completed, we
take the next request out of the queue of pending requests and send it
for processing to tx thread. If there is no pending requests we remove
stream object from hash table and destroy it. Requests with zero stream
ID are processed in the old way.

Part of #5860

@TarantoolBot document
Title: streams are implemented in iproto
A distinctive feature of streams is that all requests in them
are processed sequentially. The execution of the next request
in stream will not start until the previous one is completed.
To separate requests belonging to and not belonging to streams
we use stream ID field in binary iproto protocol: requests with
non-zero stream ID belongs to some stream. Stream ID is unique
within the connection and indicates which stream the request
belongs to. For streams from different connections, the IDs may
be the same.
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Add stream support to `net.box`. In "net.box", stream
is an object over connection that has the same methods,
but all requests from it sends with non-zero stream ID.
Since there can be a lot of streams, we do not copy the
spaces from the connection to the stream immediately when
creating a stream, but do it only when we first access space.
Also, when updating the schema, we update the spaces in lazy
mode: each stream has it's own schema_version, when there is
some access to stream space we compare stream schema_version
and connection schema_version and if they are different update
clear stream space cache and wrap space that is being accessed
to stream cache.

Part of #5860

@TarantoolBot document
Title: stream support was added to net.box
In "net.box", stream is an object over connection that
has the same methods, but all requests from it sends
with non-zero stream ID. Stream ID is generated on the
client automatically. Simple example of stream creation
using net.box:
```lua
stream = conn:new_stream()
-- all connection methods are valid, but send requests
-- with non zero stream_id.
```
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Adding interactive transactions over iproto streamss requires
adding new request types for begin, commit and rollback them.
The type names of these new requests conflict with the existing
names for the 'raft' requests. Adding RAFT prefix for all requests
related to 'raft' resolves this problem.

Part of #5860

@TarantoolBot document
Title: add RAFT prefix for all requests related to 'raft'.
Rename IPROTO_PROMOTE, IPROTO_DEMOTE, IPROTO_CONFIRM and
IPROTO_ROLLBACK to IPROTO_RAFT_PROMOTE, IPROTO_RAFT_DEMOTE,
IPROTO_RAFT_CONFIRM and IPROTO_RAFT_ROLLBACK accordingly.
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.

Part of #5860

@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK
accordingly. If disconnect occurs when there is some active transaction
in stream, this transaction will be rollbacked, if it does not have time
to commit before this moment. Add new command codes for begin, commit and
rollback transactions: `IPROTO_BEGIN 14`, `IPROTO_COMMIT 15` and
`IPROTO_ROLLBACK 16` accordingly.
EvgenyMekhanik added a commit that referenced this issue Aug 12, 2021
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.

Closes #5860

@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test

stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
alyapunov pushed a commit that referenced this issue Aug 13, 2021
To apply a client request, we only need to know its type and body. All
the meta information, such as LSN, TSN, or replica id, must be set by
WAL. Currently, however, it isn't necessarily true: iproto leaves a
request header received over iproto as is, and tx will reuse the header
instead of allocating a new one in this case, which is needed to process
replication requests, see txn_add_redo().

Unless a client actually sets one of those meta fields, this causes no
problems. However, if we added transaction support to the replication
protocol, reusing the header would result in broken xlog, because
currently, all requests received over iproto have the is_commit field
set in xrow_header for the lack of TSN, while is_commit must only be set
for the final statement in a transaction. One way to fix it would be
clearing is_commit explicitly in iproto, but ignoring the whole header
received over iproto looks more logical and error-proof.

Needed for #5860
alyapunov pushed a commit that referenced this issue Aug 13, 2021
For further implementation of streams, we need to separate
requests belonging to and not belonging to streams. For this
purpose, the stream ID field was added to the iproto binary
protocol. For requests that do not belong to stream, this field
is omitted or equal to zero. For requests belonging to stream,
we use this field to determine which stream the request belongs to.

Part of #5860

@TarantoolBot document
Title: new field in binary iproto protocol

Add new field to binary iproto protocol.
`IPROTO_STREAM_ID 0x0a` determines whether a request
belongs to a stream or not. If this field is omited
or equal to zero this request doesn't belongs to stream.
alyapunov pushed a commit that referenced this issue Aug 13, 2021
Implement streams in iproto. There is a hash table of streams for
each connection. When a new request comes with a non-zero stream ID,
we look for the stream with such ID in this table and if it does not
exist, we create it. The request is placed in the queue of pending
requests, and if this queue was empty at the time of its receipt, it
is pushed to the tx thread for processing. When a request belonging to
stream returns to the network thread after processing is completed, we
take the next request out of the queue of pending requests and send it
for processing to tx thread. If there is no pending requests we remove
stream object from hash table and destroy it. Requests with zero stream
ID are processed in the old way.

Part of #5860

@TarantoolBot document
Title: streams are implemented in iproto
A distinctive feature of streams is that all requests in them
are processed sequentially. The execution of the next request
in stream will not start until the previous one is completed.
To separate requests belonging to and not belonging to streams
we use stream ID field in binary iproto protocol: requests with
non-zero stream ID belongs to some stream. Stream ID is unique
within the connection and indicates which stream the request
belongs to. For streams from different connections, the IDs may
be the same.
alyapunov pushed a commit that referenced this issue Aug 13, 2021
Add stream support to `net.box`. In "net.box", stream
is an object over connection that has the same methods,
but all requests from it sends with non-zero stream ID.
Since there can be a lot of streams, we do not copy the
spaces from the connection to the stream immediately when
creating a stream, but do it only when we first access space.
Also, when updating the schema, we update the spaces in lazy
mode: each stream has it's own schema_version, when there is
some access to stream space we compare stream schema_version
and connection schema_version and if they are different update
clear stream space cache and wrap space that is being accessed
to stream cache.

Part of #5860

@TarantoolBot document
Title: stream support was added to net.box
In "net.box", stream is an object over connection that
has the same methods, but all requests from it sends
with non-zero stream ID. Stream ID is generated on the
client automatically. Simple example of stream creation
using net.box:
```lua
stream = conn:new_stream()
-- all connection methods are valid, but send requests
-- with non zero stream_id.
```
alyapunov pushed a commit that referenced this issue Aug 13, 2021
Adding interactive transactions over iproto streamss requires
adding new request types for begin, commit and rollback them.
The type names of these new requests conflict with the existing
names for the 'raft' requests. Adding RAFT prefix for all requests
related to 'raft' resolves this problem.

Part of #5860

@TarantoolBot document
Title: add RAFT prefix for all requests related to 'raft'.
Rename IPROTO_PROMOTE, IPROTO_DEMOTE, IPROTO_CONFIRM and
IPROTO_ROLLBACK to IPROTO_RAFT_PROMOTE, IPROTO_RAFT_DEMOTE,
IPROTO_RAFT_CONFIRM and IPROTO_RAFT_ROLLBACK accordingly.
alyapunov pushed a commit that referenced this issue Aug 13, 2021
Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.

Part of #5860

@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK
accordingly. If disconnect occurs when there is some active transaction
in stream, this transaction will be rollbacked, if it does not have time
to commit before this moment. Add new command codes for begin, commit and
rollback transactions: `IPROTO_BEGIN 14`, `IPROTO_COMMIT 15` and
`IPROTO_ROLLBACK 16` accordingly.
kyukhin pushed a commit that referenced this issue Dec 23, 2021
To apply a client request, we only need to know its type and body. All
the meta information, such as LSN, TSN, or replica id, must be set by
WAL. Currently, however, it isn't necessarily true: iproto leaves a
request header received over iproto as is, and tx will reuse the header
instead of allocating a new one in this case, which is needed to process
replication requests, see txn_add_redo().

Unless a client actually sets one of those meta fields, this causes no
problems. However, if we added transaction support to the replication
protocol, reusing the header would result in broken xlog, because
currently, all requests received over iproto have the is_commit field
set in xrow_header for the lack of TSN, while is_commit must only be set
for the final statement in a transaction. One way to fix it would be
clearing is_commit explicitly in iproto, but ignoring the whole header
received over iproto looks more logical and error-proof.

Needed for #5860

(cherry picked from commit 4fefb51)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
epic feature A new functionality
Projects
None yet
Development

No branches or pull requests

7 participants