View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.cassandra;
18  
19  import java.io.Serializable;
20  import java.net.InetSocketAddress;
21  import java.util.ArrayList;
22  import java.util.Date;
23  import java.util.List;
24  
25  import com.datastax.driver.core.BatchStatement;
26  import com.datastax.driver.core.BoundStatement;
27  import com.datastax.driver.core.Cluster;
28  import com.datastax.driver.core.PreparedStatement;
29  import com.datastax.driver.core.Session;
30  import org.apache.logging.log4j.core.LogEvent;
31  import org.apache.logging.log4j.core.appender.ManagerFactory;
32  import org.apache.logging.log4j.core.appender.db.AbstractDatabaseManager;
33  import org.apache.logging.log4j.core.appender.db.ColumnMapping;
34  import org.apache.logging.log4j.core.config.plugins.convert.DateTypeConverter;
35  import org.apache.logging.log4j.core.config.plugins.convert.TypeConverters;
36  import org.apache.logging.log4j.core.net.SocketAddress;
37  import org.apache.logging.log4j.spi.ThreadContextMap;
38  import org.apache.logging.log4j.spi.ThreadContextStack;
39  import org.apache.logging.log4j.util.ReadOnlyStringMap;
40  import org.apache.logging.log4j.util.Strings;
41  
42  /**
43   * Manager for a Cassandra appender instance.
44   */
45  public class CassandraManager extends AbstractDatabaseManager {
46  
47      private static final int DEFAULT_PORT = 9042;
48  
49      private final Cluster cluster;
50      private final String keyspace;
51      private final String insertQueryTemplate;
52      private final List<ColumnMapping> columnMappings;
53      private final BatchStatement batchStatement;
54      // re-usable argument binding array
55      private final Object[] values;
56  
57      private Session session;
58      private PreparedStatement preparedStatement;
59  
60      private CassandraManager(final String name, final int bufferSize, final Cluster cluster,
61                               final String keyspace, final String insertQueryTemplate,
62                               final List<ColumnMapping> columnMappings, final BatchStatement batchStatement) {
63          super(name, bufferSize);
64          this.cluster = cluster;
65          this.keyspace = keyspace;
66          this.insertQueryTemplate = insertQueryTemplate;
67          this.columnMappings = columnMappings;
68          this.batchStatement = batchStatement;
69          this.values = new Object[columnMappings.size()];
70      }
71  
72      @Override
73      protected void startupInternal() throws Exception {
74          session = cluster.connect(keyspace);
75          preparedStatement = session.prepare(insertQueryTemplate);
76      }
77  
78      @Override
79      protected boolean shutdownInternal() throws Exception {
80          session.close();
81          cluster.close();
82          return true;
83      }
84  
85      @Override
86      protected void connectAndStart() {
87          // a Session automatically manages connections for us
88      }
89  
90      @Override
91      protected void writeInternal(final LogEvent event, final Serializable serializable) {
92          for (int i = 0; i < columnMappings.size(); i++) {
93              final ColumnMapping columnMapping = columnMappings.get(i);
94              if (ThreadContextMap.class.isAssignableFrom(columnMapping.getType())
95                  || ReadOnlyStringMap.class.isAssignableFrom(columnMapping.getType())) {
96                  values[i] = event.getContextData().toMap();
97              } else if (ThreadContextStack.class.isAssignableFrom(columnMapping.getType())) {
98                  values[i] = event.getContextStack().asList();
99              } else if (Date.class.isAssignableFrom(columnMapping.getType())) {
100                 values[i] = DateTypeConverter.fromMillis(event.getTimeMillis(), columnMapping.getType().asSubclass(Date.class));
101             } else {
102                 values[i] = TypeConverters.convert(columnMapping.getLayout().toSerializable(event),
103                     columnMapping.getType(), null);
104             }
105         }
106         final BoundStatement boundStatement = preparedStatement.bind(values);
107         if (batchStatement == null) {
108             session.execute(boundStatement);
109         } else {
110             batchStatement.add(boundStatement);
111         }
112     }
113 
114     @Override
115     protected boolean commitAndClose() {
116         if (batchStatement != null) {
117             session.execute(batchStatement);
118         }
119         return true;
120     }
121 
122     public static CassandraManager getManager(final String name, final SocketAddress[] contactPoints,
123                                               final ColumnMapping[] columns, final boolean useTls,
124                                               final String clusterName, final String keyspace, final String table,
125                                               final String username, final String password,
126                                               final boolean useClockForTimestampGenerator, final int bufferSize,
127                                               final boolean batched, final BatchStatement.Type batchType) {
128         return getManager(name,
129             new FactoryData(contactPoints, columns, useTls, clusterName, keyspace, table, username, password,
130                 useClockForTimestampGenerator, bufferSize, batched, batchType), CassandraManagerFactory.INSTANCE);
131     }
132 
133     private static class CassandraManagerFactory implements ManagerFactory<CassandraManager, FactoryData> {
134 
135         private static final CassandraManagerFactory INSTANCE = new CassandraManagerFactory();
136 
137         @Override
138         public CassandraManager createManager(final String name, final FactoryData data) {
139             final Cluster.Builder builder = Cluster.builder()
140                 .addContactPointsWithPorts(data.contactPoints)
141                 .withClusterName(data.clusterName);
142             if (data.useTls) {
143                 builder.withSSL();
144             }
145             if (Strings.isNotBlank(data.username)) {
146                 builder.withCredentials(data.username, data.password);
147             }
148             if (data.useClockForTimestampGenerator) {
149                 builder.withTimestampGenerator(new ClockTimestampGenerator());
150             }
151             final Cluster cluster = builder.build();
152 
153             final StringBuilder sb = new StringBuilder("INSERT INTO ").append(data.table).append(" (");
154             for (final ColumnMapping column : data.columns) {
155                 sb.append(column.getName()).append(',');
156             }
157             sb.setCharAt(sb.length() - 1, ')');
158             sb.append(" VALUES (");
159             final List<ColumnMapping> columnMappings = new ArrayList<>(data.columns.length);
160             for (final ColumnMapping column : data.columns) {
161                 if (Strings.isNotEmpty(column.getLiteralValue())) {
162                     sb.append(column.getLiteralValue());
163                 } else {
164                     sb.append('?');
165                     columnMappings.add(column);
166                 }
167                 sb.append(',');
168             }
169             sb.setCharAt(sb.length() - 1, ')');
170             final String insertQueryTemplate = sb.toString();
171             LOGGER.debug("Using CQL for appender {}: {}", name, insertQueryTemplate);
172             return new CassandraManager(name, data.getBufferSize(), cluster, data.keyspace, insertQueryTemplate,
173                 columnMappings, data.batched ? new BatchStatement(data.batchType) : null);
174         }
175     }
176 
177     private static class FactoryData extends AbstractFactoryData {
178         private final InetSocketAddress[] contactPoints;
179         private final ColumnMapping[] columns;
180         private final boolean useTls;
181         private final String clusterName;
182         private final String keyspace;
183         private final String table;
184         private final String username;
185         private final String password;
186         private final boolean useClockForTimestampGenerator;
187         private final boolean batched;
188         private final BatchStatement.Type batchType;
189 
190         private FactoryData(final SocketAddress[] contactPoints, final ColumnMapping[] columns, final boolean useTls,
191                             final String clusterName, final String keyspace, final String table, final String username,
192                             final String password, final boolean useClockForTimestampGenerator, final int bufferSize,
193                             final boolean batched, final BatchStatement.Type batchType) {
194             super(bufferSize, null);
195             this.contactPoints = convertAndAddDefaultPorts(contactPoints);
196             this.columns = columns;
197             this.useTls = useTls;
198             this.clusterName = clusterName;
199             this.keyspace = keyspace;
200             this.table = table;
201             this.username = username;
202             this.password = password;
203             this.useClockForTimestampGenerator = useClockForTimestampGenerator;
204             this.batched = batched;
205             this.batchType = batchType;
206         }
207 
208         private static InetSocketAddress[] convertAndAddDefaultPorts(final SocketAddress... socketAddresses) {
209             final InetSocketAddress[] inetSocketAddresses = new InetSocketAddress[socketAddresses.length];
210             for (int i = 0; i < inetSocketAddresses.length; i++) {
211                 final SocketAddress socketAddress = socketAddresses[i];
212                 inetSocketAddresses[i] = socketAddress.getPort() == 0
213                     ? new InetSocketAddress(socketAddress.getAddress(), DEFAULT_PORT)
214                     : socketAddress.getSocketAddress();
215             }
216             return inetSocketAddresses;
217         }
218     }
219 }