"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.SocketRestClient = exports.SocketRestStreamResponse = void 0;
const FunctionBase_1 = require("../storyV2/function/FunctionBase");
const LangUtil_1 = require("./LangUtil");
/**
 * completion之后的消息都不再处理
 */
class SocketRestStreamResponse extends FunctionBase_1.RunStreamResponse {
    constructor() {
        super(...arguments);
        this.isCompletion = false;
    }
    receive(newValue, completion, error) {
        if (this.isCompletion) {
            return;
        }
        super.receive(newValue, completion, error);
        this.isCompletion = completion;
    }
}
exports.SocketRestStreamResponse = SocketRestStreamResponse;
class SocketRestClient {
    constructor(topicService, webSocketService) {
        this.topicService = topicService;
        this.webSocketService = webSocketService;
    }
    /**
     * request 返回对象是 StreamResponse
     * @param request
     * @param topicType
     * @param funcDataSourceType
     * @param identityInfo
     */
    async requestStream(request, topicType, funcDataSourceType, identityInfo) {
        const streamResponse = new SocketRestStreamResponse();
        const pubsub_topic = await this.topicService.generateTopic(identityInfo, topicType);
        await this.webSocketService.on(pubsub_topic, identityInfo, (res, error) => {
            streamResponse.receive(res, res?.isCompleted ?? false, error);
        });
        request(pubsub_topic)
            .then(res => {
            if (this.isAwaitSocketResponse(res.response.status)) {
                return;
            }
            res.onReceived(result => {
                if (result) {
                    if (result.done) {
                        result.abort();
                    }
                    streamResponse.receive(new FunctionBase_1.FuncDataSource(funcDataSourceType, result.fullText.trim(), result.done), result.done);
                }
            });
        })
            .catch(error => {
            streamResponse.receive(undefined, true, error);
        });
        return streamResponse;
    }
    /**
     * request 返回对象是： RestResponse
     * @param request
     * @param toResult
     * @param topicType
     * @param identityInfo
     */
    async requestRest(request, toResult, topicType, identityInfo) {
        const streamResponse = new SocketRestStreamResponse();
        const pubsub_topic = await this.topicService.generateTopic(identityInfo, topicType);
        await this.webSocketService.on(pubsub_topic, identityInfo, (res, error) => {
            streamResponse.receive(res, res?.isCompleted ?? false, error);
        });
        request(pubsub_topic)
            .then(res => {
            if (this.isAwaitSocketResponse(res.status)) {
                return;
            }
            if (res.isSuccess) {
                const data = toResult(res);
                streamResponse.receive(data, true);
            }
            return res.validate();
        })
            .catch(error => {
            streamResponse.receive(undefined, true, error);
        });
        return streamResponse;
    }
    /**
     * 返回对象是 GlobalResponse
     * @param request
     * @param toResult
     * @param topicType
     * @param identityInfo
     */
    async requestRestGlobal(request, topicType, identityInfo) {
        const streamResponse = new SocketRestStreamResponse();
        const pubsub_topic = await this.topicService.generateTopic(identityInfo, topicType);
        let receivedSocketMsg = false;
        await this.webSocketService.on(pubsub_topic, identityInfo, (res, error) => {
            receivedSocketMsg = true;
            streamResponse.receive(res, res?.isCompleted ?? false, error);
        });
        request(pubsub_topic)
            .then(async (res) => {
            if (this.isAwaitSocketResponse(res.status)) {
                await (0, LangUtil_1.sleep)(40 * 1000);
                if (!receivedSocketMsg) {
                    streamResponse.receive(undefined, true, new Error("Timeout"));
                }
                return;
            }
            if (res.isSuccess) {
                streamResponse.receive(res, true);
            }
            return res.validate();
        })
            .catch(async (error) => {
            if (this.isAwaitSocketResponse(error?.status)) {
                await (0, LangUtil_1.sleep)(40 * 1000);
                if (!receivedSocketMsg) {
                    streamResponse.receive(undefined, true, new Error("Timeout"));
                }
                return;
            }
            streamResponse.receive(undefined, true, error);
        });
        return streamResponse;
    }
    async requestRestDefault(request, topicType, identityInfo) {
        const streamResponse = new SocketRestStreamResponse();
        const pubsub_topic = await this.topicService.generateTopic(identityInfo, topicType);
        this.webSocketService.on(pubsub_topic, identityInfo, (res, error) => {
            streamResponse.receive(res, res?.isCompleted ?? false, error);
        });
        request(pubsub_topic)
            .then(res => {
            if (this.isAwaitSocketResponse(res.status)) {
                return;
            }
            if (res.isSuccess) {
                streamResponse.receive(res, true);
            }
            return res.validate();
        })
            .catch(error => {
            streamResponse.receive(undefined, true, error);
        });
        return streamResponse;
    }
    isAwaitSocketResponse(code) {
        return code === 202 || code === 504 || code === 502;
    }
    /**
     * TODO: 兼容之前未修改的function
     * @param key
     * @param identityInfo
     * @param listener
     */
    async runWebsocket(key, identityInfo, listener) {
        const pubsub_topic = await this.topicService.generateTopic(identityInfo, key);
        this.webSocketService.on(pubsub_topic, identityInfo, listener);
        return pubsub_topic;
    }
}
exports.SocketRestClient = SocketRestClient;
