Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/SnapshotPayloadParseState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void SnapshotPayloadParseState::flushQueuedKeys() {
auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
if (current_database < cserver.dbnum) {
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
--insertsInFlightTmp;
delete pallocator;
});
Expand Down
10 changes: 9 additions & 1 deletion src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3002,8 +3002,16 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
});
}

void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem)
/* This function is to bulk insert directly to storage provider bypassing in memory, assumes rgKeys and rgVals are not sds strings */
void redisDbPersistentData::bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem)
{
if (g_pserver->cluster_enabled) {
aeAcquireLock();
for (size_t i = 0; i < celem; i++) {
slotToKeyUpdateKeyCore(rgKeys[i], rgcbKeys[i], 1);
}
aeReleaseLock();
}
m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem);
}

Expand Down
4 changes: 2 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,7 @@ class redisDbPersistentData
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }

std::unique_ptr<const StorageCache> CloneStorageCache() { return std::unique_ptr<const StorageCache>(m_spstorage->clone()); }
void bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem);
void bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem);

dict_iter find_cached_threadsafe(const char *key) const;

Expand Down Expand Up @@ -1370,7 +1370,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::FRehashing;
using redisDbPersistentData::FTrackingChanges;
using redisDbPersistentData::CloneStorageCache;
using redisDbPersistentData::bulkStorageInsert;
using redisDbPersistentData::bulkDirectStorageInsert;

public:
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {
Expand Down