Home Reference Source

lib/writers/warcWriterBase.js

const fs = require('fs-extra')
const zlib = require('zlib')
const Path = require('path')
const uuid = require('uuid/v1')
const EventEmitter = require('eventemitter3')
const {
  warcInfoHeader,
  warcInfoContent,
  warcRequestHeader,
  warcResponseHeader,
  warcMetadataHeader,
  recordSeparator,
  CRLF,
  CRLF2x
} = require('./warcFields')
const ensureWARCFileName = require('../utils/ensureWARCFilename')

/**
 * @type {Buffer}
 */
const recordSeparatorBuffer = Buffer.from(recordSeparator, 'utf8')

/**
 * @type {Buffer}
 */
const CRLFBuffer = Buffer.from(CRLF, 'utf8')

/**
 * @type {number}
 */
const WARCSepTSize = recordSeparatorBuffer.length + CRLFBuffer.length

/**
 *
 * @param {string} string
 * @param {?Buffer|?string} maybeBuffer
 * @return {Buffer}
 */
function makeContentBuffer (string, maybeBuffer) {
  if (maybeBuffer != null) {
    // ensure that the string buffer contains CRLF 2x since we have a body
    const strBuff = string.endsWith(CRLF2x)
      ? Buffer.from(string, 'utf8')
      : Buffer.from(`${string}${CRLF}`, 'utf8')
    const cntBuffer = Buffer.isBuffer(maybeBuffer)
      ? maybeBuffer
      : Buffer.from(maybeBuffer, 'utf8')
    return Buffer.concat(
      [strBuff, cntBuffer],
      strBuff.length + cntBuffer.length
    )
  }
  return Buffer.from(string, 'utf8')
}

/**
 * @desc Default options that control how WARC writting is done
 * @param {WARCFileOpts} [opts]
 * @return {WARCFileOpts}
 */
function ensureOptions (opts) {
  if (opts == null) {
    opts = {}
  }
  return {
    appending: opts.appending || false,
    gzip: opts.gzip || process.env.NODEWARC_WRITE_GZIPPED != null
  }
}

/**
 * @extends {EventEmitter}
 * @desc Base class used for writing to the WARC
 */
class WARCWriterBase extends EventEmitter {
  /**
   * @desc Create a new WARCWriter
   * @param {?WARCFileOpts} [defaultOpts] - Optional default WARC file options
   */
  constructor (defaultOpts) {
    super()
    /**
     * @type {?WriteStream}
     * @private
     */
    this._warcOutStream = null
    /**
     * @type {?Error}
     * @private
     */
    this._lastError = null
    /**
     * @type {?string}
     * @private
     */
    this._now = null
    /**
     * @type {?string}
     * @private
     */
    this._fileName = null
    /**
     * @type {?string}
     * @private
     */
    this._warcInfoId = null

    /**
     * @type {?WARCFileOpts}
     */
    this.opts = null

    /**
     * @type {WARCFileOpts}
     */
    this.defaultOpts = ensureOptions(defaultOpts)

    /**
     * @type {string}
     * @private
     */
    this._version = require('../../package.json').version
    this._onFinish = this._onFinish.bind(this)
    this._onError = this._onError.bind(this)
  }

  /**
   * @desc Set the default WARC creation options
   * @param {WARCFileOpts} defaultOpts - The new default options
   */
  setDefaultOpts (defaultOpts) {
    this.defaultOpts = ensureOptions(defaultOpts)
  }

