001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.Collection; 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.locks.LockSupport; 023 024import org.apache.logging.log4j.core.config.Node; 025import org.apache.logging.log4j.core.config.plugins.Plugin; 026import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 027import org.apache.logging.log4j.core.config.plugins.PluginFactory; 028import org.jctools.queues.MpscArrayQueue; 029 030/** 031 * Factory for creating instances of BlockingQueues backed by JCTools {@link MpscArrayQueue}. 032 * 033 * @since 2.7 034 */ 035@Plugin(name = "JCToolsBlockingQueue", category = Node.CATEGORY, elementType = BlockingQueueFactory.ELEMENT_TYPE) 036public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> { 037 038 private final WaitStrategy waitStrategy; 039 040 private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) { 041 this.waitStrategy = waitStrategy; 042 } 043 044 @Override 045 public BlockingQueue<E> create(final int capacity) { 046 return new MpscBlockingQueue<>(capacity, waitStrategy); 047 } 048 049 @PluginFactory 050 public static <E> JCToolsBlockingQueueFactory<E> createFactory( 051 @PluginAttribute(value = "WaitStrategy", defaultString = "PARK") final WaitStrategy waitStrategy) { 052 return new JCToolsBlockingQueueFactory<>(waitStrategy); 053 } 054 055 /** 056 * BlockingQueue wrapper for JCTools multiple producer single consumer array queue. 057 */ 058 private static final class MpscBlockingQueue<E> extends MpscArrayQueue<E> implements BlockingQueue<E> { 059 060 private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy; 061 062 MpscBlockingQueue(final int capacity, final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy) { 063 super(capacity); 064 this.waitStrategy = waitStrategy; 065 } 066 067 @Override 068 public int drainTo(final Collection<? super E> c) { 069 return drainTo(c, capacity()); 070 } 071 072 @Override 073 public int drainTo(final Collection<? super E> c, final int maxElements) { 074 return drain(new Consumer<E>() { 075 @Override 076 public void accept(final E e) { 077 c.add(e); 078 } 079 }, maxElements); 080 } 081 082 @Override 083 public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { 084 int idleCounter = 0; 085 final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout); 086 do { 087 if (offer(e)) { 088 return true; 089 } else if (System.nanoTime() - timeoutNanos > 0) { 090 return false; 091 } 092 idleCounter = waitStrategy.idle(idleCounter); 093 } while (!Thread.interrupted()); //clear interrupted flag 094 throw new InterruptedException(); 095 } 096 097 @Override 098 public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { 099 int idleCounter = 0; 100 final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout); 101 do { 102 final E result = poll(); 103 if (result != null) { 104 return result; 105 } else if (System.nanoTime() - timeoutNanos > 0) { 106 return null; 107 } 108 idleCounter = waitStrategy.idle(idleCounter); 109 } while (!Thread.interrupted()); //clear interrupted flag 110 throw new InterruptedException(); 111 } 112 113 @Override 114 public void put(final E e) throws InterruptedException { 115 int idleCounter = 0; 116 do { 117 if (offer(e)) { 118 return; 119 } 120 idleCounter = waitStrategy.idle(idleCounter); 121 } while (!Thread.interrupted()); //clear interrupted flag 122 throw new InterruptedException(); 123 } 124 125 @Override 126 public boolean offer(final E e) { 127 //keep 2 cache lines empty to avoid false sharing that will slow the consumer thread when queue is full. 128 return offerIfBelowThreshold(e, capacity() - 32); 129 } 130 131 @Override 132 public int remainingCapacity() { 133 return capacity() - size(); 134 } 135 136 @Override 137 public E take() throws InterruptedException { 138 int idleCounter = 100; 139 do { 140 final E result = relaxedPoll(); 141 if (result != null) { 142 return result; 143 } 144 idleCounter = waitStrategy.idle(idleCounter); 145 } while (!Thread.interrupted()); //clear interrupted flag 146 throw new InterruptedException(); 147 } 148 } 149 150 public enum WaitStrategy { 151 SPIN(new Idle() { 152 @Override 153 public int idle(final int idleCounter) { 154 return idleCounter + 1; 155 } 156 }), 157 YIELD(new Idle() { 158 @Override 159 public int idle(final int idleCounter) { 160 Thread.yield(); 161 return idleCounter + 1; 162 } 163 }), 164 PARK(new Idle() { 165 @Override 166 public int idle(final int idleCounter) { 167 LockSupport.parkNanos(1L); 168 return idleCounter + 1; 169 } 170 }), 171 PROGRESSIVE(new Idle() { 172 @Override 173 public int idle(final int idleCounter) { 174 if (idleCounter > 200) { 175 LockSupport.parkNanos(1L); 176 } else if (idleCounter > 100) { 177 Thread.yield(); 178 } 179 return idleCounter + 1; 180 } 181 }); 182 183 private final Idle idle; 184 185 private int idle(final int idleCounter) { 186 return idle.idle(idleCounter); 187 } 188 189 WaitStrategy(final Idle idle) { 190 this.idle = idle; 191 } 192 } 193 194 private interface Idle { 195 int idle(int idleCounter); 196 } 197 198}