Skip to content

LlamaCoder:开源版 Claude Artifacts 的全链路流程解析

LlamaCoder 是一个由 Nutlope 开发的开源项目,旨在利用 Llama 3.1 等大语言模型实现类似 Claude Artifacts 的代码生成与实时预览功能。本文将深入剖析 LlamaCoder 的核心工作流程,包括会话创建、多模型协作以及代码生成的具体实现逻辑,帮助读者理解其背后的技术架构与设计思路。

功能概述: 初始化一个新的 AI 编码对话会话

核心流程:

用户提交 prompt
1. 创建 Chat 记录(数据库)
2. 并行执行两个任务:
├── fetchTitle() - 生成聊天标题(3-5词)
└── fetchTopExample() - 匹配相似示例(landing page/blog app等)
3. 如果有截图 → 调用视觉模型分析截图
4. 如果 quality === "high" → 调用架构模型生成项目计划
5. 保存初始消息到数据库:
├── System 消息(编码指令)
└── User 消息(prompt 或项目计划)
6. 返回 { chatId, lastMessageId }

使用的模型:

  • meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo - 标题生成、示例匹配
  • Qwen/Qwen3-VL-32B-Instruct - 截图分析
  • Qwen/Qwen3-Next-80B-A3B-Instruct - 项目计划

2. /api/get-next-completion-stream-promise - 流式生成代码

Section titled “2. /api/get-next-completion-stream-promise - 流式生成代码”

功能概述: 根据对话历史流式生成 AI 响应

核心流程:

接收 messageId, model
1. 查询消息及历史记录(position <= 当前消息)
2. Token 优化:
├── optimizeMessagesForTokens() - 移除早期 assistant 消息中的代码块
└── 消息长度限制 - 超过10条时保留 [前3条 + 最后7条]
3. 调用 LLM(stream: true)
4. 直接返回流式响应

Token 优化策略:

  1. 代码块剥离 - 保留最后 2 条 assistant 消息的完整内容,早期消息删除代码块
  2. 消息裁剪 - 最多保留 10 条消息:[0, 1, 2] + 最后7条
[page.tsx:132-167]
用户提交表单
POST /api/create-chat
(创建会话、生成标题、项目计划)
返回 { chatId, lastMessageId }
POST /api/get-next-completion-stream-promise
(核心:这里调用 LLM stream: true)
获取流式响应 → 跳转到 /chats/{chatId}
(页面边接收边显示 AI 生成的代码)

简单总结:

接口作用返回
/api/create-chat初始化对话,生成标题、项目计划chatIdlastMessageId
/api/get-next-completion-stream-promise流式生成 AI 代码响应可读流

页面跳转到 /chats/{chatId} 时的完整流程

Section titled “页面跳转到 /chats/{chatId} 时的完整流程”
[首页] 用户提交表单
POST /api/create-chat
→ 创建 Chat 记录
→ 生成初始消息(system + user)
→ 返回 { chatId, lastMessageId }
POST /api/get-next-completion-stream-promise
→ 获取流式响应 Promise
router.push(`/chats/${chatId}`)
┌─────────────────────────────────────────────────┐
│ [chats/[id]/page.tsx] 服务端渲染 │
├─────────────────────────────────────────────────┤
│ 1. getChatById(id) │
│ → 从数据库加载 Chat 和消息 │
│ 2. 生成 HTML + 初始数据 │
│ 3. 返回给客户端 │
└─────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────┐
│ [page.client.tsx] 客户端激活 │
├─────────────────────────────────────────────────┤
│ 1. 接收服务端传来的 chat 数据 │
│ 2. 从 Context 获取 streamPromise │
│ 3. useEffect 触发流处理 │
│ 4. 开始接收 AI 流式响应 │
│ 5. 逐字显示代码 │
│ 6. 流结束后保存消息到数据库 │
└─────────────────────────────────────────────────┘
// 用户访问 /chats/abc123
export default async function Page({ params }) {
const id = (await params).id;
// 从数据库加载聊天数据
const chat = await getChatById(id);
// {
// id: "abc123",
// title: "Todo App",
// messages: [
// { position: 0, role: "system", content: "..." },
// { position: 1, role: "user", content: "Build me a todo app" }
// ]
// }
return <PageClient chat={chat} />;
}

