跳到主要内容

什么是浏览器的 Mixed Content 问题

· 阅读需 4 分钟
素明诚
Full stack development

Mixed Content(混合内容)是指在浏览器中访问一个使用 HTTPS(超文本传输安全协议)加载的网页时,该网页包含了一些通过 HTTP(不安全的超文本传输协议)加载的资源。浏览器会将这种情况视为安全风险,因为 HTTP 的资源是不加密的,可能被第三方截获或篡改,从而破坏整个页面的安全性。

为什么 Mixed Content 是问题?

HTTPS 提供了三大关键安全属性

加密 防止数据在传输过程中被第三方监听。

数据完整性 防止数据在传输过程中被篡改。

身份验证 确保与服务器通信的客户端是合法的。

如果在 HTTPS 页面中加载 HTTP 资源

HTTP 资源的数据是未加密的,可能被中间人攻击(如篡改 JavaScript 脚本、图片、CSS)。

这使得原本安全的页面不再完全可信,破坏了 HTTPS 的初衷!。

Mixed Content 的分类

根据加载的 HTTP 资源类型,Mixed Content 被分为两类

Passive Mixed Content(被动混合内容)

这类资源不会直接与页面的其他内容互动。

例如 图片、视频、音频、样式文件(CSS)。

浏览器可能会允许被动混合内容的加载,但会在地址栏或开发者工具中显示警告。

Active Mixed Content(主动混合内容)

这类资源会与页面的其他部分互动,直接影响页面的安全性。

例如 JavaScript 脚本、内联框架(iframe)、AJAX 请求等。

由于这些资源可能会被劫持和篡改,浏览器会直接阻止主动混合内容的加载。

浏览器如何处理 Mixed Content?

不同浏览器的处理方式可能略有不同,但大多数现代浏览器(如 Chrome、Firefox、Edge 等)都会采取以下措施

被动混合内容

浏览器会允许加载这些资源,但会显示安全警告,提示用户当前页面加载了不安全的内容。

主动混合内容

浏览器会完全阻止加载这些资源,防止不安全的内容影响页面的安全性。

通常会在控制台(Console)中看到错误信息,并且不会加载这些不安全的资源。

Mixed Content 的解决方法

要解决 Mixed Content 问题,你需要确保页面上所有资源都通过 HTTPS 加载。以下是几种常见的解决方法

确保所有资源使用 HTTPS

将页面中所有引用的外部资源(如图片、脚本、样式表、iframe 等)都更改为 HTTPS。

如果资源服务器不支持 HTTPS,考虑将资源托管在支持 HTTPS 的服务器上,或联系资源提供方升级其服务。

相对协议使用

