001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.async;
018
019import java.util.Collection;
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.locks.LockSupport;
023
024import org.apache.logging.log4j.core.config.Node;
025import org.apache.logging.log4j.core.config.plugins.Plugin;
026import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
027import org.apache.logging.log4j.core.config.plugins.PluginFactory;
028import org.jctools.queues.MpscArrayQueue;
029
030/**
031 * Factory for creating instances of BlockingQueues backed by JCTools {@link MpscArrayQueue}.
032 *
033 * @since 2.7
034 */
035@Plugin(name = "JCToolsBlockingQueue", category = Node.CATEGORY, elementType = BlockingQueueFactory.ELEMENT_TYPE)
036public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
037
038    private final WaitStrategy waitStrategy;
039
040    private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) {
041        this.waitStrategy = waitStrategy;
042    }
043
044    @Override
045    public BlockingQueue<E> create(final int capacity) {
046        return new MpscBlockingQueue<>(capacity, waitStrategy);
047    }
048
049    @PluginFactory
050    public static <E> JCToolsBlockingQueueFactory<E> createFactory(
051        @PluginAttribute(value = "WaitStrategy", defaultString = "PARK") final WaitStrategy waitStrategy) {
052        return new JCToolsBlockingQueueFactory<>(waitStrategy);
053    }
054
055    /**
056     * BlockingQueue wrapper for JCTools multiple producer single consumer array queue.
057     */
058    private static final class MpscBlockingQueue<E> extends MpscArrayQueue<E> implements BlockingQueue<E> {
059
060        private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy;
061
062        MpscBlockingQueue(final int capacity, final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy) {
063            super(capacity);
064            this.waitStrategy = waitStrategy;
065        }
066
067        @Override
068        public int drainTo(final Collection<? super E> c) {
069            return drainTo(c, capacity());
070        }
071
072        @Override
073        public int drainTo(final Collection<? super E> c, final int maxElements) {
074            return drain(new Consumer<E>() {
075                @Override
076                public void accept(final E e) {
077                    c.add(e);
078                }
079            }, maxElements);
080        }
081
082        @Override
083        public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
084            int idleCounter = 0;
085            final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout);
086            do {
087                if (offer(e)) {
088                    return true;
089                } else if (System.nanoTime() - timeoutNanos > 0) {
090                    return false;
091                }
092                idleCounter = waitStrategy.idle(idleCounter);
093            } while (!Thread.interrupted()); //clear interrupted flag
094            throw new InterruptedException();
095        }
096
097        @Override
098        public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
099            int idleCounter = 0;
100            final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout);
101            do {
102                final E result = poll();
103                if (result != null) {
104                    return result;
105                } else if (System.nanoTime() - timeoutNanos > 0) {
106                    return null;
107                }
108                idleCounter = waitStrategy.idle(idleCounter);
109            } while (!Thread.interrupted()); //clear interrupted flag
110            throw new InterruptedException();
111        }
112
113        @Override
114        public void put(final E e) throws InterruptedException {
115            int idleCounter = 0;
116            do {
117                if (offer(e)) {
118                    return;
119                }
120                idleCounter = waitStrategy.idle(idleCounter);
121            } while (!Thread.interrupted()); //clear interrupted flag
122            throw new InterruptedException();
123        }
124
125        @Override
126        public boolean offer(final E e) {
127            //keep 2 cache lines empty to avoid false sharing that will slow the consumer thread when queue is full.
128            return offerIfBelowThreshold(e, capacity() - 32);
129        }
130
131        @Override
132        public int remainingCapacity() {
133            return capacity() - size();
134        }
135
136        @Override
137        public E take() throws InterruptedException {
138            int idleCounter = 100;
139            do {
140                final E result = relaxedPoll();
141                if (result != null) {
142                    return result;
143                }
144                idleCounter = waitStrategy.idle(idleCounter);
145            } while (!Thread.interrupted()); //clear interrupted flag
146            throw new InterruptedException();
147        }
148    }
149
150    public enum WaitStrategy {
151        SPIN(new Idle() {
152            @Override
153            public int idle(final int idleCounter) {
154                return idleCounter + 1;
155            }
156        }),
157        YIELD(new Idle() {
158            @Override
159            public int idle(final int idleCounter) {
160                Thread.yield();
161                return idleCounter + 1;
162            }
163        }),
164        PARK(new Idle() {
165            @Override
166            public int idle(final int idleCounter) {
167                LockSupport.parkNanos(1L);
168                return idleCounter + 1;
169            }
170        }),
171        PROGRESSIVE(new Idle() {
172            @Override
173            public int idle(final int idleCounter) {
174                if (idleCounter > 200) {
175                    LockSupport.parkNanos(1L);
176                } else if (idleCounter > 100) {
177                    Thread.yield();
178                }
179                return idleCounter + 1;
180            }
181        });
182
183        private final Idle idle;
184
185        private int idle(final int idleCounter) {
186            return idle.idle(idleCounter);
187        }
188
189        WaitStrategy(final Idle idle) {
190            this.idle = idle;
191        }
192    }
193
194    private interface Idle {
195        int idle(int idleCounter);
196    }
197
198}