feat(workflow): Implement Worker Runtime with Context Shell

- Implemented `ContextShell` trait providing `ls`, `find`, `grep`, `cat`, `patch`, `wc` primitives.
- Added `WorkerContext` struct acting as the shell interface for LLM workers.
- Implemented `get_tool_definitions` to export OpenAI-compatible tool schemas.
- Added `patch` command for atomic, exact-match text replacement.
- Added comprehensive tests covering happy paths and edge cases (ambiguity, unicode, regex errors).
- Updated design documentation.
This commit is contained in:
Lv, Qi 2025-11-27 01:50:48 +08:00
parent fcadb1ff6a
commit efd2c42775
5 changed files with 704 additions and 0 deletions

View File

@ -13,6 +13,7 @@ thiserror = "1.0"
hex = "0.4" hex = "0.4"
walkdir = "2.3" walkdir = "2.3"
regex = "1.10" regex = "1.10"
globset = "0.4.18"
[dev-dependencies] [dev-dependencies]
tempfile = "3.8" tempfile = "3.8"

View File

@ -2,8 +2,10 @@ pub mod types;
pub mod traits; pub mod traits;
pub mod vgcs; pub mod vgcs;
pub mod docos; pub mod docos;
pub mod worker_runtime;
pub use types::*; pub use types::*;
pub use traits::*; pub use traits::*;
pub use vgcs::Vgcs; pub use vgcs::Vgcs;
pub use docos::{DocOS, DocManager}; pub use docos::{DocOS, DocManager};
pub use worker_runtime::{WorkerContext, ContextShell, OutputFormat, FindOptions, NodeMetadata, GrepMatch, FileStats};

View File

