Skip to content

feat: Add delete one message or many messages feature for published a… #1674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions src/DotNetCore.CAP.Dashboard/RouteActionProvider.cs
Original file line number Diff line number Diff line change
@@ -40,7 +40,8 @@ public RouteActionProvider(IEndpointRouteBuilder builder, DashboardOptions optio
_agent = _serviceProvider.GetService<GatewayProxyAgent>(); // may be null
}

private IMonitoringApi MonitoringApi => _serviceProvider.GetRequiredService<IDataStorage>().GetMonitoringApi();
private IDataStorage DataStorage => _serviceProvider.GetRequiredService<IDataStorage>();
private IMonitoringApi MonitoringApi => DataStorage.GetMonitoringApi();

public void MapDashboardRoutes()
{
@@ -54,7 +55,9 @@ public void MapDashboardRoutes()
_builder.MapGet(prefixMatch + "/published/message/{id:long}", PublishedMessageDetails).AllowAnonymousIf(_options.AllowAnonymousExplicit, _options.AuthorizationPolicy);
_builder.MapGet(prefixMatch + "/received/message/{id:long}", ReceivedMessageDetails).AllowAnonymousIf(_options.AllowAnonymousExplicit, _options.AuthorizationPolicy);
_builder.MapPost(prefixMatch + "/published/requeue", PublishedRequeue).AllowAnonymousIf(_options.AllowAnonymousExplicit, _options.AuthorizationPolicy);
_builder.MapPost(prefixMatch + "/published/delete", PublishedDelete).AllowAnonymousIf(_options.AllowAnonymousExplicit, _options.AuthorizationPolicy);
_builder.MapPost(prefixMatch + "/received/reexecute", ReceivedRequeue).AllowAnonymousIf(_options.AllowAnonymousExplicit, _options.AuthorizationPolicy);
_builder.MapPost(prefixMatch + "/received/delete", ReceivedDelete).AllowAnonymousIf(_options.AllowAnonymousExplicit, _options.AuthorizationPolicy);
_builder.MapGet(prefixMatch + "/published/{status}", PublishedList).AllowAnonymousIf(_options.AllowAnonymousExplicit, _options.AuthorizationPolicy);
_builder.MapGet(prefixMatch + "/received/{status}", ReceivedList).AllowAnonymousIf(_options.AllowAnonymousExplicit, _options.AuthorizationPolicy);
_builder.MapGet(prefixMatch + "/subscriber", Subscribers).AllowAnonymousIf(_options.AllowAnonymousExplicit, _options.AuthorizationPolicy);
@@ -79,7 +82,7 @@ public async Task MetaInfo(HttpContext httpContext)
var cap = _serviceProvider.GetService<CapMarkerService>();
var broker = _serviceProvider.GetService<CapMessageQueueMakerService>();
var storage = _serviceProvider.GetService<CapStorageMarkerService>();

await httpContext.Response.WriteAsJsonAsync(new
{
cap,
@@ -215,6 +218,23 @@ public async Task PublishedRequeue(HttpContext httpContext)
httpContext.Response.StatusCode = StatusCodes.Status204NoContent;
}

public async Task PublishedDelete(HttpContext httpContext)
{
if (_agent != null && await _agent.Invoke(httpContext)) return;

var messageIds = await httpContext.Request.ReadFromJsonAsync<long[]>();
if (messageIds == null || messageIds.Length == 0)
{
httpContext.Response.StatusCode = StatusCodes.Status422UnprocessableEntity;
return;
}

foreach (var messageId in messageIds)
_ = await DataStorage.DeletePublishedMessageAsync(messageId);

httpContext.Response.StatusCode = StatusCodes.Status204NoContent;
}

public async Task ReceivedRequeue(HttpContext httpContext)
{
if (_agent != null && await _agent.Invoke(httpContext)) return;
@@ -236,6 +256,24 @@ public async Task ReceivedRequeue(HttpContext httpContext)
httpContext.Response.StatusCode = StatusCodes.Status204NoContent;
}

public async Task ReceivedDelete(HttpContext httpContext)
{
if (_agent != null && await _agent.Invoke(httpContext)) return;

var messageIds = await httpContext.Request.ReadFromJsonAsync<long[]>();
if (messageIds == null || messageIds.Length == 0)
{
httpContext.Response.StatusCode = StatusCodes.Status422UnprocessableEntity;
return;
}

foreach (var messageId in messageIds)
_ = await DataStorage.DeleteReceivedMessageAsync(messageId);

httpContext.Response.StatusCode = StatusCodes.Status204NoContent;
}


public async Task PublishedList(HttpContext httpContext)
{
if (_agent != null && await _agent.Invoke(httpContext)) return;
@@ -293,7 +331,7 @@ public async Task ReceivedList(HttpContext httpContext)
public async Task Subscribers(HttpContext httpContext)
{
if (_agent != null && await _agent.Invoke(httpContext)) return;

var cache = _serviceProvider.GetRequiredService<MethodMatcherCache>();
var subscribers = cache.GetCandidatesMethodsOfGroupNameGrouped();

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

74 changes: 0 additions & 74 deletions src/DotNetCore.CAP.Dashboard/wwwroot/dist/assets/index.8eb375a0.js

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.Dashboard/wwwroot/dist/index.html
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<link rel="icon" href="./favicon.ico">
<title>CAP Dashboard</title>
<script type="module" crossorigin src="./assets/index.8eb375a0.js"></script>
<script type="module" crossorigin src="./assets/index.7838251c.js"></script>
<link rel="stylesheet" href="./assets/index.856c0890.css">
</head>

Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ export default {
Tags: "Tags",
Actions: "Actions",
ReexecuteSuccess: "😀 Reexecute Successful !",
DeleteSuccess: "😀 Delete Successful !",
RequeueSuccess: "😀 Requeue Successfull !",
SelectNamespaces: "-- Please select a kubernetes namespace --",
Latency: "Latency",
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ export default {
Tags: "标签",
Actions: "动作",
ReexecuteSuccess: "😀 重新执行成功!",
DeleteSuccess: "😀 删除成功!",
RequeueSuccess: "😀 重新发布成功!",
SwitchedNode: "切换的节点",
Storage: "存储",
29 changes: 27 additions & 2 deletions src/DotNetCore.CAP.Dashboard/wwwroot/src/pages/Published.vue
Original file line number Diff line number Diff line change
@@ -44,10 +44,14 @@
<b-row>
<b-col md="12">
<b-btn-toolbar class="mt-4">
<b-button size="sm" variant="dark" @click="requeue" :disabled="!selectedItems.length">
<b-button size="sm" variant="dark" @click="requeue" :disabled="!selectedItems.length" class="action-button">
<b-icon-arrow-repeat aria-hidden="true"></b-icon-arrow-repeat>
{{ requeueTitle }}
</b-button>
<b-button size="sm" variant="danger" @click="deletemsg" :disabled="!selectedItems.length" class="action-button">
<b-icon-trash aria-hidden="true"></b-icon-trash>
{{ $t("Delete") }}
</b-button>
<div class="pagination">
<span style="font-size: 14px">{{ $t("Page Size") }}:</span>
<b-button-group class="ml-2">
@@ -102,7 +106,7 @@ import JSONBIG from "json-bigint";
import {
BIconInfoCircleFill,
BIconArrowRepeat,
BIconSearch
BIconSearch, BIconTrash
} from 'bootstrap-vue';
const formDataTpl = {
@@ -113,6 +117,7 @@ const formDataTpl = {
};
export default {
components: {
BIconTrash,
BIconInfoCircleFill,
BIconArrowRepeat,
BIconSearch
@@ -276,6 +281,22 @@ export default {
_this.clear();
});
},
deletemsg: function () {
const _this = this;
axios.post('/published/delete', this.selectedItems.map((item) => item.id)).then(() => {
this.selectedItems.map((item) => {
_this.$bvToast.toast(this.$t("DeleteSuccess") + " " + item.id, {
title: "Tips",
variant: "secondary",
autoHideDelay: 1000,
appendToast: true,
solid: true
});
});
_this.fetchData();
_this.clear();
});
},
clear() {
this.items = this.items.map((item) => {
return {
@@ -317,4 +338,8 @@ export default {
.my-align-middle {
vertical-align: middle;
}
.action-button {
margin-right: 1rem;
}
</style>
32 changes: 29 additions & 3 deletions src/DotNetCore.CAP.Dashboard/wwwroot/src/pages/Received.vue
Original file line number Diff line number Diff line change
@@ -39,10 +39,14 @@
<b-row>
<b-col md="12">
<b-btn-toolbar class="mt-4">
<b-button size="sm" variant="dark" @click="reexecute" :disabled="!selectedItems.length">
<b-button size="sm" variant="dark" @click="reexecute" :disabled="!selectedItems.length" class="action-button">
<b-icon-arrow-repeat aria-hidden="true"></b-icon-arrow-repeat>
{{ $t("Re-execute") }}
</b-button>
<b-button size="sm" variant="danger" @click="deletemsg" :disabled="!selectedItems.length" class="action-button">
<b-icon-trash aria-hidden="true"></b-icon-trash>
{{ $t("Delete") }}
</b-button>
<div class="pagination">
<span style="font-size: 14px"> {{ $t("Page Size") }}:</span>
<b-button-group class="ml-2">
@@ -98,7 +102,8 @@ import axios from "axios";
import JSONBIG from "json-bigint";
import {
BIconArrowRepeat,
BIconSearch
BIconSearch,
BIconTrash,
} from 'bootstrap-vue';
const formDataTpl = {
@@ -111,7 +116,8 @@ const formDataTpl = {
export default {
components: {
BIconArrowRepeat,
BIconSearch
BIconSearch,
BIconTrash
},
props: {
status: {},
@@ -259,6 +265,22 @@ export default {
_this.clear();
});
},
deletemsg: function () {
const _this = this;
axios.post('/received/delete', this.selectedItems.map((item) => item.id)).then(() => {
this.selectedItems.map((item) => {
_this.$bvToast.toast(this.$t("DeleteSuccess") + " " + item.id, {
title: "Tips",
variant: "secondary",
autoHideDelay: 1000,
appendToast: true,
solid: true
});
});
_this.fetchData();
_this.clear();
});
},
clear() {
this.items = this.items.map((item) => {
return {
@@ -296,4 +318,8 @@ export default {
color: white;
background-color: black;
}
.action-button {
margin-right: 1rem;
}
</style>
15 changes: 14 additions & 1 deletion src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -209,6 +210,18 @@ public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan
return Task.FromResult(result);
}

public Task<int> DeleteReceivedMessageAsync(long id)
{
var deleteResult = ReceivedMessages.TryRemove(id.ToString(CultureInfo.InvariantCulture), out _);
return Task.FromResult(deleteResult ? 1 : 0);
}

public Task<int> DeletePublishedMessageAsync(long id)
{
var deleteResult = PublishedMessages.TryRemove(id.ToString(CultureInfo.InvariantCulture), out _);
return Task.FromResult(deleteResult ? 1 : 0);
}

public Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
CancellationToken token = default)
{
@@ -224,4 +237,4 @@ public IMonitoringApi GetMonitoringApi()
{
return new InMemoryMonitoringApi();
}
}
}
16 changes: 15 additions & 1 deletion src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs
Original file line number Diff line number Diff line change
@@ -290,6 +290,20 @@ public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(Tim
}).ToList();
}

public async Task<int> DeleteReceivedMessageAsync(long id)
{
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);
var deleteResult = await collection.DeleteOneAsync(x => x.Id == id).ConfigureAwait(false);
return (int)deleteResult.DeletedCount;
}

public async Task<int> DeletePublishedMessageAsync(long id)
{
var collection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);
var deleteResult = await collection.DeleteOneAsync(x => x.Id == id).ConfigureAwait(false);
return (int)deleteResult.DeletedCount;
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
CancellationToken token = default)
{
@@ -360,4 +374,4 @@ public IMonitoringApi GetMonitoringApi()
{
return new MongoDBMonitoringApi(_client, _options, _serializer);
}
}
}
28 changes: 24 additions & 4 deletions src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
Original file line number Diff line number Diff line change
@@ -213,14 +213,14 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
return await connection.ExecuteNonQueryAsync(
$@"DELETE P FROM `{table}` AS P
JOIN (
SELECT Id
SELECT Id
FROM `{table}`
WHERE ExpiresAt < @timeout
WHERE ExpiresAt < @timeout
AND StatusName IN ('{StatusName.Succeeded}', '{StatusName.Failed}')
LIMIT @batchCount
) AS T ON P.Id = T.Id;",
null,
new MySqlParameter("@timeout", timeout),
new MySqlParameter("@timeout", timeout),
new MySqlParameter("@batchCount", batchCount)).ConfigureAwait(false);
}

@@ -234,6 +234,26 @@ public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan
return GetMessagesOfNeedRetryAsync(_recName, lookbackSeconds);
}

public async Task<int> DeleteReceivedMessageAsync(long id)
{
var sql = $"DELETE FROM `{_recName}` WHERE Id={id};";

var connection = new MySqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
var result = await connection.ExecuteNonQueryAsync(sql).ConfigureAwait(false);
return result;
}

public async Task<int> DeletePublishedMessageAsync(long id)
{
var sql = $"DELETE FROM `{_pubName}` WHERE Id={id};";

var connection = new MySqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
var result = await connection.ExecuteNonQueryAsync(sql).ConfigureAwait(false);
return result;
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
CancellationToken token = default)
{
@@ -356,4 +376,4 @@ private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(strin

return result;
}
}
}
30 changes: 25 additions & 5 deletions src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
Original file line number Diff line number Diff line change
@@ -209,11 +209,11 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
await using var _ = connection.ConfigureAwait(false);

return await connection.ExecuteNonQueryAsync(
$@"DELETE FROM {table}
$@"DELETE FROM {table}
WHERE ""Id"" IN (
SELECT ""Id""
FROM {table}
WHERE ""ExpiresAt"" < @timeout
SELECT ""Id""
FROM {table}
WHERE ""ExpiresAt"" < @timeout
AND ""StatusName"" IN ('{StatusName.Succeeded}','{StatusName.Failed}')
LIMIT @batchCount
)",
@@ -232,6 +232,26 @@ public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(Tim
return await GetMessagesOfNeedRetryAsync(_recName, lookbackSeconds).ConfigureAwait(false);
}

public async Task<int> DeleteReceivedMessageAsync(long id)
{
var sql = $@"DELETE FROM {_recName} WHERE ""Id""={id} FOR DELETE SKIP LOCKED";

var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
var result = await connection.ExecuteNonQueryAsync(sql);
return result;
}

public async Task<int> DeletePublishedMessageAsync(long id)
{
var sql = $@"DELETE FROM {_pubName} WHERE ""Id""={id} FOR DELETE SKIP LOCKED";

var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
var result = await connection.ExecuteNonQueryAsync(sql);
return result;
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
CancellationToken token = default)
{
@@ -356,4 +376,4 @@ private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(strin

return result;
}
}
}
30 changes: 25 additions & 5 deletions src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
Original file line number Diff line number Diff line change
@@ -209,15 +209,15 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
await using var _ = connection.ConfigureAwait(false);

return await connection.ExecuteNonQueryAsync(
$@"DELETE FROM {table}
$@"DELETE FROM {table}
WHERE Id IN (
SELECT TOP (@batchCount) Id
SELECT TOP (@batchCount) Id
FROM {table} WITH (READPAST)
WHERE ExpiresAt < @timeout
WHERE ExpiresAt < @timeout
AND StatusName IN('{StatusName.Succeeded}','{StatusName.Failed}')
);",
null,
new SqlParameter("@timeout", timeout),
new SqlParameter("@timeout", timeout),
new SqlParameter("@batchCount", batchCount)).ConfigureAwait(false);
}

@@ -231,6 +231,26 @@ public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan
return GetMessagesOfNeedRetryAsync(_recName, lookbackSeconds);
}

public async Task<int> DeleteReceivedMessageAsync(long id)
{
var sql = $"DELETE FROM {_recName} WHERE Id={id}";

var connection = new SqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
var affectedRowCount = await connection.ExecuteNonQueryAsync(sql).ConfigureAwait(false);
return affectedRowCount;
}

public async Task<int> DeletePublishedMessageAsync(long id)
{
var sql = $"DELETE FROM {_pubName} WHERE Id={id}";

var connection = new SqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
var affectedRowCount = await connection.ExecuteNonQueryAsync(sql).ConfigureAwait(false);
return affectedRowCount;
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
CancellationToken token = default)
{
@@ -354,4 +374,4 @@ private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(strin

return result;
}
}
}
6 changes: 5 additions & 1 deletion src/DotNetCore.CAP/Persistence/IDataStorage.cs
Original file line number Diff line number Diff line change
@@ -39,6 +39,10 @@ public interface IDataStorage

Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds);

Task<int> DeleteReceivedMessageAsync(long id);

Task<int> DeletePublishedMessageAsync(long id);

//dashboard api
IMonitoringApi GetMonitoringApi();
}
}