在Java中使用play framewok和akka进行映射诊断上下文记录

前端之家收集整理的这篇文章主要介绍了在Java中使用play framewok和akka进行映射诊断上下文记录前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在尝试mdc日志记录在 java中的所有请求我在这个教程中遵循 Scala并尝试转换为java http://yanns.github.io/blog/2014/05/04/slf4j-mapped-diagnostic-context-mdc-with-play-framework/

但仍然mdc不传播到所有执行上下文.
我使用这个dispathcher作为默认调度程序,但它有很多执行上下文.我需要mdc传播到所有执行上下文

下面是我的java代码

  1. import java.util.Map;
  2.  
  3. import org.slf4j.MDC;
  4.  
  5. import scala.concurrent.ExecutionContext;
  6. import scala.concurrent.duration.Duration;
  7. import scala.concurrent.duration.FiniteDuration;
  8. import akka.dispatch.Dispatcher;
  9. import akka.dispatch.ExecutorServiceFactoryProvider;
  10. import akka.dispatch.MessageDispatcherConfigurator;
  11.  
  12. public class MDCPropagatingDispatcher extends Dispatcher {
  13. public MDCPropagatingDispatcher(
  14. MessageDispatcherConfigurator _configurator,String id,int throughput,Duration throughputDeadlineTime,ExecutorServiceFactoryProvider executorServiceFactoryProvider,FiniteDuration shutdownTimeout) {
  15. super(_configurator,id,throughput,throughputDeadlineTime,executorServiceFactoryProvider,shutdownTimeout);
  16.  
  17. }
  18.  
  19. @Override
  20. public ExecutionContext prepare() {
  21. final Map<String,String> mdcContext = MDC.getCopyOfContextMap();
  22. return new ExecutionContext() {
  23.  
  24. @Override
  25. public void execute(Runnable r) {
  26. Map<String,String> oldMDCContext = MDC.getCopyOfContextMap();
  27. setContextMap(mdcContext);
  28. try {
  29. r.run();
  30. } finally {
  31. setContextMap(oldMDCContext);
  32. }
  33. }
  34.  
  35. @Override
  36. public ExecutionContext prepare() {
  37. return this;
  38. }
  39.  
  40. @Override
  41. public void reportFailure(Throwable t) {
  42. play.Logger.info("error occured in dispatcher");
  43. }
  44.  
  45. };
  46. }
  47.  
  48. private void setContextMap(Map<String,String> context) {
  49. if (context == null) {
  50. MDC.clear();
  51. } else {
  52. play.Logger.info("set context "+ context.toString());
  53. MDC.setContextMap(context);
  54. }
  55. }
  56. }
  57.  
  58.  
  59.  
  60. import java.util.concurrent.TimeUnit;
  61.  
  62. import scala.concurrent.duration.Duration;
  63. import scala.concurrent.duration.FiniteDuration;
  64.  
  65. import com.typesafe.config.Config;
  66.  
  67. import akka.dispatch.DispatcherPrerequisites;
  68. import akka.dispatch.MessageDispatcher;
  69. import akka.dispatch.MessageDispatcherConfigurator;
  70.  
  71. public class MDCPropagatingDispatcherConfigurator extends
  72. MessageDispatcherConfigurator {
  73. private MessageDispatcher instance;
  74.  
  75. public MDCPropagatingDispatcherConfigurator(Config config,DispatcherPrerequisites prerequisites) {
  76. super(config,prerequisites);
  77. Duration throughputDeadlineTime = new FiniteDuration(-1,TimeUnit.MILLISECONDS);
  78. FiniteDuration shutDownDuration = new FiniteDuration(1,TimeUnit.MILLISECONDS);
  79. instance = new MDCPropagatingDispatcher(this,"play.akka.actor.contexts.play-filter-context",100,configureExecutor(),shutDownDuration);
  80. }
  81.  
  82. public MessageDispatcher dispatcher() {
  83. return instance;
  84. }
  85.  
  86. }

