lib/parsers/autoWARCParser.js
const fs = require('fs-extra')
const zlib = require('zlib')
const EventEmitter = require('eventemitter3')
const untildify = require('untildify')
const WARCStreamTransform = require('./warcStreamTransform')
const GzipDetector = require('./gzipDetector')
const canUseRecordIterator = require('./_canUseRecordIterator')
/**
* @desc Parses a WARC file automatically detecting if it is gzipped.
* @extends {EventEmitter}
* @example
* const parser = new AutoWARCParser('<path-to-warcfile>')
* parser.on('record', record => { console.log(record) })
* parser.on('error', error => { console.error(error) })
* parser.start()
* @example
* const parser = new AutoWARCParser()
* parser.on('record', record => { console.log(record) })
* parser.on('error', error => { console.error(error) })
* parser.parseWARC('<path-to-warcfile>')
* @example
* // requires node >= 10
* for await (const record of new AutoWARCParser('<path-to-warcfile>')) {
* console.log(record)
* }
*/
class AutoWARCParser extends EventEmitter {
/**
* @desc Create a new AutoWARCParser
* @param {?string} [wp] - path to the warc file to be parsed
*/
constructor (wp) {
super()
/**
* @type {?string} the path to the WARC file to be parsed
* @private
*/
this._wp = wp
/**
* @type {boolean} is the parser currently parsing the WARC
* @private
*/
this._parsing = false
this._onRecord = this._onRecord.bind(this)
this._onEnd = this._onEnd.bind(this)
this._onError = this._onError.bind(this)
if (canUseRecordIterator) {
const recordIterator = require('./recordterator')
/**
* @returns {AsyncIterator<WARCRecord>}
*/
this[Symbol.asyncIterator] = () => {
return recordIterator(this._getStream())
}
}
}
/**
* @desc Begin parsing the WARC file. Once the start method has been called the parser will begin emitting
* @emits {record} emitted when the parser has parsed a full record, the argument supplied to the listener will be the parsed record
* @emits {done} emitted when the WARC file has been completely parsed
* @emits {error} emitted if an exception occurs, the argument supplied to the listener will be the error that occurred.
* @return {boolean} if the parser has begun or is currently parsing a WARC file
* - true: indicates the parser has begun parsing the WARC file true
* - false: indicated the parser is currently parsing a WARC file
* @throws {Error} if the path to the WARC file is null or undefined or another error occurred
*/
start () {
let startedParsing = false
if (!this._parsing) {
if (this._wp == null) {
throw new Error('The supplied path to the WARC file is null/undefined')
}
this._parsing = true
this._getStream()
.pipe(new WARCStreamTransform())
.on('data', this._onRecord)
.on('error', this._onError)
.on('end', this._onEnd)
startedParsing = true
}
return startedParsing
}
/**
* @desc Alias for {@link start} except that you can supply the path to the WARC file to be parsed
* if one was not supplied via the constructor or to parse another WARC file. If the path to WARC file
* to be parsed was supplied via the constructor and you supply a different path to this method.
* It will override the one supplied via the constructor
* @param {?string} [wp] - path to the WARC file to be parsed
* @return {boolean} indication if the parser has begun or is currently parsing a WARC file
* @throws {Error} if the path to the WARC file is null or undefined or another error occurred
*/
parseWARC (wp) {
if (!this._parsing) {
this._wp = wp || this._wp
}
return this.start()
}
/**
* @desc Listener for a parsers record event
* @param {WARCRecord} record
* @private
*/
_onRecord (record) {
this.emit('record', record)
}
/**
* @desc Listener for a parsers done event
* @private
*/
_onEnd () {
this._parsing = false
this.emit('done')
}
/**
* @desc Listener for a parsers error event
* @param {Error} error
* @private
*/
_onError (error) {
this.emit('error', error)
}
/**
* @desc Returns a ReadStream for the WARC to be parsed.
* If the WARC file is gziped the returned value will the
* results of ``ReadStream.pipe(zlib.createGunzip())``
* @returns {ReadStream|Gunzip}
* @private
*/
_getStream () {
const isGz = GzipDetector.isGzippedSync(this._wp)
const stream = fs.createReadStream(untildify(this._wp))
if (isGz) return stream.pipe(zlib.createGunzip())
return stream
}
}
/**
* @type {AutoWARCParser}
*/
module.exports = AutoWARCParser