  /**
   * @desc Initialize the writer. The options object is optional and defaults to `appending = false` and `gzip = process.env.NODEWARC_WRITE_GZIPPED != null`.
   * Writing gzipped records is also controllable by setting `NODEWARC_WRITE_GZIPPED` environment variable.
   * Options supplied to this method override the default options.
   * @param {string} warcPath - the path for the WARC file to be written
   * @param {?WARCFileOpts} [options] - write options controlling how the WARC should be written
   */
  initWARC (warcPath, options) {
    this.opts = Object.assign({}, this.defaultOpts, options || {})
    const wfp = ensureWARCFileName(warcPath, this.opts.gzip)
    if (this.opts.appending) {
      this._warcOutStream = fs.createWriteStream(wfp, {
        flags: 'a',
        encoding: 'utf8'
      })
    } else {
      this._warcOutStream = fs.createWriteStream(wfp, { encoding: 'utf8' })
    }
    this._warcOutStream.on('finish', this._onFinish)
    this._warcOutStream.on('error', this._onError)
    let now = new Date().toISOString()
    this._now = now.substr(0, now.indexOf('.')) + 'Z'
    this._fileName = Path.basename(wfp)
  }

  /**
   * @param {string} targetURI - The target URI for the request response record pairs
   * @param {{headers: string, data?: Buffer|string}} reqData - The request data
   * @param {{headers: string, data?: Buffer|string}} resData - The response data
   * @return {Promise<void>}
   */
  async writeRequestResponseRecords (targetURI, reqData, resData) {
    const resRecId = uuid()
    await this._writeRequestRecord(
      targetURI,
      resRecId,
      reqData.headers,
      reqData.data
    )
    return this._writeResponseRecord(
      targetURI,
      resRecId,
      resData.headers,
      resData.data
    )
  }

  /**
   * @desc Write arbitrary number of items to the WARC
   * @param {Buffer[]} recordParts - Array of buffers to be writtern
   * @return {Promise<void>}
   */
  async writeRecordChunks (...recordParts) {
    for (let chunk of recordParts) {
      await this.writeRecordBlock(chunk)
    }
  }

  /**
   * @desc Write out the WARC-Type: info records.
   * If the contents for the info record is an object then the objects properties (property, property value pairs)
   * are written otherwise (when Buffer or string) the content is written as is
   * @param {Object|Buffer|string} winfo - The contents for the WARC info record
   * @return {Promise<void>}
   */
  writeWarcInfoRecord (winfo) {
    if (Buffer.isBuffer(winfo) || typeof winfo === 'string') {
      return this.writeWarcRawInfoRecord(winfo)
    }
    if (winfo.software == null) {
      winfo.software = `node-warc/${this._version}`
    }
    return this.writeWarcRawInfoRecord(
      Buffer.from(warcInfoContent(winfo), 'utf8')
    )
  }

  /**
   * @desc Write warc-info record
   * @param {string|Buffer} warcInfoContent - The contents of the warc-info record
   * @return {Promise<void>}
   */
  writeWarcRawInfoRecord (warcInfoContent) {
    let recID
    if (this._warcInfoId) {
      recID = uuid()
    } else {
      this._warcInfoId = recID = uuid()
    }
    const whct = Buffer.isBuffer(warcInfoContent)
      ? warcInfoContent
      : Buffer.from(warcInfoContent, 'utf8')
    const wh = Buffer.from(
      warcInfoHeader({
        date: this._now,
        fileName: this._fileName,
        len: whct.length,
        rid: recID
      }),
      'utf8'
    )
    const totalLength = wh.length + whct.length + WARCSepTSize
    return this.writeRecordBlock(
      Buffer.concat([wh, CRLFBuffer, whct, recordSeparatorBuffer], totalLength)
    )
  }

