1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170"""
里程碑3 CLI - 处理训练数据为训练样本
"""
import argparse
from pathlib import Path
import json
from .sample_processor import SampleProcessor
def process_command(args):
"""处理数据集为训练样本"""
processor = SampleProcessor()
print(f"Loading dataset from {args.input}")
processor.load_dataset(args.input)
print(f"Processing sequences with min context length: {args.min_context}")
samples = processor.process_sequences(min_context_length=args.min_context)
print(f"Generated {len(samples)} training samples")
stats = processor.get_statistics()
print("\nStatistics:")
for key, value in stats.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
if args.output:
processor.save_samples(args.output)
if args.verbose:
# 显示上下文长度分布
print("\nContext length distribution:")
dist = processor.analyze_context_distribution()
for length, count in list(dist.items())[:10]:
print(f" Length {length}: {count} samples")
def analyze_command(args):
"""分析训练样本"""
processor = SampleProcessor()
print(f"Loading samples from {args.input}")
processor.load_samples(args.input)
stats = processor.get_statistics()
print("\nDataset Statistics:")
for key, value in stats.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
# 上下文长度分布
print("\nContext Length Distribution (top 10):")
context_dist = processor.analyze_context_distribution()
for length, count in list(context_dist.items())[:10]:
percentage = (count / len(processor.samples)) * 100
print(f" Length {length}: {count} samples ({percentage:.1f}%)")
# 目标分布
print("\nTarget Distribution (top 10):")
target_dist = processor.analyze_target_distribution()
for target_id, count in list(target_dist.items())[:10]:
percentage = (count / len(processor.samples)) * 100
node_type = processor.dataset.type_mapper.id_to_node_type.get(target_id, f"unknown_{target_id}")
print(f" {node_type}: {count} samples ({percentage:.1f}%)")
if args.output:
report = {
'statistics': stats,
'context_distribution': context_dist,
'target_distribution': target_dist
}
with open(args.output, 'w') as f:
json.dump(report, f, indent=2)
print(f"\nReport saved to {args.output}")
def export_command(args):
"""导出样本为不同格式"""
processor = SampleProcessor()
print(f"Loading samples from {args.input}")
processor.load_samples(args.input)
if args.format == 'csv':
import csv
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['context', 'target', 'source_id'])
for sample in processor.samples:
context_str = ','.join(map(str, sample.context))
writer.writerow([context_str, sample.target, sample.source_sequence_id])
print(f"Exported {len(processor.samples)} samples to CSV: {output_path}")
elif args.format == 'numpy':
import numpy as np
# 找到最大上下文长度用于padding
max_length = max(len(s.context) for s in processor.samples)
contexts = []
targets = []
for sample in processor.samples:
# Padding
padded = sample.context + [0] * (max_length - len(sample.context))
contexts.append(padded)
targets.append(sample.target)
X = np.array(contexts)
y = np.array(targets)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
np.savez(output_path, X=X, y=y, max_length=max_length)
print(f"Exported to NumPy format: {output_path}")
print(f" X shape: {X.shape}")
print(f" y shape: {y.shape}")
def main():
parser = argparse.ArgumentParser(description="Milestone 3: Process Training Samples")
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Process command
process_parser = subparsers.add_parser('process', help='Process dataset into training samples')
process_parser.add_argument('input', help='Path to input dataset (from milestone2)')
process_parser.add_argument('--output', '-o', help='Path to save processed samples')
process_parser.add_argument('--min-context', type=int, default=1,
help='Minimum context length (default: 1)')
process_parser.add_argument('--verbose', '-v', action='store_true',
help='Show detailed statistics')
# Analyze command
analyze_parser = subparsers.add_parser('analyze', help='Analyze training samples')
analyze_parser.add_argument('input', help='Path to training samples')
analyze_parser.add_argument('--output', '-o', help='Save analysis report to JSON')
# Export command
export_parser = subparsers.add_parser('export', help='Export samples to different formats')
export_parser.add_argument('input', help='Path to training samples')
export_parser.add_argument('output', help='Output file path')
export_parser.add_argument('--format', choices=['csv', 'numpy'], default='csv',
help='Export format (default: csv)')
args = parser.parse_args()
if args.command == 'process':
process_command(args)
elif args.command == 'analyze':
analyze_command(args)
elif args.command == 'export':
export_command(args)
else:
parser.print_help()
if __name__ == '__main__':
main()