diff --git a/src/config.cpp b/src/config.cpp index a8217e7bc..d3fc8644c 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -361,7 +361,7 @@ bool initializeStorageProvider(const char **err) // Create The Storage Factory (if necessary) serverLog(LL_NOTICE, "Initializing FLASH storage provider (this may take a long time)"); adjustOpenFilesLimit(); - g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncworkqueue); + g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncworkqueue, &g_pserver->asyncwriteworkqueue); #else serverLog(LL_WARNING, "To use the flash storage provider please compile KeyDB with ENABLE_FLASH=yes"); serverLog(LL_WARNING, "Exiting due to the use of an unsupported storage provider"); diff --git a/src/db.cpp b/src/db.cpp index 9a0f35684..0f473abb9 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -415,6 +415,13 @@ void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, in } incrRefCount(val); if (signal) signalModifiedKey(c,db,key); + + if(g_pserver->m_pstorageFactory != nullptr) { + if (!(c->flags & CLIENT_BLOCKED)) { + blockClient(c, BLOCKED_STORAGE); + } + serverTL->setclientsCommit.insert(c); + } } /* Common case for genericSetKey() where the TTL is not retained. */ @@ -3091,9 +3098,18 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** auto *tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback); if (tok != nullptr) { + for (client *c : serverTL->setclientsCommit) + { + /* Remove from the list of pending writes if needed. */ + if (c->flags & CLIENT_PENDING_WRITE) { + c->flags &= ~CLIENT_PENDING_WRITE; + } + } + tok->setc = std::move(serverTL->setclientsCommit); tok->db = this; tok->type = StorageToken::TokenType::BatchWrite; } + serverTL->setclientsCommit.clear(); } } @@ -3416,6 +3432,7 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set &setc) blockClient(c, BLOCKED_STORAGE); } tok->setc = std::move(setcBlocked); + tok->type = StorageToken::TokenType::SingleRead; tok->db = this; } return; diff --git a/src/server.cpp b/src/server.cpp index 4792d0fd9..32f41068c 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4160,6 +4160,9 @@ void InitServerLast() { g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads*10); + //Process one write/commit at a time to ensure consistency + g_pserver->asyncwriteworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(1); + // Allocate the repl backlog } diff --git a/src/server.h b/src/server.h index ef21fd930..b70bb9a70 100644 --- a/src/server.h +++ b/src/server.h @@ -2218,6 +2218,7 @@ struct redisServerThreadVars { int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */ std::unordered_set setclientsProcess; std::unordered_set setclientsPrefetch; + std::unordered_set setclientsCommit; std::unordered_set setStorageTokensProcess; dictAsyncRehashCtl *rehashCtl = nullptr; @@ -2705,6 +2706,7 @@ struct redisServer { uint64_t mvcc_tstamp; AsyncWorkQueue *asyncworkqueue; + AsyncWorkQueue *asyncwriteworkqueue; /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 56f00821f..6943e316b 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -266,7 +266,7 @@ StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el tok->tspdb = m_spdb; m_spbatch = nullptr; m_lock.unlock(); - (*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{ + (*m_pfactory->m_wworkqueue)->AddWorkFunction([this, el,callback,tok]{ tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch()); aePostFunction(el,callback,tok); }); diff --git a/src/storage/rocksdbfactor_internal.h b/src/storage/rocksdbfactor_internal.h index addff77ce..6fb0d3956 100644 --- a/src/storage/rocksdbfactor_internal.h +++ b/src/storage/rocksdbfactor_internal.h @@ -12,8 +12,9 @@ class RocksDBStorageFactory : public IStorageFactory public: AsyncWorkQueue **m_wqueue; + AsyncWorkQueue **m_wworkqueue; - RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue); + RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue); ~RocksDBStorageFactory(); virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override; diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 0ebfb93e1..de32755a2 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -35,9 +35,8 @@ rocksdb::Options DefaultRocksDBOptions() { return options; } -IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue) -{ - return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, wqueue); +IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue) + return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, wqueue, wworkqueue); } rocksdb::Options RocksDBStorageFactory::RocksDbOptions() @@ -52,8 +51,8 @@ rocksdb::Options RocksDBStorageFactory::RocksDbOptions() return options; } -RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue) - : m_path(dbfile), m_wqueue(wqueue) +RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue) + : m_path(dbfile), m_wqueue(wqueue), m_wworkqueue(wworkqueue) { dbnum++; // create an extra db for metadata // Get the count of column families in the actual database diff --git a/src/storage/rocksdbfactory.h b/src/storage/rocksdbfactory.h index c137a79e4..8efe27513 100644 --- a/src/storage/rocksdbfactory.h +++ b/src/storage/rocksdbfactory.h @@ -1,3 +1,3 @@ #pragma once -class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue); \ No newline at end of file +class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue); \ No newline at end of file