你可以使用相对协议来加载资源(即 // 开头的 URL),例如

<script src="//example.com/script.js"></script>

这种方式会自动使用当前页面的协议来加载资源。如果当前页面是 HTTPS,资源也将通过 HTTPS 加载。

将不支持 HTTPS 的资源代理到 HTTPS

如果外部资源不支持 HTTPS,可以考虑将请求代理到你的服务器,通过你的服务器以 HTTPS 加载该资源。

检测和修复

使用浏览器的开发者工具(F12 -> Console 或 Network 标签页)查看哪些资源导致了 Mixed Content 问题。

修复后重新加载页面,确认问题是否解决。

具体的案例分析

假设你有一个通过 HTTPS 加载的网页,但是其中的图片和 JavaScript 资源是通过 HTTP 加载的。可能出现以下情况

图片资源(被动混合内容)

<img src="http://example.com/image.jpg">

浏览器可能会允许图片加载,但会显示安全警告。

JavaScript 资源(主动混合内容)

<script src="http://example.com/script.js"></script>

浏览器会阻止 JavaScript 文件加载,并在控制台显示错误信息,因为 JavaScript 可能会影响页面的安全性。

浏览器显示的错误提示

Chrome

Mixed Content: The page at 'https://example.com' was loaded over HTTPS, but requested an insecure resource 'http://example.com/resource.js'. This request has been blocked; the content must be served over HTTPS.

Firefox

Blocked loading mixed active content "http://example.com/resource.js"
Loading mixed (insecure) display content "http://example.com/image.jpg" on a secure page

Linux MongoDB 数据的导出和导入

· 阅读需 2 分钟
素明诚
Full stack development

准备工作

检查 MongoDB 安装状态

如果你已经安装了 MongoDB 服务器,那么 mongodump 和 mongorestore 工具可能已经包含在内,你可以直接使用这些工具。如果未安装完整的服务器或使用 Docker 环境,需按以下步骤安装这些工具。

安装 mongodump 和 mongorestore

导入 MongoDB 的 GPG 公钥

执行以下命令导入公钥,确保软件来源的安全性。

wget -qO - https://www.mongodb.org/static/pgp/server-6.0.asc | sudo apt-key add -

添加 MongoDB 的软件源

我的是 Ubuntu,创建相应的源列表文件。以下示例适用于 Ubuntu 20.04 (Focal Fossa)。

echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/6.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-6.0.list

更新软件包列表

更新 apt 包索引以包含最新的 MongoDB 软件源。

sudo apt-get update

安装 MongoDB 数据库工具

安装包含 mongodump 和 mongorestore 的数据库工具包。

sudo apt-get install -y mongodb-database-tools

验证工具安装

安装完成后,检查工具的版本以确认安装成功。

mongodump --version
mongorestore --version

使用 mongodump 和 mongorestore

数据导出

使用 mongodump 从源数据库导出数据。请确保正确替换 URI 中的用户名和密码,对密码进行 URL 编码(如有特殊字符),并指定足够权限的输出目录。

mongodump --uri "mongodb://myusername:mypassword@10.20.201.215:27017/fastgpt?authSource=admin&directConnection=true" --out /home/fastgpt-dump

数据导入

使用 mongorestore 将数据导入到目标数据库。添加 --noOverwrite 选项可以避免覆盖目标实例中已存在的数据。

mongorestore --uri "mongodb://name:你的密码@192.168.33.62:27017/fastgpt?authSource=admin" --noOverwrite /home/fastgpt-dump

Linux 使用 tar 命令进行迁移

· 阅读需 2 分钟
素明诚
Full stack development

以迁移 MongoDB 的数据为例。

停止当前机器上运行的 MongoDB 服务,以确保数据库文件处于一致状态。

打包

本次迁移的是 mongo/ 的卷文件到另一台机器上

使用 tar 命令将 mongo 目录打包成一个归档文件。tar 命令可以保留文件权限和所有者信息。

tar -czvf mongo.tar.gz mongo/

这会将 mongo 目录打包成一个名为 mongo.tar.gz 的归档文件。

校验

使用 md5sumsha256sum 命令计算归档文件的校验和。例如:

md5sum mongo.tar.gz > mongo.tar.gz.md5

这会计算 mongo.tar.gz 文件的 MD5 校验和,并将其保存到 mongo.tar.gz.md5 文件中。

使用安全的文件传输工具将归档文件 mongo.tar.gz 和校验和文件 mongo.tar.gz.md5 传输到目标机器。例如,你可以使用 scprsync 等工具进行传输。

scp mongo.tar.gz mongo.tar.gz.md5 user@target_machine:/path/to/destination/

user 替换为目标机器的用户名,target_machine 替换为目标机器的主机名或 IP,/path/to/destination/ 替换为目标机器上的目标目录。

完整性校验

在目标机器上,使用 md5sumsha256sum 命令验证接收到的归档文件的完整性。

md5sum -c mongo.tar.gz.md5

这会计算接收到的 mongo.tar.gz 文件的 MD5 校验和,并与 mongo.tar.gz.md5 文件中的校验和进行比较。如果校验和匹配,则说明文件在传输过程中没有被修改。

1404a69a7ed73f6e9d229366031c54ed## 解压

如果校验和匹配,使用 tar 命令将归档文件解压缩到目标目录。

tar -xzvf mongo.tar.gz

这会将 mongo.tar.gz 文件解压缩,并将 mongo 目录及其内容提取到当前目录。

通过以上步骤,你可以确保 mongo 目录在传输过程中的完整性,并将其准确地复制到目标机器上。

Docker 更换镜像源阿里科大网易腾讯中科大微软官方

· 阅读需 2 分钟
素明诚
Full stack development

编辑 Docker 配置文件: 打开或创建 /etc/docker/daemon.json 文件

{
"registry-mirrors": [
"https://registry.docker-cn.com",
"http://hub-mirror.c.163.com",
"https://docker.mirrors.ustc.edu.cn",
"https://dockerhub.azk8s.cn",
"https://mirror.ccs.tencentyun.com",
"https://registry.cn-hangzhou.aliyuncs.com",
"https://docker.mirrors.ustc.edu.cn"
]
}

重启 Docker 服务: 为使配置生效,请执行以下命令:

sudo systemctl daemon-reload
sudo systemctl restart docker

原地址

Docker 官方镜像加速器(中国区)

  • 地址: https://registry.docker-cn.com

网易云镜像加速器

  • 地址: http://hub-mirror.c.163.com

科大讯飞开源镜像加速器

  • 地址: https://docker.mirrors.ustc.edu.cn

Azure 中国镜像加速器

  • 地址: https://dockerhub.azk8s.cn

腾讯云公共镜像库

  • 地址: https://mirror.ccs.tencentyun.com

阿里云公共镜像加速器

  • 地址: https://registry.cn-hangzhou.aliyuncs.com

中国科学技术大学镜像加速器

  • 地址: https://docker.mirrors.ustc.edu.cn

2024 年 8 月 2 日 09:14:06 更新

如果上面的 G 了,可以试试下面这些

{
"registry-mirrors": [
"https://hub.uuuadc.top",
"https://docker.anyhub.us.kg",
"https://dockerhub.jobcher.com",
"https://dockerhub.icu",
"https://docker.ckyl.me",
"https://docker.awsl9527.cn"
]
}

2024 年 8 月 12 日 15:32:23 更新

镜像源失效的问题,通常有以下几种长期解决方案

使用阿里云+GitHub 构建镜像

这个网上有很多教程,就细说了

本机配置代理 (推荐)

可以封装一下 clash,容器启动,方便管理

搭建私有容器镜像仓库(如 Harbor)

Harbor 是一个开源的企业级容器镜像仓库,提供了镜像管理、访问控制、安全扫描等功能。搭建私有仓库可以方便团队内部的镜像下载和同步,同时也需要配置相应的代理以加速访问。

FastGPT47 chatglm3-6b m3e 本地部署

· 阅读需 8 分钟
素明诚
Full stack development

安装 Docker 和 docker-compose

Windows 可以使用 wsl

# 安装 Docker
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
systemctl enable --now docker
# 安装 docker-compose
curl -L https://github.com/docker/compose/releases/download/v2.20.3/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
# 验证安装
docker -v
docker-compose -v

安装并启动

docker-compose.yml 和 config.json 这两个文件很重要,安装后不要删掉~

mkdir fastgpt
cd fastgpt
curl -O https://raw.githubusercontent.com/labring/FastGPT/main/files/deploy/fastgpt/docker-compose.yml
curl -O https://raw.githubusercontent.com/labring/FastGPT/main/projects/app/data/config.json

注意启动的时候要在 fastgpt 这个目录

# 启动容器
docker-compose up -d
# 等待10s,OneAPI第一次总是要重启几次才能连上Mysql
sleep 10
# 重启一次oneapi(由于OneAPI的默认Key有点问题,不重启的话会提示找不到渠道,临时手动重启一次解决,等待作者修复)
docker restart oneapi

5b6581830daeef63ab9688aa590e002a

安装成功后可以看到红框里的容器,m3e 后续再说

服务说明

OneAPI:提供标准的 API 格式,后续 M3E 和 GLM 都要对接到这里面

FastGPT:NextJS 做的,提供前端和后端的服务

mongo:聊天记录、历史对话等数据

pg:存向量

MySQL:_

M3E:向量化模型

运行 chatglm3-6b

https://github.com/THUDM/ChatGLM3

克隆项目后,我们安装依赖,记得装 CUDA 驱动。

我们需要的就是 openai_api_demo 里面的https://github.com/THUDM/ChatGLM3/blob/main/openai_api_demo/api_server.py

这里需要注意的是,你可以选择提前下载好模型,或者是运行的时候再下载,提前下载可能会快很多~

glm3-6b,这个比较好运行,网上教程非常多,只要跑起来就可以,这里不赘述了

ed8272ae3599f0d00c32bc9b6f989a08

注意是 GLM3 的不要 下载错了

api_server.py

这里给出来,修改了一下前面的路径,因为我已经提前下载好了模型

import os
import time
import tiktoken
import torch
import uvicorn

from fastapi import FastAPI, HTTPException, Response
from fastapi.middleware.cors import CORSMiddleware

from contextlib import asynccontextmanager
from typing import List, Literal, Optional, Union
from loguru import logger
from pydantic import BaseModel, Field
from transformers import AutoTokenizer, AutoModel
from utils import process_response, generate_chatglm3, generate_stream_chatglm3
from sentence_transformers import SentenceTransformer

from sse_starlette.sse import EventSourceResponse


print(torch.__version__)
print(torch.cuda.is_available())

# Set up limit request time
EventSourceResponse.DEFAULT_PING_INTERVAL = 1000

# 获取当前脚本的目录
current_dir = os.path.dirname(os.path.abspath(__file__))

# 构建相对于当前脚本的模型目录路径
model_dir = os.path.abspath(os.path.join(current_dir, "..", "..", "chatglm3-6b"))

print(f"model_dir: {model_dir}")

# 设置环境变量
os.environ['MODEL_PATH'] = model_dir
os.environ['TOKENIZER_PATH'] = model_dir

print("MODEL_PATH:", os.getenv('MODEL_PATH'))
print("TOKENIZER_PATH:", os.getenv('TOKENIZER_PATH'))

# set LLM path
MODEL_PATH = os.environ.get('MODEL_PATH', 'THUDM/chatglm3-6b')
TOKENIZER_PATH = os.environ.get("TOKENIZER_PATH", MODEL_PATH)

# set Embedding Model path
EMBEDDING_PATH = os.environ.get('EMBEDDING_PATH', 'BAAI/bge-m3')


@asynccontextmanager
async def lifespan(app: FastAPI):
yield
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()


app = FastAPI(lifespan=lifespan)

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)


