Skip to content

Commit c0572c1

Browse files
Merge pull request #20594 from Snuffleupagus/Node-ReadableStream
[Node.js] Don't abort the full request for local PDF files smaller than two range requests, and use standard `ReadableStream`s
2 parents 2cef80d + 663d4cd commit c0572c1

5 files changed

Lines changed: 139 additions & 136 deletions

File tree

gulpfile.mjs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2366,6 +2366,7 @@ function packageJson() {
23662366
license: DIST_LICENSE,
23672367
optionalDependencies: {
23682368
"@napi-rs/canvas": "^0.1.88",
2369+
"node-readable-to-web-readable-stream": "^0.4.2",
23692370
},
23702371
browser: {
23712372
canvas: false,

package-lock.json

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
"jstransformer-nunjucks": "^1.2.0",
4040
"metalsmith": "^2.6.3",
4141
"metalsmith-html-relative": "^2.0.9",
42+
"node-readable-to-web-readable-stream": "^0.4.2",
4243
"ordered-read-streams": "^2.0.0",
4344
"pngjs": "^7.0.0",
4445
"postcss": "^8.5.6",

src/display/node_stream.js

Lines changed: 92 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
*/
1515
/* globals process */
1616

17-
import { AbortException, assert } from "../shared/util.js";
17+
import { AbortException, assert, warn } from "../shared/util.js";
1818
import { createResponseError } from "./network_utils.js";
1919

2020
if (typeof PDFJSDev !== "undefined" && PDFJSDev.test("MOZCENTRAL")) {
@@ -33,6 +33,33 @@ function parseUrlOrPath(sourceUrl) {
3333
return new URL(url.pathToFileURL(sourceUrl));
3434
}
3535

36+
function getReadableStream(readStream) {
37+
const { Readable } = process.getBuiltinModule("stream");
38+
39+
if (typeof Readable.toWeb === "function") {
40+
// See https://nodejs.org/api/stream.html#streamreadabletowebstreamreadable-options
41+
return Readable.toWeb(readStream);
42+
}
43+
// Fallback to support Node.js versions older than `24.0.0` and `22.17.0`.
44+
const require = process
45+
.getBuiltinModule("module")
46+
.createRequire(import.meta.url);
47+
48+
const polyfill = require("node-readable-to-web-readable-stream");
49+
return polyfill.makeDefaultReadableStreamFromNodeReadable(readStream);
50+
}
51+
52+
function getArrayBuffer(val) {
53+
if (val instanceof Uint8Array) {
54+
return val.buffer;
55+
}
56+
if (val instanceof ArrayBuffer) {
57+
return val;
58+
}
59+
warn(`getArrayBuffer - unexpected data format: ${val}`);
60+
return new Uint8Array(val).buffer;
61+
}
62+
3663
class PDFNodeStream {
3764
constructor(source) {
3865
this.source = source;
@@ -59,11 +86,11 @@ class PDFNodeStream {
5986
return this._fullRequestReader;
6087
}
6188

62-
getRangeReader(start, end) {
89+
getRangeReader(begin, end) {
6390
if (end <= this._progressiveDataLength) {
6491
return null;
6592
}
66-
const rangeReader = new PDFNodeStreamFsRangeReader(this, start, end);
93+
const rangeReader = new PDFNodeStreamFsRangeReader(this, begin, end);
6794
this._rangeRequestReaders.push(rangeReader);
6895
return rangeReader;
6996
}
@@ -78,10 +105,11 @@ class PDFNodeStream {
78105
}
79106

80107
class PDFNodeStreamFsFullReader {
108+
_headersCapability = Promise.withResolvers();
109+
110+
_reader = null;
111+
81112
constructor(stream) {
82-
this._url = stream.url;
83-
this._done = false;
84-
this._storedError = null;
85113
this.onProgress = null;
86114
const source = stream.source;
87115
this._contentLength = source.length; // optional
@@ -97,27 +125,39 @@ class PDFNodeStreamFsFullReader {
97125
this._isStreamingSupported = !source.disableStream;
98126
this._isRangeSupported = !source.disableRange;
99127

100-
this._readableStream = null;
101-
this._readCapability = Promise.withResolvers();
102-
this._headersCapability = Promise.withResolvers();
103-
128+
const url = stream.url;
104129
const fs = process.getBuiltinModule("fs");
105-
fs.promises.lstat(this._url).then(
106-
stat => {
130+
fs.promises
131+
.lstat(url)
132+
.then(stat => {
133+
const readStream = fs.createReadStream(url);
134+
const readableStream = getReadableStream(readStream);
135+
136+
this._reader = readableStream.getReader();
137+
138+
const { size } = stat;
139+
if (size <= 2 * this._rangeChunkSize) {
140+
// The file size is smaller than the size of two chunks, so it doesn't
141+
// make any sense to abort the request and retry with a range request.
142+
this._isRangeSupported = false;
143+
}
107144
// Setting right content length.
108-
this._contentLength = stat.size;
145+
this._contentLength = size;
146+
147+
// We need to stop reading when range is supported and streaming is
148+
// disabled.
149+
if (!this._isStreamingSupported && this._isRangeSupported) {
150+
this.cancel(new AbortException("Streaming is disabled."));
151+
}
109152

110-
this._setReadableStream(fs.createReadStream(this._url));
111153
this._headersCapability.resolve();
112-
},
113-
error => {
154+
})
155+
.catch(error => {
114156
if (error.code === "ENOENT") {
115-
error = createResponseError(/* status = */ 0, this._url.href);
157+
error = createResponseError(/* status = */ 0, url.href);
116158
}
117-
this._storedError = error;
118159
this._headersCapability.reject(error);
119-
}
120-
);
160+
});
121161
}
122162

123163
get headersReady() {
@@ -141,91 +181,51 @@ class PDFNodeStreamFsFullReader {
141181
}
142182

143183
async read() {
144-
await this._readCapability.promise;
145-
if (this._done) {
146-
return { value: undefined, done: true };
147-
}
148-
if (this._storedError) {
149-
throw this._storedError;
150-
}
151-
152-
const chunk = this._readableStream.read();
153-
if (chunk === null) {
154-
this._readCapability = Promise.withResolvers();
155-
return this.read();
184+
await this._headersCapability.promise;
185+
const { value, done } = await this._reader.read();
186+
if (done) {
187+
return { value, done };
156188
}
157-
this._loaded += chunk.length;
189+
this._loaded += value.length;
158190
this.onProgress?.({
159191
loaded: this._loaded,
160192
total: this._contentLength,
161193
});
162194

163-
// Ensure that `read()` method returns ArrayBuffer.
164-
const buffer = new Uint8Array(chunk).buffer;
165-
return { value: buffer, done: false };
195+
return { value: getArrayBuffer(value), done: false };
166196
}
167197

168198
cancel(reason) {
169-
// Call `this._error()` method when cancel is called
170-
// before _readableStream is set.
171-
if (!this._readableStream) {
172-
this._error(reason);
173-
return;
174-
}
175-
this._readableStream.destroy(reason);
176-
}
177-
178-
_error(reason) {
179-
this._storedError = reason;
180-
this._readCapability.resolve();
181-
}
182-
183-
_setReadableStream(readableStream) {
184-
this._readableStream = readableStream;
185-
readableStream.on("readable", () => {
186-
this._readCapability.resolve();
187-
});
188-
189-
readableStream.on("end", () => {
190-
// Destroy readable to minimize resource usage.
191-
readableStream.destroy();
192-
this._done = true;
193-
this._readCapability.resolve();
194-
});
195-
196-
readableStream.on("error", reason => {
197-
this._error(reason);
198-
});
199-
200-
// We need to stop reading when range is supported and streaming is
201-
// disabled.
202-
if (!this._isStreamingSupported && this._isRangeSupported) {
203-
this._error(new AbortException("streaming is disabled"));
204-
}
205-
206-
// Destroy ReadableStream if already in errored state.
207-
if (this._storedError) {
208-
this._readableStream.destroy(this._storedError);
209-
}
199+
this._reader?.cancel(reason);
210200
}
211201
}
212202

213203
class PDFNodeStreamFsRangeReader {
214-
constructor(stream, start, end) {
215-
this._url = stream.url;
216-
this._done = false;
217-
this._storedError = null;
204+
_readCapability = Promise.withResolvers();
205+
206+
_reader = null;
207+
208+
constructor(stream, begin, end) {
218209
this.onProgress = null;
219210
this._loaded = 0;
220-
this._readableStream = null;
221-
this._readCapability = Promise.withResolvers();
222211
const source = stream.source;
223212
this._isStreamingSupported = !source.disableStream;
224213

214+
const url = stream.url;
225215
const fs = process.getBuiltinModule("fs");
226-
this._setReadableStream(
227-
fs.createReadStream(this._url, { start, end: end - 1 })
228-
);
216+
try {
217+
const readStream = fs.createReadStream(url, {
218+
start: begin,
219+
end: end - 1,
220+
});
221+
const readableStream = getReadableStream(readStream);
222+
223+
this._reader = readableStream.getReader();
224+
225+
this._readCapability.resolve();
226+
} catch (error) {
227+
this._readCapability.reject(error);
228+
}
229229
}
230230

231231
get isStreamingSupported() {
@@ -234,62 +234,18 @@ class PDFNodeStreamFsRangeReader {
234234

235235
async read() {
236236
await this._readCapability.promise;
237-
if (this._done) {
238-
return { value: undefined, done: true };
239-
}
240-
if (this._storedError) {
241-
throw this._storedError;
242-
}
243-
244-
const chunk = this._readableStream.read();
245-
if (chunk === null) {
246-
this._readCapability = Promise.withResolvers();
247-
return this.read();
237+
const { value, done } = await this._reader.read();
238+
if (done) {
239+
return { value, done };
248240
}
249-
this._loaded += chunk.length;
241+
this._loaded += value.length;
250242
this.onProgress?.({ loaded: this._loaded });
251243

252-
// Ensure that `read()` method returns ArrayBuffer.
253-
const buffer = new Uint8Array(chunk).buffer;
254-
return { value: buffer, done: false };
244+
return { value: getArrayBuffer(value), done: false };
255245
}
256246

257247
cancel(reason) {
258-
// Call `this._error()` method when cancel is called
259-
// before _readableStream is set.
260-
if (!this._readableStream) {
261-
this._error(reason);
262-
return;
263-
}
264-
this._readableStream.destroy(reason);
265-
}
266-
267-
_error(reason) {
268-
this._storedError = reason;
269-
this._readCapability.resolve();
270-
}
271-
272-
_setReadableStream(readableStream) {
273-
this._readableStream = readableStream;
274-
readableStream.on("readable", () => {
275-
this._readCapability.resolve();
276-
});
277-
278-
readableStream.on("end", () => {
279-
// Destroy readableStream to minimize resource usage.
280-
readableStream.destroy();
281-
this._done = true;
282-
this._readCapability.resolve();
283-
});
284-
285-
readableStream.on("error", reason => {
286-
this._error(reason);
287-
});
288-
289-
// Destroy readableStream if already in errored state.
290-
if (this._storedError) {
291-
this._readableStream.destroy(this._storedError);
292-
}
248+
this._reader?.cancel(reason);
293249
}
294250
}
295251

test/unit/node_stream_spec.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,41 @@ describe("node_stream", function () {
117117
expect(isRangeSupported).toEqual(true);
118118
expect(fullReaderCancelled).toEqual(true);
119119
});
120+
121+
it("read filesystem pdf files (smaller than two range requests)", async function () {
122+
const smallPdf = new URL("./test/pdfs/empty.pdf", cwdURL).href;
123+
const smallLength = 4920;
124+
125+
const stream = new PDFNodeStream({
126+
url: smallPdf,
127+
rangeChunkSize: 65536,
128+
disableStream: true,
129+
disableRange: false,
130+
});
131+
132+
const fullReader = stream.getFullReader();
133+
134+
let isStreamingSupported, isRangeSupported;
135+
const promise = fullReader.headersReady.then(() => {
136+
isStreamingSupported = fullReader.isStreamingSupported;
137+
isRangeSupported = fullReader.isRangeSupported;
138+
});
139+
140+
let len = 0;
141+
const read = function () {
142+
return fullReader.read().then(function (result) {
143+
if (result.done) {
144+
return undefined;
145+
}
146+
len += result.value.byteLength;
147+
return read();
148+
});
149+
};
150+
151+
await Promise.all([read(), promise]);
152+
153+
expect(isStreamingSupported).toEqual(false);
154+
expect(isRangeSupported).toEqual(false);
155+
expect(len).toEqual(smallLength);
156+
});
120157
});

0 commit comments

Comments
 (0)