python使用httpx_sse调用sse流式接口对响应格式为application/json的错误信息的处理

news/2025/2/24 5:26:12

目录

  • 问题描述
  • 方案

问题描述

调用sse流式接口使用httpx_sse的方式

python">				import httpx
				from httpx_sse import connect_sse
				# 省略无关代码
				try:
                    with httpx.Client() as client:
                        with connect_sse(client, "GET", url, params=param) as event_source:
                            clear_textbox(response_textbox)
                            # 把 iter_sse() 迭代完, 就相当于处理完了一次流式调用
                            for sse in event_source.iter_sse():
                                # 流式响应中,每次响应体的处理逻辑
                                print(f"generated_answer的值是: '{sse.data}'")
                                response = sse.data
                                if response != '':
                                    # self.response = response
                                    append_text(response_textbox, response)

                except httpx.RequestError as e:
                    print(f"请求错误:{e}")
                except Exception as e:
                    print(f"发生了一个错误:{e}")

httpx_sse的connet_sse源码:

python">@contextmanager
def connect_sse(
    client: httpx.Client, method: str, url: str, **kwargs: Any
) -> Iterator[EventSource]:
    headers = kwargs.pop("headers", {})
    headers["Accept"] = "text/event-stream"
    headers["Cache-Control"] = "no-store"

    with client.stream(method, url, headers=headers, **kwargs) as response:
        yield EventSource(response)

可以看到connect_sse源码中的headers的"Accept"设置了只接受"text/event-stream"流式结果,正常这么调用是没错的。但是当后端的流式接口因为401权限问题等报错返回了"application/json"格式,如
{ “code”:401, “msg”:“登录过期,请重新登录”, “data”:null} 这样的json格式结果时,以上代码就会报错,因为他不是"text/event-stream"流式响应结果头。那么该怎么办呢?

方案

重新写一个自定义的connect_sse

python">import httpx
from httpx_sse import EventSource
from typing import Any, Iterator
from contextlib import contextmanager
import json
# 自定义调用sse接口
    @contextmanager
    def custom_connect_sse(
            self, client: httpx.Client, method: str, url: str, **kwargs: Any
    ) -> Iterator[EventSource]:
        headers = kwargs.pop("headers", {})
        # 只有当没有指定Accept时才添加默认值
        headers["Accept"] = "*/*"
        headers["Cache-Control"] = "no-store"

        with client.stream(method, url, headers=headers, **kwargs) as response:
            content_type = response.headers.get('content-type', '').lower()
            json_flag = False
            if 'text/event-stream' in content_type:
                # 处理SSE流
                yield json_flag, EventSource(response)
            elif 'application/json' in content_type:
                # yield response  # 在这里你可以决定如何进一步处理这个JSON响应
                # 读取并合并所有文本块
                text_data = ''.join([chunk for chunk in response.iter_text()])
                # 解析整个响应体为JSON
                json_data = json.loads(text_data)
                json_flag = True
                yield json_flag, json_data

调用代码

python"># 使用自定义的connect_sse函数
                try:
                    with httpx.Client() as client:
                        with self.custom_connect_sse(client, "GET", url, params=param, headers=headers) as (json_flag, event_source):
                            if json_flag:
                                code = event_source.get("code")
                                msg = event_source.get("msg")
                                print(f"Code: {code}, Message: {msg}")
                            else:
                                full_answer = ""
                                clear_textbox(response_textbox)
                                for sse in event_source.iter_sse():
                                    print(f"generated_answer的值是: '{sse.data}'")
                                    response = sse.data
                                    if response:
                                        append_text(response_textbox, response)
                                        full_answer += response
                                user_record += reply + full_answer + "\n"
                                print(f"user_record:{user_record}")

                except httpx.RequestError as e:
                    print(f"请求错误:{e}")
                except Exception as e:
                    print(f"发生了一个错误:{e}")

关键步骤:
1.设置headers[“Accept”] = “/”,所有响应头都可以接收
2.content_type = response.headers.get(‘content-type’, ‘’).lower() 判断响应头是流式还是json,并用json_flag记录是否json标识,返回不同的结果。如果是json,则循环合并处理chunk块,拼装完整json返回结果(实测第一次就返回完整json结构了,但是代码得这么写)。
3.使用自定义connect_sse方法时,根据json_flag来分别处理成功调用流式结果还是异常的json结果。


http://www.niftyadmin.cn/n/5863980.html

相关文章

Pytorch使用手册-音频数据增强(专题二十)

音频数据增强 torchaudio 提供了多种方式来增强音频数据。 在本教程中,我们将介绍一种应用效果、滤波器、RIR(房间脉冲响应)和编解码器的方法。 最后,我们将从干净的语音合成带噪声的电话语音。 import torch import torchaudio import torchaudio.functional as Fprin…

创建第一个 Maven 项目(二)

六、添加依赖 在 Maven 项目开发过程中,添加依赖是一项常见且关键的操作。通过添加依赖,我们可以引入项目所需的各种库和框架,极大地扩展项目的功能。接下来,我们将以 JUnit 依赖为例,详细介绍如何在 Maven 项目中添加…

保姆级! 本地部署DeepSeek-R1大模型 安装Ollama Api 后,Postman本地调用 deepseek

要在Postman中访问Ollama API并调用DeepSeek模型,你需要遵循以下步骤。首先,确保你有一个有效的Ollama服务器实例运行中,并且DeepSeek模型已经被加载。 可以参考我的这篇博客 保姆级!使用Ollama本地部署DeepSeek-R1大模型 并java…

【Java八股文】11-分布式及场景面试篇

【Java八股文】11-分布式及场景面试篇 消息队列你的项目为什么要用消息队列?你说说 Kafka 为什么是高性能的?kafka的使用场景,是否有消息丢失的情况RocketMQ、Kafka 和 RabbitMQRabbitMq怎么消息被消费项目中MQ是怎么用的?MQ消息可…

基础dp——动态规划

目录 一、什么是动态规划? 二、动态规划的使用步骤 1.状态表示 2.状态转移方程 3.初始化 4.填表顺序 5.返回值 三、试题讲解 1.最小花费爬楼梯 2.下降路径最小和 3.解码方法 一、什么是动态规划? 动态规划(Dynamic Programming&…

[通俗易懂C++]:指针和const

之前的文章有说过,使用指针我们可以改变指针指向的内容(通过给指针赋一个新的地址)或者改变被保存地址的值(通过给解引用指针赋一个新值): int main() {int x { 5 }; // 创建一个整数变量 x,初始值为 5int* ptr { &x }; // 创建一个指针 ptr,指向 …

如何手动设置u-boot的以太网的IP地址、子网掩码、网关信息、TFTP的服务器地址,并进行测试

设置IP地址 运行下面这条命令设置u-boot的以太网的IP地址: setenv ipaddr 192.168.5.9设置子网掩码 运行下面这条命令设置u-boot的以太网的子网掩码: setenv netmask 255.255.255.0设置网关信息 运行下面这条命令设置u-boot的网关信息: …

案例-14.文件上传-简介

一.简介 文件上传涉及到两个部分,一个是前端程序,另一个是服务端程序。 二.前端程序 1.前端上传文件必须有三要素: 1.form表单,并且在form表单中要定义一个表单项,类型为file。其效果是会弹出一个“选择文件”的按钮…