此时 AI 还没有生成代码,只有初始的 system 和 user 消息。


export default function PageClient({ chat }: { chat: Chat }) {
const context = use(Context);
// streamPromise 是从首页传过来的!
const [streamPromise, setStreamPromise] = useState(
context.streamPromise // ← 这是已经开始的流
);

useEffect(() => {
async function f() {
const stream = await streamPromise; // 获取可读流
ChatCompletionStream.fromReadableStream(stream)
.on("content", (delta, content) => {
// 每收到一个 token 触发
setStreamText((text) => text + delta);
// "im" -> "imp" -> "impor" -> "import" ...
// 检测到代码时显示代码查看器
if (content.includes("```")) {
setIsShowingCodeViewer(true);
}
})
.on("finalContent", async (finalText) => {
// 流结束后保存到数据库
const message = await createMessage(
chat.id,
finalText, // 完整的 AI 响应
"assistant",
allFiles, // 提取的文件
);
router.refresh(); // 刷新服务端数据
});
}
f();
}, [streamPromise]);

时间轴:
T+0s: 页面加载,显示 ChatLog(只有初始消息)
┌─────────────────────────┐
│ system: You are expert │
│ user: Build me a todo │
└─────────────────────────┘
T+1s: 开始接收流
┌─────────────────────────┐
│ system: You are expert │
│ user: Build me a todo │
│ assistant: Here's a │ ← 实时显示
└─────────────────────────┘
T+2s: 继续接收
│ assistant: Here's a React│
T+5s: 检测到代码块,打开 CodeViewer
┌────────────┬─────────────┐
│ ChatLog │ CodeViewer │
│ ... │ import React│
│ │ export ... │
└────────────┴─────────────┘
T+30s: 流结束,保存到数据库
router.refresh() → 服务端数据更新

阶段位置发生了什么
首页page.tsx调用 /api/create-chat → 创建 Chat
首页page.tsx调用 /api/get-next-completion-stream-promise → 获取流
跳转router.push导航到 /chats/{id},携带 streamPromise
服务端[id]/page.tsxgetChatById() → 加载数据库数据(此时无 AI 响应)
客户端page.client.tsxuseEffect → 处理流,逐字显示
结束page.client.tsxcreateMessage() → 保存 AI 响应到数据库

关键理解: 页面跳转时,AI 已经在生成代码了(流已经开始),页面只是负责接收和显示这个流。

这是一个服务端数据获取函数,用于加载聊天会话的完整数据。

const getChatById = cache(async (id: string) => {
// 1. 查询 Chat 基本信息
// 2. 查询消息(分批加载策略)
// 3. 返回组合数据
});
1. 查询 Chat 记录
2. 统计总消息数 (totalMessages)
3. 分批加载消息:
├── position 0,1 (必须加载:system prompt + 初始用户消息)
└── position ≥2 的最近 100 条
4. 计算版本计数器 (assistantMessagesCountBefore)
5. 返回组合数据
假设数据库有 200 条消息:
position 0: [system] 编码指令
position 1: [user] 初始 prompt
position 2-101: [对话...]
position 102-200: [最近对话...]
加载结果:
✓ position 0 (必须)
✓ position 1 (必须)
✓ position 102-200 (最近 100 条)
✗ position 2-101 (跳过,节省数据传输)
最终按 position 排序返回
const getChatById = cache(async (id: string) => {

React Server Component 的缓存机制:

同一个请求中,getChatById 可能被调用多次:
├── generateMetadata() 需要 chat.title
└── Page() 需要 chat 数据
使用 cache() 确保只查询一次数据库,
第二次调用直接返回缓存结果
{
id: "abc123",
title: "Budgeting App",
model: "deepseek-v3-2-251201",
quality: "high",
prompt: "Build me a budgeting app",
messages: [
{ position: 0, role: "system", content: "..." },
{ position: 1, role: "user", content: "..." },
{ position: 102, role: "assistant", content: "..." },
// ... 最多 103 条消息
],
totalMessages: 200, // 总消息数(用于显示"加载更多")
assistantMessagesCountBefore: 50 // 用于版本控制
}
  1. 性能 - 不加载所有历史消息,只加载必要的 + 最近 100 条
  2. 成本 - 减少数据库查询和数据传输
  3. 用户体验 - 用户主要关心最近的对话内容

流式响应中提取代码到多个文件的完整解析

Section titled “流式响应中提取代码到多个文件的完整解析”

AI 被要求以特定的 Markdown 格式输出多个文件:

Here's your todo app:
```tsx{path=src/App.tsx}
import React from 'react';
export default function App() { ... }
body { margin: 0; }
{
"name": "todo-app"
}
### 2. 代码块提取 (`lib/utils.ts`)
#### `parseReplySegments()` - 实时解析流式内容
```typescript
// 逐行解析,支持流式(未闭合的代码块标记为 partial)
export function parseReplySegments(markdown: string): ReplySegment[] {
const lines = markdown.split("\n");
const fenceRegex = /^```([^\n]*)$/;
let openTag: string | null = null; // 当前代码块是否打开
let codeBuffer: string[] = []; // 代码内容缓冲区
for (const line of lines) {
const match = line.match(fenceRegex);
if (match && !openTag) {
// 开始代码块: tsx{path=src/App.tsx}
openTag = match[1] || "";
} else if (match && openTag) {
// 结束代码块: ```
segments.push({
type: "file",
code: codeBuffer.join("\n"),
language: "tsx",
path: "src/App.tsx",
isPartial: false, // ← 已完成
});
openTag = null;
} else if (openTag) {
// 代码块内的行
codeBuffer.push(line);
}
}
// 如果流结束时代码块未闭合
if (openTag) {
segments.push({
type: "file",
code: codeBuffer.join("\n"),
language: "tsx",
path: "src/App.tsx",
isPartial: true, // ← 正在生成中
});
}
}

extractAllCodeBlocks() - 提取所有已完成的代码块

Section titled “extractAllCodeBlocks() - 提取所有已完成的代码块”
export function extractAllCodeBlocks(input: string) {
const codeBlockRegex = /```([^\n]*)\n([\s\S]*?)\n```/g;
const files = [];
let match;
while ((match = codeBlockRegex.exec(input)) !== null) {
const fenceTag = match[1]; // "tsx{path=src/App.tsx}"
const code = match[2]; // 代码内容
// 解析标签
const { language, path } = parseFenceTag(fenceTag);
// language = "tsx"
// path = "src/App.tsx"
files.push({ code, language, path });
}
return files;
}
function parseFenceTag(tag: string) {
// 输入: "tsx{path=src/App.tsx}"
const langMatch = tag.match(/^([A-Za-z0-9]+)/);
const language = langMatch ? langMatch[1] : "text";
// language = "tsx"
const pathMatch = tag.match(/path\s*=\s*([^}\s]+)/);
const path = pathMatch ? pathMatch[1] : `file.${getExtensionForLanguage(language)}`;
// path = "src/App.tsx"
return { language, path };
}
ChatCompletionStream.fromReadableStream(stream)
.on("content", (delta, content) => {
// 每收到一个 token
setStreamText((text) => text + delta);
// 解析当前内容
const segments = parseReplySegments(content);
// [
// { type: "text", content: "Here's your app:\n\n" },
// { type: "file", code: "import...", language: "tsx", path: "src/App.tsx", isPartial: true }
// ]
// 检测到文件时打开代码查看器
if (segments.some((seg) => seg.type === "file")) {
setIsShowingCodeViewer(true);
}
// 第一个完整文件时显示预览
if (segments.some((seg) => seg.type === "file" && !seg.isPartial)) {
setActiveTab("preview");
}
})
.on("finalContent", async (finalText) => {
// 流结束后提取所有文件
const currentFiles = extractAllCodeBlocks(finalText);
// [
// { code: "import React...", language: "tsx", path: "src/App.tsx" },
// { code: "body { margin: 0 }", language: "css", path: "src/styles.css" },
// { code: "{ \"name\": \"todo\" }", language: "json", path: "package.json" }
// ]
// 合并之前的文件(同一 path 的文件会被覆盖)
const fileMap = new Map();
previousFiles.forEach((f) => fileMap.set(f.path, f));
currentFiles.forEach((f) => fileMap.set(f.path, f));
const allFiles = Array.from(fileMap.values());
// 保存到数据库
await createMessage(chat.id, finalText, "assistant", allFiles);
});
// 合并流中的文件
const streamAllFiles = extractAllCodeBlocks(streamText); // 已完成的
const latestStreamBlock = extractLatestStreamBlock(streamText); // 正在生成的
// 合并:同一 path 的文件,新的覆盖旧的
let mergedStreamFiles = [...streamAllFiles];
if (latestStreamBlock) {
const existingIdx = mergedStreamFiles.findIndex(
(f) => f.path === latestStreamBlock.path,
);
if (existingIdx !== -1) {
mergedStreamFiles[existingIdx] = latestStreamBlock; // 更新正在生成的文件
} else {
mergedStreamFiles.push(latestStreamBlock); // 添加新文件
}
}
// 与之前消息的文件合并
const baseFiles = lastMessage ? getFilesFromMessage(lastMessage) : [];
const files = mergeFiles(baseFiles, mergedStreamFiles);
AI 流式输出:
"Here's your app:\n\n```tsx{path=App.tsx}\nimpor" (t+1s)
"Here's your app:\n\n```tsx{path=App.tsx}\nimport" (t+2s)
"Here's your app:\n\n```tsx{path=App.tsx}\nimport React" (t+3s)
...
"Here's your app:\n\n```tsx{path=App.tsx}\nexport default" (t+10s)
"Here's your app:\n\n```tsx{path=App.tsx}\nexport default\n```\n\n" (t+11s - 文件1完成)
"```css{path=styles.css}\nbody" (t+12s - 开始文件2)
...
实时解析:
t+1s: [{ type: "file", path: "App.tsx", isPartial: true, code: "impor" }]
t+10s: [{ type: "file", path: "App.tsx", isPartial: true, code: "export default" }]
t+11s: [{ type: "file", path: "App.tsx", isPartial: false, code: "export default" }] ← 完成
t+12s: [{ type: "file", path: "App.tsx", isPartial: false },
{ type: "file", path: "styles.css", isPartial: true }]
UI 显示:
┌─────────────────────────────────┐
│ 📁 App.tsx │
│ import React; │
│ export default function App() { │ ← 实时更新
│ return <div>Hello</div>; │
│ } │
├─────────────────────────────────┤
│ 📁 styles.css (generating...) │
│ body │
└─────────────────────────────────┘
函数作用时机
parseReplySegments()实时解析,包括 partial 文件每次收到 token
extractAllCodeBlocks()提取所有已完成的代码块流结束后/查看历史
parseFenceTag()解析 tsx{path=src/App.tsx}解析代码块标签
extractLatestStreamBlock()获取当前正在生成的代码块实时显示

