SignaLR案例:导入英汉词典

需求

  1. 英汉词典ECDICT中导入单词到数据库。
  2. T_Worditems:Id(主键)、Word(单词)、Phonetic(音标)、Definition(英文解释)、Translation(中文翻译)


下载地址:https://github.com/skywind3000/ECDICT

CSV(Comma-Separated Values)是一种常用的文本文件格式,用于存储和交换表格数据。以下是关于 CSV 格式的详细介绍:

特点

简单性:CSV 格式非常简单,它以纯文本形式存储数据,每行代表一条记录,字段之间用逗号分隔。这种简单的结构使得 CSV 文件易于阅读、编写和解析,几乎所有的文本编辑器和电子表格软件都能处理 CSV 文件。
通用性:CSV 是一种通用的格式,可被多种应用程序识别和导入,如电子表格软件(如 Microsoft Excel、Google Sheets)、数据库管理系统、数据分析工具等。这使得它成为不同系统之间数据交换的常用格式。

数据表示

记录:CSV 文件中的每一行都是一条记录,代表一个数据实体。例如,在一个学生信息表中,每一行可能代表一个学生的信息。
字段:每行中的字段用逗号分隔。例如,在学生信息表中,可能有姓名、年龄、性别、成绩等字段,它们之间用逗号隔开。如果字段值本身包含逗号或其他特殊字符,通常会用双引号将字段值括起来。

示例

以下是一个简单的 CSV 文件示例,包含了三个学生的信息:

1
2
3
4
姓名,年龄,性别,成绩
张三,20,男,85
李四,21,女,90
王五,20,男,88

应用场景

数据导出和导入:常用于从数据库中导出数据或将数据导入到数据库中。也可用于在不同的软件应用程序之间交换数据,例如将电子表格中的数据分享给其他用户或导入到数据分析工具中进行处理。
数据存储:对于一些简单的数据集,CSV 格式是一种方便的存储方式。它占用的空间相对较小,且易于管理和备份。

日志记录:一些应用程序会将日志信息以 CSV 格式保存,以便后续分析和处理。例如,网站服务器可能会将访问日志记录为 CSV 文件,包含访问时间、访问者 IP 地址、请求页面等信息。

开始

  1. ImportExector中注入IHubContext<ImportDictHub>等服务。
  2. 暂时用字符串Split解析CSV,或者用更专业的库。
  3. 用SqlBulkCopy进行分批快速导入。

第一步,创建ASP.NET Core Web API。创建一个执行数据导入的类ImportExeccutor

