Skip to content

[POC] feat: implementing multi-stream-append #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kurrentdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version = "1.0.0-alpha.3"
# Uncomment if you want to update messages.rs code-gen.
# We disabled codegen.rs because it requires having `protoc` installed on your machine
# in order to build that library.
# build = "codegen.rs"
build = "codegen.rs"

description = "Official KurrentDB gRPC client"
keywords = ["database", "event-sourcing", "ddd", "cqrs", "kurrent"]
Expand Down
6 changes: 6 additions & 0 deletions kurrentdb/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub fn generate() -> Result<(), Box<dyn std::error::Error>> {
"protos/monitoring.proto",
"protos/operations.proto",
"protos/users.proto",
"protos/multi-append.proto",
];

fs::create_dir_all(out_dir)?;
Expand All @@ -52,6 +53,8 @@ pub fn generate() -> Result<(), Box<dyn std::error::Error>> {
"ReadEvent.RecordedEvent.custom_metadata",
"ReadEvent.RecordedEvent.data",
"StreamIdentifier.stream_name",
"AppendRecord.data",
"DynamicValue.bytes_value",
])
.out_dir(out_dir)
.extern_path(".event_store.client.Empty", "()")
Expand Down Expand Up @@ -91,6 +94,9 @@ pub fn generate() -> Result<(), Box<dyn std::error::Error>> {
} else if filename_string.as_str() == "google.rpc.rs" {
let new_file = file.path().parent().unwrap().join("google_rpc.rs");
fs::rename(file.path(), new_file)?;
} else if filename_string.as_str() == "kurrentdb.protocol.v2.rs" {
let new_file = file.path().parent().unwrap().join("new_streams.rs");
fs::rename(file.path(), new_file)?;
}
}

Expand Down
210 changes: 210 additions & 0 deletions kurrentdb/protos/multi-append.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
syntax = "proto3";

package kurrentdb.protocol.v2;

option csharp_namespace = "KurrentDB.Protocol.Streams.V2";
option java_package = "io.kurrentdb.streams.v2";
option java_multiple_files = true;

import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";

service StreamsService {
// Executes an atomic operation to append records to multiple streams.
// This transactional method ensures that all appends either succeed
// completely, or are entirely rolled back, thereby maintaining strict data
// consistency across all involved streams.
rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse);

// Streaming version of MultiStreamAppend that allows clients to send multiple
// append requests over a single connection. When the stream completes, all
// records are appended transactionally (all succeed or fail together).
// Provides improved efficiency for high-throughput scenarios while
// maintaining the same transactional guarantees.
rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse);
}

message ProtocolDataUnit {
string id = 1;
map<string, DynamicValue> properties = 2;
bytes data = 3;
google.protobuf.Timestamp timestamp = 4;
}

// Record to be appended to a stream.
message AppendRecord {
// Universally Unique identifier for the record.
// If not provided, the server will generate a new one.
optional string record_id = 1;
// A collection of properties providing additional system information about the
// record.
map<string, DynamicValue> properties = 2;
// The actual data payload of the record, stored as bytes.
bytes data = 3;
// // Optional timestamp indicating when the record was created.
// // If not provided, the server will use the current time.
// optional google.protobuf.Timestamp timestamp = 4;
}

// Constants that match the expected state of a stream during an
// append operation. It can be used to specify whether the stream should exist,
// not exist, or can be in any state.
enum ExpectedRevisionConstants {
// The stream should exist and the expected revision should match the current
EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0;
// It is not important whether the stream exists or not.
EXPECTED_REVISION_CONSTANTS_ANY = -2;
// The stream should not exist. If it does, the append will fail.
EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1;
// The stream should exist
EXPECTED_REVISION_CONSTANTS_EXISTS = -4;
}

// Represents the input for appending records to a specific stream.
message AppendStreamRequest {
// The name of the stream to append records to.
string stream = 1;
// The records to append to the stream.
repeated AppendRecord records = 2;
// The expected revision of the stream. If the stream's current revision does
// not match, the append will fail.
// The expected revision can also be one of the special values
// from ExpectedRevisionConstants.
// Missing value means no expectation, the same as EXPECTED_REVISION_CONSTANTS_ANY
optional sint64 expected_revision = 3;
}