StickToBottom 包裹 SyntaxHighlighter 的原因

Section titled “StickToBottom 包裹 SyntaxHighlighter 的原因”

StickToBottom 是一个组件,用于在内容动态增长时自动滚动到底部。这对于流式代码生成非常重要。

AI 正在生成代码(流式输出):
第1秒: import React from 'react';
第2秒: import React from 'react';
export default function App() {
第3秒: import React from 'react';
export default function App() {
return <div>Hello
第4秒: import React from 'react';
export default function App() {
return <div>Hello World</div>;
}

如果不自动滚动:

初始视图(可见第1-20行):
┌─────────────────────────────┐
│ import React from 'react'; │ ← 第1行(可见)
│ ... │
│ const data = [ │ ← 第20行(可见)
└─────────────────────────────┘
第5秒后(AI生成了50行代码):
┌─────────────────────────────┐
│ import React from 'react'; │ ← 第1行(仍然可见)
│ ... │
│ const data = [ │ ← 第20行(仍然可见)
│ │
│ ↑ 用户看不到正在生成的代码! │
└─────────────────────────────┘
新增的第21-50行在视野外

使用 StickToBottom 后:

第5秒后(自动滚动到底部):
┌─────────────────────────────┐
│ ... │
│ const data = [ │
│ { id: 1, name: 'A' }, │
│ ]; │
│ │
│ export default function App()│ ← 新生成的代码(可见)
│ return <div>Hello │ ← 用户看到实时生成!
└─────────────────────────────┘
自动滚动到最后
code-viewer.tsx
<StickToBottom
className="relative grow overflow-hidden *:!h-[inherit]"
resize="smooth"
initial={false}
>
<StickToBottom.Content>
<SyntaxHighlighter
files={files.map((f) => ({
path: f.path,
content: f.code,
language: f.language,
}))}
activePath={
streamText
? latestStreamBlock?.path || files.at(-1)?.path
: undefined
}
disableSelection={!!streamText}
isStreaming={!!streamText}
/>
</StickToBottom.Content>
</StickToBottom>

StickToBottom 的工作机制:

  1. 监听内容高度变化
  2. 当内容增长时,自动滚动到底部
  3. resize="smooth" - 平滑滚动动画
  4. initial={false} - 初始不滚动(只有内容变化时才滚动)
// SyntaxHighlighter 内部也有自动滚动逻辑
useEffect(() => {
if (!isStreaming || !editorRef.current) return;
const editor = editorRef.current;
const lineCount = model?.getLineCount?.() || 1;
// 滚动到最后
editor.revealLine?.(lineCount);
editor.setScrollTop?.(scrollHeight);
}, [file?.content, activeFile, isStreaming]);
层级作用范围
StickToBottom滚动整个代码查看器容器外层容器滚动
Monaco Editor滚动编辑器内容到最后一行编辑器内部滚动
┌─────────────────────────────────────────┐
│ StickToBottom (外层容器) │
│ ┌─────────────────────────────────────┐ │
│ │ Monaco Editor (内层编辑器) │ │
│ │ │ │
│ │ import React; │ │
│ │ ... │ │
│ │ export default... (生成中) │ │ ← 两者协同确保这里可见
│ │ │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────┘
无 StickToBottom:
[ 代码生成中... ]
│ 第1行 │
│ 第2行 │
│ 第3行 │ ← 用户手动滚动
...
│ 第50行 │ ← 用户需要手动滚动到这里才能看到新代码
有 StickToBottom:
[ 代码生成中... ]
...
│ 第48行 │ ← 自动跟随
│ 第49行 │ ← 自动跟随
│ 第50行 │ ← 自动跟随,用户始终看到最新生成的代码

StickToBottom 确保在 AI 流式生成代码时:

  1. 用户始终看到最新代码 - 自动滚动到正在生成的位置
  2. 无需手动滚动 - 提供更好的观看体验
  3. 平滑动画 - resize="smooth" 提供流畅的视觉效果
  4. 只在流式时启用 - isStreaming 控制是否启用