通过connectionId来通知前端传输的进度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
{
public class ImportExecutor
{
//IHubContext<ImportDictHub>:用于与客户端实时通信,更新导入进度
private readonly IHubContext<ImportDictHub> hubContext;
//ILogger<ImportExecutor>:记录日志,便于调试和监控
private readonly ILogger<ImportExecutor> logger;

public ImportExecutor(IHubContext<ImportDictHub> hubContext, ILogger<ImportExecutor> logger)
{
this.hubContext = hubContext;
this.logger = logger;
}

public async Task ExecuteAsync(string connectionId)
{
//异步读取 CSV 文件的所有行
string[] lines = await File.ReadAllLinesAsync(@"E:\temp\stardict.csv");
int totalCount = lines.Length - 1;//总行数:跳过表头
//硬编码,连接数据库
string connStr = "Server=localhost;Database=T_youxianyu;Trusted_Connection=True;MultipleActiveResultSets=true;TrustServerCertificate=True;Encrypt=True;";
//SqlBulkCopy:高效地将大量数据批量插入 SQL Serve
using SqlBulkCopy bulkCopy = new SqlBulkCopy(connStr);
//列映射:将 CSV 列映射到数据库表的列
bulkCopy.DestinationTableName = "T_Worditems";
bulkCopy.ColumnMappings.Add("Word", "Word");
bulkCopy.ColumnMappings.Add("Phonetic", "Phonetic");
bulkCopy.ColumnMappings.Add("Definition", "Definition");
bulkCopy.ColumnMappings.Add("Translation", "Translation");

int counter = 0;
//DataTable:临时存储 CSV 数据,用于批量导入
using DataTable dataTable = new DataTable();
dataTable.Columns.Add("Word");
dataTable.Columns.Add("Phonetic");
dataTable.Columns.Add("Definition");
dataTable.Columns.Add("Translation");
foreach(var line in lines)
{
string[] str = line.Split(",");
//数据验证:检查每行是否包含足够的字段
string word = str[0];
string? phonetic = str[1];
string? definition = str[2];
string? translation = str[3];
//创建 DataRow:将 CSV 数据转换为 DataTable 的行
DataRow row = dataTable.NewRow();
row["Word"] = word;
row["Phonetic"] = phonetic;
row["Definition"] = definition;
row["Translation"] = translation;
dataTable.Rows.Add(row);
counter++;
Console.WriteLine($"已加载{counter}");
if(dataTable.Rows.Count == 100)
{
await bulkCopy.WriteToServerAsync(dataTable);
//批量写入:当 DataTable 积累到 100 行时,执行一次批量导入
dataTable.Clear();
}
//进度通知:每处理 100 条记录或处理完所有记录时,通过 SignalR 通知客户端
await hubContext.Clients.Client(connectionId).SendAsync("ImportProgress",totalCount,counter);
}
//处理剩余数据:确保最后一批不足 100 条的数据也被导入
await bulkCopy.WriteToServerAsync(dataTable);
//最终进度通知:确保客户端收到 100% 完成的通知
await hubContext.Clients.Client(connectionId).SendAsync("ImportProgress", totalCount, counter);
}
}
}
/*public class ImportExecutor
{
private readonly IHubContext<ImportDictHub> hubContext;
private readonly ILogger<ImportExecutor> logger;
private readonly IConfiguration configuration;

public ImportExecutor(IHubContext<ImportDictHub> hubContext, ILogger<ImportExecutor> logger, IConfiguration configuration)
{
this.hubContext = hubContext;
this.logger = logger;
this.configuration = configuration;
}

public async Task ExecuteAsync(string connectionId)
{
try
{
string filePath = @"E:\temp\stardict.csv";
string[] lines = await File.ReadAllLinesAsync(filePath);

if (lines.Length <= 1) // 检查是否有数据行
{
await hubContext.Clients.Client(connectionId).SendAsync("ImportProgress", 0, 0);
return;
}

int totalCount = lines.Length - 1; // 总行数:跳过表头
string connStr = configuration.GetConnectionString("DefaultConnection");

using (var bulkCopy = new SqlBulkCopy(connStr))
{
bulkCopy.DestinationTableName = "T_Worditems";
bulkCopy.ColumnMappings.Add("Word", "Word");
bulkCopy.ColumnMappings.Add("Phonetic", "Phonetic");
bulkCopy.ColumnMappings.Add("Definition", "Definition");
bulkCopy.ColumnMappings.Add("Translation", "Translation");

using (DataTable dataTable = new DataTable())
{
dataTable.Columns.Add("Word");
dataTable.Columns.Add("Phonetic");
dataTable.Columns.Add("Definition");
dataTable.Columns.Add("Translation");

int counter = 0;
int progressCounter = 0;
const int BatchSize = 100;
const int ProgressUpdateInterval = 100; // 每处理100条记录更新一次进度

// 跳过表头行
for (int i = 1; i < lines.Length; i++)
{
string line = lines[i];
string[] str = line.Split(',');

if (str.Length < 4) // 确保数据行有足够的字段
{
logger.LogWarning($"第 {i} 行格式不正确: {line}");
continue;
}

string word = str[0];
string phonetic = str[1];
string definition = str[2];
string translation = str[3];

DataRow row = dataTable.NewRow();
row["Word"] = word;
row["Phonetic"] = phonetic;
row["Definition"] = definition;
row["Translation"] = translation;
dataTable.Rows.Add(row);
counter++;

if (dataTable.Rows.Count == BatchSize)
{
await bulkCopy.WriteToServerAsync(dataTable);
dataTable.Clear();
}

// 每处理ProgressUpdateInterval条记录发送一次进度更新
if (counter % ProgressUpdateInterval == 0 || counter == totalCount)
{
await hubContext.Clients.Client(connectionId).SendAsync("ImportProgress", totalCount, counter);
}
}

// 写入剩余的数据
if (dataTable.Rows.Count > 0)
{
await bulkCopy.WriteToServerAsync(dataTable);
}

// 确保发送最终进度
await hubContext.Clients.Client(connectionId).SendAsync("ImportProgress", totalCount, counter);
}
}
}
catch (Exception ex)
{
logger.LogError(ex, "导入过程中发生错误");
await hubContext.Clients.Client(connectionId).SendAsync("ImportError", ex.Message);
}
}
}*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ImportDictHub:Hub
{
//依赖注入:通过构造函数注入 ImportExecutor,该服务负责实际的 CSV 导入操作
private readonly ImportExecutor importExecutor;

public ImportDictHub(ImportExecutor importExecutor)
{
this.importExecutor = importExecutor;
}
public Task ImportEcdict()
{
//启动字典数据的导入过程
/*this.Context.ConnectionId:获取当前客户端连接的唯一标识符
importExecutor.ExecuteAsync(...):调用导入执行器的异步方法开始导入
_ = ...:使用弃元 (discard) 忽略返回的任务,表示不等待导入完成就返回
return Task.CompletedTask:立即返回已完成的任务,让客户端知道请求已收到*/
_ = importExecutor.ExecuteAsync(this.Context.ConnectionId);
return Task.CompletedTask;
}
}
  1. 异步非阻塞设计
1
_ = importExecutor.ExecuteAsync(this.Context.ConnectionId);

为什么这样设计?
导入操作可能需要很长时间(几秒到几分钟不等)
如果等待导入完成再返回,会阻塞 SignalR 连接,影响其他客户端通信
采用 “触发并忘记”(fire-and-forget) 模式,让导入在后台执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
builder.Services.AddSignalR();

string[] urls = new[] { "" };
builder.Services.AddCors(options =>
options.AddDefaultPolicy(builder => builder
.WithOrigins(urls)
.AllowAnyMethod()
.AllowAnyHeader()
.AllowCredentials()));

builder.Services.AddScoped<ImportExecutor>();

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();
app.MapHub<ImportDictHub>("/ImportDictHub");
app.MapControllers();

前端Vue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<template>
<div>
<input type="button" value="导入" v-on:click="importECDict"/>
<progress :value="state.current" :max="state.total"></progress>
</div>
</template>

<script>
import { reactive, onMounted } from 'vue';
import * as signalR from '@microsoft/signalr';
let connection;
export default {
name: 'Login',
setup() {
const state = reactive({ current: 0, total: 0 });
const importECDict = async function () {
await connection.invoke("ImportEcDict");
alert("启动导入");
};
onMounted(async function () {
var options = { skipNegotiation: true, transport: signalR.HttpTransportType.WebSockets };
connection = new signalR.HubConnectionBuilder()
.withUrl('https://localhost:7068/ImportDictHub', options)
.withAutomaticReconnect()
.build();
await connection.start();
connection.on('ImportProgress', (total, current) => {
state.total = total;
state.current = current;
});
});
return { state, importECDict };
}
}
</script>