@ -0,0 +1,378 @@
use std::path::Path;
use std::sync::Arc;
use std::env;
use anyhow::{Result, Context, anyhow};
use serde::{Serialize, Deserialize};
use serde::de::DeserializeOwned;
use globset::Glob;
use regex::Regex;
use crate::{DocOS, DocManager, Vgcs, DocNodeKind};
// --- Data Structures ---
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OutputFormat {
Text,
Json,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct NodeMetadata {
pub path: String,
pub kind: String, // "File" or "Dir"
pub size: u64,
// pub modified: bool, // TODO: Implement diff check against base
}
#[derive(Debug, Default, Clone)]
pub struct FindOptions {
pub recursive: bool,
pub max_depth: Option<usize>,
pub type_filter: Option<String>, // "File" or "Dir"
pub min_size: Option<u64>,
pub max_size: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GrepMatch {
pub path: String,
pub line_number: usize,
pub content: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FileStats {
pub path: String,
pub lines: usize,
pub bytes: usize,
}
// --- Trait Definition ---
pub trait ContextShell {
fn tree(&self, path: &str, depth: Option<usize>, format: OutputFormat) -> Result<String>;
fn find(&self, name_pattern: &str, options: FindOptions) -> Result<Vec<NodeMetadata>>;
fn grep(&self, pattern: &str, paths: Option<Vec<String>>) -> Result<Vec<GrepMatch>>;
fn cat(&self, paths: &[String]) -> Result<String>;
fn wc(&self, paths: &[String]) -> Result<Vec<FileStats>>;
fn patch(&mut self, path: &str, original: &str, replacement: &str) -> Result<()>;
}
// --- WorkerContext Implementation ---
pub struct WorkerContext {
doc: DocOS<Vgcs>,
}
impl WorkerContext {
pub fn from_env() -> Result<Self> {
let req_id = env::var("WORKFLOW_REQ_ID").context("Missing WORKFLOW_REQ_ID")?;
let commit = env::var("WORKFLOW_BASE_COMMIT").context("Missing WORKFLOW_BASE_COMMIT")?;
let data_path = env::var("WORKFLOW_DATA_PATH").context("Missing WORKFLOW_DATA_PATH")?;
let vgcs = Vgcs::new(&data_path);
let doc = DocOS::new(Arc::new(vgcs), &req_id, &commit);
Ok(Self { doc })
}
pub fn new(data_path: &str, req_id: &str, commit: &str) -> Self {
let vgcs = Vgcs::new(data_path);
let doc = DocOS::new(Arc::new(vgcs), req_id, commit);
Self { doc }
}
pub fn read_json<T: DeserializeOwned>(&self, path: impl AsRef<Path>) -> Result<T> {
let path_str = path.as_ref().to_string_lossy();
let content = self.doc.read_content(&path_str)?;
let data = serde_json::from_str(&content)
.with_context(|| format!("Failed to parse JSON from {}", path_str))?;
Ok(data)
}
pub fn read_text(&self, path: impl AsRef<Path>) -> Result<String> {
let path_str = path.as_ref().to_string_lossy();
self.doc.read_content(&path_str)
}
pub fn write_file(&mut self, path: impl AsRef<Path>, content: &str) -> Result<()> {
let path_str = path.as_ref().to_string_lossy();
self.doc.write_content(&path_str, content)
}
pub fn attach_subsection(&mut self, parent: impl AsRef<Path>, name: &str, content: &str) -> Result<()> {
let parent_str = parent.as_ref().to_string_lossy();
self.doc.insert_subsection(&parent_str, name, content)
}
pub fn commit(&mut self, message: &str) -> Result<String> {
self.doc.save(message)
}
pub fn get_tool_definitions() -> serde_json::Value {
serde_json::json!([
{
"type": "function",
"function": {
"name": "tree",
"description": "List directory structure to understand the file layout.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Root path to list (default: root)" },
"depth": { "type": "integer", "description": "Recursion depth" },
"format": { "type": "string", "enum": ["Text", "Json"], "default": "Text" }
}
}
}
},
{
"type": "function",
"function": {
"name": "find",
"description": "Find files by name pattern (glob). Fast metadata search.",
"parameters": {
"type": "object",
"required": ["pattern"],
"properties": {
"pattern": { "type": "string", "description": "Glob pattern (e.g. **/*.rs)" },
"recursive": { "type": "boolean", "default": true }
}
}
}
},
{
"type": "function",
"function": {
"name": "grep",
"description": "Search for content within files using regex.",
"parameters": {
"type": "object",
"required": ["pattern"],
"properties": {
"pattern": { "type": "string", "description": "Regex pattern" },
"paths": { "type": "array", "items": { "type": "string" }, "description": "Limit search to these paths" }
}
}
}
},
{
"type": "function",
"function": {
"name": "cat",
"description": "Read and assemble content of multiple files.",
"parameters": {
"type": "object",
"required": ["paths"],
"properties": {
"paths": { "type": "array", "items": { "type": "string" } }
}
}
}
},
{
"type": "function",
"function": {
"name": "patch",
"description": "Replace a specific text block in a file. Use this for small corrections.",
"parameters": {
"type": "object",
"required": ["path", "original", "replacement"],
"properties": {
"path": { "type": "string" },
"original": { "type": "string", "description": "Exact text to look for. Must be unique in file." },
"replacement": { "type": "string", "description": "New text to insert." }
}
}
}
}
])
}
}
impl ContextShell for WorkerContext {
fn tree(&self, path: &str, depth: Option<usize>, format: OutputFormat) -> Result<String> {
let root_node = self.doc.get_outline()?;
let target_node = if path == "/" || path == "." {
Some(&root_node)
} else {
fn find_node<'a>(node: &'a crate::DocNode, path: &str) -> Option<&'a crate::DocNode> {
if node.path == path {
return Some(node);
}
for child in &node.children {
if let Some(found) = find_node(child, path) {
return Some(found);
}
}
None
}
find_node(&root_node, path)
};
let node = target_node.ok_or_else(|| anyhow!("Path not found: {}", path))?;
match format {
OutputFormat::Json => {
Ok(serde_json::to_string_pretty(node)?)
},
OutputFormat::Text => {
let mut output = String::new();
fn print_tree(node: &crate::DocNode, prefix: &str, is_last: bool, depth: usize, max_depth: Option<usize>, output: &mut String) {
if let Some(max) = max_depth {
if depth > max { return; }
}
let name = if node.path == "/" { "." } else { &node.name };
if depth > 0 {
let connector = if is_last { "└── " } else { "├── " };
output.push_str(&format!("{}{}{}\n", prefix, connector, name));
} else {
output.push_str(&format!("{}\n", name));
}
let child_prefix = if depth > 0 {
if is_last { format!("{} ", prefix) } else { format!("{}", prefix) }
} else {
"".to_string()
};
for (i, child) in node.children.iter().enumerate() {
print_tree(child, &child_prefix, i == node.children.len() - 1, depth + 1, max_depth, output);
}
}
print_tree(node, "", true, 0, depth, &mut output);
Ok(output)
}
}
}
fn find(&self, name_pattern: &str, options: FindOptions) -> Result<Vec<NodeMetadata>> {
let root = self.doc.get_outline()?;
let mut results = Vec::new();
let glob = Glob::new(name_pattern)?.compile_matcher();
fn traverse(node: &crate::DocNode, glob: &globset::GlobMatcher, opts: &FindOptions, depth: usize, results: &mut Vec<NodeMetadata>) {
if let Some(max) = opts.max_depth {
if depth > max { return; }
}
let match_name = glob.is_match(&node.name) || glob.is_match(&node.path);
let kind_str = match node.kind {
DocNodeKind::Composite => "Dir",
DocNodeKind::Leaf => "File",
DocNodeKind::Section => "Section",
};
let type_match = match &opts.type_filter {
Some(t) => t.eq_ignore_ascii_case(kind_str),
None => true,
};
if depth > 0 && match_name && type_match {
results.push(NodeMetadata {
path: node.path.clone(),
kind: kind_str.to_string(),
size: 0,
});
}
if opts.recursive || depth == 0 {
for child in &node.children {
traverse(child, glob, opts, depth + 1, results);
}
}
}
traverse(&root, &glob, &options, 0, &mut results);
Ok(results)
}
fn grep(&self, pattern: &str, paths: Option<Vec<String>>) -> Result<Vec<GrepMatch>> {
let re = Regex::new(pattern).context("Invalid regex pattern")?;
let target_paths = match paths {
Some(p) => p,
None => {
let all_nodes = self.find("**/*", FindOptions {
recursive: true,
type_filter: Some("File".to_string()),
..Default::default()
})?;
all_nodes.into_iter().map(|n| n.path).collect()
}
};
let mut matches = Vec::new();
for path in target_paths {
if let Ok(content) = self.read_text(&path) {
for (i, line) in content.lines().enumerate() {
if re.is_match(line) {
matches.push(GrepMatch {
path: path.clone(),
line_number: i + 1,
content: line.trim().to_string(),
});
}
}
}
}
Ok(matches)
}
fn cat(&self, paths: &[String]) -> Result<String> {
let mut output = String::new();
for path in paths {
match self.read_text(path) {
Ok(content) => {
output.push_str(&format!("<file path=\"{}\">\n", path));
output.push_str(&content);
if !content.ends_with('\n') {
output.push('\n');
}
output.push_str("</file>\n\n");
},
Err(e) => {
output.push_str(&format!("<!-- Failed to read {}: {} -->\n", path, e));
}
}
}
Ok(output)
}
fn wc(&self, paths: &[String]) -> Result<Vec<FileStats>> {
let mut stats = Vec::new();
for path in paths {
if let Ok(content) = self.read_text(path) {
stats.push(FileStats {
path: path.clone(),
lines: content.lines().count(),
bytes: content.len(),
});
}
}
Ok(stats)
}
fn patch(&mut self, path: &str, original: &str, replacement: &str) -> Result<()> {
let content = self.read_text(path)?;
let matches: Vec<_> = content.match_indices(original).collect();
match matches.len() {
0 => return Err(anyhow!("Original text not found in {}", path)),
1 => {
let new_content = content.replace(original, replacement);
self.write_file(path, &new_content)?;
Ok(())
},
_ => return Err(anyhow!("Ambiguous match: original text found {} times", matches.len())),
}
}
}