class ModelCard(BaseModel):
id: str
object: str = "model"
created: int = Field(default_factory=lambda: int(time.time()))
owned_by: str = "owner"
root: Optional[str] = None
parent: Optional[str] = None
permission: Optional[list] = None


class ModelList(BaseModel):
object: str = "list"
data: List[ModelCard] = []


class FunctionCallResponse(BaseModel):
name: Optional[str] = None
arguments: Optional[str] = None


class ChatMessage(BaseModel):
role: Literal["user", "assistant", "system", "function"]
content: str = None
name: Optional[str] = None
function_call: Optional[FunctionCallResponse] = None


class DeltaMessage(BaseModel):
role: Optional[Literal["user", "assistant", "system"]] = None
content: Optional[str] = None
function_call: Optional[FunctionCallResponse] = None


## for Embedding
class EmbeddingRequest(BaseModel):
input: Union[List[str], str]
model: str


class CompletionUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int


class EmbeddingResponse(BaseModel):
data: list
model: str
object: str
usage: CompletionUsage


# for ChatCompletionRequest

class UsageInfo(BaseModel):
prompt_tokens: int = 0
total_tokens: int = 0
completion_tokens: Optional[int] = 0


class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
temperature: Optional[float] = 0.8
top_p: Optional[float] = 0.8
max_tokens: Optional[int] = None
stream: Optional[bool] = False
tools: Optional[Union[dict, List[dict]]] = None
repetition_penalty: Optional[float] = 1.1


