View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.Collection;
20  import java.util.concurrent.BlockingQueue;
21  import java.util.concurrent.TimeUnit;
22  import java.util.concurrent.locks.LockSupport;
23  
24  import org.apache.logging.log4j.core.config.Node;
25  import org.apache.logging.log4j.core.config.plugins.Plugin;
26  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
27  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28  import org.jctools.queues.MpscArrayQueue;
29  
30  /**
31   * Factory for creating instances of BlockingQueues backed by JCTools {@link MpscArrayQueue}.
32   *
33   * @since 2.7
34   */
35  @Plugin(name = "JCToolsBlockingQueue", category = Node.CATEGORY, elementType = BlockingQueueFactory.ELEMENT_TYPE)
36  public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
37  
38      private final WaitStrategy waitStrategy;
39  
40      private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) {
41          this.waitStrategy = waitStrategy;
42      }
43  
44      @Override
45      public BlockingQueue<E> create(final int capacity) {
46          return new MpscBlockingQueue<>(capacity, waitStrategy);
47      }
48  
49      @PluginFactory
50      public static <E> JCToolsBlockingQueueFactory<E> createFactory(
51          @PluginAttribute(value = "WaitStrategy", defaultString = "PARK") final WaitStrategy waitStrategy) {
52          return new JCToolsBlockingQueueFactory<>(waitStrategy);
53      }
54  
55      /**
56       * BlockingQueue wrapper for JCTools multiple producer single consumer array queue.
57       */
58      private static final class MpscBlockingQueue<E> extends MpscArrayQueue<E> implements BlockingQueue<E> {
59  
60          private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy;
61  
62          MpscBlockingQueue(final int capacity, final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy) {
63              super(capacity);
64              this.waitStrategy = waitStrategy;
65          }
66  
67          @Override
68          public int drainTo(final Collection<? super E> c) {
69              return drainTo(c, capacity());
70          }
71  
72          @Override
73          public int drainTo(final Collection<? super E> c, final int maxElements) {
74              return drain(new Consumer<E>() {
75                  @Override
76                  public void accept(final E e) {
77                      c.add(e);
78                  }
79              }, maxElements);
80          }
81  
82          @Override
83          public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
84              int idleCounter = 0;
85              final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout);
86              do {
87                  if (offer(e)) {
88                      return true;
89                  } else if (System.nanoTime() - timeoutNanos > 0) {
90                      return false;
91                  }
92                  idleCounter = waitStrategy.idle(idleCounter);
93              } while (!Thread.interrupted()); //clear interrupted flag
94              throw new InterruptedException();
95          }
96  
97          @Override
98          public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
99              int idleCounter = 0;
100             final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout);
101             do {
102                 final E result = poll();
103                 if (result != null) {
104                     return result;
105                 } else if (System.nanoTime() - timeoutNanos > 0) {
106                     return null;
107                 }
108                 idleCounter = waitStrategy.idle(idleCounter);
109             } while (!Thread.interrupted()); //clear interrupted flag
110             throw new InterruptedException();
111         }
112 
113         @Override
114         public void put(final E e) throws InterruptedException {
115             int idleCounter = 0;
116             do {
117                 if (offer(e)) {
118                     return;
119                 }
120                 idleCounter = waitStrategy.idle(idleCounter);
121             } while (!Thread.interrupted()); //clear interrupted flag
122             throw new InterruptedException();
123         }
124 
125         @Override
126         public boolean offer(final E e) {
127             //keep 2 cache lines empty to avoid false sharing that will slow the consumer thread when queue is full.
128             return offerIfBelowThreshold(e, capacity() - 32);
129         }
130 
131         @Override
132         public int remainingCapacity() {
133             return capacity() - size();
134         }
135 
136         @Override
137         public E take() throws InterruptedException {
138             int idleCounter = 100;
139             do {
140                 final E result = relaxedPoll();
141                 if (result != null) {
142                     return result;
143                 }
144                 idleCounter = waitStrategy.idle(idleCounter);
145             } while (!Thread.interrupted()); //clear interrupted flag
146             throw new InterruptedException();
147         }
148     }
149 
150     public enum WaitStrategy {
151         SPIN(new Idle() {
152             @Override
153             public int idle(final int idleCounter) {
154                 return idleCounter + 1;
155             }
156         }),
157         YIELD(new Idle() {
158             @Override
159             public int idle(final int idleCounter) {
160                 Thread.yield();
161                 return idleCounter + 1;
162             }
163         }),
164         PARK(new Idle() {
165             @Override
166             public int idle(final int idleCounter) {
167                 LockSupport.parkNanos(1L);
168                 return idleCounter + 1;
169             }
170         }),
171         PROGRESSIVE(new Idle() {
172             @Override
173             public int idle(final int idleCounter) {
174                 if (idleCounter > 200) {
175                     LockSupport.parkNanos(1L);
176                 } else if (idleCounter > 100) {
177                     Thread.yield();
178                 }
179                 return idleCounter + 1;
180             }
181         });
182 
183         private final Idle idle;
184 
185         private int idle(final int idleCounter) {
186             return idle.idle(idleCounter);
187         }
188 
189         WaitStrategy(final Idle idle) {
190             this.idle = idle;
191         }
192     }
193 
194     private interface Idle {
195         int idle(int idleCounter);
196     }
197 
198 }