// Success represents the successful outcome of an append operation.
message AppendStreamSuccess {
// The name of the stream to which records were appended.
string stream = 1;
// The position of the last appended record in the stream.
int64 position = 2;
// The expected revision of the stream after the append operation.
int64 stream_revision = 3;
}

// Failure represents the detailed error information when an append operation fails.
message AppendStreamFailure {
// The name of the stream to which records were appended.
string stream = 1;

// The error details
oneof error {
// Failed because the actual stream revision didn't match the expected revision.
ErrorDetails.WrongExpectedRevision wrong_expected_revision = 2;
// Failed because the client lacks sufficient permissions.
ErrorDetails.AccessDenied access_denied = 3;
// Failed because the target stream has been deleted.
ErrorDetails.StreamDeleted stream_deleted = 4;
}
}

// AppendStreamOutput represents the output of appending records to a specific
// stream.
message AppendStreamResponse {
// The result of the append operation.
oneof result {
// Success represents the successful outcome of an append operation.
AppendStreamSuccess success = 1;
// Failure represents the details of a failed append operation.
AppendStreamFailure failure = 2;
}
}

// MultiStreamAppendRequest represents a request to append records to multiple streams.
message MultiStreamAppendRequest {
// A list of AppendStreamInput messages, each representing a stream to which records should be appended.
repeated AppendStreamRequest input = 1;
}

// Response from the MultiStreamAppend operation.
message MultiStreamAppendResponse {
oneof result {
// Success represents the successful outcome of a multi-stream append operation.
Success success = 1;
// Failure represents the details of a failed multi-stream append operation.
Failure failure = 2;
}

message Success {
repeated AppendStreamSuccess output = 1;
}

message Failure {
repeated AppendStreamFailure output = 1;
}
}

// ErrorDetails provides detailed information about specific error conditions.
message ErrorDetails {
// When the user does not have sufficient permissions to perform the operation.
message AccessDenied {
// The simplified reason for access denial.
string reason = 1;
}

// When the stream has been deleted.
message StreamDeleted {
// The time when the stream was deleted.
google.protobuf.Timestamp deleted_at = 1;

// If the stream was hard deleted, you cannot reuse the stream name,
// it will raise an exception if you try to append to it again.
bool tombstoned = 2;
}

// When the expected revision of the stream does not match the actual revision.
message WrongExpectedRevision {
// The actual revision of the stream.
int64 stream_revision = 1;
}

// When the transaction exceeds the maximum size allowed
// (its bigger than the configured chunk size).
message TransactionMaxSizeExceeded {
// The maximum allowed size of the transaction.
uint32 max_size = 1;
}
}

//===================================================================
// Shared
//===================================================================

// Represents a list of dynamically typed values.
message ListDynamicValue {
// Repeated property of dynamically typed values.
repeated DynamicValue values = 1;
}