class ChatCompletionResponseChoice(BaseModel):
index: int
message: ChatMessage
finish_reason: Literal["stop", "length", "function_call"]


class ChatCompletionResponseStreamChoice(BaseModel):
delta: DeltaMessage
finish_reason: Optional[Literal["stop", "length", "function_call"]]
index: int


class ChatCompletionResponse(BaseModel):
model: str
id: str
object: Literal["chat.completion", "chat.completion.chunk"]
choices: List[Union[ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice]]
created: Optional[int] = Field(default_factory=lambda: int(time.time()))
usage: Optional[UsageInfo] = None


@app.get("/health")
async def health() -> Response:
"""Health check."""
return Response(status_code=200)


@app.post("/v1/embeddings", response_model=EmbeddingResponse)
async def get_embeddings(request: EmbeddingRequest):
if isinstance(request.input, str):
embeddings = [embedding_model.encode(request.input)]
else:
embeddings = [embedding_model.encode(text) for text in request.input]
embeddings = [embedding.tolist() for embedding in embeddings]

def num_tokens_from_string(string: str) -> int:
"""
Returns the number of tokens in a text string.
use cl100k_base tokenizer
"""
encoding = tiktoken.get_encoding('cl100k_base')
num_tokens = len(encoding.encode(string))
return num_tokens

response = {
"data": [
{
"object": "embedding",
"embedding": embedding,
"index": index
}
for index, embedding in enumerate(embeddings)
],
"model": request.model,
"object": "list",
"usage": CompletionUsage(
prompt_tokens=sum(len(text.split()) for text in request.input),
completion_tokens=0,
total_tokens=sum(num_tokens_from_string(text) for text in request.input),
)
}
return response


@app.get("/v1/models", response_model=ModelList)
async def list_models():
model_card = ModelCard(
id="chatglm3-6b"
)
return ModelList(
data=[model_card]
)


@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def create_chat_completion(request: ChatCompletionRequest):
global model, tokenizer

if len(request.messages) < 1 or request.messages[-1].role == "assistant":
raise HTTPException(status_code=400, detail="Invalid request")

gen_params = dict(
messages=request.messages,
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens or 1024,
echo=False,
stream=request.stream,
repetition_penalty=request.repetition_penalty,
tools=request.tools,
)
logger.debug(f"==== request ====\n{gen_params}")

if request.stream:

# Use the stream mode to read the first few characters, if it is not a function call, direct stram output
predict_stream_generator = predict_stream(request.model, gen_params)
output = next(predict_stream_generator)
if not contains_custom_function(output):
return EventSourceResponse(predict_stream_generator, media_type="text/event-stream")

# Obtain the result directly at one time and determine whether tools needs to be called.
logger.debug(f"First result output:\n{output}")

function_call = None
if output and request.tools:
try:
function_call = process_response(output, use_tool=True)
except:
logger.warning("Failed to parse tool call")

# CallFunction
if isinstance(function_call, dict):
function_call = FunctionCallResponse(**function_call)

"""
In this demo, we did not register any tools.
You can use the tools that have been implemented in our `tools_using_demo` and implement your own streaming tool implementation here.
Similar to the following method:
function_args = json.loads(function_call.arguments)
tool_response = dispatch_tool(tool_name: str, tool_params: dict)
"""
tool_response = ""

if not gen_params.get("messages"):
gen_params["messages"] = []

gen_params["messages"].append(ChatMessage(
role="assistant",
content=output,
))
gen_params["messages"].append(ChatMessage(
role="function",
name=function_call.name,
content=tool_response,
))

# Streaming output of results after function calls
generate = predict(request.model, gen_params)
return EventSourceResponse(generate, media_type="text/event-stream")

else:
# Handled to avoid exceptions in the above parsing function process.
generate = parse_output_text(request.model, output)
return EventSourceResponse(generate, media_type="text/event-stream")

# Here is the handling of stream = False
response = generate_chatglm3(model, tokenizer, gen_params)

# Remove the first newline character
if response["text"].startswith("\n"):
response["text"] = response["text"][1:]
response["text"] = response["text"].strip()

usage = UsageInfo()
function_call, finish_reason = None, "stop"
if request.tools:
try:
function_call = process_response(response["text"], use_tool=True)
except:
logger.warning("Failed to parse tool call, maybe the response is not a tool call or have been answered.")

if isinstance(function_call, dict):
finish_reason = "function_call"
function_call = FunctionCallResponse(**function_call)

