/usr/lib/nodejs/block-stream.js is in node-block-stream 0.0.9-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | // write data to it, and it'll emit data in 512 byte blocks.
// if you .end() or .flush(), it'll emit whatever it's got,
// padded with nulls to 512 bytes.
module.exports = BlockStream
var Stream = require("stream").Stream
, inherits = require("inherits")
, assert = require("assert").ok
, debug = process.env.DEBUG ? console.error : function () {}
function BlockStream (size, opt) {
this.writable = this.readable = true
this._opt = opt || {}
this._chunkSize = size || 512
this._offset = 0
this._buffer = []
this._bufferLength = 0
if (this._opt.nopad) this._zeroes = false
else {
this._zeroes = new Buffer(this._chunkSize)
for (var i = 0; i < this._chunkSize; i ++) {
this._zeroes[i] = 0
}
}
}
inherits(BlockStream, Stream)
BlockStream.prototype.write = function (c) {
// debug(" BS write", c)
if (this._ended) throw new Error("BlockStream: write after end")
if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "")
if (c.length) {
this._buffer.push(c)
this._bufferLength += c.length
}
// debug("pushed onto buffer", this._bufferLength)
if (this._bufferLength >= this._chunkSize) {
if (this._paused) {
// debug(" BS paused, return false, need drain")
this._needDrain = true
return false
}
this._emitChunk()
}
return true
}
BlockStream.prototype.pause = function () {
// debug(" BS pausing")
this._paused = true
}
BlockStream.prototype.resume = function () {
// debug(" BS resume")
this._paused = false
return this._emitChunk()
}
BlockStream.prototype.end = function (chunk) {
// debug("end", chunk)
if (typeof chunk === "function") cb = chunk, chunk = null
if (chunk) this.write(chunk)
this._ended = true
this.flush()
}
BlockStream.prototype.flush = function () {
this._emitChunk(true)
}
BlockStream.prototype._emitChunk = function (flush) {
// debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused)
// emit a <chunkSize> chunk
if (flush && this._zeroes) {
// debug(" BS push zeroes", this._bufferLength)
// push a chunk of zeroes
var padBytes = (this._bufferLength % this._chunkSize)
if (padBytes !== 0) padBytes = this._chunkSize - padBytes
if (padBytes > 0) {
// debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))
this._buffer.push(this._zeroes.slice(0, padBytes))
this._bufferLength += padBytes
// debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)
}
}
if (this._emitting || this._paused) return
this._emitting = true
// debug(" BS entering loops")
var bufferIndex = 0
while (this._bufferLength >= this._chunkSize &&
(flush || !this._paused)) {
// debug(" BS data emission loop", this._bufferLength)
var out
, outOffset = 0
, outHas = this._chunkSize
while (outHas > 0 && (flush || !this._paused) ) {
// debug(" BS data inner emit loop", this._bufferLength)
var cur = this._buffer[bufferIndex]
, curHas = cur.length - this._offset
// debug("cur=", cur)
// debug("curHas=%j", curHas)
// If it's not big enough to fill the whole thing, then we'll need
// to copy multiple buffers into one. However, if it is big enough,
// then just slice out the part we want, to save unnecessary copying.
// Also, need to copy if we've already done some copying, since buffers
// can't be joined like cons strings.
if (out || curHas < outHas) {
out = out || new Buffer(this._chunkSize)
cur.copy(out, outOffset,
this._offset, this._offset + Math.min(curHas, outHas))
} else if (cur.length === outHas && this._offset === 0) {
// shortcut -- cur is exactly long enough, and no offset.
out = cur
} else {
// slice out the piece of cur that we need.
out = cur.slice(this._offset, this._offset + outHas)
}
if (curHas > outHas) {
// means that the current buffer couldn't be completely output
// update this._offset to reflect how much WAS written
this._offset += outHas
outHas = 0
} else {
// output the entire current chunk.
// toss it away
outHas -= curHas
outOffset += curHas
bufferIndex ++
this._offset = 0
}
}
this._bufferLength -= this._chunkSize
assert(out.length === this._chunkSize)
// debug("emitting data", out)
// debug(" BS emitting, paused=%j", this._paused, this._bufferLength)
this.emit("data", out)
out = null
}
// debug(" BS out of loops", this._bufferLength)
// whatever is left, it's not enough to fill up a block, or we're paused
this._buffer = this._buffer.slice(bufferIndex)
if (this._paused) {
// debug(" BS paused, leaving", this._bufferLength)
this._needsDrain = true
this._emitting = false
return
}
// if flushing, and not using null-padding, then need to emit the last
// chunk(s) sitting in the queue. We know that it's not enough to
// fill up a whole block, because otherwise it would have been emitted
// above, but there may be some offset.
var l = this._buffer.length
if (flush && !this._zeroes && l) {
if (l === 1) {
if (this._offset) {
this.emit("data", this._buffer[0].slice(this._offset))
} else {
this.emit("data", this._buffer[0])
}
} else {
var outHas = this._bufferLength
, out = new Buffer(outHas)
, outOffset = 0
for (var i = 0; i < l; i ++) {
var cur = this._buffer[i]
, curHas = cur.length - this._offset
cur.copy(out, outOffset, this._offset)
this._offset = 0
outOffset += curHas
this._bufferLength -= curHas
}
this.emit("data", out)
}
// truncate
this._buffer.length = 0
this._bufferLength = 0
this._offset = 0
}
// now either drained or ended
// debug("either draining, or ended", this._bufferLength, this._ended)
// means that we've flushed out all that we can so far.
if (this._needDrain) {
// debug("emitting drain", this._bufferLength)
this._needDrain = false
this.emit("drain")
}
if ((this._bufferLength === 0) && this._ended && !this._endEmitted) {
// debug("emitting end", this._bufferLength)
this._endEmitted = true
this.emit("end")
}
this._emitting = false
// debug(" BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize)
}
|