SseAlchemyHelper.cs 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. using Microsoft.AspNetCore.Http.Features;
  2. using Newtonsoft.Json;
  3. using Newtonsoft.Json.Serialization;
  4. /// <summary>
  5. /// SSE 流式数据助手类,提供初始化 SSE、发送数据包和结束流的方法
  6. /// </summary>
  7. public static class SseAlchemyHelper
  8. {
  9. private static readonly JsonSerializerSettings _jsonSettings = new()
  10. {
  11. ContractResolver = new CamelCasePropertyNamesContractResolver(),
  12. DateTimeZoneHandling = DateTimeZoneHandling.Local,
  13. NullValueHandling = NullValueHandling.Ignore
  14. };
  15. /// <summary>
  16. /// 初始化 SSE
  17. /// </summary>
  18. public static void InitializeSse(this HttpContext context)
  19. {
  20. var syncIOFeature = context.Features.Get<IHttpBodyControlFeature>();
  21. if (syncIOFeature != null) syncIOFeature.AllowSynchronousIO = true;
  22. var response = context.Response;
  23. response.Headers.Append("Content-Type", "text/event-stream");
  24. response.Headers.Append("Cache-Control", "no-cache");
  25. response.Headers.Append("Connection", "keep-alive");
  26. response.Headers.Append("X-Accel-Buffering", "no");
  27. }
  28. /// <summary>
  29. /// 发送流数据包
  30. /// </summary>
  31. public static async Task SendSseStepAsync(
  32. this HttpContext context,
  33. int progress,
  34. string message,
  35. object? data = null)
  36. {
  37. // 检查客户端是否已断开连接
  38. if (context.RequestAborted.IsCancellationRequested) return;
  39. var payload = JsonConvert.SerializeObject(new { progress, message, data }, _jsonSettings);
  40. string sseFormattedData = $"data: {payload}\n\n";
  41. byte[] bytes = Encoding.UTF8.GetBytes(sseFormattedData);
  42. try
  43. {
  44. // 写入并立即冲刷到客户端
  45. await context.Response.Body.WriteAsync(bytes, 0, bytes.Length, context.RequestAborted);
  46. await context.Response.Body.FlushAsync(context.RequestAborted);
  47. }
  48. catch (OperationCanceledException) { /* 客户端取消,优雅退出 */ }
  49. catch (Exception) { /* 忽略其他写入异常 */ }
  50. }
  51. /// <summary>
  52. /// 结束 SSE 流
  53. /// </summary>
  54. public static async Task FinalizeSseAsync(this HttpContext context)
  55. {
  56. if (!context.RequestAborted.IsCancellationRequested)
  57. {
  58. await context.Response.Body.FlushAsync();
  59. }
  60. }
  61. }