message = ChatMessage(
role="assistant",
content=response["text"],
function_call=function_call if isinstance(function_call, FunctionCallResponse) else None,
)

logger.debug(f"==== message ====\n{message}")

choice_data = ChatCompletionResponseChoice(
index=0,
message=message,
finish_reason=finish_reason,
)
task_usage = UsageInfo.model_validate(response["usage"])
for usage_key, usage_value in task_usage.model_dump().items():
setattr(usage, usage_key, getattr(usage, usage_key) + usage_value)

return ChatCompletionResponse(
model=request.model,
id="", # for open_source model, id is empty
choices=[choice_data],
object="chat.completion",
usage=usage
)


async def predict(model_id: str, params: dict):
global model, tokenizer

choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=DeltaMessage(role="assistant"),
finish_reason=None
)
chunk = ChatCompletionResponse(model=model_id, id="", choices=[choice_data], object="chat.completion.chunk")
yield "{}".format(chunk.model_dump_json(exclude_unset=True))

previous_text = ""
for new_response in generate_stream_chatglm3(model, tokenizer, params):
decoded_unicode = new_response["text"]
delta_text = decoded_unicode[len(previous_text):]
previous_text = decoded_unicode

finish_reason = new_response["finish_reason"]
if len(delta_text) == 0 and finish_reason != "function_call":
continue

function_call = None
if finish_reason == "function_call":
try:
function_call = process_response(decoded_unicode, use_tool=True)
except:
logger.warning(
"Failed to parse tool call, maybe the response is not a tool call or have been answered.")

if isinstance(function_call, dict):
function_call = FunctionCallResponse(**function_call)

delta = DeltaMessage(
content=delta_text,
role="assistant",
function_call=function_call if isinstance(function_call, FunctionCallResponse) else None,
)

choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=delta,
finish_reason=finish_reason
)
chunk = ChatCompletionResponse(
model=model_id,
id="",
choices=[choice_data],
object="chat.completion.chunk"
)
yield "{}".format(chunk.model_dump_json(exclude_unset=True))

choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=DeltaMessage(),
finish_reason="stop"
)
chunk = ChatCompletionResponse(
model=model_id,
id="",
choices=[choice_data],
object="chat.completion.chunk"
)
yield "{}".format(chunk.model_dump_json(exclude_unset=True))
yield '[DONE]'


def predict_stream(model_id, gen_params):
"""
The function call is compatible with stream mode output.
The first seven characters are determined.
If not a function call, the stream output is directly generated.
Otherwise, the complete character content of the function call is returned.
:param model_id:
:param gen_params:
:return:
"""
output = ""
is_function_call = False
has_send_first_chunk = False
for new_response in generate_stream_chatglm3(model, tokenizer, gen_params):
decoded_unicode = new_response["text"]
delta_text = decoded_unicode[len(output):]
output = decoded_unicode

# When it is not a function call and the character length is> 7,
# try to judge whether it is a function call according to the special function prefix
if not is_function_call and len(output) > 7:

# Determine whether a function is called
is_function_call = contains_custom_function(output)
if is_function_call:
continue

# Non-function call, direct stream output
finish_reason = new_response["finish_reason"]

# Send an empty string first to avoid truncation by subsequent next() operations.
if not has_send_first_chunk:
message = DeltaMessage(
content="",
role="assistant",
function_call=None,
)
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=message,
finish_reason=finish_reason
)
chunk = ChatCompletionResponse(
model=model_id,
id="",
choices=[choice_data],
created=int(time.time()),
object="chat.completion.chunk"
)
yield "{}".format(chunk.model_dump_json(exclude_unset=True))

send_msg = delta_text if has_send_first_chunk else output
has_send_first_chunk = True
message = DeltaMessage(
content=send_msg,
role="assistant",
function_call=None,
)
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=message,
finish_reason=finish_reason
)
chunk = ChatCompletionResponse(
model=model_id,
id="",
choices=[choice_data],
created=int(time.time()),
object="chat.completion.chunk"
)
yield "{}".format(chunk.model_dump_json(exclude_unset=True))

if is_function_call:
yield output
else:
yield '[DONE]'


async def parse_output_text(model_id: str, value: str):
"""
Directly output the text content of value
:param model_id:
:param value:
:return:
"""
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=DeltaMessage(role="assistant", content=value),
finish_reason=None
)
chunk = ChatCompletionResponse(model=model_id, id="", choices=[choice_data], object="chat.completion.chunk")
yield "{}".format(chunk.model_dump_json(exclude_unset=True))

choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=DeltaMessage(),
finish_reason="stop"
)
chunk = ChatCompletionResponse(model=model_id, id="", choices=[choice_data], object="chat.completion.chunk")
yield "{}".format(chunk.model_dump_json(exclude_unset=True))
yield '[DONE]'


def contains_custom_function(value: str) -> bool:
"""
Determine whether 'function_call' according to a special function prefix.
For example, the functions defined in "tools_using_demo/tool_register.py" are all "get_xxx" and start with "get_"
[Note] This is not a rigorous judgment method, only for reference.
:param value:
:return:
"""
return value and 'get_' in value


