Source: types/store.js

'use strict';

// Dependencies
const level = require('level');
const crypto = require('crypto');
const pointer = require('json-pointer');

// Fabric Types
const Actor = require('./actor');
const Collection = require('./collection');
const Entity = require('./entity');
const Stack = require('./stack');

/**
 * Long-term storage.
 * @property {Mixed} settings Current configuration.
 */
class Store extends Actor {
  /**
   * Create an instance of a {@link Store} to manage long-term storage, which is
   * particularly useful when building a user-facing {@link Product}.
   * @param  {Object} [settings={}] configuration object.
   * @return {Store}              Instance of the Store, ready to start.
   */
  constructor (settings = {}) {
    super(settings);

    this.settings = Object.assign({
      name: '@fabric/store',
      path: './stores/store',
      type: 'leveldb',
      persistent: true,
      verbosity: 2, // 0 none, 1 error, 2 warning, 3 notice, 4 debug
    }, settings);

    this['@entity'] = {
      '@type': 'Store',
      '@data': {}
    };

    this.keys = {};
    this.commits = new Collection({
      type: 'State'
    });

    this._state = {
      actors: {},
      collections: {},
      content: {},
      documents: {},
      metadata: {},
      indices: {},
      routes: {},
      status: 'PAUSED',
      tips: {}
    };

    Object.defineProperty(this, '@allocation', { enumerable: false });
    Object.defineProperty(this, '@buffer', { enumerable: false });
    Object.defineProperty(this, '@encoding', { enumerable: false });
    Object.defineProperty(this, '@parent', { enumerable: false });
    Object.defineProperty(this, '@preimage', { enumerable: false });
    Object.defineProperty(this, 'frame', { enumerable: false });
    Object.defineProperty(this, 'services', { enumerable: false });

    return this;
  }

  _getPathForKey (key) {
    const path = pointer.escape(key);
    return this.sha256(path);
  }

  async _errorHandler (err) {
    console.error('[FABRIC:STORE]', 'Error condition:', err);
  }

  async _setEncrypted (path, value, passphrase = '') {
    const secret = value; // TODO: encrypt value
    const name = crypto.createHash('sha256').createHash(path).digest('hex');
    return this.set(`/secrets/${name}`, secret);
  }

  async _getEncrypted (path, passphrase = '') {
    const name = crypto.createHash('sha256').createHash(path).digest('hex');
    const secret = this.get(`/secrets/${name}`);
    const decrypted = secret; // TODO: decrypt value
    return decrypted;
  }

  /**
   * Registers an {@link Actor}.  Necessary to store in a collection.
   * @param  {Object} obj Instance of the object to store.
   * @return {Vector}     Returned from `storage.set`
   */
  async _REGISTER (obj) {
    const actor = new Actor(obj);
    const existing = await this._GET(`/entities/${actor.id}`);

    store.log('[STORE]', '_REGISTER', vector.id, vector['@type']);

    try {
      let item = await this._GET(`/entities/${vector.id}`);
    } catch (E) {
      this.warn('[STORE]', '_REGISTER', `Could not read from store:`, E);
    }

    try {
      await this._SET(`/types/${vector.id}`, vector['@type']);
    } catch (E) {
      this.error('Error creating object:', E, obj);
    }

    try {
      result = await this._SET(`/entities/${vector.id}`, vector['@data']);
    } catch (E) {
      this.error('Error creating object:', E, obj);
    }

    return result;
  }

  async _GET (key) {
    let result = null;

    if (this.settings.verbosity >= 5) this.log('[STORE]', '_GET', key);

    try {
      result = await this.get(key);
    } catch (E) {
      if (this.settings.verbosity >= 5) this.warn('[STORE]', '[_GET]', '[FAILURE]', E);
    }

    return result;
  }

  async _SET (key, value) {
    return this.set(key, value);
  }

  async _PUT (key, value) {
    return this.set(key, value);
  }

  async _DELETE (key) {
    await this._PUT(key, null);
    return null;
  }

  async _PATCH (key, patch) {
    this.log('[STORE]', '_PATCH', 'patch:', key, typeof patch, patch);

    const root = {};
    const current = await this._GET(key);

    if (this.settings.verbosity >= 3) console.warn('current value, no typecheck:', typeof current, current);
    const result = Object.assign(root, current || {}, patch);
    if (this.settings.verbosity >= 5) console.log('[STORE]', 'Patch result:', result);

    try {
      let action = await this._PUT(key, result);
    } catch (E) {
      console.error('Could not modify:', E);
    }

    return result;
  }

