在对数据库中进行数据处理的时候,可能会有并发问题。因为一般数据库的 UPDATE、INSERT 等简单语句并不是原子的,当大量数据库连接同时进行数据更新或插入的时候,可能会发生冲突。
在实际开发中,应该将并发问题在客户端或服务端解决,尽量不要将并发问题引入数据库层面。但如果真的要对数据库进行并发控制,在 EFCore 也是有办法的。EFCore 的并发控制分为悲观、乐观两种策略,分别对应悲观锁和乐观锁。
悲观并发控制即对数据加锁,主要有行锁、表锁等。加锁确保同一时间只有一个连接可以对数据资源进行处理。不同数据库支持在 SQL 脚本中增加锁,且不同数据库的语法不一样,EFCore 本身没有封装数据库悲观锁的使用,所以只能通过程序员手动编写 SQL 脚本来实现。
以 MySQL 为例,通过 SELECT FOR UPDATE 语法可以在选择一行数据的时候加上行锁,Book 的 Sold 属性表示本书是否已经出售:
using(MyDbContext ctx = new MyDbContext()) using(var ts = ctx.Database.BeginTransaction()) { Console.WriteLine("WAITING " + DateTime.Now); var book = ctx.Books.FromSqlInterpolated<Book> (@$"SELECT * FROM T_Books WHERE Id = 1 FOR UPDATE").First(); if (book.Sold) { Console.WriteLine("!!!"); Console.ReadKey(); return; } Console.WriteLine("SELECTED " + DateTime.Now); Thread.Sleep(10000); book.Sold = true; ctx.SaveChanges(); Console.WriteLine("SAVED " + DateTime.Now); ts.Commit(); Console.ReadKey(); }
由于锁需要使用到事务,所以需要用 DbContext.Database.BeginTransaction 方法创建一个事务,然后在所有操作结束后使用 Commit 方法提交事务,这里注意必须在 SaveChanges 后提交,否则锁是无效的。
加上悲观锁后,每一个程序实例运行到 FromSqlInterpolated 这一行时都要先看看锁是否占用,如果有那么则一直等待,直到锁释放。这样的悲观策略对于并发性能的影响是很大的,如果请求很多,那么就需要一直排队,效率就非常低了,并且还可能有死锁问题。
一个更推荐的策略是乐观锁,乐观锁使用 CAS(Compare And Swap)方法,CAS 即通过并发令牌(Concurrency Token)来判断数据是否已经被修过:当程序查询数据时,会通过记录并发令牌来记录当前数据的状态,在程序写入数据时,会将记录的状态与当前的状态对比,如果相同则说明中途没有其他程序修改过数据,那么写入是安全的,如果不同则说明发生了并发冲突。
从这里可以看出,必须保证并发令牌可以唯一的表示数据状态,在 EFCore 中,使用 PropertyBuilder.IsConcurrencyToken 方法来配置并发令牌:
public void Configure(EntityTypeBuilder<Book> builder) { builder.ToTable("T_Books"); builder.Property(e => e.Title).HasMaxLength(50).IsRequired(); builder.HasOne<Author>(e => e.Author).WithMany(e => e.Books).IsRequired(); builder.Property(e => e.Sold).IsConcurrencyToken(); }
之后就可以正常进行查询了,如果出现并发冲突会抛出DbUpdateConcurrencyException 异常:
using(MyDbContext ctx = new MyDbContext()) { Console.WriteLine("WAITING " + DateTime.Now); var book = ctx.Books.First(); if (book.Sold) { Console.WriteLine("!!!"); Console.ReadKey(); return; } Console.WriteLine("SELECTED " + DateTime.Now); Thread.Sleep(10000); book.Sold = true; ctx.SaveChanges(); Console.WriteLine("SAVED " + DateTime.Now); Console.ReadKey(); }
这里需要注意的是,乐观并发控制在查询时是不等待的,只有在写入时才会检查是否发生并发冲突。这样的好处在于对于并发性能的影响比较小,所以乐观锁一般比悲观锁更好。但如果冲突发生的概率太大的话,乐观锁就会不断地重试,此时处理成本可能会比悲观锁还高。
上面是将一个现有属性设置为并发令牌,这样做可能会有 ABA 问题,或有时你无法给出一个属性可以作为并发令牌的话,就需要引入一个新的属性。
在 EFCore,可以借用属性的 IsRowVersion 配置,如果一个属性被配置为 RowVersion,那么 EFCore 会自动将它设置为并发令牌,并且在对应的数据库分配特殊的时间戳类型给它(在 SQLServer,该类型是 rowversion 或 timestamp),每次插入或更新该行数据时,EFCore 会自动为 RowVersion 列生成一个新的值。
以 SQLServer 为例,在 C# 代码中只需将此属性设置为 byte[] 类型,然后通过 PropertyBuilder.IsRowVersion 方法配置即可:
builder.Property(e => e.Version).IsRowVersion();
不过,取决于数据库,RowVersion 映射的数据类型是不同的。在 SQLServer 中,可以确保 rowversion 或 timestamp 类型的精度足够以应付高并发情况,但在 MySQL 中,可能会出现 timestamp 类型精度不足,导致在高并发下依旧会发生冲突。
本质上 RowVersion 和 ConcurrencyToken 是一样的,都是乐观锁用于判断数据状态的列而已。区别在于 ConcurrencyToken 一般可以使用任何类型的属性,包括现有的属性,它不会自动更新,只能被手动修改,并且可能为空。而 RowVersion 一般只能使用时间戳类型,它会在数据变更的时候自动更新自身,不需要手动修改,并且一般永不为空。
如果 RowVersion 不能满足需求(例如精度不够的情况下),那么也可以使用 Guid 类型的 ConcurrencyToken,只要在每次修改数据的时候手动生成一个新的 Guid 即可。
在某些时候我们可能需要一个 RowVersion,却不想将它作为并发令牌,这个时候就可以这么配置:
builder.Property(e => e.Version).IsRowVersion().IsConcurrencyToken(false);
这样的话,虽然每次修改都会自动更新 RowVersion 的值,但不会在查询的时候进行并发对比,此时的 RowVersion 仅仅是一列会自动更新的普通字段而已。
接下来的问题是,使用乐观锁时,在发生并发冲突后,EFCore 会抛出 DbUpdateConcurrencyException 异常,这个时候我们可能需要进行冲突处理:
using(MyDbContext ctx = new MyDbContext()) { bool SaveFailed; do { SaveFailed = false; Console.WriteLine("WAITING " + DateTime.Now); var book = ctx.Books.First(); if (book.Sold) { Console.WriteLine("!!! " + book.Price); Console.ReadKey(); return; } Console.WriteLine("SELECTED " + DateTime.Now); Thread.Sleep(10000); //便于调试 book.Sold = true; try { ctx.SaveChanges(); } catch (DbUpdateConcurrencyException ex) { Console.WriteLine("ThrowException " + DateTime.Now); SaveFailed = true; ex.Entries.Single().Reload(); } } while (SaveFailed); Console.WriteLine("SAVED " + DateTime.Now); Console.ReadKey(); }
在上面的代码中,使用 try-catch 捕获到异常之后,我们通过 DbUpdateConcurrencyException.Entries 获取发生冲突异常的这些实体对象,然后通过 Reload 方法从数据库重新读取它们的值,这样才能在下次重试中使用新的状态。由于这里发生异常的实体对象只有一个,所以使用了 Entries.Single 方法,如果更新多个实体对象的时候发生了异常冲突,我们可能需要对 Entries 的每一个实体对象都执行 Reload 方法。
注意到上面处理异常的方式是从数据库中读取新的值(Database Wins)。有时候我们可能需要使用用户输入的值来重写(Client Wins),那么应该执行 Entry.OriginalValues.SetValues 方法:
catch (DbUpdateConcurrencyException ex) { Console.WriteLine("ThrowException " + DateTime.Now); SaveFailed = true; foreach(var e in ex.Entries) { e.OriginalValues.SetValues(e.CurrentValues); } }
这个时候,下次重试时,实体对象储存的值就是上一次用户输入的保存失败的值。无论是 Database Wins 还是 Client Wins,最终影响的只是实体对象储存的值,并不会影响数据库的数据,在重试的时候还是得通过 SaveChanges 来更新数据库。