All files client.js

100% Statements 93/93
100% Branches 34/34
100% Functions 19/19
100% Lines 93/93

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207      1x 1x       676x 676x             254x   254x       254x 254x       254x         598x     598x 598x   42991x     598x 598x 598x 598x       676x 676x 676x       598x       598x 36x   562x 562x 562x 254x 254x             235x 217x   18x 19x 254x         55558x 44347x   11211x 11211x   11211x 55778x 55741x 507x 55234x     507x 507x   54727x       37x 37x   11211x         42393x         42393x       42393x         82309x 82309x 82309x 41058x 41058x   41251x     41251x 36x   41215x     36x   82309x       924x 888x 888x             885x 586x   299x   587x     335x       543x 308x   235x 235x 235x   18x   217x       598x 598x 562x   36x   598x 254x   344x 344x   598x 598x         598x 598x 598x   36x 36x   562x    
/*eslint-env node, browser */
import stream from 'stream';
import buffer from 'buffer';
const { Duplex } = stream;
const { Buffer } = buffer;
 
export class ResponseError extends Error {
    constructor(response) {
        super(response.statusText || String(response.status));
        this.response = response;
    }
 
    async init() {}
}
 
function request_streaming_supported() {
    let duplex_accessed = false;
 
    const has_content_type = new Request('', {
        body: new ReadableStream(),
        method: 'POST',
        get duplex() {
            duplex_accessed = true;
            return 'half';
        }
    }).headers.has('Content-Type');
 
    return duplex_accessed && !has_content_type;
}
 
class FetchDuplex extends Duplex {
    constructor(url, options) {
        super(Object.assign({
            autoDestroy: true
        }, options));
        this.url = url;
        this.options = Object.assign({
            disable_request_streaming: false,
            fetch: (...args) => fetch(...args),
            ResponseError
        }, options);
        this.first = true;
        this.reading = false;
        this.abort_reader = new AbortController();
        this.abort_writer = new AbortController();
    }
 
    async _response_error(response) {
        const err = new this.options.ResponseError(response);
        await err.init();
        return err;
    }
 
    async init() {
        const response = await this.options.fetch(this.url, Object.assign({
            cache: 'no-store',
            signal: this.abort_reader.signal
        }, this.options));
        if (!response.ok) {
            throw await this._response_error(response);
        }
        this.reader = response.body.getReader();
        this.id = response.headers.get('http2-duplex-id');
        if (!this.options.disable_request_streaming && request_streaming_supported()) {
            const { readable, writable } = new TransformStream();
            this.options.fetch(this.url, this._write_options({
                headers: {
                    'http2-duplex-single': 'true'
                },
                body: readable,
                duplex: 'half'
            })).then(async response => {
                if (response.ok) {
                    return this.end();
                }
                this.destroy(await this._response_error(response));
            }).catch(err => this.destroy(err));
            this.writer = writable.getWriter();
        }
    }
 
    async _read() {
        if (this.reading) {
            return;
        }
        this.reading = true;
        try {
            let value, done;
            do {
                ({ value, done } = await this.reader.read());
                if (done) {
                    this.push(null);
                } else if (this.first) {
                    // Sometimes fetch waits for first byte before resolving
                    // so server-side sends initial dummy byte
                    this.first = false;
                    done = !this.push(Buffer.from(value.subarray(1)));
                } else {
                    done = !this.push(Buffer.from(value));
                }
            } while (!done);
        } catch (ex) {
            this.push(null);
            this.emit('error', ex);
        } finally {
            this.reading = false;
        }
    }
 
    _write_options(extra_options) {
        const options = Object.assign({
            method: 'POST',
            cache: 'no-store',
            signal: this.abort_writer.signal
        }, extra_options, this.options);
        options.headers = Object.assign({
            'http2-duplex-id': this.id,
            'Content-Type': 'application/octet-stream'
        }, extra_options.headers, options.headers);
        return options;
    }
 
    async _write(chunk, encoding, cb) {
        let err;
        try {
            const data = Uint8Array.from(chunk);
            if (this.writer) {
                await this.writer.ready;
                await this.writer.write(data);
            } else {
                const response = await this.options.fetch(this.url, this._write_options({
                    body: data
                }));
                if (!response.ok) {
                    throw await this._response_error(response);
                }
                await response.arrayBuffer();
            }
        } catch (ex) {
            err = ex;
        }
        cb(err);
    }
 
    async _send_end(err, cb) {
        if (this.id !== undefined) {
            try {
                const response = await this.options.fetch(this.url, this._write_options({
                    headers: {
                        'http2-duplex-end': 'true',
                        'http2-duplex-destroyed': this.destroyed
                    },
                    signal: undefined
                }));
                if (!response.ok) {
                    throw await this._response_error(response);
                }
                await response.arrayBuffer();
            } catch (ex) {
                return cb(err || ex);
            }
        }
        cb(err);
    }
 
    async _final(cb) {
        if (!this.writer) {
            return await this._send_end(null, cb);
        }
        try {
            await this.writer.ready;
            await this.writer.close();
        } catch (ex) {
            return await this._send_end(ex, cb);
        }
        cb();
    }
 
    _destroy(err, cb) {
        const ignore_error = () => {}; // ignore cancel/abort errors
        if (this.reader) {
            this.reader.cancel().catch(ignore_error);
        } else {
            this.abort_reader.abort();
        }
        if (this.writer) {
            this.writer.abort().catch(ignore_error);
        } else {
            this.abort_writer.abort();
            this.abort_writer = new AbortController();
        }
        this._send_end(null, () => {}); // don't care if we can't tell other end
        cb(err);
    }
}
 
export default async function (url, options) {
    const duplex = new FetchDuplex(url, options);
    try {
        await duplex.init();
    } catch (ex) {
        duplex.destroy();
        throw ex;
    }
    return duplex;
}