if __name__ == "__main__":
# Load LLM
tokenizer = AutoTokenizer.from_pretrained(os.getenv('TOKENIZER_PATH'), trust_remote_code=True)
model = AutoModel.from_pretrained(os.getenv('MODEL_PATH'), trust_remote_code=True, device_map="auto").eval()

# load Embedding
embedding_model = SentenceTransformer(EMBEDDING_PATH, device="cuda")
uvicorn.run(app, host='0.0.0.0', port=8888, workers=1)

运行成功后

85e97e5d6213b8e74cd7faa2622a355b### 安装 M3E

可以参考官方给出的镜像,这个自己搞容易出问题,直接使用镜像最方便,https://doc.fastai.site/docs/development/custom-models/m3e/

配置 OneAPI

重点来了,OneAPI 目前这个地方有个坑,你参考这篇文章的时候,https://doc.fastai.site/docs/development/custom-models/m3e/#接入-one-api,记得 Base URL 一定要填写具体的 IP 地址。

我的 M3E 在 WSL 里面

2090aa22ad0e2333c3349df4bcff350e### 我的 ChatGLM API 在本机本地运行
c04de4dba58351b4174778516b263eb1### 测试

如果响应时间特别长,就是有问题

ed0d8bf11e9e4b189ad5dbcde10b5bb7### 增加 config.json 配置

这里我直接给出我增加的内容,配置文件说明请看 https://doc.fastai.site/docs/development/configuration/

{
"feConfigs": {
"lafEnv": "https://laf.dev"
},
"systemEnv": {
"openapiPrefix": "fastgpt",
"vectorMaxProcess": 15,
"qaMaxProcess": 15,
"pgHNSWEfSearch": 100
},
"llmModels": [
{
"model": "gpt-3.5-turbo",
"name": "gpt-3.5-turbo",
"maxContext": 16000,
"avatar": "/imgs/model/openai.svg",
"maxResponse": 4000,
"quoteMaxToken": 13000,
"maxTemperature": 1.2,
"charsPointsPrice": 0,
"censor": false,
"vision": false,
"datasetProcess": true,
"usedInClassify": true,
"usedInExtractFields": true,
"usedInToolCall": true,
"usedInQueryExtension": true,
"toolChoice": true,
"functionCall": true,
"customCQPrompt": "",
"customExtractPrompt": "",
"defaultSystemChatPrompt": "",
"defaultConfig": {}
},
{
"model": "gpt-4-0125-preview",
"name": "gpt-4-turbo",
"avatar": "/imgs/model/openai.svg",
"maxContext": 125000,
"maxResponse": 4000,
"quoteMaxToken": 100000,
"maxTemperature": 1.2,
"charsPointsPrice": 0,
"censor": false,
"vision": false,
"datasetProcess": false,
"usedInClassify": true,
"usedInExtractFields": true,
"usedInToolCall": true,
"usedInQueryExtension": true,
"toolChoice": true,
"functionCall": false,
"customCQPrompt": "",
"customExtractPrompt": "",
"defaultSystemChatPrompt": "",
"defaultConfig": {}
},
{
"model": "gpt-4-vision-preview",
"name": "gpt-4-vision",
"avatar": "/imgs/model/openai.svg",
"maxContext": 128000,
"maxResponse": 4000,
"quoteMaxToken": 100000,
"maxTemperature": 1.2,
"charsPointsPrice": 0,
"censor": false,
"vision": true,
"datasetProcess": false,
"usedInClassify": false,
"usedInExtractFields": false,
"usedInToolCall": false,
"usedInQueryExtension": false,
"toolChoice": true,
"functionCall": false,
"customCQPrompt": "",
"customExtractPrompt": "",
"defaultSystemChatPrompt": "",
"defaultConfig": {}
},
{
"model": "chatglm3-6b",
"name": "chatglm3-6b",
"maxContext": 8000,
"maxResponse": 8000,
"quoteMaxToken": 2000,
"maxTemperature": 1,
"vision": false,
"defaultSystemChatPrompt": ""
}
],
"vectorModels": [
{
"model": "m3e",
"name": "M3E(测试使用)",
"price": 0.1,
"defaultToken": 500,
"maxToken": 1800
}
],
"reRankModels": [],
"audioSpeechModels": [
{
"model": "tts-1",
"name": "OpenAI TTS1",
"charsPointsPrice": 0,
"voices": [
{
"label": "Alloy",
"value": "alloy",
"bufferId": "openai-Alloy"
},
{
"label": "Echo",
"value": "echo",
"bufferId": "openai-Echo"
},
{
"label": "Fable",
"value": "fable",
"bufferId": "openai-Fable"
},
{
"label": "Onyx",
"value": "onyx",
"bufferId": "openai-Onyx"
},
{
"label": "Nova",
"value": "nova",
"bufferId": "openai-Nova"
},
{
"label": "Shimmer",
"value": "shimmer",
"bufferId": "openai-Shimmer"
}
]
}
],
"whisperModel": {
"model": "whisper-1",
"name": "Whisper1",
"charsPointsPrice": 0
}
}

常见问题

  • 使用WSL最好电脑内存大点,WSL这玩意很吃内存
  • 能用docker就用docker,很多都是因为配置的问题出错
  • 显存不够可以量化,不行就用API

