Skip to content

Cardinality #316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,26 @@ public static class Constants
// EventHubs
public static string OutputEventHubQueueName = "test-eventhuboutput-java";
public static string InputEventHubName = "test-input-java";

public static string OutputJsonEventHubQueueName = "test-eventhuboutputjson-java";
public static string InputJsonEventHubName = "test-inputjson-java";

public static string OutputOneEventHubQueueName = "test-eventhuboutputone-java";
public static string InputCardinalityOneEventHubName = "test-inputOne-java";

public static string OutputBinaryOneQueueName = "test-binary-output-cardinality-one-java";
public static string InputBinaryOneEventHubQueueName = "test-binary-input-cardinality-one-java";

public static string OutputBinaryManyQueueName = "test-binary-output-cardinality-many-list-java";
public static string InputBinaryManyEventHubQueueName = "test-binary-input-cardinality-many-list-java";

public static string OutputBinaryArrayManyQueueName = "test-binary-output-cardinality-many-array-java";
public static string InputBinaryManyArrayEventHubQueueName = "test-binary-input-cardinality-many-array-java";

// Settings
public static string EventHubsConnectionStringSenderSetting = Environment.GetEnvironmentVariable("AzureWebJobsEventHubSender");
public static string EventHubsConnectionStringSenderSetting2 = Environment.GetEnvironmentVariable("AzureWebJobsEventHubSender_2");

public static string EventHubsConnectionStringSetting = Environment.GetEnvironmentVariable("AzureWebJobsEventHubSender");

// Xunit Fixtures and Collections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection.Metadata;
using System.Threading.Tasks;
using Xunit;

Expand All @@ -29,7 +30,7 @@ public async Task EventHubTriggerAndOutputJSON_Succeeds()
await SetupQueue(Constants.OutputJsonEventHubQueueName);

// Need to setup EventHubs: test-inputjson-java and test-outputjson-java
await EventHubsHelpers.SendJSONMessagesAsync(expectedEventId);
await EventHubsHelpers.SendJSONMessagesAsync(expectedEventId, Constants.EventHubsConnectionStringSenderSetting);

//Verify
var queueMessage = await StorageHelpers.ReadFromQueue(Constants.OutputJsonEventHubQueueName);
Expand All @@ -52,7 +53,7 @@ public async Task EventHubTriggerAndOutputString_Succeeds()
await SetupQueue(Constants.OutputEventHubQueueName);

// Need to setup EventHubs: test-input-java and test-output-java
await EventHubsHelpers.SendMessagesAsync(expectedEventId, Constants.InputEventHubName);
await EventHubsHelpers.SendMessagesAsync(expectedEventId, Constants.InputEventHubName, Constants.EventHubsConnectionStringSenderSetting);

//Verify
var queueMessage = await StorageHelpers.ReadFromQueue(Constants.OutputEventHubQueueName);
Expand All @@ -74,7 +75,7 @@ public async Task EventHubTriggerCardinalityOne_Succeeds()
await SetupQueue(Constants.OutputOneEventHubQueueName);

// Need to setup EventHubs: test-inputOne-java and test-outputone-java
await EventHubsHelpers.SendMessagesAsync(expectedEventId, Constants.InputCardinalityOneEventHubName);
await EventHubsHelpers.SendMessagesAsync(expectedEventId, Constants.InputCardinalityOneEventHubName, Constants.EventHubsConnectionStringSenderSetting);

//Verify
IEnumerable<string> queueMessages = await StorageHelpers.ReadMessagesFromQueue(Constants.OutputOneEventHubQueueName);
Expand All @@ -87,6 +88,72 @@ public async Task EventHubTriggerCardinalityOne_Succeeds()
}
}

/*
[Fact]
public async Task EventHubTriggerAndOutputBinaryListMany_Succeeds()
{
string expectedEventId = Guid.NewGuid().ToString();
try
{
await SetupQueue(Constants.OutputBinaryManyQueueName);

await EventHubsHelpers.SendMessagesAsync(expectedEventId, Constants.InputBinaryManyEventHubQueueName, Constants.EventHubsConnectionStringSenderSetting2);

//Verify
var queueMessage = await StorageHelpers.ReadFromQueue(Constants.OutputBinaryManyQueueName);
Assert.Contains(expectedEventId, queueMessage);
}
finally
{
//Clear queue
await StorageHelpers.ClearQueue(Constants.OutputEventHubQueueName);
}
}
*/
[Fact]
public async Task EventHubTriggerAndOutputBinaryOne_Succeeds()
{
string expectedEventId = Guid.NewGuid().ToString();
try
{
await SetupQueue(Constants.OutputBinaryOneQueueName);

await EventHubsHelpers.SendMessagesAsync(expectedEventId, Constants.InputBinaryOneEventHubQueueName, Constants.EventHubsConnectionStringSenderSetting2);

//Verify
var queueMessage = await StorageHelpers.ReadFromQueue(Constants.OutputBinaryOneQueueName);
Assert.Contains(expectedEventId, queueMessage);
}
finally
{
//Clear queue
await StorageHelpers.ClearQueue(Constants.OutputEventHubQueueName);
}
}
/*
[Fact]
public async Task EventHubTriggerAndOutputBinaryArrayMany_Succeeds()
{
string expectedEventId = Guid.NewGuid().ToString();
try
{
await SetupQueue(Constants.OutputBinaryArrayManyQueueName);

await EventHubsHelpers.SendMessagesAsync(expectedEventId, Constants.InputBinaryManyArrayEventHubQueueName, Constants.EventHubsConnectionStringSenderSetting2);

//Verify
var queueMessage = await StorageHelpers.ReadFromQueue(Constants.OutputBinaryArrayManyQueueName);
Assert.Contains(expectedEventId, queueMessage);
}
finally
{
//Clear queue
await StorageHelpers.ClearQueue(Constants.OutputEventHubQueueName);
}
}
*/


