Skip to content

Commit 663d4cd

Browse files
committed
Use standard ReadableStreams in the src/display/node_stream.js code
Thanks to newer Node.js functionality, see https://nodejs.org/api/stream.html#streamreadabletowebstreamreadable-options, we can use standard `ReadableStream`s which help to significantly shorten and simplify the code. For older Node.js versions we use the `node-readable-to-web-readable-stream` package, see https://www.npmjs.com/package/node-readable-to-web-readable-stream, to get the same functionality.
1 parent 45294d3 commit 663d4cd

4 files changed

Lines changed: 95 additions & 135 deletions

File tree

gulpfile.mjs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2366,6 +2366,7 @@ function packageJson() {
23662366
license: DIST_LICENSE,
23672367
optionalDependencies: {
23682368
"@napi-rs/canvas": "^0.1.88",
2369+
"node-readable-to-web-readable-stream": "^0.4.2",
23692370
},
23702371
browser: {
23712372
canvas: false,

package-lock.json

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
"jstransformer-nunjucks": "^1.2.0",
4040
"metalsmith": "^2.6.3",
4141
"metalsmith-html-relative": "^2.0.9",
42+
"node-readable-to-web-readable-stream": "^0.4.2",
4243
"ordered-read-streams": "^2.0.0",
4344
"pngjs": "^7.0.0",
4445
"postcss": "^8.5.6",

src/display/node_stream.js

Lines changed: 85 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
*/
1515
/* globals process */
1616

17-
import { AbortException, assert } from "../shared/util.js";
17+
import { AbortException, assert, warn } from "../shared/util.js";
1818
import { createResponseError } from "./network_utils.js";
1919

2020
if (typeof PDFJSDev !== "undefined" && PDFJSDev.test("MOZCENTRAL")) {
@@ -33,6 +33,33 @@ function parseUrlOrPath(sourceUrl) {
3333
return new URL(url.pathToFileURL(sourceUrl));
3434
}
3535

36+
function getReadableStream(readStream) {
37+
const { Readable } = process.getBuiltinModule("stream");
38+
39+
if (typeof Readable.toWeb === "function") {
40+
// See https://nodejs.org/api/stream.html#streamreadabletowebstreamreadable-options
41+
return Readable.toWeb(readStream);
42+
}
43+
// Fallback to support Node.js versions older than `24.0.0` and `22.17.0`.
44+
const require = process
45+
.getBuiltinModule("module")
46+
.createRequire(import.meta.url);
47+
48+
const polyfill = require("node-readable-to-web-readable-stream");
49+
return polyfill.makeDefaultReadableStreamFromNodeReadable(readStream);
50+
}
51+
52+
function getArrayBuffer(val) {
53+
if (val instanceof Uint8Array) {
54+
return val.buffer;
55+
}
56+
if (val instanceof ArrayBuffer) {
57+
return val;
58+
}
59+
warn(`getArrayBuffer - unexpected data format: ${val}`);
60+
return new Uint8Array(val).buffer;
61+
}
62+
3663
class PDFNodeStream {
3764
constructor(source) {
3865
this.source = source;
@@ -59,11 +86,11 @@ class PDFNodeStream {
5986
return this._fullRequestReader;
6087
}
6188

62-
getRangeReader(start, end) {
89+
getRangeReader(begin, end) {
6390
if (end <= this._progressiveDataLength) {
6491
return null;
6592
}
66-
const rangeReader = new PDFNodeStreamFsRangeReader(this, start, end);
93+
const rangeReader = new PDFNodeStreamFsRangeReader(this, begin, end);
6794
this._rangeRequestReaders.push(rangeReader);
6895
return rangeReader;
6996
}
@@ -78,10 +105,11 @@ class PDFNodeStream {
78105
}
79106

80107
class PDFNodeStreamFsFullReader {
108+
_headersCapability = Promise.withResolvers();
109+
110+
_reader = null;
111+
81112
constructor(stream) {
82-
this._url = stream.url;
83-
this._done = false;
84-
this._storedError = null;
85113
this.onProgress = null;
86114
const source = stream.source;
87115
this._contentLength = source.length; // optional
@@ -97,13 +125,16 @@ class PDFNodeStreamFsFullReader {
97125
this._isStreamingSupported = !source.disableStream;
98126
this._isRangeSupported = !source.disableRange;
99127

100-
this._readableStream = null;
101-
this._readCapability = Promise.withResolvers();
102-
this._headersCapability = Promise.withResolvers();
103-
128+
const url = stream.url;
104129
const fs = process.getBuiltinModule("fs");
105-
fs.promises.lstat(this._url).then(
106-
stat => {
130+
fs.promises
131+
.lstat(url)
132+
.then(stat => {
133+
const readStream = fs.createReadStream(url);
134+
const readableStream = getReadableStream(readStream);
135+
136+
this._reader = readableStream.getReader();
137+
107138
const { size } = stat;
108139
if (size <= 2 * this._rangeChunkSize) {
109140
// The file size is smaller than the size of two chunks, so it doesn't
@@ -113,17 +144,20 @@ class PDFNodeStreamFsFullReader {
113144
// Setting right content length.
114145
this._contentLength = size;
115146

116-
this._setReadableStream(fs.createReadStream(this._url));
147+
// We need to stop reading when range is supported and streaming is
148+
// disabled.
149+
if (!this._isStreamingSupported && this._isRangeSupported) {
150+
this.cancel(new AbortException("Streaming is disabled."));
151+
}
152+
117153
this._headersCapability.resolve();
118-
},
119-
error => {
154+
})
155+
.catch(error => {
120156
if (error.code === "ENOENT") {
121-
error = createResponseError(/* status = */ 0, this._url.href);
157+
error = createResponseError(/* status = */ 0, url.href);
122158
}
123-
this._storedError = error;
124159
this._headersCapability.reject(error);
125-
}
126-
);
160+
});
127161
}
128162

129163
get headersReady() {
@@ -147,91 +181,51 @@ class PDFNodeStreamFsFullReader {
147181
}
148182

149183
async read() {
150-
await this._readCapability.promise;
151-
if (this._done) {
152-
return { value: undefined, done: true };
153-
}
154-
if (this._storedError) {
155-
throw this._storedError;
156-
}
157-
158-
const chunk = this._readableStream.read();
159-
if (chunk === null) {
160-
this._readCapability = Promise.withResolvers();
161-
return this.read();
184+
await this._headersCapability.promise;
185+
const { value, done } = await this._reader.read();
186+
if (done) {
187+
return { value, done };
162188
}
163-
this._loaded += chunk.length;
189+
this._loaded += value.length;
164190
this.onProgress?.({
165191
loaded: this._loaded,
166192
total: this._contentLength,
167193
});
168194

169-
// Ensure that `read()` method returns ArrayBuffer.
170-
const buffer = new Uint8Array(chunk).buffer;
171-
return { value: buffer, done: false };
195+
return { value: getArrayBuffer(value), done: false };
172196
}
173197

174198
cancel(reason) {
175-
// Call `this._error()` method when cancel is called
176-
// before _readableStream is set.
177-
if (!this._readableStream) {
178-
this._error(reason);
179-
return;
180-
}
181-
this._readableStream.destroy(reason);
182-
}
183-
184-
_error(reason) {
185-
this._storedError = reason;
186-
this._readCapability.resolve();
187-
}
188-
189-
_setReadableStream(readableStream) {
190-
this._readableStream = readableStream;
191-
readableStream.on("readable", () => {
192-
this._readCapability.resolve();
193-
});
194-
195-
readableStream.on("end", () => {
196-
// Destroy readable to minimize resource usage.
197-
readableStream.destroy();
198-
this._done = true;
199-
this._readCapability.resolve();
200-
});
201-
202-
readableStream.on("error", reason => {
203-
this._error(reason);
204-
});
205-
206-
// We need to stop reading when range is supported and streaming is
207-
// disabled.
208-
if (!this._isStreamingSupported && this._isRangeSupported) {
209-
this._error(new AbortException("streaming is disabled"));
210-
}
211-
212-
// Destroy ReadableStream if already in errored state.
213-
if (this._storedError) {
214-
this._readableStream.destroy(this._storedError);
215-
}
199+
this._reader?.cancel(reason);
216200
}
217201
}
218202

219203
class PDFNodeStreamFsRangeReader {
220-
constructor(stream, start, end) {
221-
this._url = stream.url;
222-
this._done = false;
223-
this._storedError = null;
204+
_readCapability = Promise.withResolvers();
205+
206+
_reader = null;
207+
208+
constructor(stream, begin, end) {
224209
this.onProgress = null;
225210
this._loaded = 0;
226-
this._readableStream = null;
227-
this._readCapability = Promise.withResolvers();
228211
const source = stream.source;
229212
this._isStreamingSupported = !source.disableStream;
230213

214+
const url = stream.url;
231215
const fs = process.getBuiltinModule("fs");
232-
this._setReadableStream(
233-
fs.createReadStream(this._url, { start, end: end - 1 })
234-
);
216+
try {
217+
const readStream = fs.createReadStream(url, {
218+
start: begin,
219+
end: end - 1,
220+
});
221+
const readableStream = getReadableStream(readStream);
222+
223+
this._reader = readableStream.getReader();
224+
225+
this._readCapability.resolve();
226+
} catch (error) {
227+
this._readCapability.reject(error);
228+
}
235229
}
236230

237231
get isStreamingSupported() {
@@ -240,62 +234,18 @@ class PDFNodeStreamFsRangeReader {
240234

241235
async read() {
242236
await this._readCapability.promise;
243-
if (this._done) {
244-
return { value: undefined, done: true };
245-
}
246-
if (this._storedError) {
247-
throw this._storedError;
248-
}
249-
250-
const chunk = this._readableStream.read();
251-
if (chunk === null) {
252-
this._readCapability = Promise.withResolvers();
253-
return this.read();
237+
const { value, done } = await this._reader.read();
238+
if (done) {
239+
return { value, done };
254240
}
255-
this._loaded += chunk.length;
241+
this._loaded += value.length;
256242
this.onProgress?.({ loaded: this._loaded });
257243

258-
// Ensure that `read()` method returns ArrayBuffer.
259-
const buffer = new Uint8Array(chunk).buffer;
260-
return { value: buffer, done: false };
244+
return { value: getArrayBuffer(value), done: false };
261245
}
262246

263247
cancel(reason) {
264-
// Call `this._error()` method when cancel is called
265-
// before _readableStream is set.
266-
if (!this._readableStream) {
267-
this._error(reason);
268-
return;
269-
}
270-
this._readableStream.destroy(reason);
271-
}
272-
273-
_error(reason) {
274-
this._storedError = reason;
275-
this._readCapability.resolve();
276-
}
277-
278-
_setReadableStream(readableStream) {
279-
this._readableStream = readableStream;
280-
readableStream.on("readable", () => {
281-
this._readCapability.resolve();
282-
});
283-
284-
readableStream.on("end", () => {
285-
// Destroy readableStream to minimize resource usage.
286-
readableStream.destroy();
287-
this._done = true;
288-
this._readCapability.resolve();
289-
});
290-
291-
readableStream.on("error", reason => {
292-
this._error(reason);
293-
});
294-
295-
// Destroy readableStream if already in errored state.
296-
if (this._storedError) {
297-
this._readableStream.destroy(this._storedError);
298-
}
248+
this._reader?.cancel(reason);
299249
}
300250
}
301251

0 commit comments

Comments
 (0)