View File

@ -0,0 +1,142 @@
use workflow_context::{WorkerContext, ContextShell, OutputFormat, FindOptions, Vgcs, ContextStore};
use tempfile::TempDir;
const ZERO_OID: &str = "0000000000000000000000000000000000000000";
fn setup_env() -> (TempDir, String, String) {
let temp_dir = TempDir::new().unwrap();
let data_path = temp_dir.path().to_str().unwrap().to_string();
let req_id = "req-shell-test".to_string();
// Init Repo
let vgcs = Vgcs::new(&data_path);
vgcs.init_repo(&req_id).unwrap();
(temp_dir, data_path, req_id)
}
#[test]
fn test_shell_comprehensive() -> anyhow::Result<()> {
let (_tmp, data_path, req_id) = setup_env();
// 1. Setup Initial Context
let mut ctx = WorkerContext::new(&data_path, &req_id, ZERO_OID);
ctx.write_file("README.md", "Project Root\n\nIntroduction here.")?;
ctx.write_file("src/main.rs", "fn main() {\n println!(\"Hello\");\n println!(\"Hello\");\n}")?; // Double Hello for ambiguity test
ctx.write_file("src/util.rs", "pub fn util() -> i32 { 42 }")?;
ctx.write_file("data/config.json", "{\n \"key\": \"value\",\n \"retries\": 3\n}")?;
ctx.write_file("文档/说明.txt", "这是一个中文文件。")?; // Unicode Path & Content
let commit_1 = ctx.commit("Init")?;
let mut ctx = WorkerContext::new(&data_path, &req_id, &commit_1);
// --- Find Tests ---
println!("Testing Find...");
// Test: Recursive vs Non-recursive
// Note: Includes directories (src, data, 文档) + files (5) = 8
let all_nodes = ctx.find("**/*", FindOptions { recursive: true, ..Default::default() })?;
assert_eq!(all_nodes.len(), 8);
// Test: Only Files
let only_files = ctx.find("**/*", FindOptions {
recursive: true,
type_filter: Some("File".to_string()),
..Default::default()
})?;
assert_eq!(only_files.len(), 5);
// Test: Non-recursive (Top level)
let root_nodes = ctx.find("*", FindOptions { recursive: false, ..Default::default() })?;
// Expect README.md, src(dir), data(dir), 文档(dir)
assert!(root_nodes.iter().any(|f| f.path == "README.md"));
assert!(root_nodes.iter().any(|f| f.path == "src"));
// Test: Type Filter (Dir)
let dirs = ctx.find("**/*", FindOptions {
recursive: true,
type_filter: Some("Dir".to_string()),
..Default::default()
})?;
assert!(dirs.iter().any(|d| d.path == "src"));
assert!(dirs.iter().any(|d| d.path == "data"));
assert!(dirs.iter().any(|d| d.path == "文档"));
assert!(!dirs.iter().any(|d| d.path == "README.md"));
// --- Grep Tests ---
println!("Testing Grep...");
// Test: Regex Match
let matches = ctx.grep(r"fn \w+\(\)", None)?;
assert_eq!(matches.len(), 2); // main() and util()
// Test: Unicode Content
let zh_matches = ctx.grep("中文", None)?;
assert_eq!(zh_matches.len(), 1);
assert_eq!(zh_matches[0].path, "文档/说明.txt");
// Test: Invalid Regex
let bad_regex = ctx.grep("(", None);
assert!(bad_regex.is_err());
// --- Patch Tests ---
println!("Testing Patch...");
// Test: Ambiguous Match (Safety Check)
// src/main.rs has two "println!(\"Hello\");"
let res = ctx.patch("src/main.rs", "println!(\"Hello\");", "println!(\"World\");");
assert!(res.is_err(), "Should fail on ambiguous match");
let err_msg = res.unwrap_err().to_string();
assert!(err_msg.contains("Ambiguous match"), "Error message mismatch: {}", err_msg);
// Test: Unique Match
// Patch "Introduction here." to "Intro v2." in README.md
ctx.patch("README.md", "Introduction here.", "Intro v2.")?;
ctx.commit("Patch 1")?; // Must commit to verify via read (if read uses committed state)
// Verify
let readme = ctx.read_text("README.md")?;
assert!(readme.contains("Intro v2."));
// Test: Special Characters (Literal Match)
// Let's try to patch JSON which has braces and quotes
ctx.patch("data/config.json", "\"retries\": 3", "\"retries\": 5")?;
ctx.commit("Patch 2")?;
let config = ctx.read_text("data/config.json")?;
assert!(config.contains("\"retries\": 5"));
// Test: Cross-line Patch
// Replace the whole function body in util.rs
let old_block = "pub fn util() -> i32 { 42 }";
let new_block = "pub fn util() -> i32 {\n return 100;\n}";
ctx.patch("src/util.rs", old_block, new_block)?;
ctx.commit("Patch 3")?;
let util = ctx.read_text("src/util.rs")?;
assert!(util.contains("return 100;"));
// Test: Patch non-existent file
let res = ctx.patch("ghost.txt", "foo", "bar");
assert!(res.is_err());
Ok(())
}
#[test]
fn test_tool_schema_validity() {
let defs = WorkerContext::get_tool_definitions();
assert!(defs.is_array());
let arr = defs.as_array().unwrap();
// Verify critical fields exist for OpenAI
for tool in arr {
let obj = tool.as_object().unwrap();
assert_eq!(obj["type"], "function");
let func = obj["function"].as_object().unwrap();
assert!(func.contains_key("name"));
assert!(func.contains_key("description"));
assert!(func.contains_key("parameters"));
}
}

