Skip to content

9: Node.js Core Streams

Tim Oxley edited this page Sep 25, 2015 · 12 revisions

[#9] Node.js Core Streams

With Guest Tim Oxley (@timoxley)

Summary

Tim Oxley (@timoxley) joins us to take us through Node.js Core Streams. Streams have gone through a lot in the Node core and they are one of the more debated parts of the codebase. Tim demonstrates what is happening when we use streams in Node, how to debug node core, and much more.

Details

Code Repositories

References

Notes

Tim resides in Singapore and is employed at NodeSource. He works with Node everyday and is very knowledgeable in Node Core and the JavaScript community. Tim spoke a bit on Streams on his podcast called NodeUp which is what led us to recruit him.


Part I: Why Streams?

Buffering vs Streaming

Buffering

const fs = require('fs')

const filename = process.argv[2]
fs.readFile(filename, 'utf8', (err, data) => {
  if (err) throw err
  data = data.toUpperCase()
  fs.writeFile(`transformed_${filename}`, data)
})

Read File with Buffering Demo

> cat data-0.txt
> node readfile-buffering.js data-0.txt
> cat transformed_data-0.txt
> node readfile-buffering.js data-300.txt

buffer.js:369
    throw new Error('toString failed');
    ^

Error: toString failed
    at Buffer.toString (buffer.js:369:11)

# Ran out of memory (and exposed node bug).

https://github.com/nodejs/node/issues/2767

> node readfile-buffering.js data-100.txt

Streaming

const fs = require('fs')
const stream = require('stream')

const upperCaser = new stream.Transform({
  transform(chunk, encoding, done) {
    this.push(String(chunk).toUpperCase())
    done()
  }
})

const filename = process.argv[2]
fs.createReadStream(filename)
.pipe(upperCaser)
.pipe(fs.createWriteStream(`transformed_${filename}`))

Read File with Streaming Demo

> node readfile-buffering.js data-0.txt
> cat transformed_data-0.txt
# Success
> node readfile-streaming.js data-100.txt
# Success! 
> node readfile-buffering.js data-300.txt
# Success. Buffering couldn't handle this file

Memory Usage

Streams vs Buffers

image

  1. Streaming reading 800MB
  2. Streaming reading 300MB
  3. Streaming reading 100MB
  4. Buffered reading 100MB

Memory Usage vs Throughput

const fs = require('fs')
const stream = require('stream')

const upperCaser = new stream.Transform({
  transform(chunk, encoding, done) {
    this.push(String(chunk).toUpperCase())
    done()
  }
})

const filename = process.argv[2]
fs.createReadStream(filename, {
  highWaterMark: 1024 * 16 // adjust this to balance memory usage vs throughput
})
.pipe(upperCaser)
.pipe(fs.createWriteStream(`transformed_${filename}`))

Memory Usage vs Throughput

image

  1. Streaming reading 100MB with high water mark set to 1 byte
  2. Streaming reading 100MB with high water mark set to 32kb
  3. Streaming reading 100MB with high water mark set to 32kb & a console.log on each chunk
  4. Streaming reading 800MB
  5. Streaming reading 300MB
  6. Streaming reading 100MB
  7. Buffered reading 100MB

Accidental Streams

Packages often implement a stream-like interface, but it's not actually a stream. Classic example is having an event emitter that emits 'data' events but it's not actually a stream.

const EventEmitter = require('events')

class MyAPI extends EventEmitter {
  read (data) {
    this.parse(data).forEach(chunk => {
      this.emit('data', chunk)
    })
  }
  ...
}

Problems

  • Inconsistent API.
  • No composability via pipe.
  • No backpressure.
  • No buffering.

Just extend Readable

const Readable = require('readable-stream').Readable

class MyAPI extends Readable {
  _read (data) {
    this.parse(data).forEach(chunk => {
      this.push('data', chunk)
    })
  }
  ...
}

Part II: Streams in Node

https://github.com/nodejs/node/tree/master/lib

Implemented in nodejs/readable-stream

https://github.com/nodejs/readable-stream Originally written by isaac for streams2.

Protect your codebase from future changes.

require('stream')

Simplest implementation of a 'Stream'.

EventEmitter which emits 'data' events.

stream.emit('data', chunk)

Implements naive .pipe method

Backpressure

If can't write, pause.

https://github.com/nodejs/node/blob/master/lib/stream.js#L26-L37

Resume on 'drain' event.

https://github.com/nodejs/node/blob/master/lib/stream.js#L39-L45

Auto-End

https://github.com/nodejs/node/blob/master/lib/stream.js#L47-L60

Don't actually use this.

This is the simplest implementation of a stream but is limited in usefulness as it:

  • Is a push stream
  • Does not support buffering

Don't actually use this.

Stream Error Handling

YOLO by default

'error' events that aren't handled are thrown.

https://github.com/nodejs/node/blob/master/lib/stream.js#L70-L76

YOLO

a
.pipe(b)
.pipe(c)
.pipe(d)
.pipe(e)

Safe, but gross

a
.pipe(b).on('error', onError)
.pipe(c).on('error', onError)
.pipe(d).on('error', onError)
.pipe(e).on('error', onError)

This is kinda crap IMO because it means that most people simply don't attach error handlers when piping.

Chaining streams together with is one of the nicest parts of streams but it's kinda ruined when you need to attach error handlers all over the place.

Solution – Mississippi/Pumpify


'Real' Streams

  • Readable
  • Writable
  • Duplex
  • Transform
  • PassThrough

Buffering by Default

Isn't Buffering Bad?

  • Buffering is bad when the buffers are too big.
  • Memory vs Throughput

Buffers?

  • Allocates a slab of untyped memory.
  • Basically a malloc
  • Backed by TypedArrays
  • TypedArrays before we had TypedArrays

Object Mode

Non-Object Mode

  • Strings & Buffers Only
> process.stdout.write({object: 'data'})
TypeError: invalid data
    at WriteStream.Socket.write (net.js:617:11)
    ...
> process.stdout.write(JSON.stringify({object: 'data'}))
{"object":"data"}

Object Mode

var stream = JSONStream.parse(['rows', true, 'doc'])

stream
.pipe(new stream.Transform({
  objectMode: true,
  transform (chunk, enc, done) {
    console.log(typeof chunk) // 'object'
    this.push(JSON.stringify(chunk))
    done()
  }
}))
.pipe(process.stdout)

Readable

Buffered "Source".

https://github.com/nodejs/readable-stream/blob/master/lib/_stream_readable.js

Writable

Buffered "Sink".

https://github.com/nodejs/readable-stream/blob/master/lib/_stream_writable.js

Duplex

Stream that's both Readable and a Writable.

https://github.com/nodejs/readable-stream/blob/master/lib/_stream_duplex.js

TCP socket as created by net is a duplex stream.

import net from 'net'
import {Transform} from 'stream'
net.createServer(socket => {
  socket.pipe(new Transform({
    transform(chunk, enc, done) {
      console.log('server got %s', chunk)
      this.push(String(chunk).toUpperCase())
      this.push(null)
      done()
    }
  })).pipe(socket)
}).listen(3000)

const client = net.connect(3000)
client.write('hello')
client.pipe(process.stdout)
client.on('end', () => process.exit(0))

Transform

Duplex Stream with a "map" function that maps input to output.

https://github.com/nodejs/readable-stream/blob/master/lib/_stream_transform.js

PassThrough

Read-Only Transform Stream.

https://github.com/nodejs/readable-stream/blob/master/lib/_stream_passthrough.js

Part III: Streams in the wild.

Common problem

https://github.com/mafintosh/duplexify

Consuming Streams

One of the things that surprises people when they first start working with HTTP in node is the body of requests isn't there by default for http requests and responses.

HTTP Request and Response are Readable & Writable respectively.

Common to see stuff like:

Concat Strings

Slow!

http.createServer((req, res) => {
  var body = ''
  request.on('data', data => {
    body += data
  })
  request.on('end', () => {
    JSON.parse(body) // or whatever
  })
})

Concat Buffers

http.createServer((req, res) => {
  let body = new Buffer(0)
  request.on('data', data => {
    body = body.concat(data)
  })
  request.on('end', () => {
    JSON.parse(String(body)) // or whatever
  })
})

Delayed Concat Buffers

http.createServer((req, res) => {
  let pieces = []
  req.on('data', data => {
    pieces.push(data)
  })
  req.on('end', () => {
    let body = new Buffer(0)
    for (var i = 0; i < pieces.length; i++) body = body.concat(pieces[i])
    JSON.parse(String(body)) // or whatever
  })
})

Just use bl

import bl from 'bl'

http.createServer((req, res) => {
  req.pipe(bl((err, body) => {
    // TODO handle err
    JSON.parse(String(body)) // or whatever
  }))
})

Don't be afraid to use 3rd party packages. This stuff isn't really designed to be used raw. That's the whole point of having a minimal core, rather than baking too many opinions into node core, users are asked to lean heavily on userland.

Learn More