// Represents a dynamic value
message DynamicValue {
oneof kind {
// Represents a null value.
google.protobuf.NullValue null_value = 1;
// Represents a 32-bit signed integer value.
sint32 int32_value = 2;
// Represents a 64-bit signed integer value.
sint64 int64_value = 3;
// Represents a byte array value.
bytes bytes_value = 4;
// Represents a 64-bit double-precision floating-point value.
double double_value = 5;
// Represents a 32-bit single-precision floating-point value
float float_value = 6;
// Represents a string value.
string string_value = 7;
// Represents a boolean value.
bool boolean_value = 8;
// Represents a timestamp value.
google.protobuf.Timestamp timestamp_value = 9;
// Represents a duration value.
google.protobuf.Duration duration_value = 10;
// // Represents a list of dynamic values.
// ListDynamicValue list_value = 11;
// // Represents a json struct
// google.protobuf.Struct struct_value = 12;
}
}
55 changes: 48 additions & 7 deletions kurrentdb/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use crate::options::read_stream::ReadStreamOptions;
use crate::options::subscribe_to_stream::SubscribeToStreamOptions;
use crate::server_features::ServerInfo;
use crate::{
DeletePersistentSubscriptionOptions, DeleteStreamOptions, GetPersistentSubscriptionInfoOptions,
ListPersistentSubscriptionsOptions, MetadataStreamName, PersistentSubscription,
PersistentSubscriptionInfo, PersistentSubscriptionToAllOptions, Position, ReadStream,
ReplayParkedMessagesOptions, RestartPersistentSubscriptionSubsystem, RevisionOrPosition,
StreamMetadata, StreamMetadataResult, StreamName, SubscribeToAllOptions,
AppendRequest, DeletePersistentSubscriptionOptions, DeleteStreamOptions,
GetPersistentSubscriptionInfoOptions, ListPersistentSubscriptionsOptions, MetadataStreamName,
MultiWriteResult, PersistentSubscription, PersistentSubscriptionInfo,
PersistentSubscriptionToAllOptions, Position, ReadStream, ReplayParkedMessagesOptions,
RestartPersistentSubscriptionSubsystem, RevisionOrPosition, StreamMetadata,
StreamMetadataResult, StreamName, StreamState, SubscribeToAllOptions,
SubscribeToPersistentSubscriptionOptions, Subscription, TombstoneStreamOptions,
VersionedMetadata, WriteResult, commands,
VersionedMetadata, WriteResult, commands, new_commands,
};
use crate::{
EventData,
Expand Down Expand Up @@ -78,7 +79,47 @@ impl Client {
where
Events: ToEvents,
{
commands::append_to_stream(&self.client, stream_name, options, events.into_events()).await
let req = AppendRequest {
stream: unsafe { String::from_utf8_unchecked(stream_name.into_stream_name().to_vec()) },
events: events.into_events().collect(),
state: match options.version {
crate::event_store::client::streams::append_req::options::ExpectedStreamRevision::Revision(r) => StreamState::StreamRevision(r),
crate::event_store::client::streams::append_req::options::ExpectedStreamRevision::NoStream(_) => StreamState::NoStream,
crate::event_store::client::streams::append_req::options::ExpectedStreamRevision::Any(_) => StreamState::Any,
crate::event_store::client::streams::append_req::options::ExpectedStreamRevision::StreamExists(_) => StreamState::StreamExists,
},
};

let result = self
.multi_append_stream(options, vec![req].into_iter())
.await?;

match result {
MultiWriteResult::Success(items) => Ok(WriteResult {
next_expected_version: items[0].next_expected_version,
position: items[0].position,
}),

MultiWriteResult::Failure(items) => match items[0].error {
crate::MultiAppendWriteError::AccessDenied { .. } => {
Err(crate::Error::AccessDenied)
}
crate::MultiAppendWriteError::StreamDeleted { .. } => {
Err(crate::Error::ResourceDeleted)
}
crate::MultiAppendWriteError::WrongExpectedRevision { current, expected } => {
Err(crate::Error::WrongExpectedVersion { expected, current })
}
},
}
}

pub async fn multi_append_stream(
&self,
options: &AppendToStreamOptions,
events: impl Iterator<Item = AppendRequest> + Send + 'static,
) -> crate::Result<MultiWriteResult> {
new_commands::multi_stream_append(&self.client, options, events).await
}

// Sets a stream metadata.
Expand Down
2 changes: 1 addition & 1 deletion kurrentdb/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,7 @@ pub async fn restart_persistent_subscription_subsystem(
Ok(())
}

fn create_streams_client(handle: Handle) -> StreamsClient<HyperClient> {
pub(crate) fn create_streams_client(handle: Handle) -> StreamsClient<HyperClient> {
StreamsClient::with_origin(handle.client, handle.uri)
.max_decoding_message_size(client::MAX_RECEIVE_MESSAGE_SIZE)
}
Expand Down
Loading
Loading