  /**
   * Insert something into a collection.
   * @param  {String}  key   Path to add data to.
   * @param  {Mixed}  value Object to store.
   * @return {Promise}       Resolves on success with a String pointer.
   */
  async _POST (key, value) {
    if (this.settings.verbosity >= 5) console.log('[STORE]', '_POST', key, typeof value, value);

    this['@method'] = '_POST';

    // preamble
    let self = this;
    let path = pointer.escape(key);
    let router = this.sha256(path);
    let address = `/collections/${router}`;

    if (!this.keys[address]) {
      // TODO: store metadata
      this.keys[address] = {
        path: key,
        address: address
      };
    }

    // TODO: check for commit state
    self['@entity']['@data'].addresses[router] = address;

    let state = new State(value);
    let serial = state.serialize();
    let digest = this.sha256(serial);

    // defaults
    let actor = null;
    let list = null;
    let type = null;
    let tip = null;

    if (!self.db) {
      await self.open().catch(self._errorHandler.bind(self));
    }

    let family = null;
    let origin = null;
    let entity = null;

    // TODO: use ._GET
    try {
      entity = await self.db.get(address);
      // console.log('loading entity:', entity.toString('utf8'));
    } catch (E) {
      if (this.settings.verbosity >= 3) console.warn('Creating new collection:', E);
    }

    if (entity) {
      try {
        entity = JSON.parse(entity);
      } catch (E) {
        console.warn(`Couldn't parse: ${entity}`, E);
      }
    }

    try {
      if (entity) {
        family = await self.populate(entity);
        if (this.settings.verbosity >= 5) console.warn('WARNING:', 'family exists, expecting restoration:', family);
        origin = new Collection(family);
      } else {
        origin = new Collection();
      }

      // Add Element to Collection
      let height = origin.push(value);

      // Store the object at an entity locale
      let object = await self._PUT(`/entities/${state.id}`, value);
      let serialized = await origin.serialize();

      // Write serialized Collection to disk
      let answer = await self.db.put(address, serialized.toString());
    } catch (E) {
      console.log('Could not POST:', key, value, E);
      return false;
    }

    return state.link;
  }

  async _PUSH (key, data) {
    let id = pointer.escape(key);
    let path = `/stacks/${id}`;
    let list = await this._GET(path);
    if (!list) list = [];
    let vector = new State(data);
    let stack = new Stack(list);
    let result = stack.push(vector.id);
    let actor = await this._REGISTER(data);
    let blob = await this._PUT(`/blobs/${vector.id}`, vector['@data']);
    let saved = await this._SET(path, stack['@data']);
    let commit = await this.commit();
    let output = await this._GET(`/blobs/${vector.id}`);
    return output;
  }

  async encodeValue (value) {
    if (!(value instanceof String)) {
      switch (value.constructor.name) {
        default:
          value = JSON.stringify(value);
          break;
      }
    }

    return Buffer.from(value, 'utf8').toString('hex');
  }

  async getDataInfo (value) {
    let type = null;
    let size = null;
    let hash = null;

    switch (value.constructor.name) {
      case 'String':
        type = 'JSONString';
        size = value.length;
        hash = this.sha256(value);
        break;
      default:
        console.error('unhandled type:', value.constructor.name);
        type = 'Unhandled';
        break;
    }

    return {
      hash,
      size,
      type
    };
  }

  async getRouteInfo (path) {
    if (path.substring(0, 1) !== '/') path = '/' + path;

    const id = pointer.escape(path);
    const router = this.sha256(id);

    return {
      path: path,
      pointer: id,
      index: router
    };
  }

  async populate (element) {
    let map = [];

    for (let i = 0; i < element.length; i++) {
      map[i] = await this._GET(`/entities/${element[i]}`);
    }

    return map;
  }

  /**
   * Barebones getter.
   * @param  {String}  key Name of data to retrieve.
   * @return {Promise}     Resolves on complete.  `null` if not found.
   */
  async get (key) {
    const route = await this.getRouteInfo(key);
    const result = pointer.get(this._state.content, route.path);
    const type = this._state.metadata[route.index].type;

    let output = null;

    switch (type) {
      default:
        output = result;
        break;
    }

    return output;
  }

  /**
   * Set a `key` to a specific `value`.
   * @param       {String} key   Address of the information.
   * @param       {Mixed} value Content to store at `key`.
   */
  async set (key, value) {
    const route = await this.getRouteInfo(key);
    const info = await this.getDataInfo(value);
    const data = await this.encodeValue(value);

    // Let's use the document's key as the identifying value.
    // This is what defines our key => value store.
    // All functions can be run as a map of an original input vector, allowing
    // binary scoping across trees of varying complexity.
    const hash = this.sha256(value);
    const actor = new Actor({
      type: 'FabricDocument',
      content: data,
      encoding: 'json',
      original: value
    });

    this._state.actors[actor.id] = actor;
    this._state.documents[hash] = value;
    this._state.indices[route.index] = route.pointer;
    this._state.metadata[route.index] = info;

    pointer.set(this._state.content, route.path, value);

    this.commit();

    return this.get(key);
  }