参考链接

https://doc.fastai.site/docs/development/docker/

https://github.com/THUDM/ChatGLM3/tree/main/openai_api_demo

https://huggingface.co/THUDM/chatglm3-6b

https://www.modelscope.cn/models/xrunda/m3e-base/summary

https://github.com/labring/FastGPT

https://github.com/labring/sealos

https://github.com/songquanpeng/one-api

如果您喜欢这篇文章,不妨给它点个赞并收藏,感谢您的支持!

单元测试和集成测试的区别

· 阅读需 3 分钟
素明诚
Full stack development

单元测试(Unit Testing)

单元测试是对软件中的最小可测试部分,通常是单个函数或方法进行测试,以确保它们能够正确执行预定的任务。可以说单元测试是针对每一个函数或方法都是一块砖,单元测试就是确保每块砖本身没有缺陷。

为什么要进行单元测试

确保质量 通过单元测试,开发者可以确认每个基本部件都按照预期工作。

快速识别问题 如果出现问题,很容易定位到具体的函数或方法,因为测试是独立进行的。

单元测试的特点

独立性 每个测试独立于其他测试,确保测试结果的准确性。

无外部依赖 避免使用数据库或网络服务等外部系统,简单点说就是,只是针对特定的组件进行测试,不连接缓存或者是数据库,也不会请求其他的模块~

集成测试(Integration Testing)

集成测试是在单元测试之后进行的,目的是检查多个单元(比如几个函数或模块)组合在一起时是否能够协同工作。如果说单元测试是检查每块砖,那么集成测试就是检查砖与砖之间的粘合是否牢固。

为什么要进行集成测试

接口和数据流 确保不同部分之间的接口正确无误,数据能够正确传递。

发现协同问题 单元测试可能漏掉的问题,在集成测试中往往可以被发现,一般来说只要你对其他业务接口的数据理解没问题,集成测试也基本没问题

集成测试的特点

逐步集成 从测试最小的组合开始,逐步增加直到整个系统被测试完毕。很多人理解集成测试就是一下子都笼起来,全部测一下。其实也是可以一部分一部分测的

模拟真实环境 尽可能在接近生产环境的设置中进行测试,以模拟真实操作。

总结

单元测试就是为了测试某个具体的功能是否能通过,有 bug 就改完再测,不行就再来亿遍。集成测试你的代码基本跑通,接下来要跟第三方进行集成测试。

Windows10 安装 Miniconda

· 阅读需 1 分钟
素明诚
Full stack development

下载 miniconda

Installing Miniconda## 添加环境变量

手动添加 Miniconda 到 PATH

1. 打开环境变量设置

按下 Win + X,选择“系统”,然后点击“高级系统设置”。

或者通过“控制面板” > “系统和安全” > “系统” > “高级系统设置”进入。

2. 编辑环境变量

点击“环境变量”按钮。

在“用户变量”或“系统变量”中,找到并选中 Path,然后点击“编辑”。

3. 添加 Miniconda 路径

点击“新建”,添加以下路径(根据您的实际安装路径):

C:\Users\<username>\Miniconda3

C:\Users\<username>\Miniconda3\Scripts

C:\Users\<username>\Miniconda3\Library\bin

注意:请将 <username> 替换为您的实际用户名。

4. 保存更改

  • 点击“确定”保存环境变量设置。
  • 关闭所有对话框。

5. 重启 PowerShell

  • 关闭并重新打开 PowerShell,以使环境变量更改生效。

验证

conda --version

systemd 常用命令

· 阅读需 1 分钟
素明诚
Full stack development

systemctl 是用于控制 systemd 系统和服务管理器的命令行工具。systemd 是现代 Linux 发行版中常用的初始化系统和服务管理器,负责在系统启动时启动和管理系统服务。

常用命令

启动服务

sudo systemctl start <服务名>

停止服务

sudo systemctl stop <服务名>

重启服务

sudo systemctl restart <服务名>

查看服务状态

sudo systemctl status <服务名>

启用服务(开机自启)

sudo systemctl enable <服务名>

禁用服务(取消开机自启)

sudo systemctl disable <服务名>

查看所有正在运行的服务

systemctl list-units --type=service --state=running

查看所有服务(包括未运行的)

systemctl list-unit-files --type=service

示例

启动 Apache 服务

sudo systemctl start apache2

查看 Apache 服务状态

sudo systemctl status apache2

设置 Apache 开机自启

sudo systemctl enable apache2

停止 Apache 服务

sudo systemctl stop apache2

常用的 journalctl 命令总结

· 阅读需 2 分钟
素明诚
Full stack development

journalctl 是一个用于查看由 systemd 收集的系统和服务日志的工具。

1. 查看所有日志

journalctl

2. 实时查看日志(类似于 tail -f)

journalctl -f

3. 按服务查看日志

查看特定服务的日志

journalctl -u <服务名>

例如,查看 nginx 服务的日志

journalctl -u nginx

实时查看特定服务的日志

journalctl -u <服务名> -f

4. 按时间过滤日志

查看自某时间以来的日志

