record_stream.js

// *****************************************************************************
// Copyright 2013-2023 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// *****************************************************************************

'use strict'

const stream = require('stream')
const inherits = require('util').inherits

/**
 * @class RecordStream
 * @classdesc Stream of database records (full or partial) returned by {@link Query} or {@link Scan} operations.
 *
 * *Note:* Record stream currently does not support Node.js'
 * <code>Stream#pause</code> and <code>Stream#resume</code> methods, i.e. it
 * always operates in flowing mode. That means data is read from the Aerospike
 * database and provided to your application as fast as possible. If no data
 * event handlers are attached, then data will be lost.
 *
 * #### Aborting a Query/Scan
 *
 * A query or scan operation can be aborted by calling the {@link
 * RecordStream#abort} method at any time. It is no possible to continue a
 * record stream, once aborted.
 *
 * @extends stream
 *
 * @example
 *
 * const Aerospike = require('aerospike')
 *
 * // INSERT HOSTNAME AND PORT NUMBER OF AEROSPIKE SERVER NODE HERE!
 * var config = {
 *   hosts: '192.168.33.10:3000',
 *   // Timeouts disabled, latency dependent on server location. Configure as needed.
 *   policies: {
 *     scan : new Aerospike.ScanPolicy({socketTimeout : 0, totalTimeout : 0}),
 *    }
 * }
 *
 * Aerospike.connect(config, (error, client) => {
 *   if (error) throw error
 *   var recordsSeen = 0
 *   var scan = client.scan('test', 'demo')
 *   var stream = scan.foreach()
 *
 *   stream.on('error', (error) => {
 *     console.error(error)
 *     throw error
 *   })
 *   stream.on('data', (record) => {
 *     recordsSeen++
 *     console.log(record)
 *     if (recordsSeen > 1000) {
 *       stream.abort() // We've seen enough!
 *     }
 *   })
 *   stream.on('end', () => {
 *     console.info(stream.aborted ? 'scan aborted' : 'scan completed')
 *     client.close()
 *   })
 * })
 */
function RecordStream (client) {
  /**
   * <code>true</code> if the scan has been aborted by the user; <code>false</code> otherwise.
   * @member {boolean} RecordStream#aborted
   * @see {@link RecordStream#abort}
   */
  this.aborted = false

  // Keep a reference to the client instance even though it's not actually
  // needed to process the stream. This is to prevent situations where the
  // client object goes out of scope while the stream is still being processed
  // and the memory for the C++ client instance and dependent objects gets
  // free'd.
  this.client = client
}

inherits(RecordStream, stream)

RecordStream.prototype.writable = false
RecordStream.prototype.readable = true
RecordStream.prototype._read = function () {}

/**
 * @function RecordStream#abort
 *
 * @summary Aborts the query/scan operation.
 *
 * Once aborted, it is not possible to resume the stream.
 *
 * @since v2.0
 */
RecordStream.prototype.abort = function () {
  if (this.aborted) return
  this.aborted = true
  process.nextTick(this.emit.bind(this, 'end'))
}

/**
 * @event RecordStream#data
 * @param {Record} record - Aerospike record incl. bins, key and meta data.
 * Depending on the operation, all, some or no bin values will be returned.
 */

/**
 * @event RecordStream#error
 * @type {AerospikeError}
 */

/**
 * @event RecordStream#end
 */

module.exports = RecordStream