๐Ÿ“ฆ FlowiseAI / FlowiseSDK

๐Ÿ“„ flowise-sdk.ts ยท 150 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150export interface PredictionData {
    chatflowId: string;
    question: string;
    overrideConfig?: Record<string, any>;
    chatId?: string;
    streaming?: boolean;
    history?: IMessage[];
    uploads?: IFileUpload[];
    leadEmail?: string
    action?: IAction
}

interface IAction {
    id?: string;
    elements?: Array<{
        type: string;
        label: string;
    }>;
    mapping?: {
        approve: string;
        reject: string;
        toolCalls: any[];
    };
}

interface IFileUpload {
    data?: string;
    type: string;
    name: string;
    mime: string;
}

interface IMessage {
    message: string;
    type: MessageType;
    role?: MessageType;
    content?: string;
}

type MessageType = 'apiMessage' | 'userMessage';

export interface StreamResponse {
    event: string;
    data: string;
}

interface FlowiseClientOptions {
    baseUrl?: string;
    apiKey?: string;
}

type PredictionResponse<T extends PredictionData> = T['streaming'] extends true
    ? AsyncGenerator<StreamResponse, void, unknown> // Streaming returns an async generator
    : Record<string, any>;

export default class FlowiseClient {
    private baseUrl: string;
    private apiKey: string;

    constructor(options: FlowiseClientOptions = {}) {
        this.baseUrl = options.baseUrl || 'http://localhost:3000';
        this.apiKey = options.apiKey || '';
    }

    // Method to create a new prediction and handle streaming response
    async createPrediction<T extends PredictionData>(
        data: T
    ): Promise<PredictionResponse<T>> {
        const { chatflowId, streaming } = data;

        // Check if chatflow is available to stream
        const chatFlowStreamingUrl = `${this.baseUrl}/api/v1/chatflows-streaming/${chatflowId}`;
        const resp = await fetch(chatFlowStreamingUrl, {
            method: 'GET',
            headers: {
                'Content-Type': 'application/json',
            },
        });

        const chatFlowStreamingData = await resp.json();
        const isChatFlowAvailableToStream =
            chatFlowStreamingData.isStreaming || false;

        const predictionUrl = `${this.baseUrl}/api/v1/prediction/${chatflowId}`;

        const options: any = {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
            },
            body: JSON.stringify(data),
        };
        if (this.apiKey) {
            options.headers['Authorization'] = `Bearer ${this.apiKey}`;
        }

        if (isChatFlowAvailableToStream && streaming) {
            return {
                async *[Symbol.asyncIterator]() {
                    const response = await fetch(predictionUrl, options);

                    if (!response.ok) {
                        throw new Error(
                            `HTTP error! status: ${response.status}`
                        );
                    }

                    //@ts-ignore
                    const reader = response.body.getReader();
                    const decoder = new TextDecoder();
                    let buffer = '';

                    try {
                        while (true) {
                            const { done, value } = await reader.read();
                            if (done) break;

                            buffer += decoder.decode(value, { stream: true });
                            const lines = buffer.split('\n');
                            buffer = lines.pop() || '';

                            for (const line of lines) {
                                if (line.trim() === '') continue;
                                if (line.startsWith('data:')) {
                                    const stringifiedJson = line.replace(
                                        'data:',
                                        ''
                                    );
                                    const event = JSON.parse(stringifiedJson);
                                    yield event;
                                }
                            }
                        }
                    } finally {
                        reader.releaseLock();
                    }
                },
            } as unknown as Promise<PredictionResponse<T>>;
        } else {
            try {
                const response = await fetch(predictionUrl, options);
                const resp = await response.json();
                return resp as Promise<PredictionResponse<T>>;
            } catch (error) {
                throw new Error('Error creating prediction');
            }
        }
    }
}