journalctl --since "YYYY-MM-DD HH:MM:SS"

例如,查看今天的日志

journalctl --since today

查看某时间范围内的日志

journalctl --since "YYYY-MM-DD" --until "YYYY-MM-DD"

5. 按引导(启动)次数查看日志

查看当前引导(启动)以来的日志

journalctl -b

查看上一次引导的日志

journalctl -b -1

列出所有引导记录

journalctl --list-boots

6. 按优先级(日志级别)过滤

仅显示错误级别及以上的日志

journalctl -p err

指定优先级范围

journalctl -p 0..3

优先级等级(从高到低)

优先级级别含义
0emerg紧急
1alert警报
2crit严重
3err错误
4warning警告
5notice通知
6info信息
7debug调试

7. 搜索日志内容

按关键词搜索日志

journalctl | grep "关键词"

使用 -g 选项搜索

journalctl -g "关键词"

8. 按进程或用户过滤

按进程 ID(PID)过滤

journalctl _PID=1234

按用户 ID(UID)过滤

journalctl _UID=1000

9. 查看内核日志

journalctl -k

10. 反向排序日志(最新的在前)

journalctl -r

11. 查看磁盘日志占用情况

journalctl --disk-usage

12. 清理日志

保留最近 7 天的日志

sudo journalctl --vacuum-time=7d

将日志总大小限制为 1GB

sudo journalctl --vacuum-size=1G

13. 不使用分页器显示

journalctl --no-pager

14. 指定输出格式

详细模式

journalctl -o verbose

JSON 格式

journalctl -o json-pretty

15. 导出和导入日志

导出日志

journalctl --since "YYYY-MM-DD" --until "YYYY-MM-DD" --output=export > logs.bin

导入日志

journalctl --file=logs.bin

什么是控制反转IoC

· 阅读需 3 分钟
素明诚
Full stack development

控制反转(IoC,Inversion of Control)

控制反转是一种软件设计原则,用于减少程序中组件间的耦合。在这种模式下,组件的依赖关系不由组件本身管理,而是交由外部容器或框架控制。IoC 提升了代码的模块性、可维护性和可测试性。

IoC 的工作原理

在传统编程模式中,组件通常自行创建和管理其依赖对象。例如,UserHandler 类可能会直接实例化 UserService 来处理用户业务逻辑,导致两者之间紧密耦合,难以修改或测试。

使用 IoC 时,UserHandler 通过构造函数注入或其他方式获得 UserService 的实例,而不是自行创建。这个实例由 IoC 容器在运行时动态提供。UserHandler 只需声明其对 UserService 的需求,无需关心其具体实现,从而降低了耦合。

IoC 的优点

降低耦合度:组件不直接负责依赖的创建和管理,降低了相互依赖性。

增强模块性:组件化更明确,系统各部分更易于理解和修改。

提高可维护性:降低的耦合度意味着修改一个组件对其他组件的影响较小。

增强可测试性:依赖注入使得替换组件进行单元测试变得简单,可以使用模拟对象代替真实服务。

使用 IoC 时的注意事项

虽然 IoC 带来多种优势,但过度依赖 IoC 可能导致代码复杂化,错误追踪困难。应避免过度设计,合理利用 IoC 以解决特定问题,同时保持代码的简洁和清晰

我将展示一个简单的 Go 示例,其中UserHandler依赖于一个UserService接口来处理用户相关的业务逻辑,而具体的服务实现则在运行时注入。

UserService.go

首先,定义一个UserService接口,它声明了必须由任何实现该接口的服务提供的方法。

package main

// UserService 定义了用户服务需要实现的接口
type UserService interface {
ProcessUser()
}

UserServiceImpl.go

接下来,定义一个实现了UserService接口的UserServiceImpl类。

package main

import "fmt"

// UserServiceImpl 是UserService的具体实现
type UserServiceImpl struct{}

// ProcessUser 是UserService接口的实现,处理用户业务
func (s *UserServiceImpl) ProcessUser() {
fmt.Println("Processing user...")
}

UserHandler.go

然后,定义UserHandler,它有一个UserService类型的字段。UserHandler不会直接创建UserService的实例,而是在构造时通过依赖注入接收。

package main

// UserHandler 负责用户操作,依赖UserService
type UserHandler struct {
userService UserService // 使用接口,而非具体类
}

// NewUserHandler 创建一个新的UserHandler实例,需要注入UserService
func NewUserHandler(service UserService) *UserHandler {
return &UserHandler{userService: service}
}

// HandleUser 使用注入的服务处理用户
func (h *UserHandler) HandleUser() {
h.userService.ProcessUser()
fmt.Println("User handled.")
}

main.go

最后,在main函数中创建UserServiceImpl的实例,并将其注入到UserHandler中。

package main

func main() {
userService := &UserServiceImpl{} // 创建UserService的实例
userHandler := NewUserHandler(userService) // 注入UserService到UserHandler
userHandler.HandleUser() // 调用UserHandler的方法
}

补充

依赖查找也是 IoC 的一种实现方式,但是 Go 没有内置支持依赖查找,但你可以通过一些设计模式或第三方库来实现它。