  /**
   * @desc Writes a WARC Info record containing Webrecorder/Webrecorder Player bookmark (page list)
   * @param {string|Array<string>} pages - The URL of the page this WARC contains or an Array of URLs for the pages
   * this WARC contains
   * @return {Promise<void>}
   */
  writeWebrecorderBookmarksInfoRecord (pages) {
    let recID
    if (this._warcInfoId) {
      recID = uuid()
    } else {
      this._warcInfoId = recID = uuid()
    }
    const winfoContent = {
      software: `node-warc/${this._version}`,
      'json-metadata': JSON.stringify({
        desc: '',
        auto_title: true,
        type: 'recording',
        pages: Array.isArray(pages)
          ? pages.map(page => ({ timestamp: this._now, url: page }))
          : [{ timestamp: this._now, url: pages }]
      })
    }
    const whct = Buffer.from(warcInfoContent(winfoContent), 'utf8')
    const wh = Buffer.from(
      warcInfoHeader({
        date: this._now,
        fileName: this._fileName,
        len: whct.length,
        rid: recID
      }),
      'utf8'
    )
    const totalLength = wh.length + whct.length + WARCSepTSize
    return this.writeRecordBlock(
      Buffer.concat([wh, CRLFBuffer, whct, recordSeparatorBuffer], totalLength)
    )
  }

  /**
   * @desc Write WARC-Type: metadata for outlinks
   * @param {string} targetURI - The target URI for the metadata record
   * @param {string} outlinks - A string containing outlink metadata
   * @return {Promise<void>}
   */
  writeWarcMetadataOutlinks (targetURI, outlinks) {
    return this.writeWarcMetadata(targetURI, outlinks)
  }

  /**
   * @desc Write WARC-Type: metadata record
   * @param {string} targetURI - The URL of the page the this metadata record is for
   * @param {string|Buffer} metaData - A string or buffer containing metadata information to be used as this records content
   * @return {Promise<void>}
   */
  writeWarcMetadata (targetURI, metaData) {
    const wmhc = Buffer.isBuffer(metaData)
      ? metaData
      : Buffer.from(metaData, 'utf8')
    const wmh = Buffer.from(
      warcMetadataHeader({
        targetURI,
        now: this._now,
        len: wmhc.length,
        concurrentTo: this._warcInfoId,
        rid: uuid()
      }),
      'utf8'
    )
    const totalLength = wmhc.length + wmh.length + WARCSepTSize
    return this.writeRecordBlock(
      Buffer.concat([wmh, CRLFBuffer, wmhc, recordSeparatorBuffer], totalLength)
    )
  }

  /**
   * @desc Write A Request Record
   * @param {string} targetURI - The URL of the response
   * @param {string} httpHeaderString - Stringified HTTP headers
   * @param {string|Buffer} [requestData] - Body of the request if any
   * @return {Promise<void>}
   */
  writeRequestRecord (targetURI, httpHeaderString, requestData) {
    return this._writeRequestRecord(
      targetURI,
      null,
      httpHeaderString,
      requestData
    )
  }

  /**
   * @desc Write A Response Record
   * @param {string} targetURI - The URL of the response
   * @param {string} httpHeaderString - Stringified HTTP headers
   * @param {string|Buffer} [responseData] - The response body if it exists
   * @return {Promise<void>}
   */
  writeResponseRecord (targetURI, httpHeaderString, responseData) {
    return this._writeResponseRecord(
      targetURI,
      uuid(),
      httpHeaderString,
      responseData
    )
  }

  /**
   * @desc Write an record block to the WARC
   * @param {Buffer} recordBuffer
   * @return {Promise<void>}
   */
  writeRecordBlock (recordBuffer) {
    return new Promise((resolve, reject) => {
      if (this.opts.gzip) {
        // we're in gzipped mode - GZip the buffer
        recordBuffer = zlib.gzipSync(recordBuffer)
      }

      if (!this._warcOutStream.write(recordBuffer, 'utf8')) {
        this._warcOutStream.once('drain', resolve)
      } else {
        resolve()
      }
    })
  }

  /**
   * @desc Close  the underlying filestream to the WARC currently being written.
   * The `finished` event will not be emitted until this method has been called
   */
  end () {
    if (this._warcOutStream != null) {
      this._warcOutStream.end()
    }
  }