View File

@ -0,0 +1,181 @@
# 设计方案 3: Worker Runtime (Context Shell & Utilities)
## 1. 定位与目标
Worker Runtime 是连接底层文档系统 (DocOS) 与上层业务逻辑 (LLM Worker) 的桥梁。
如果说 VGCS 是硬盘DocOS 是文件系统,那么 Worker Runtime 就是 **Shell (命令行工具集)**
它的核心任务是:**高效地检索、过滤、组装上下文,为大模型准备输入 (Prompt Context)。**
## 2. 核心设计Context Shell
我们借鉴 Linux Coreutils 及现代 Rust CLI 工具(如 `fd`, `ripgrep`, `exa`)的理念,提供一组高效、强类型、支持结构化输出的原语。
### 2.1 接口定义 (Rust Trait)
```rust
pub enum OutputFormat {
Text, // 人类/LLM 可读的文本 (如 ASCII Tree)
Json, // 程序可读的结构化数据 (类似 jq 输入)
}
pub trait ContextShell {
/// [tree]: 生成目录树视图
/// depth: 递归深度
/// format: Text (ASCII Tree) | Json (Nested Objects)
fn tree(&self, path: &str, depth: Option<usize>, format: OutputFormat) -> Result<String>;
/// [find]: 基于元数据的快速查找 (不读取内容)
/// name_pattern: Glob (如 "*.rs")
/// min_size/max_size: 大小过滤
fn find(&self, name_pattern: &str, options: FindOptions) -> Result<Vec<NodeMetadata>>;
/// [grep]: 全文检索 (读取内容)
/// pattern: Regex
/// paths: 限制搜索的文件列表 (通常由 find 的结果输入)
fn grep(&self, pattern: &str, paths: Option<Vec<String>>) -> Result<Vec<GrepMatch>>;
/// [cat]: 读取并组装内容
/// 自动处理 Blob 下载,拼接多个文件,添加 header
fn cat(&self, paths: &[&str]) -> Result<String>;
/// [wc]: 统计元数据 (行数, 字节数)
fn wc(&self, paths: &[&str]) -> Result<Vec<FileStats>>;
/// [patch]: 局部修补 (新增)
/// 基于精确文本匹配的替换,避免全量读写。
/// original: 必须在文件中唯一存在的原文片段
/// replacement: 替换后的新文本
fn patch(&self, path: &str, original: &str, replacement: &str) -> Result<()>;
}
```
### 2.2 数据结构 (Serializable)
```rust
#[derive(Serialize, Deserialize)]
pub struct NodeMetadata {
pub path: String,
pub kind: NodeKind, // File | Dir
pub size: u64,
// pub modified: bool, // TODO: Implement diff check against base
}
#[derive(Serialize, Deserialize)]
pub struct FindOptions {
pub recursive: bool,
pub max_depth: Option<usize>,
pub type_filter: Option<String>, // "File" or "Dir"
pub min_size: Option<u64>,
pub max_size: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GrepMatch {
pub path: String,
pub line_number: usize,
pub content: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FileStats {
pub path: String,
pub lines: usize,
pub bytes: usize,
}
```
## 3. 详细功能设计
### 3.1 tree (Structure Awareness)
* **场景**: LLM 需要先看一眼 "Map" 才能知道去哪找宝藏。
* **Text Mode**: 生成经典的 ASCII Tree直接作为 Prompt 的一部分。
* **Json Mode**: 嵌套的 JSON 对象,供代码逻辑分析结构。
### 3.2 find (Fast Filter)
* **场景**: 效率优化。不要一开始就 `grep` 全文。
* **原理**: 只遍历 Git Tree 对象(元数据),**不解压 Blob**。速度极快。
### 3.3 grep (Content Search)
* **场景**: RAG (Retrieval) 环节。
* **优化**: 接受 `find` 的输出作为 `paths` 参数,避免全盘扫描。
* **并行**: 利用 Rayon 并行读取 Blob 并匹配。
### 3.4 cat (Assemble)
* **场景**: 组装 Prompt Context。
* **格式**: 使用 XML tags 包裹,明确文件边界。
### 3.5 patch (Atomic Update)
* **场景**: 修正笔误或更新局部数据。
* **逻辑**:
1. 读取文件内容。
2. 查找 `original` 字符串。
3. **Fail Fast**: 如果找不到,或者找到多处(歧义),直接报错,防止改错位置。
4. 执行替换并在内存中生成新 Blob更新 Index。
* **优势**: 相比 `write_file`,它不需要 Worker 回传整个文件内容,节省网络传输和 Token。
## 4. Tool Definition Schema (AI Configuration)
为了让 LLM 能够直接使用这些工具Runtime 将提供一个方法导出标准的 JSON Schema (OpenAI Compatible)。
```rust
impl WorkerContext {
pub fn get_tool_definitions() -> serde_json::Value {
serde_json::json!([
{
"type": "function",
"function": {
"name": "tree",
"description": "List directory structure. Use this first to understand the file layout.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Root path to list (default: root)" },
"depth": { "type": "integer", "description": "Recursion depth" }
}
}
}
},
{
"type": "function",
"function": {
"name": "patch",
"description": "Replace a specific text block in a file. Use this for small corrections.",
"parameters": {
"type": "object",
"required": ["path", "original", "replacement"],
"properties": {
"path": { "type": "string" },
"original": { "type": "string", "description": "Exact text to look for. Must be unique in file." },
"replacement": { "type": "string", "description": "New text to insert." }
}
}
}
}
// ... defined for grep, cat, find, etc.
])
}
}
```
## 5. 高效工作流示例 (The Funnel Pattern)
采用了类似 "漏斗" 的筛选机制,层层递进,效率最高。
```rust
// 1. 全局感知 (Tree)
let structure = ctx.tree("/", Some(2), OutputFormat::Text)?;
// 2. 快速定位 (Find)
let json_files = ctx.find("**/*.json", FindOptions::default())?;
// 3. 精确检索 (Grep)
let matches = ctx.grep("NetProfit", Some(json_files.map(|f| f.path)))?;
// 4. 局部修正 (Patch)
// 假设我们在 matches 中发现了一个拼写错误 "NetProft"
ctx.patch("data/financials.json", "\"NetProft\":", "\"NetProfit\":")?;
// 5. 组装阅读 (Cat)
let context = ctx.cat(&distinct_files)?;
```