๐Ÿ“ฆ microsoft / playwright

๐Ÿ“„ streamDispatcher.ts ยท 70 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70/**
 * Copyright (c) Microsoft Corporation.
 *
 * Licensed under the Apache License, Version 2.0 (the 'License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import { Dispatcher } from './dispatcher';
import { ManualPromise } from '../../utils/isomorphic/manualPromise';
import { SdkObject } from '../instrumentation';

import type { ArtifactDispatcher } from './artifactDispatcher';
import type * as channels from '@protocol/channels';
import type * as stream from 'stream';
import type { Progress } from '@protocol/progress';

class StreamSdkObject extends SdkObject {
  readonly stream: stream.Readable;

  constructor(parent: SdkObject, stream: stream.Readable) {
    super(parent, 'stream');
    this.stream = stream;
  }
}

export class StreamDispatcher extends Dispatcher<StreamSdkObject, channels.StreamChannel, ArtifactDispatcher> implements channels.StreamChannel {
  _type_Stream = true;
  private _ended: boolean = false;

  constructor(scope: ArtifactDispatcher, stream: stream.Readable) {
    super(scope, new StreamSdkObject(scope._object, stream), 'Stream', {});
    // In Node v12.9.0+ we can use readableEnded.
    stream.once('end', () => this._ended =  true);
    stream.once('error', () => this._ended =  true);
  }

  async read(params: channels.StreamReadParams, progress: Progress): Promise<channels.StreamReadResult> {
    const stream = this._object.stream;
    if (this._ended)
      return { binary: Buffer.from('') };
    if (!stream.readableLength) {
      const readyPromise = new ManualPromise<void>();
      const done = () => readyPromise.resolve();
      stream.on('readable', done);
      stream.on('end', done);
      stream.on('error', done);
      await progress.race(readyPromise).finally(() => {
        stream.off('readable', done);
        stream.off('end', done);
        stream.off('error', done);
      });
    }
    const buffer = stream.read(Math.min(stream.readableLength, params.size || stream.readableLength));
    return { binary: buffer || Buffer.from('') };
  }

  async close(params: channels.StreamCloseParams, progress: Progress): Promise<void> {
    this._object.stream.destroy();
  }
}