  /**
   * @desc Write A Request Record
   * @param {string} targetURI - The URL of the response
   * @param {?string} resId - The id of the record this request recorrd is concurrent to, typically its response
   * @param {string} httpHeaderString - Stringified HTTP headers
   * @param {string|Buffer} [requestData] - Body of the request if any
   * @return {Promise<void>}
   */
  _writeRequestRecord (targetURI, resId, httpHeaderString, requestData) {
    const reqHeadContentBuffer = makeContentBuffer(
      httpHeaderString,
      requestData
    )
    const reqWHeader = Buffer.from(
      warcRequestHeader({
        targetURI,
        concurrentTo: resId,
        now: this._now,
        rid: uuid(),
        wid: this._warcInfoId,
        len: reqHeadContentBuffer.length
      }),
      'utf8'
    )
    const totalLength =
      reqWHeader.length + reqHeadContentBuffer.length + WARCSepTSize
    return this.writeRecordBlock(
      Buffer.concat(
        [reqWHeader, CRLFBuffer, reqHeadContentBuffer, recordSeparatorBuffer],
        totalLength
      )
    )
  }

  /**
   * @desc Write A Response Record
   * @param {string} targetURI - The URL of the response
   * @param {string} resId - The id to be used for the response record
   * @param {string} httpHeaderString - Stringified HTTP headers
   * @param {string|Buffer} [responseData] - The response body if it exists
   * @return {Promise<void>}
   */
  _writeResponseRecord (targetURI, resId, httpHeaderString, responseData) {
    const resHeaderContentBuffer = makeContentBuffer(
      httpHeaderString,
      responseData
    )
    const respWHeader = Buffer.from(
      warcResponseHeader({
        targetURI,
        now: this._now,
        rid: resId,
        wid: this._warcInfoId,
        len: resHeaderContentBuffer.length
      }),
      'utf8'
    )
    const totalLength =
      respWHeader.length + resHeaderContentBuffer.length + WARCSepTSize
    return this.writeRecordBlock(
      Buffer.concat(
        [
          respWHeader,
          CRLFBuffer,
          resHeaderContentBuffer,
          recordSeparatorBuffer
        ],
        totalLength
      )
    )
  }

  /**
   * @desc Called when the WARC generation is finished
   * @emits {finished} emitted when WARC generation is complete
   * @private
   */
  _onFinish () {
    let le = this._lastError
    this._lastError = null
    this._warcOutStream.removeAllListeners()
    this._warcOutStream.destroy()
    this._warcOutStream = null
    this._now = null
    this._fileName = null
    this._warcInfoId = null
    if (le) {
      this.emit('finished', le)
    } else {
      this.emit('finished')
    }
  }

  /**
   * @desc Emits an error if one occurs
   * @param {Error} err
   * @emits {error} The error that occurred
   * @private
   */
  _onError (err) {
    this._lastError = err
    this.emit('error', err)
  }
}

/**
 * @type {WARCWriterBase}
 */
module.exports = WARCWriterBase

/**
 * @typedef {Object} WARCFileOpts
 * @property {boolean} [appending] - Should the WARC writer append to an existing warc?
 * @property {boolean} [gzip] - Should the WARC writer generate gziped records?
 */

/**
 * @typedef {Object} WARCInitOpts
 * @property {string} warcPath - Path the warc file to be written to
 * @property {boolean} [appending] - Should the WARC writer append to an existing warc?
 * @property {boolean} [gzip] - Should the WARC writer generate gziped records?
 */

/**
 * @typedef {Object} Metadata
 * @property {string} targetURI - The target URI for the metadata record
 * @property {?string|Buffer} [content] - The contents of the metadata record
 */

/**
 * @typedef {Object} WARCGenOpts
 * @property {string|Array<string>} pages - The URL of the page this WARC contains or an Array of
 * URLs for the pages this WARC contains. Used to write a WARC Info record containing
 * Webrecorder Player compatible bookmark list
 * @property {WARCInitOpts} warcOpts - Options for the writing of the WARC file
 * @property {Object|Buffer|string} [winfo] - Optional contents for the WARC info record
 * @property {Metadata} [metadata] - Optional metadata contents
 */