private static async Task SetupQueue(string queueName)
{
//Clear queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Azure.Functions.Java.Tests.E2E
{
public class EventHubsHelpers
{
public static async Task SendJSONMessagesAsync(string eventId)
public static async Task SendJSONMessagesAsync(string eventId, string connectionString)
{
// write 3 events
List<EventData> events = new List<EventData>();
Expand All @@ -29,13 +29,13 @@ public static async Task SendJSONMessagesAsync(string eventId)
events.Add(evt);
}

EventHubsConnectionStringBuilder builder = new EventHubsConnectionStringBuilder(Constants.EventHubsConnectionStringSetting);
EventHubsConnectionStringBuilder builder = new EventHubsConnectionStringBuilder(connectionString);
builder.EntityPath = Constants.InputJsonEventHubName;
EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(builder.ToString());
await eventHubClient.SendAsync(events);
}

public static async Task SendMessagesAsync(string eventId, string evenHubName)
public static async Task SendMessagesAsync(string eventId, string evenHubName, string connectionString)
{
// write 3 events
List<EventData> events = new List<EventData>();
Expand All @@ -48,7 +48,7 @@ public static async Task SendMessagesAsync(string eventId, string evenHubName)
events.Add(evt);
}

EventHubsConnectionStringBuilder builder = new EventHubsConnectionStringBuilder(Constants.EventHubsConnectionStringSetting);
EventHubsConnectionStringBuilder builder = new EventHubsConnectionStringBuilder(connectionString);
builder.EntityPath = evenHubName;
EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(builder.ToString());
await eventHubClient.SendAsync(events);
Expand Down
1 change: 1 addition & 0 deletions endtoendtests/local.settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"AzureWebJobsServiceBus": "",
"AzureWebJobsEventHubReceiver":"",
"AzureWebJobsEventHubSender":"",
"AzureWebJobsEventHubSender_2":"",
"AzureWebJobsEventHubPath":"",
"CosmosDBDatabaseName":"",
"CosmosDBCollectionName":"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void EventHubTriggerCardinalityOne(
/**
* This function verifies the above functions
*/
@FunctionName("TestEventHubOutputJson")
@FunctionName("EventHubOutputJson")
public void TestEventHubOutputJson(
@EventHubTrigger(name = "message", eventHubName = "test-outputjson-java", connection = "AzureWebJobsEventHubSender") String message,
@QueueOutput(name = "output", queueName = "test-eventhuboutputjson-java", connection = "AzureWebJobsStorage") OutputBinding<String> output,
Expand All @@ -57,7 +57,7 @@ public void TestEventHubOutputJson(
output.setValue(message);
}

@FunctionName("TestEventHubOutput")
@FunctionName("EventHubOutput")
public void TestEventHubOutput(
@EventHubTrigger(name = "message", eventHubName = "test-output-java", connection = "AzureWebJobsEventHubSender", cardinality = Cardinality.ONE) String message,
@QueueOutput(name = "output", queueName = "test-eventhuboutput-java", connection = "AzureWebJobsStorage") OutputBinding<String> output,
Expand All @@ -67,7 +67,7 @@ public void TestEventHubOutput(
output.setValue(message);
}

@FunctionName("TestEventHubOutputInputOne")
@FunctionName("EventHubOutputInputOne")
public void TestEventHubOutputInputOne(
@EventHubTrigger(name = "message", eventHubName = "test-outputone-java", connection = "AzureWebJobsEventHubSender", cardinality = Cardinality.ONE) String message,
@QueueOutput(name = "output", queueName = "test-eventhuboutputone-java", connection = "AzureWebJobsStorage") OutputBinding<String> output,
Expand All @@ -77,11 +77,40 @@ public void TestEventHubOutputInputOne(
output.setValue(message);
}

@FunctionName("EventHubTriggerAndOutputBinaryCardinalityManyListBinary")
public void EventHubTriggerAndOutputBinaryCardinalityManyListBinary(
@EventHubTrigger(name = "messages", eventHubName = "test-binary-input-cardinality-many-list-java", connection = "AzureWebJobsEventHubSender_2", dataType = "binary", cardinality = Cardinality.MANY) List<byte[]> messages,
@QueueOutput(name = "output", queueName = "test-binary-output-cardinality-many-list-java", connection = "AzureWebJobsStorage") OutputBinding<byte[]> output,
final ExecutionContext context
) {
context.getLogger().info("Java Event Hub trigger received " + messages.size() +" messages");
output.setValue(messages.get(0));
}

@FunctionName("EventHubTriggerAndOutputBinaryCardinalityOne")
public void EventHubTriggerAndOutputBinaryCardinalityOne(
@EventHubTrigger(name = "message", eventHubName = "test-binary-input-cardinality-one-java", connection = "AzureWebJobsEventHubSender_2", dataType = "binary", cardinality = Cardinality.ONE) byte[] message,
@QueueOutput(name = "output", queueName = "test-binary-output-cardinality-one-java",connection = "AzureWebJobsStorage") OutputBinding<byte[]> output,
final ExecutionContext context
) {
context.getLogger().info("Java Event Hub trigger received message" + message);
output.setValue(message);
}

@FunctionName("EventHubTriggerAndOutputBinaryCardinalityManyArrayBinary")
public void EventHubTriggerAndOutputBinaryCardinalityManyArrayBinary(
@EventHubTrigger(name = "messages", eventHubName = "test-binary-input-cardinality-many-array-java", connection = "AzureWebJobsEventHubSender_2", dataType = "binary", cardinality = Cardinality.MANY) byte[][] messages,
@QueueOutput(name = "output", queueName = "test-binary-output-cardinality-many-array-java", connection = "AzureWebJobsStorage") OutputBinding<byte[]> output,
final ExecutionContext context
) {
context.getLogger().info("Java Event Hub trigger received " + messages.length +" messages");
output.setValue(messages[0]);
}

public static class SystemProperty {
public String SequenceNumber;
public String Offset;
public String PartitionKey;
public String EnqueuedTimeUtc;
}

}
25 changes: 18 additions & 7 deletions src/main/azure-functions-language-worker-protobuf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,30 @@ From within the Azure Functions language worker repo:
1. Define remote branch for cleaner git commands
- `git remote add proto-file https://github.com/azure/azure-functions-language-worker-protobuf.git`
- `git fetch proto-file`
2. Merge updates
- `git merge -s subtree proto-file/<version branch> --squash --allow-unrelated-histories`
- You can also merge with an explicit path to subtree: `git merge -X subtree=<path in language worker repo> --squash proto-file/<version branch> --allow-unrelated-histories`
3. Finalize with commit
- `git commit -m "Updated subtree from https://github.com/azure/azure-functions-language-worker-protobuf. Branch: <version branch>. Commit: <latest protobuf commit hash>"`
2. Pull a specific release tag
- `git fetch proto-file refs/tags/<tag-name>`
- Example: `git fetch proto-file refs/tags/v1.1.0-protofile`
3. Merge updates
- Merge with an explicit path to subtree: `git merge -X subtree=<path in language worker repo> --squash <tag-name> --allow-unrelated-histories --strategy-option theirs`
- Example: `git merge -X subtree=src/WebJobs.Script.Grpc/azure-functions-language-worker-protobuf --squash v1.1.0-protofile --allow-unrelated-histories --strategy-option theirs`
4. Finalize with commit
- `git commit -m "Updated subtree from https://github.com/azure/azure-functions-language-worker-protobuf. Tag: <tag-name>. Commit: <commit hash>"`
- `git push`


## Releasing a Language Worker Protobuf version

1. Draft a release in the GitHub UI
- Be sure to inculde details of the release
2. Create a release version, following semantic versioning guidelines ([semver.org](https://semver.org/))
3. Tag the version with the pattern: `v<M>.<m>.<p>-protofile` (example: `v1.1.0-protofile`)
3. Merge `dev` to `master`

## Consuming FunctionRPC.proto
*Note: Update versionNumber before running following commands*

## CSharp
```
set NUGET_PATH=%UserProfile%\.nuget\packages
set NUGET_PATH="%UserProfile%\.nuget\packages"
set GRPC_TOOLS_PATH=%NUGET_PATH%\grpc.tools\<versionNumber>\tools\windows_x86
set PROTO_PATH=.\azure-functions-language-worker-protobuf\src\proto
set PROTO=.\azure-functions-language-worker-protobuf\src\proto\FunctionRpc.proto
Expand Down
Loading