job.js

// *****************************************************************************
// Copyright 2013-2023 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// *****************************************************************************

'use strict'

const as = require('bindings')('aerospike.node')
const Commands = require('./commands')

const DEFAULT_POLL_INTERVALL = 1000

/**
 * @class Job
 * @classdesc Potentially long-running background job.
 *
 * @see {@link Scan#background}
 * @see {@link Query#background}
 */
function Job (client, jobID, module) {
  this.client = client
  this.jobID = jobID
  this.module = module
}

Job.safeRandomJobID = function () {
  return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER)
}

/**
 * Repeatedly execute the given status function until it either indicates that
 * the job has completed or returns an error.
 *
 * @returns {Promise}
 *
 * @private
 */
Job.pollUntilDone = function (statusFunction, pollInterval) {
  pollInterval = pollInterval || DEFAULT_POLL_INTERVALL

  return new Promise((resolve, reject) => {
    let timer = null
    const poll = function () {
      statusFunction()
        .then(done => {
          if (done) {
            if (timer) clearInterval(timer)
            resolve()
          } else if (!timer) {
            timer = setInterval(poll, pollInterval)
          }
        })
        .catch(error => {
          if (timer) clearInterval(timer)
          reject(error)
        })
    }
    poll()
  })
}

/**
 * @private
 */
Job.prototype.hasCompleted = function (info) {
  return (info.status === as.jobStatus.COMPLETED)
}

/**
 * Fetch job info once to check if the job has completed.
 *
 * @returns {Promise}
 *
 * @private
 */
Job.prototype.checkStatus = function () {
  return this.info().then(info => this.hasCompleted(info))
}

/**
 * @function Job#info
 *
 * @summary Check the progress of a background job running on the database.
 *
 * @param {Client~InfoPolicy} [policy] - The Info Policy to use for this operation.
 * @param {JobinfoCallback} [callback] - The function to call with the job info response.
 *
 * @return {?Promise} If no callback function is passed, the function returns a
 * Promise that resolves to the job info.
 *
 * @example
 *
 * const Aerospike = require('aerospike')
 *
 * // INSERT HOSTNAME AND PORT NUMBER OF AEROSPIKE SERVER NODE HERE!
 * var config = {
 *   hosts: '192.168.33.10:3000',
 *   // Timeouts disabled, latency dependent on server location. Configure as needed.
 *   policies: {
 *     scan : new Aerospike.ScanPolicy({socketTimeout : 0, totalTimeout : 0}),
 *   }
 * }
 * Aerospike.connect(config, (error, client) => {
 *   if (error) throw error
 *
 *   var scan = client.scan('test', 'demo')
 *   scan.background('myUdfModule', 'myUdfFunction', (error, job) => {
 *     if (error) throw error
 *     var timer = setInterval(() => {
 *       job.info((error, info) => {
 *         if (error) throw error
 *         console.info('scan status: %d (%d%% complete, %d records scanned)', info.status, info.progressPct, info.recordsRead)
 *         if (info.status === Aerospike.jobStatus.COMPLETED) {
 *           console.info('scan completed!')
 *           clearInterval(timer)
 *           client.close()
 *         }
 *       })
 *     }, 1000)
 *   })
 * })
 */
Job.prototype.info = function (policy, callback) {
  if (typeof policy === 'function') {
    callback = policy
    policy = null
  }

  const cmd = new Commands.JobInfo(this.client, [this.jobID, this.module, policy], callback)
  return cmd.execute()
}

/**
 * @function Job#wait
 *
 * @summary Wait until the task has been completed.
 *
 * @param {number} [pollInterval=1000] - Interval in milliseconds to use when polling the cluster nodes.
 * @param {JobdoneCallback} [callback] - The function to call when the task has completed.
 *
 * @return {?Promise} If no callback function is passed, the function returns a
 * Promise that resolves once the job is completed.
 */
Job.prototype.wait = function (pollInterval, callback) {
  if (typeof pollInterval === 'function') {
    callback = pollInterval
    pollInterval = null
  }
  const checkStatus = this.checkStatus.bind(this)

  if (typeof callback === 'function') {
    Job.pollUntilDone(checkStatus, pollInterval)
      .then(result => callback(null, result))
      .catch(error => callback(error))
  } else {
    return Job.pollUntilDone(checkStatus, pollInterval)
  }
}

Job.prototype.waitUntilDone = Job.prototype.wait

module.exports = Job