Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 92 additions & 2 deletions decoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,29 @@ var IncomingMessage = require('./lib/incoming-message')
var debug = require('./lib/debug')

var HEADER_MAX_BYTES = 1024 // TODO: Figure out the best max size of the header
var INTERLEAVED_HEADER_BYTES = 4
var CR = 0x0d
var NL = 0x0a
var NO_BODY_STATUS_CODES = [100, 304]

var INTERLEAVED_SIGN = 0x24

var Decoder = module.exports = function (opts) {
if (!(this instanceof Decoder)) return new Decoder(opts)

stream.Writable.call(this, opts)

this._inHead = false
this._inBody = false
this._headerOffset = 0
this._header = new Buffer(HEADER_MAX_BYTES)
this._bodyBytesWritten = 0

this._inInterleavedHead = false
this._inInterleavedPacket = false
this._interleavedHeaderOffset = 0
this._interleavedHeader = new Buffer(INTERLEAVED_HEADER_BYTES)
this._interleavedPacketBytesWritten = 0
}

util.inherits(Decoder, stream.Writable)
Expand All @@ -33,15 +43,94 @@ Decoder.prototype._writeOffset = function (chunk, offset, cb) {
if (this._inBody) {
offset = this._writeBody(chunk, offset, cb)
if (offset === 0) return // chunk not consumed - _writeOffset will be called again when ready
} else {
} else if (this._inHead) {
offset = this._writeHead(chunk, offset)
} else if (this._inInterleavedHead) {
offset = this._writeInterleavedHead(chunk, offset)
} else if (this._inInterleavedPacket) {
offset = this._writeInterleavedPacket(chunk, offset, cb)
} else {
if (chunk[offset] === INTERLEAVED_SIGN) {
offset = this._writeInterleavedHead(chunk, offset)
} else {
offset = this._writeHead(chunk, offset)
}
}
}
cb()
}
Decoder.prototype._writeInterleavedHead = function (chunk, offset) {
if (this._interleavedHeaderOffset === 0) {
debug('start of interleaved header')
this._inInterleavedHead = true
}
if (chunk.length - offset < 4) {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the 4 here be INTERLEAVED_HEADER_BYTES if I understand the code correctly?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right.

chunk.copy(this._interleavedHeader, this._interleavedHeaderOffset, offset)
this._interleavedHeaderOffset += chunk.length - offset
return chunk.length
}

chunk.copy(this._interleavedHeader, this._interleavedHeaderOffset, offset, offset + INTERLEAVED_HEADER_BYTES)

debug('end of interleaved header')
this._inInterleavedHead = false

this._packet = new stream.PassThrough()
this._packet.channel = this._interleavedHeader.readInt8(1)
this._packet.size = this._interleavedHeader.readInt16BE(2)

this._interleavedHeaderOffset = 0
this._interleavedHeader = new Buffer(INTERLEAVED_HEADER_BYTES)

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you creating a new _interleavedHeader buffer here? I'd normally just reuse the existing buffer so we don't have to initiate new memory.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably you are right.

I just tried to use the same approach you used for RTSP Header to make it easier to understand.

As it is a fixed size reader header, maybe we should just remove line 83 and resuse the same buffer.

@surfdude75 surfdude75 May 6, 2016

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty new to node, github and open source projects.

I am getting used to it.

:)

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries 😃 It's awesome to see new people getting into open source


this._inInterleavedPacket = true

// _writeInterleavedPacket logic to handle back-pressure
var self = this
this._packet.on('drain', function () {
if (!this._chunk) return
self._writeOffset(this._chunk, this._end, this._cb)
this._chunk = null
this._end = null
this._cb = null
})
this.emit('packet', this._packet)

return offset + (INTERLEAVED_HEADER_BYTES - this._interleavedHeaderOffset)
}

Decoder.prototype._writeInterleavedPacket = function (chunk, offset, cb) {
if (this._interleavedPacketBytesWritten === 0) debug('start of packet')

var bytesLeft = this._packet.size - this._interleavedPacketBytesWritten
var end = bytesLeft < chunk.length - offset ? offset + bytesLeft : chunk.length

var drained = this._packet.write(chunk.slice(offset, end))

this._interleavedPacketBytesWritten += end - offset

if (this._interleavedPacketBytesWritten >= this._packet.size) {
debug('end of packet')
this._inInterleavedPacket = false
this._interleavedPacketBytesWritten = 0
this._packet.end()
}

if (!drained) {
debug('back pressure detected in IncomingPacket')
this._packet._chunk = chunk
this._packet._end = end
this._packet._cb = cb
return 0 // indicate we didn't consume the chunk (yet)
}

return end
}

Decoder.prototype._writeHead = function (chunk, offset) {
if (this._headerOffset === 0) debug('start of header')
if (this._headerOffset === 0) {
debug('start of header')
this._inHead = true
}

chunk.copy(this._header, this._headerOffset, offset)
var origHeaderOffset = this._headerOffset
Expand All @@ -50,6 +139,7 @@ Decoder.prototype._writeHead = function (chunk, offset) {
if (!~bodyStart) return chunk.length // still reading the header

debug('end of header')
this._inHead = false

this._msg = new IncomingMessage(this._header)

Expand Down