#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 正确的DWG线条提取脚本 基于调试结果,使用正确的API提取线条数据 """ import os import sys import csv import json from pathlib import Path import math def print_separator(title=""): """打印分隔线""" print("=" * 60) if title: print(f" {title} ") print("=" * 60) def extract_lines_correctly(dwg_path): """正确提取DWG文件中的线条""" print_separator("正确提取DWG文件中的线条") if not os.path.exists(dwg_path): print(f"文件不存在: {dwg_path}") return None try: import aspose.cad as cad from aspose.cad.fileformats.cad import CadImage from aspose.cad.fileformats.cad.cadobjects import CadLine # 获取文件基本信息 file_size = os.path.getsize(dwg_path) print(f"文件路径: {dwg_path}") print(f"文件大小: {file_size:,} 字节 ({file_size/1024/1024:.2f} MB)") # 使用CadImage.load加载文件 print("正在使用CadImage.load加载DWG文件...") cad_image = CadImage.load(dwg_path) print(f"成功加载DWG文件") print(f"图像类型: {type(cad_image).__name__}") print(f"图像尺寸: {cad_image.width} x {cad_image.height}") lines_data = [] total_lines = 0 # 方法1:尝试从cad_image的blocks获取 print("\n方法1:尝试从blocks获取实体") try: if hasattr(cad_image, 'blocks'): blocks = cad_image.blocks print(f"找到blocks属性: {type(blocks).__name__}") if hasattr(blocks, '__len__'): print(f"块数量: {len(blocks)}") for block_index, block in enumerate(blocks): print(f" 分析块 {block_index}: {type(block).__name__}") if hasattr(block, 'entities'): block_entities = block.entities if hasattr(block_entities, '__len__'): print(f" 块中实体数量: {len(block_entities)}") for entity_index, entity in enumerate(block_entities): entity_type = type(entity).__name__ print(f" 实体 {entity_index}: {entity_type}") if entity_type == 'CadLine': line_info = extract_cad_line_info(entity, total_lines, block_index, entity_index) if line_info: lines_data.append(line_info) total_lines += 1 else: print("cad_image没有blocks属性") except Exception as e: print(f"从blocks获取实体失败: {e}") # 方法2:尝试从cad_image的entities获取 print("\n方法2:尝试从entities获取实体") try: if hasattr(cad_image, 'entities'): entities = cad_image.entities print(f"找到entities属性: {type(entities).__name__}") if hasattr(entities, '__len__'): print(f"实体数量: {len(entities)}") for entity_index, entity in enumerate(entities): entity_type = type(entity).__name__ print(f" 实体 {entity_index}: {entity_type}") if entity_type == 'CadLine': line_info = extract_cad_line_info(entity, total_lines, -1, entity_index) if line_info: lines_data.append(line_info) total_lines += 1 else: print("cad_image没有entities属性") except Exception as e: print(f"从entities获取实体失败: {e}") # 方法3:尝试从cad_image的layers获取 print("\n方法3:尝试从layers获取实体") try: if hasattr(cad_image, 'layers'): layers = cad_image.layers print(f"找到layers属性: {type(layers).__name__}") if hasattr(layers, '__len__'): print(f"图层数量: {len(layers)}") for layer_index, layer in enumerate(layers): print(f" 分析图层 {layer_index}: {type(layer).__name__}") if hasattr(layer, 'entities'): layer_entities = layer.entities if hasattr(layer_entities, '__len__'): print(f" 图层中实体数量: {len(layer_entities)}") for entity_index, entity in enumerate(layer_entities): entity_type = type(entity).__name__ print(f" 实体 {entity_index}: {entity_type}") if entity_type == 'CadLine': line_info = extract_cad_line_info(entity, total_lines, layer_index, entity_index) if line_info: lines_data.append(line_info) total_lines += 1 else: print("cad_image没有layers属性") except Exception as e: print(f"从layers获取实体失败: {e}") # 方法4:尝试遍历cad_image的所有属性 print("\n方法4:遍历cad_image的所有属性") try: all_attrs = [attr for attr in dir(cad_image) if not attr.startswith('_')] print(f"cad_image共有 {len(all_attrs)} 个属性") # 查找可能包含实体的属性 entity_attrs = [attr for attr in all_attrs if 'entit' in attr.lower() or 'object' in attr.lower() or 'item' in attr.lower()] print(f"可能的实体属性: {entity_attrs}") for attr_name in entity_attrs: try: attr_value = getattr(cad_image, attr_name) if attr_value and hasattr(attr_value, '__len__') and len(attr_value) > 0: print(f" 找到属性 {attr_name}: {type(attr_value).__name__} (长度: {len(attr_value)})") # 尝试遍历这个属性 for i, item in enumerate(attr_value): item_type = type(item).__name__ print(f" {attr_name}[{i}]: {item_type}") if item_type == 'CadLine': line_info = extract_cad_line_info(item, total_lines, -1, i) if line_info: lines_data.append(line_info) total_lines += 1 if i >= 4: # 只显示前5个 break except Exception as e: print(f" 访问属性 {attr_name} 失败: {e}") except Exception as e: print(f"遍历属性失败: {e}") print(f"\n成功提取 {total_lines} 条线条") return lines_data except Exception as e: print(f"提取线条失败: {e}") import traceback traceback.print_exc() return None def extract_cad_line_info(entity, line_index, parent_index, entity_index): """提取CadLine实体的详细信息""" try: line_info = { 'line_index': line_index, 'parent_index': parent_index, 'entity_index': entity_index, 'entity_type': 'CadLine', 'layer_name': 'Unknown', 'color': 'Unknown', 'line_type': 'Unknown', 'thickness': 0, 'visible': True, } # 尝试获取基本属性 try: if hasattr(entity, 'layer_name'): line_info['layer_name'] = str(getattr(entity, 'layer_name', 'Unknown')) if hasattr(entity, 'color_name'): line_info['color'] = str(getattr(entity, 'color_name', 'Unknown')) if hasattr(entity, 'line_type_name'): line_info['line_type'] = str(getattr(entity, 'line_type_name', 'Unknown')) if hasattr(entity, 'thickness'): line_info['thickness'] = float(getattr(entity, 'thickness', 0)) if hasattr(entity, 'visible'): line_info['visible'] = bool(getattr(entity, 'visible', True)) except Exception as e: print(f" 获取基本属性失败: {e}") # 尝试获取坐标信息 try: if hasattr(entity, 'first_point'): first_point = entity.first_point line_info['start_x'] = float(getattr(first_point, 'x', 0)) line_info['start_y'] = float(getattr(first_point, 'y', 0)) line_info['start_z'] = float(getattr(first_point, 'z', 0)) if hasattr(entity, 'second_point'): second_point = entity.second_point line_info['end_x'] = float(getattr(second_point, 'x', 0)) line_info['end_y'] = float(getattr(second_point, 'y', 0)) line_info['end_z'] = float(getattr(second_point, 'z', 0)) # 计算线条长度 if 'start_x' in line_info and 'end_x' in line_info: dx = line_info['end_x'] - line_info['start_x'] dy = line_info['end_y'] - line_info['start_y'] dz = line_info['end_z'] - line_info['start_z'] length = math.sqrt(dx*dx + dy*dy + dz*dz) line_info['length'] = round(length, 4) # 计算角度 if 'start_x' in line_info and 'end_x' in line_info: dx = line_info['end_x'] - line_info['start_x'] dy = line_info['end_y'] - line_info['start_y'] if dx != 0 or dy != 0: angle = math.atan2(dy, dx) * 180 / math.pi line_info['angle_degrees'] = round(angle, 2) except Exception as e: print(f" 获取坐标信息失败: {e}") # 尝试获取其他属性 try: if hasattr(entity, 'object_handle'): line_info['handle'] = str(getattr(entity, 'object_handle', '')) if hasattr(entity, 'id'): line_info['id'] = str(getattr(entity, 'id', '')) if hasattr(entity, 'length'): line_info['cad_length'] = float(getattr(entity, 'length', 0)) except Exception as e: print(f" 获取其他属性失败: {e}") return line_info except Exception as e: print(f"提取线条 {line_index} 信息时出错: {e}") return None def save_lines_to_csv(lines_data, output_path): """将线条数据保存为CSV文件""" if not lines_data: print("没有线条数据可保存") return False try: with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile: # 获取所有可能的字段名 all_fields = set() for line in lines_data: all_fields.update(line.keys()) fieldnames = sorted(list(all_fields)) writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for line in lines_data: # 确保所有字段都有值 row = {} for field in fieldnames: row[field] = line.get(field, '') writer.writerow(row) print(f"线条数据已保存到: {output_path}") return True except Exception as e: print(f"保存CSV文件失败: {e}") return False def save_lines_to_json(lines_data, output_path): """将线条数据保存为JSON文件""" if not lines_data: print("没有线条数据可保存") return False try: with open(output_path, 'w', encoding='utf-8') as jsonfile: json.dump(lines_data, jsonfile, ensure_ascii=False, indent=2) print(f"线条数据已保存到: {output_path}") return True except Exception as e: print(f"保存JSON文件失败: {e}") return False def create_summary_report(lines_data, output_path): """创建线条统计报告""" if not lines_data: return False try: with open(output_path, 'w', encoding='utf-8') as report: report.write("DWG文件线条提取报告\n") report.write("=" * 50 + "\n\n") # 基本统计 total_lines = len(lines_data) report.write(f"总线条数量: {total_lines}\n\n") # 按类型统计 type_count = {} layer_count = {} color_count = {} for line in lines_data: # 类型统计 entity_type = line.get('entity_type', 'Unknown') type_count[entity_type] = type_count.get(entity_type, 0) + 1 # 图层统计 layer = line.get('layer_name', 'Unknown') layer_count[layer] = layer_count.get(layer, 0) + 1 # 颜色统计 color = line.get('color', 'Unknown') color_count[color] = color_count.get(color, 0) + 1 # 写入统计信息 report.write("按类型统计:\n") for entity_type, count in sorted(type_count.items()): report.write(f" {entity_type}: {count}\n") report.write("\n按图层统计:\n") for layer, count in sorted(layer_count.items()): report.write(f" {layer}: {count}\n") report.write("\n按颜色统计:\n") for color, count in sorted(color_count.items()): report.write(f" {color}: {count}\n") # 长度统计 lengths = [line.get('length', 0) for line in lines_data if 'length' in line and line['length'] > 0] if lengths: report.write(f"\n长度统计:\n") report.write(f" 最短线条: {min(lengths):.4f}\n") report.write(f" 最长线条: {max(lengths):.4f}\n") report.write(f" 平均长度: {sum(lengths)/len(lengths):.4f}\n") # 详细数据 report.write(f"\n详细线条数据:\n") for i, line in enumerate(lines_data): report.write(f"\n线条 {i+1}:\n") for key, value in line.items(): report.write(f" {key}: {value}\n") print(f"统计报告已保存到: {output_path}") return True except Exception as e: print(f"创建统计报告失败: {e}") return False def main(): """主函数""" print_separator("正确的DWG线条提取工具") print("基于调试结果,使用正确的API提取线条数据") print() # 查找DWG文件 dwg_files = list(Path('.').glob('*.dwg')) if not dwg_files: print("当前目录下未找到DWG文件") print("请确保DWG文件在当前目录中") return print(f"找到 {len(dwg_files)} 个DWG文件:") for i, dwg_file in enumerate(dwg_files, 1): print(f" {i}. {dwg_file}") # 使用第一个DWG文件进行测试 test_file = str(dwg_files[0]) print(f"\n使用文件进行提取: {test_file}") # 提取线条数据 lines_data = extract_lines_correctly(test_file) if lines_data is None: print("线条提取失败") return # 保存数据 base_name = test_file.replace('.dwg', '') # 保存为CSV csv_path = f"{base_name}_lines_correct.csv" csv_success = save_lines_to_csv(lines_data, csv_path) # 保存为JSON json_path = f"{base_name}_lines_correct.json" json_success = save_lines_to_json(lines_data, json_path) # 创建统计报告 report_path = f"{base_name}_lines_correct_report.txt" report_success = create_summary_report(lines_data, report_path) # 总结 print_separator("提取总结") print(f"提取的线条数量: {len(lines_data)}") print(f"CSV文件保存: {'成功' if csv_success else '失败'}") print(f"JSON文件保存: {'成功' if json_success else '失败'}") print(f"统计报告生成: {'成功' if report_success else '失败'}") if csv_success or json_success: print(f"\n线条提取完成!文件已保存到当前目录") print(f" - CSV文件: {csv_path}") print(f" - JSON文件: {json_path}") print(f" - 统计报告: {report_path}") else: print("\n所有保存操作都失败了") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n用户中断了程序执行") except Exception as e: print(f"\n\n程序执行出错: {e}") import traceback traceback.print_exc()