  async open () {
    // await super.open();
    if (this.settings.verbosity >= 3) console.log('[FABRIC:STORE]', 'Opening:', this.settings.path);
    // if (this.db) return this;

    try {
      this.db = level(this.settings.path);
      this.trust(this.db);
      this.status = 'opened';
      await this.commit();
      if (this.settings.verbosity >= 3) console.log('[FABRIC:STORE]', 'Opened!');
    } catch (E) {
      console.error('[FABRIC:STORE]', E);
      this.status = 'error';
    }

    if (this.settings.verbosity >= 3) console.log('[FABRIC:STORE]', 'Opened!');

    return this;
  }

  async close () {
    if (this.settings.verbosity >= 3) console.log('[FABRIC:STORE]', 'Closing:', this.settings.path);
    if (this.db) {
      try {
        await this.db.close();
      } catch (E) {
        this.error('[STORE]', 'closing store:', this.settings.path, E);
      }
    }

    // await super.close();
    return this;
  }

  /**
   * Implicitly trust an {@link Event} source.
   * @param  {EventEmitter} source Event-emitting source.
   * @return {Store}        Resulting instance of {@link Store} with new trust.
   */
  trust (source) {
    let store = this;
    let name = `/sources/${store.id}`;

    source.on('put', function (key, value) {
      // store.log('[TRUST:SOURCE]', source.constructor.name, 'emitted a put event', name, key, value.constructor.name, value);
      if (store.settings.verbosity >= 5) console.log('[TRUST:SOURCE]', source.constructor.name, 'emitted a put event', name, key, value.constructor.name, value);

      let id = pointer.escape(key);
      let router = store.sha256(id);
      let state = new State(value);

      pointer.set(store['@entity']['@data'], `${name}`, value);
      pointer.set(store['@entity']['@data'], `/states/${state.id}`, value);
      pointer.set(store['@entity']['@data'], `/blobs/${state.id}`, state.serialize());
      pointer.set(store['@entity']['@data'], `/types/${state.id}`, value.constructor.name);
      pointer.set(store['@entity']['@data'], `/tips/${router}`, state.id);
      pointer.set(store['@entity']['@data'], `/names/${router}`, id);

      store.emit('source/events', {
        '@type': 'Request',
        '@method': 'put',
        '@actor': '~level',
        '@object': state['@link'],
        '@target': key,
        '@data': value
      });
    });

    return this;
  }

  /**
   * Remove a {@link Value} by {@link Path}.
   * @param {Path} key Key to remove.
   */
  async del (key) {
    if (!this.db) {
      await this.open();
    }

    const deleted = await this.db.del(key);
    return deleted;
  }

  async batch (ops) {
    if (this.settings.verbosity >= 5) console.log('[FABRIC:STORE]', 'Batching:', ops);
    let result = null;

    if (!this.db || this.db._status === 'closed') {
      await this.open();
    }

    // Core function
    try {
      result = await this.db.batch(ops);
      if (this.settings.verbosity >= 3) console.log('[FABRIC:STORE]', 'Batched:', result);
    } catch (E) {
      console.error('[FABRIC:STORE]', 'Could not batch updates:', E);
    }

    return result;
  }

  async commit () {
    if (this.settings.verbosity >= 5) console.log('[AUDIT]', '[FABRIC:STORE]', 'Committing:', this.state);
    const entity = new Entity(this.state.state);
    this.emit('commit', entity.id, entity.data);
    // TODO: document re-opening of store
    return entity;
  }

  createReadStream () {
    return this.db.createReadStream();
  }

  /**
   * Wipes the storage.
   */
  async flush () {
    if (this.settings.verbosity >= 4) console.log('[FABRIC:STORE]', 'Flushing database...');

    for (let name in this['@entity']['@data'].addresses) {
      let address = this['@entity']['@data'].addresses[name];
      if (this.settings.verbosity >= 3) console.log('found address:', address);
      if (address) await this.del(address);
    }

    try {
      await this.del(`/collections`);
      await this.commit();
    } catch (E) {
      console.error('Could not wipe database:', E);
    }

    return this;
  }

  noop () {
    this.emit('noop');
    return this;
  }

  rotate () {
    return this;
  }

  /**
   * Start running the process.
   * @return {Promise} Resolves on complete.
   */
  async start () {
    if (this.settings.verbosity >= 3) console.log('[FABRIC:STORE]', 'Starting:', this.settings.path);
    this.status = 'starting';
    let keys = null;

    try {
      await this.open();
      this.status = 'started';
      // await this.commit();
    } catch (E) {
      console.error('[FABRIC:STORE]', 'Could not open db:', E);
    }

    if (this.settings.verbosity >= 3) console.log('[FABRIC:STORE]', 'Started on path:', this.settings.path);
    return this;
  }

  async stop () {
    this.status = 'stopping';

    if (this.settings.persistent !== true) {
      await this.flush();
    }

    try {
      await this.close();
    } catch (E) {
      console.error('Could not stop store:', E);
    }

    this.status = 'stopped';

    return this;
  }
}

module.exports = Store;