过滤拦截

  1. public class MdcLogFilter implements EssentialFilter {
  2. @Override
  3. public EssentialAction apply(final EssentialAction next) {
  4. return new MdcLogAction() {
  5. @Override
  6. public Iteratee<byte[],SimpleResult> apply(
  7. final RequestHeader requestHeader) {
  8. final String uuid = Utils.generateRandomUUID();
  9. MDC.put("uuid",uuid);
  10. play.Logger.info("request started"+uuid);
  11. final ExecutionContext playFilterContext = Akka.system()
  12. .dispatchers()
  13. .lookup("play.akka.actor.contexts.play-custom-filter-context");
  14. return next.apply(requestHeader).map(
  15. new AbstractFunction1<SimpleResult,SimpleResult>() {
  16. @Override
  17. public SimpleResult apply(SimpleResult simpleResult) {
  18. play.Logger.info("request ended"+uuid);
  19. MDC.remove("uuid");
  20. return simpleResult;
  21. }
  22. },playFilterContext);
  23.  
  24. }
  25.  
  26. @Override
  27. public EssentialAction apply() {
  28. return next.apply();
  29. }
  30. };
  31. }

}

解决方法

以下是我的解决方案,在现实生活中证明.它在Scala,而不是Play,而是Scalatra,但其基本概念是一样的.希望您能够找出如何将其移植到Java.
  1. import org.slf4j.MDC
  2. import java.util.{Map => JMap}
  3. import scala.concurrent.{ExecutionContextExecutor,ExecutionContext}
  4.  
  5. object MDCHttpExecutionContext {
  6.  
  7. def fromExecutionContextWithCurrentMDC(delegate: ExecutionContext): ExecutionContextExecutor =
  8. new MDCHttpExecutionContext(MDC.getCopyOfContextMap(),delegate)
  9. }
  10.  
  11. class MDCHttpExecutionContext(mdcContext: JMap[String,String],delegate: ExecutionContext)
  12. extends ExecutionContextExecutor {
  13.  
  14. def execute(runnable: Runnable): Unit = {
  15. val callingThreadMDC = MDC.getCopyOfContextMap()
  16. delegate.execute(new Runnable {
  17. def run() {
  18. val currentThreadMDC = MDC.getCopyOfContextMap()
  19. setContextMap(callingThreadMDC)
  20. try {
  21. runnable.run()
  22. } finally {
  23. setContextMap(currentThreadMDC)
  24. }
  25. }
  26. })
  27. }
  28.  
  29. private[this] def setContextMap(context: JMap[String,String]): Unit = {
  30. Option(context) match {
  31. case Some(ctx) => {
  32. MDC.setContextMap(context)
  33. }
  34. case None => {
  35. MDC.clear()
  36. }
  37. }
  38. }
  39.  
  40. def reportFailure(t: Throwable): Unit = delegate.reportFailure(t)
  41. }

您必须确保在所有异步调用中使用此ExecutionContext.我通过依赖注入来实现这一点,但有不同的方法.这就是我用subcut做的:

  1. bind[ExecutionContext] idBy BindingIds.GlobalExecutionContext toSingle {
  2. MDCHttpExecutionContext.fromExecutionContextWithCurrentMDC(
  3. ExecutionContext.fromExecutorService(
  4. Executors.newFixedThreadPool(globalThreadPoolSize)
  5. )
  6. )
  7. }

这种做法背后的想法如下. MDC使用线程本地存储来获取属性及其值.如果您的单个请求可以在多个线程上运行,那么您需要确保您启动的新线程使用正确的MDC.为此,您创建一个自定义执行程序,确保在开始执行您分配给它的任务之前将MDC值正确复制到新线程中.您还必须确保当线程完成任务并继续执行其他操作时,将旧值放入其MDC,因为来自池的线程可以在不同请求之间切换.

猜你在找的Java相关文章