Home Reference Source

lib/parsers/warcStreamTransform.js

'use strict'
const { Transform } = require('stream')
const RecordBuilder = require('../warcRecord/builder')
const { crlf } = require('../warcRecord/fieldIdentifiers')

/**
 * @desc Transforms a WARC file ReadStream into its individual {@link WARCRecord}s
 * @extends {Transform}
 * @example
 *  fs.createReadStream('someWARC.warc')
 *    .pipe(new WARCStreamTransform())
 *    .on('data', record => { console.log(record) })
 * @example
 *  fs.createReadStream('someWARC.warc.gz')
 *    .pipe(zlib.createGunzip())
 *    .pipe(new WARCStreamTransform())
 *    .on('data', record => { console.log(record) })
 */
class WARCStreamTransform extends Transform {
  /**
   * @desc Create a new WARCStreamTransform
   */
  constructor () {
    super({
      readableObjectMode: true
    })
    /**
     * @type {?Buffer}
     */
    this.buffered = undefined

    /**
     * @type {RecordBuilder}
     */
    this.builder = new RecordBuilder()

    /**
     * @type {number}
     */
    this.sepLen = crlf.length
  }

  /**
   * @desc Process the supplied chunk
   * @param {Buffer} chunk  - The chunk to be processed
   * @param {function} done - Function used to indicate we are done processing the chunk
   * @param {boolean} [pushLast] - Boolean indicating if we attempt to build a record and push it once
   * we are done processing the chunk IFF a record was built. Is only true when called from {@link _flush}
   * @private
   */
  _consumeChunk (chunk, done, pushLast) {
    let offset = 0
    let lastMatch = 0
    let idx
    let maybeRecord
    let chunkLen = chunk.length
    while (true) {
      idx = offset >= chunkLen ? -1 : chunk.indexOf(crlf, offset)
      if (idx !== -1 && idx < chunk.length) {
        maybeRecord = this.builder.consumeLine(
          chunk.slice(lastMatch, idx + this.sepLen)
        )
        if (maybeRecord != null) this.push(maybeRecord)
        offset = idx + this.sepLen
        lastMatch = offset
      } else {
        this.buffered = chunk.slice(lastMatch)
        if (pushLast) {
          maybeRecord = this.builder.consumeLine(this.buffered)
          if (maybeRecord) {
            this.push(maybeRecord)
          }
          maybeRecord = this.builder.buildRecord()
          if (maybeRecord) {
            this.push(maybeRecord)
          }
        }
        break
      }
    }
    done()
  }

  /**
   * @desc Process a chunk
   * @param {Buffer} buf - The chunk to be processed
   * @param {string} enc - The encoding of the chunk
   * @param {function} done - Function used to indicate we are done processing the chunk
   * @private
   */
  _transform (buf, enc, done) {
    let chunk
    if (this.buffered) {
      chunk = Buffer.concat(
        [this.buffered, buf],
        this.buffered.length + buf.length
      )
      this.buffered = undefined
    } else {
      chunk = buf
    }
    this._consumeChunk(chunk, done)
  }

  /**
   * @desc Flushes any remaining data
   * @param {function} done - Function used to indicate we are done processing the chunk
   * @private
   */
  _flush (done) {
    if (this.buffered) {
      this._consumeChunk(this.buffered, done, true)
    }
    done()
  }
}

/**
 * @type {WARCStreamTransform}
 */
module.exports = WARCStreamTransform