An implementation of the WHATWG Streams Standard.
import {\n ReadableStream,\n WritableStream,\n TransformStream,\n} from 'node:stream/web';\n
const {\n ReadableStream,\n WritableStream,\n TransformStream,\n} = require('stream/web');\n
The WHATWG Streams Standard (or \"web streams\") defines an API for handling\nstreaming data. It is similar to the Node.js Streams API but emerged later\nand has become the \"standard\" API for streaming data across many JavaScript\nenvironments.
There are three primary types of objects:
ReadableStream
WritableStream
TransformStream
This example creates a simple ReadableStream that pushes the current\nperformance.now() timestamp once every second forever. An async iterable\nis used to read the data from the stream.
performance.now()
import {\n ReadableStream\n} from 'node:stream/web';\n\nimport {\n setInterval as every\n} from 'node:timers/promises';\n\nimport {\n performance\n} from 'node:perf_hooks';\n\nconst SECOND = 1000;\n\nconst stream = new ReadableStream({\n async start(controller) {\n for await (const _ of every(SECOND))\n controller.enqueue(performance.now());\n }\n});\n\nfor await (const value of stream)\n console.log(value);\n
const {\n ReadableStream\n} = require('node:stream/web');\n\nconst {\n setInterval: every\n} = require('node:timers/promises');\n\nconst {\n performance\n} = require('node:perf_hooks');\n\nconst SECOND = 1000;\n\nconst stream = new ReadableStream({\n async start(controller) {\n for await (const _ of every(SECOND))\n controller.enqueue(performance.now());\n }\n});\n\n(async () => {\n for await (const value of stream)\n console.log(value);\n})();\n
The readableStream.locked property is false by default, and is\nswitched to true while there is an active reader consuming the\nstream's data.
readableStream.locked
false
true
import { ReadableStream } from 'node:stream/web';\n\nconst stream = new ReadableStream();\n\nconst reader = stream.getReader();\n\nconsole.log(await reader.read());\n
const { ReadableStream } = require('node:stream/web');\n\nconst stream = new ReadableStream();\n\nconst reader = stream.getReader();\n\nreader.read().then(console.log);\n
Causes the readableStream.locked to be true.
Connects this <ReadableStream> to the pair of <ReadableStream> and\n<WritableStream> provided in the transform argument such that the\ndata from this <ReadableStream> is written in to transform.writable,\npossibly transformed, then pushed to transform.readable. Once the\npipeline is configured, transform.readable is returned.
transform
transform.writable
transform.readable
Causes the readableStream.locked to be true while the pipe operation\nis active.
import {\n ReadableStream,\n TransformStream,\n} from 'node:stream/web';\n\nconst stream = new ReadableStream({\n start(controller) {\n controller.enqueue('a');\n },\n});\n\nconst transform = new TransformStream({\n transform(chunk, controller) {\n controller.enqueue(chunk.toUpperCase());\n }\n});\n\nconst transformedStream = stream.pipeThrough(transform);\n\nfor await (const chunk of transformedStream)\n console.log(chunk);\n
const {\n ReadableStream,\n TransformStream,\n} = require('node:stream/web');\n\nconst stream = new ReadableStream({\n start(controller) {\n controller.enqueue('a');\n },\n});\n\nconst transform = new TransformStream({\n transform(chunk, controller) {\n controller.enqueue(chunk.toUpperCase());\n }\n});\n\nconst transformedStream = stream.pipeThrough(transform);\n\n(async () => {\n for await (const chunk of transformedStream)\n console.log(chunk);\n})();\n
Returns a pair of new <ReadableStream> instances to which this\nReadableStream's data will be forwarded. Each will receive the\nsame data.
Creates and returns an async iterator usable for consuming this\nReadableStream's data.
Causes the readableStream.locked to be true while the async iterator\nis active.
import { Buffer } from 'node:buffer';\n\nconst stream = new ReadableStream(getSomeSource());\n\nfor await (const chunk of stream.values({ preventCancel: true }))\n console.log(Buffer.from(chunk).toString());\n
The <ReadableStream> object supports the async iterator protocol using\nfor await syntax.
for await
import { Buffer } from 'node:buffer';\n\nconst stream = new ReadableStream(getSomeSource());\n\nfor await (const chunk of stream)\n console.log(Buffer.from(chunk).toString());\n
The async iterator will consume the <ReadableStream> until it terminates.
By default, if the async iterator exits early (via either a break,\nreturn, or a throw), the <ReadableStream> will be closed. To prevent\nautomatic closing of the <ReadableStream>, use the readableStream.values()\nmethod to acquire the async iterator and set the preventCancel option to\ntrue.
break
return
throw
readableStream.values()
preventCancel
The <ReadableStream> must not be locked (that is, it must not have an existing\nactive reader). During the async iteration, the <ReadableStream> will be locked.
A <ReadableStream> instance can be transferred using a <MessagePort>.
const stream = new ReadableStream(getReadableSourceSomehow());\n\nconst { port1, port2 } = new MessageChannel();\n\nport1.onmessage = ({ data }) => {\n data.getReader().read().then((chunk) => {\n console.log(chunk);\n });\n};\n\nport2.postMessage(stream, [stream]);\n
underlyingSource
start
controller
undefined
pull
cancel
reason
type
'bytes'
autoAllocateChunkSize
strategy
highWaterMark
size
chunk
By default, calling readableStream.getReader() with no arguments\nwill return an instance of ReadableStreamDefaultReader. The default\nreader treats the chunks of data passed through the stream as opaque\nvalues, which allows the <ReadableStream> to work with generally any\nJavaScript value.
readableStream.getReader()
ReadableStreamDefaultReader
Cancels the <ReadableStream> and returns a promise that is fulfilled\nwhen the underlying stream has been canceled.
Requests the next chunk of data from the underlying <ReadableStream>\nand returns a promise that is fulfilled with the data once it is\navailable.
Releases this reader's lock on the underlying <ReadableStream>.
Creates a new <ReadableStreamDefaultReader> that is locked to the\ngiven <ReadableStream>.
The ReadableStreamBYOBReader is an alternative consumer for\nbyte-oriented <ReadableStream>s (those that are created with\nunderlyingSource.type set equal to 'bytes' when the\nReadableStream was created).
ReadableStreamBYOBReader
underlyingSource.type
The BYOB is short for \"bring your own buffer\". This is a\npattern that allows for more efficient reading of byte-oriented\ndata that avoids extraneous copying.
BYOB
import {\n open\n} from 'node:fs/promises';\n\nimport {\n ReadableStream\n} from 'node:stream/web';\n\nimport { Buffer } from 'node:buffer';\n\nclass Source {\n type = 'bytes';\n autoAllocateChunkSize = 1024;\n\n async start(controller) {\n this.file = await open(new URL(import.meta.url));\n this.controller = controller;\n }\n\n async pull(controller) {\n const view = controller.byobRequest?.view;\n const {\n bytesRead,\n } = await this.file.read({\n buffer: view,\n offset: view.byteOffset,\n length: view.byteLength\n });\n\n if (bytesRead === 0) {\n await this.file.close();\n this.controller.close();\n }\n controller.byobRequest.respond(bytesRead);\n }\n}\n\nconst stream = new ReadableStream(new Source());\n\nasync function read(stream) {\n const reader = stream.getReader({ mode: 'byob' });\n\n const chunks = [];\n let result;\n do {\n result = await reader.read(Buffer.alloc(100));\n if (result.value !== undefined)\n chunks.push(Buffer.from(result.value));\n } while (!result.done);\n\n return Buffer.concat(chunks);\n}\n\nconst data = await read(stream);\nconsole.log(Buffer.from(data).toString());\n
Do not pass a pooled <Buffer> object instance in to this method.\nPooled Buffer objects are created using Buffer.allocUnsafe(),\nor Buffer.from(), or are often returned by various node:fs module\ncallbacks. These types of Buffers use a shared underlying\n<ArrayBuffer> object that contains all of the data from all of\nthe pooled Buffer instances. When a Buffer, <TypedArray>,\nor <DataView> is passed in to readableStreamBYOBReader.read(),\nthe view's underlying ArrayBuffer is detached, invalidating\nall existing views that may exist on that ArrayBuffer. This\ncan have disastrous consequences for your application.
Buffer
Buffer.allocUnsafe()
Buffer.from()
node:fs
readableStreamBYOBReader.read()
ArrayBuffer
Creates a new ReadableStreamBYOBReader that is locked to the\ngiven <ReadableStream>.
Every <ReadableStream> has a controller that is responsible for\nthe internal state and management of the stream's queue. The\nReadableStreamDefaultController is the default controller\nimplementation for ReadableStreams that are not byte-oriented.
ReadableStreamDefaultController
Closes the <ReadableStream> to which this controller is associated.
Appends a new chunk of data to the <ReadableStream>'s queue.
Signals an error that causes the <ReadableStream> to error and close.
Returns the amount of data remaining to fill the <ReadableStream>'s\nqueue.
Every <ReadableStream> has a controller that is responsible for\nthe internal state and management of the stream's queue. The\nReadableByteStreamController is for byte-oriented ReadableStreams.
ReadableByteStreamController
When using ReadableByteStreamController in byte-oriented\nstreams, and when using the ReadableStreamBYOBReader,\nthe readableByteStreamController.byobRequest property\nprovides access to a ReadableStreamBYOBRequest instance\nthat represents the current read request. The object\nis used to gain access to the ArrayBuffer/TypedArray\nthat has been provided for the read request to fill,\nand provides methods for signaling that the data has\nbeen provided.
readableByteStreamController.byobRequest
ReadableStreamBYOBRequest
TypedArray
Signals that a bytesWritten number of bytes have been written\nto readableStreamBYOBRequest.view.
bytesWritten
readableStreamBYOBRequest.view
Signals that the request has been fulfilled with bytes written\nto a new Buffer, TypedArray, or DataView.
DataView
The WritableStream is a destination to which stream data is sent.
import {\n WritableStream\n} from 'node:stream/web';\n\nconst stream = new WritableStream({\n write(chunk) {\n console.log(chunk);\n }\n});\n\nawait stream.getWriter().write('Hello World');\n
Abruptly terminates the WritableStream. All queued writes will be\ncanceled with their associated promises rejected.
Closes the WritableStream when no additional writes are expected.
Creates and creates a new writer instance that can be used to write\ndata into the WritableStream.
The writableStream.locked property is false by default, and is\nswitched to true while there is an active writer attached to this\nWritableStream.
writableStream.locked
A <WritableStream> instance can be transferred using a <MessagePort>.
const stream = new WritableStream(getWritableSinkSomehow());\n\nconst { port1, port2 } = new MessageChannel();\n\nport1.onmessage = ({ data }) => {\n data.getWriter().write('hello');\n};\n\nport2.postMessage(stream, [stream]);\n
Releases this writer's lock on the underlying <ReadableStream>.
Appends a new chunk of data to the <WritableStream>'s queue.
The amount of data required to fill the <WritableStream>'s queue.
Creates a new WritableStreamDefaultWriter that is locked to the given\nWritableStream.
WritableStreamDefaultWriter
The WritableStreamDefaultController manage's the <WritableStream>'s\ninternal state.
WritableStreamDefaultController
Called by user-code to signal that an error has occurred while processing\nthe WritableStream data. When called, the <WritableStream> will be aborted,\nwith currently pending writes canceled.
A TransformStream consists of a <ReadableStream> and a <WritableStream> that\nare connected such that the data written to the WritableStream is received,\nand potentially transformed, before being pushed into the ReadableStream's\nqueue.
import {\n TransformStream\n} from 'node:stream/web';\n\nconst transform = new TransformStream({\n transform(chunk, controller) {\n controller.enqueue(chunk.toUpperCase());\n }\n});\n\nawait Promise.all([\n transform.writable.getWriter().write('A'),\n transform.readable.getReader().read(),\n]);\n
A <TransformStream> instance can be transferred using a <MessagePort>.
const stream = new TransformStream();\n\nconst { port1, port2 } = new MessageChannel();\n\nport1.onmessage = ({ data }) => {\n const { writable, readable } = data;\n // ...\n};\n\nport2.postMessage(stream, [stream]);\n
The TransformStreamDefaultController manages the internal state\nof the TransformStream.
TransformStreamDefaultController
The amount of data required to fill the readable side's queue.
Appends a chunk of data to the readable side's queue.
Signals to both the readable and writable side that an error has occurred\nwhile processing the transform data, causing both sides to be abruptly\nclosed.
Closes the readable side of the transport and causes the writable side\nto be abruptly closed with an error.
The encoding supported by the TextEncoderStream instance.
TextEncoderStream
Creates a new TextEncoderStream instance.
The encoding supported by the TextDecoderStream instance.
TextDecoderStream
The value will be true if decoding errors result in a TypeError being\nthrown.
TypeError
The value will be true if the decoding result will include the byte order\nmark.
Creates a new TextDecoderStream instance.
The utility consumer functions provide common options for consuming\nstreams.
They are accessed using:
import {\n arrayBuffer,\n blob,\n buffer,\n json,\n text,\n} from 'node:stream/consumers';\n
const {\n arrayBuffer,\n blob,\n buffer,\n json,\n text,\n} = require('node:stream/consumers');\n
import { buffer as arrayBuffer } from 'node:stream/consumers';\nimport { Readable } from 'node:stream';\nimport { TextEncoder } from 'node:util';\n\nconst encoder = new TextEncoder();\nconst dataArray = encoder.encode('hello world from consumers!');\n\nconst readable = Readable.from(dataArray);\nconst data = await arrayBuffer(readable);\nconsole.log(`from readable: ${data.byteLength}`);\n
const { arrayBuffer } = require('node:stream/consumers');\nconst { Readable } = require('stream');\nconst { TextEncoder } = require('util');\n\nconst encoder = new TextEncoder();\nconst dataArray = encoder.encode(['hello world from consumers!']);\nconst readable = Readable.from(dataArray);\narrayBuffer(readable).then((data) => {\n console.log(`from readable: ${data.byteLength}`);\n});\n
import { blob } from 'node:stream/consumers';\n\nconst dataBlob = new Blob(['hello world from consumers!']);\n\nconst readable = dataBlob.stream();\nconst data = await blob(readable);\nconsole.log(`from readable: ${data.size}`);\n
const { blob } = require('node:stream/consumers');\n\nconst dataBlob = new Blob(['hello world from consumers!']);\n\nconst readable = dataBlob.stream();\nblob(readable).then((data) => {\n console.log(`from readable: ${data.size}`);\n});\n
import { buffer } from 'node:stream/consumers';\nimport { Readable } from 'node:stream';\nimport { Buffer } from 'node:buffer';\n\nconst dataBuffer = Buffer.from('hello world from consumers!');\n\nconst readable = Readable.from(dataBuffer);\nconst data = await buffer(readable);\nconsole.log(`from readable: ${data.length}`);\n
const { buffer } = require('node:stream/consumers');\nconst { Readable } = require('node:stream');\nconst { Buffer } = require('node:buffer');\n\nconst dataBuffer = Buffer.from('hello world from consumers!');\n\nconst readable = Readable.from(dataBuffer);\nbuffer(readable).then((data) => {\n console.log(`from readable: ${data.length}`);\n});\n
import { json } from 'node:stream/consumers';\nimport { Readable } from 'node:stream';\n\nconst items = Array.from(\n {\n length: 100\n },\n () => ({\n message: 'hello world from consumers!'\n })\n);\n\nconst readable = Readable.from(JSON.stringify(items));\nconst data = await json(readable);\nconsole.log(`from readable: ${data.length}`);\n
const { json } = require('node:stream/consumers');\nconst { Readable } = require('node:stream');\n\nconst items = Array.from(\n {\n length: 100\n },\n () => ({\n message: 'hello world from consumers!'\n })\n);\n\nconst readable = Readable.from(JSON.stringify(items));\njson(readable).then((data) => {\n console.log(`from readable: ${data.length}`);\n});\n
import { json, text, blob, buffer } from 'node:stream/consumers';\nimport { Readable } from 'node:stream';\n\nconst readable = Readable.from('Hello world from consumers!');\nconst data = await text(readable);\nconsole.log(`from readable: ${data.length}`);\n
const { text } = require('node:stream/consumers');\nconst { Readable } = require('node:stream');\n\nconst readable = Readable.from('Hello world from consumers!');\ntext(readable).then((data) => {\n console.log(`from readable: ${data.length}`);\n});\n