From f4a84a26bf39aecddd3430361d6596b01924ef62 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Mon, 11 Mar 2024 00:40:36 -0700 Subject: [PATCH 1/3] feat: add tiered-limit datastore --- packages/datastore-core/package.json | 4 + packages/datastore-core/src/index.ts | 1 + packages/datastore-core/src/tiered-limit.ts | 172 ++++++++++++++++++ .../datastore-core/test/tiered-limit.spec.ts | 129 +++++++++++++ 4 files changed, 306 insertions(+) create mode 100644 packages/datastore-core/src/tiered-limit.ts create mode 100644 packages/datastore-core/test/tiered-limit.spec.ts diff --git a/packages/datastore-core/package.json b/packages/datastore-core/package.json index fb04ded2..02eef81f 100644 --- a/packages/datastore-core/package.json +++ b/packages/datastore-core/package.json @@ -90,6 +90,10 @@ "./tiered": { "types": "./dist/src/tiered.d.ts", "import": "./dist/src/tiered.js" + }, + "./tiered-limit": { + "types": "./dist/src/tiered-limit.d.ts", + "import": "./dist/src/tiered-limit.js" } }, "eslintConfig": { diff --git a/packages/datastore-core/src/index.ts b/packages/datastore-core/src/index.ts index 5ec96a75..be08c65c 100644 --- a/packages/datastore-core/src/index.ts +++ b/packages/datastore-core/src/index.ts @@ -72,6 +72,7 @@ export { ShardingDatastore } from './sharding.js' export { MountDatastore } from './mount.js' export { TieredDatastore } from './tiered.js' export { NamespaceDatastore } from './namespace.js' +export { TieredLimitDatastore } from './tiered-limit.js' export { Errors } export { shard } diff --git a/packages/datastore-core/src/tiered-limit.ts b/packages/datastore-core/src/tiered-limit.ts new file mode 100644 index 00000000..81f11ec4 --- /dev/null +++ b/packages/datastore-core/src/tiered-limit.ts @@ -0,0 +1,172 @@ +import { logger } from '@libp2p/logger' +import { type Key, type KeyQuery, type Pair, type Query } from 'interface-datastore' +import { BaseDatastore } from './base.js' +import type { AbortOptions, AwaitIterable } from 'interface-store' + +/** + * @example for memory store limited to 1MB, where extra data is dropped + * + * ```typescript + * import { MemoryDatastore } from 'datastore-core' + * import { TieredLimitDatastore } from 'datastore-core' + * import { BlackHoleDatastore } from 'datastore-core' + * + * const tieredLimitDatastore = new TieredLimitDatastore({ + * maxBytes: 1024 * 1024, // 1MB limit + * store: new MemoryDatastore() + * }, new BlackHoleDatastore()) + * ``` + */ + +const log = logger('datastore:core:tiered-limit') + +export class TieredLimitDatastore extends BaseDatastore { + private readonly primaryStore: T + private readonly backingStore: T2 + private readonly maxBytes: number + private currentBytes: number = 0 + /** + * Tracks sizes of items + * + * Note: this map is not taken into account when considering the maxBytes limit + */ + private readonly sizeMap = new Map() + /** + * Tracks order for eviction + * keys are added to the end of the array when added or updated + * keys are removed from the start of the array when evicted + * Note: size of keys is not tracked, so if you have large keys, you should + * increase the maxBytes limit accordingly + */ + private readonly evictionOrder: Key[] = [] + + constructor ({ maxBytes, store }: { maxBytes: number, store: T }, backingStore: T2) { + super() + this.primaryStore = store + this.backingStore = backingStore + this.maxBytes = maxBytes + } + + private updateSize (key: Key, sizeDelta: number): void { + this.currentBytes += sizeDelta + if (sizeDelta > 0) { + // If adding or updating size, push key to eviction order + this.evictionOrder.push(key) + } else { + // If reducing size, find and remove the key from eviction order + const index = this.evictionOrder.indexOf(key) + if (index !== -1) this.evictionOrder.splice(index, 1) + } + } + + /** + * Evict items from primary store to backing store until required space is available + */ + private async evictSpace (requiredSpace: number): Promise { + if (requiredSpace <= 0) { + return // No need to evict negative space + } + if (this.currentBytes + requiredSpace > this.maxBytes && this.evictionOrder.length > 0) { + log.trace('Evicting %d bytes from primary store to backing store', requiredSpace) + while (this.currentBytes + requiredSpace > this.maxBytes && this.evictionOrder.length > 0) { + const keyToEvict = this.evictionOrder.shift() // Get the oldest key + if (keyToEvict == null) { + // this shouldn't happen, but if it does: + // TODO: do we want to just add to the backingStore if we can't evict? + throw new Error('Need to evict but nothing else to evict. Is the item you are trying to add too large?') + } + const size = this.sizeMap.get(keyToEvict) + if (size == null) { + throw new Error('Key to evict not found in size map. This should not happen.') + } + log.trace('Evicting %d bytes for key "%s"', size, keyToEvict.toString()) + const value = await this.primaryStore.get(keyToEvict) // Get value to evict + await this.backingStore.put(keyToEvict, value) // Ensure it's saved in the backing store + await this.primaryStore.delete(keyToEvict) // Delete from primary store + this.sizeMap.delete(keyToEvict) // Remove size tracking for this key + this.currentBytes -= size // Update current used bytes + } + log.trace('Eviction complete') + } + } + + async handleSizeForPut (key: Key, value: Uint8Array): Promise { + const size = value.byteLength + if (size > this.maxBytes) { + throw new Error(`Item size ${size} exceeds maxBytes limit of ${this.maxBytes}`) + } + const existingSize = this.sizeMap.get(key) ?? 0 // existing size is 0 if not found + const sizeDelta = size - existingSize // if already in the primary store, this will be 0 + + await this.evictSpace(sizeDelta) // Evict if needed before adding new item + + this.sizeMap.set(key, size) // Update size tracking + this.updateSize(key, sizeDelta) + } + + async put (key: Key, value: Uint8Array, options?: AbortOptions): Promise { + log.trace('Putting %s', key.toString()) + try { + await this.handleSizeForPut(key, value) + } catch (err: any) { + log.error('Error putting %s to primary store: %s', key.toString(), err) + log.trace('Putting %s to backing store', key.toString()) + await this.backingStore.put(key, value, options) + return key + } + log.trace('Putting %s to primary store', key.toString()) + await this.primaryStore.put(key, value, options) + + // Write to backingstore happens upon eviction + return key + } + + async get (key: Key, options?: AbortOptions): Promise { + if (await this.primaryStore.has(key)) { + log.trace('Getting %s from primary store', key.toString()) + return this.primaryStore.get(key, options) + } + + log.trace('Getting %s from backing store', key.toString()) + const value = await this.backingStore.get(key, options) + // TODO: Do we always want to put the value back into the primary store? It could be a config option. + await this.put(key, value, options) + return value + } + + async has (key: Key, options?: AbortOptions): Promise { + if (await this.primaryStore.has(key, options)) { + return true + } + return this.backingStore.has(key, options) + } + + private async deleteFromPrimaryStore (key: Key, options?: any): Promise { + if (await this.primaryStore.has(key, options)) { + const size = this.sizeMap.get(key) + if (size != null) { + this.updateSize(key, -size) // Update size tracking + this.sizeMap.delete(key) // Remove size tracking + } + await this.primaryStore.delete(key, options) + } + } + + async delete (key: Key, options?: AbortOptions): Promise { + log.trace('Deleting %s', key.toString()) + await this.deleteFromPrimaryStore(key, options) + await this.backingStore.delete(key, options) + } + + async * _allKeys (q: KeyQuery, options?: AbortOptions): AwaitIterable { + // TODO: How to handle stores that don't implement _allKeys? Do we want to? + yield * this.primaryStore._allKeys(q, options) + yield * this.backingStore._allKeys(q, options) + } + + async * _all (q: Query, options?: AbortOptions): AwaitIterable { + // TODO: How to handle stores that don't implement _all? Do we want to? + yield * this.primaryStore._all(q, options) + yield * this.backingStore._all(q, options) + } +} diff --git a/packages/datastore-core/test/tiered-limit.spec.ts b/packages/datastore-core/test/tiered-limit.spec.ts new file mode 100644 index 00000000..c36846cf --- /dev/null +++ b/packages/datastore-core/test/tiered-limit.spec.ts @@ -0,0 +1,129 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import { Key } from 'interface-datastore/key' +import { interfaceDatastoreTests } from 'interface-datastore-tests' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { MemoryDatastore } from '../src/memory.js' +import { TieredLimitDatastore } from '../src/tiered-limit.js' +import type { BaseDatastore } from '../src/base.js' + +/** + * @typedef {import('interface-datastore').Datastore} Datastore + */ + +describe('TieredLimit', () => { + describe('all stores', () => { + const ms: BaseDatastore[] = [] + let store: TieredLimitDatastore + beforeEach(() => { + ms.push(new MemoryDatastore()) + ms.push(new MemoryDatastore()) + store = new TieredLimitDatastore({ + maxBytes: 50, // 50 bytes limit for testing purposes. + store: ms[0] + }, ms[1]) + }) + + it('put', async () => { + const k = new Key('hello') + const v = uint8ArrayFromString('world') + await store.put(k, v) + await expect(store.has(k)).to.eventually.be.true() + expect(ms[0].get(k)).to.be.eql(v) + expect(() => ms[1].get(k) as Uint8Array).to.throw('Not Found') + }) + + it('put - first item over limit', async () => { + const k = new Key('hello-too-big') + const v = uint8ArrayFromString('abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz') // 52 bytes out of 50 + expect(v.byteLength).to.be.eql(52) + await store.put(k, v) + expect(ms[0].has(k)).to.be.false() + expect(ms[1].has(k)).to.be.true() + await expect(store.has(k)).to.eventually.be.true() + expect(() => ms[0].get(k) as Uint8Array).to.throw('Not Found') + expect(ms[1].get(k)).to.be.eql(v) + }) + + it('put - second item over limit', async () => { + const k = new Key('hello') + const v = uint8ArrayFromString('abcdefghijklmnopqrstuvwxyz') // 26 bytes out of 50 + const k2 = new Key('hello2') + await store.put(k, v) + await store.put(k2, v) + expect(ms[0].has(k)).to.be.false() // evicted to backing store + expect(ms[1].has(k)).to.be.true() + expect(ms[0].has(k2)).to.be.true() + expect(ms[1].has(k2)).to.be.false() + await expect(store.has(k)).to.eventually.be.true() + await expect(store.has(k2)).to.eventually.be.true() + expect(ms[0].get(k2)).to.be.eql(v) + expect(ms[1].get(k)).to.be.eql(v) + }) + + it('get - over limit', async () => { + const k = new Key('hello-get-over-limit') + const v = uint8ArrayFromString('abcdefghijklmnopqrstuvwxyz') // 26 bytes out of 50 + const k2 = new Key('hello-get-over-limit-2') + await store.put(k, v) + await store.put(k2, v) + expect(ms[0].has(k)).to.be.false() // evicted to backing store + expect(ms[0].has(k2)).to.be.true() + expect(ms[1].has(k)).to.be.true() + expect(ms[1].has(k2)).to.be.false() + await expect(store.has(k)).to.eventually.be.true() + await expect(store.has(k2)).to.eventually.be.true() + expect(ms[0].get(k2)).to.be.eql(v) + expect(ms[1].get(k)).to.be.eql(v) + const gotVal = await store.get(k) // should move from backing store to primary store, and evict k2 + expect(gotVal).to.be.eql(v) + expect(ms[0].has(k)).to.be.true() + expect(ms[0].has(k2)).to.be.false() // no longer in primary store + // backing store has both now + expect(ms[1].has(k)).to.be.true() + expect(ms[1].has(k2)).to.be.true() + + expect(ms[0].get(k)).to.be.eql(v) + expect(ms[1].get(k)).to.be.eql(v) + expect(ms[1].get(k2)).to.be.eql(v) + }) + + it('get and has, where available', async () => { + const k = new Key('hello') + const v = uint8ArrayFromString('world') + await ms[1].put(k, v) + const val = await store.get(k) + expect(val).to.be.eql(v) + const exists = await store.has(k) + expect(exists).to.be.eql(true) + }) + + it('has - key not found', async () => { + expect(await store.has(new Key('hello1'))).to.be.eql(false) + }) + + it('has and delete', async () => { + const k = new Key('hello') + const v = uint8ArrayFromString('world') + await store.put(k, v) + let res = await Promise.all([ms[0].has(k), ms[1].has(k)]) + expect(res).to.be.eql([true, true]) + await store.delete(k) + res = await Promise.all([ms[0].has(k), ms[1].has(k)]) + expect(res).to.be.eql([false, false]) + }) + }) + + describe('inteface-datastore-tiered-limit', () => { + interfaceDatastoreTests({ + setup () { + return new TieredLimitDatastore({ + maxBytes: 50, // 50 bytes limit + store: new MemoryDatastore() + }, new MemoryDatastore()) + }, + teardown () { } + }) + }) +}) From 23e45a25217c1a1d5eafc26f88c06c691dfd0c74 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Mon, 11 Mar 2024 01:21:12 -0700 Subject: [PATCH 2/3] fix: prevent duplicates in _all and _allKeys --- packages/datastore-core/src/tiered-limit.ts | 30 ++++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/packages/datastore-core/src/tiered-limit.ts b/packages/datastore-core/src/tiered-limit.ts index 81f11ec4..f771a507 100644 --- a/packages/datastore-core/src/tiered-limit.ts +++ b/packages/datastore-core/src/tiered-limit.ts @@ -160,13 +160,35 @@ export class TieredLimitDatastore { // TODO: How to handle stores that don't implement _allKeys? Do we want to? - yield * this.primaryStore._allKeys(q, options) - yield * this.backingStore._allKeys(q, options) + const seenKeys = new Set() + + for await (const key of this.primaryStore._allKeys(q, options)) { + seenKeys.add(key) + yield key + } + + // yield keys from the backing store, excluding duplicates + for await (const key of this.backingStore._allKeys(q, options)) { + if (!seenKeys.has(key)) { + yield key + } + } } async * _all (q: Query, options?: AbortOptions): AwaitIterable { // TODO: How to handle stores that don't implement _all? Do we want to? - yield * this.primaryStore._all(q, options) - yield * this.backingStore._all(q, options) + const seenKeys = new Set() + + for await (const pair of this.primaryStore._all(q, options)) { + seenKeys.add(pair.key) + yield pair + } + + // yield pairs from the backing store, excluding duplicates + for await (const pair of this.backingStore._all(q, options)) { + if (!seenKeys.has(pair.key)) { + yield pair + } + } } } From a716ee00eac1ca3b3227ea725dd7118287f46fdb Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Mon, 11 Mar 2024 01:24:56 -0700 Subject: [PATCH 3/3] fix: get refreshes keys in primary store --- packages/datastore-core/src/tiered-limit.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/packages/datastore-core/src/tiered-limit.ts b/packages/datastore-core/src/tiered-limit.ts index f771a507..662045ee 100644 --- a/packages/datastore-core/src/tiered-limit.ts +++ b/packages/datastore-core/src/tiered-limit.ts @@ -121,9 +121,21 @@ export class TieredLimitDatastore { if (await this.primaryStore.has(key)) { log.trace('Getting %s from primary store', key.toString()) + this.refreshKeyEvictionOrder(key) return this.primaryStore.get(key, options) }