1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.cassandra;
18
19 import java.io.Serializable;
20 import java.net.InetSocketAddress;
21 import java.util.ArrayList;
22 import java.util.Date;
23 import java.util.List;
24
25 import com.datastax.driver.core.BatchStatement;
26 import com.datastax.driver.core.BoundStatement;
27 import com.datastax.driver.core.Cluster;
28 import com.datastax.driver.core.PreparedStatement;
29 import com.datastax.driver.core.Session;
30 import org.apache.logging.log4j.core.LogEvent;
31 import org.apache.logging.log4j.core.appender.ManagerFactory;
32 import org.apache.logging.log4j.core.appender.db.AbstractDatabaseManager;
33 import org.apache.logging.log4j.core.appender.db.ColumnMapping;
34 import org.apache.logging.log4j.core.config.plugins.convert.DateTypeConverter;
35 import org.apache.logging.log4j.core.config.plugins.convert.TypeConverters;
36 import org.apache.logging.log4j.core.net.SocketAddress;
37 import org.apache.logging.log4j.spi.ThreadContextMap;
38 import org.apache.logging.log4j.spi.ThreadContextStack;
39 import org.apache.logging.log4j.util.ReadOnlyStringMap;
40 import org.apache.logging.log4j.util.Strings;
41
42
43
44
45 public class CassandraManager extends AbstractDatabaseManager {
46
47 private static final int DEFAULT_PORT = 9042;
48
49 private final Cluster cluster;
50 private final String keyspace;
51 private final String insertQueryTemplate;
52 private final List<ColumnMapping> columnMappings;
53 private final BatchStatement batchStatement;
54
55 private final Object[] values;
56
57 private Session session;
58 private PreparedStatement preparedStatement;
59
60 private CassandraManager(final String name, final int bufferSize, final Cluster cluster,
61 final String keyspace, final String insertQueryTemplate,
62 final List<ColumnMapping> columnMappings, final BatchStatement batchStatement) {
63 super(name, bufferSize);
64 this.cluster = cluster;
65 this.keyspace = keyspace;
66 this.insertQueryTemplate = insertQueryTemplate;
67 this.columnMappings = columnMappings;
68 this.batchStatement = batchStatement;
69 this.values = new Object[columnMappings.size()];
70 }
71
72 @Override
73 protected void startupInternal() throws Exception {
74 session = cluster.connect(keyspace);
75 preparedStatement = session.prepare(insertQueryTemplate);
76 }
77
78 @Override
79 protected boolean shutdownInternal() throws Exception {
80 session.close();
81 cluster.close();
82 return true;
83 }
84
85 @Override
86 protected void connectAndStart() {
87
88 }
89
90 @Override
91 protected void writeInternal(final LogEvent event, final Serializable serializable) {
92 for (int i = 0; i < columnMappings.size(); i++) {
93 final ColumnMapping columnMapping = columnMappings.get(i);
94 if (ThreadContextMap.class.isAssignableFrom(columnMapping.getType())
95 || ReadOnlyStringMap.class.isAssignableFrom(columnMapping.getType())) {
96 values[i] = event.getContextData().toMap();
97 } else if (ThreadContextStack.class.isAssignableFrom(columnMapping.getType())) {
98 values[i] = event.getContextStack().asList();
99 } else if (Date.class.isAssignableFrom(columnMapping.getType())) {
100 values[i] = DateTypeConverter.fromMillis(event.getTimeMillis(), columnMapping.getType().asSubclass(Date.class));
101 } else {
102 values[i] = TypeConverters.convert(columnMapping.getLayout().toSerializable(event),
103 columnMapping.getType(), null);
104 }
105 }
106 final BoundStatement boundStatement = preparedStatement.bind(values);
107 if (batchStatement == null) {
108 session.execute(boundStatement);
109 } else {
110 batchStatement.add(boundStatement);
111 }
112 }
113
114 @Override
115 protected boolean commitAndClose() {
116 if (batchStatement != null) {
117 session.execute(batchStatement);
118 }
119 return true;
120 }
121
122 public static CassandraManager getManager(final String name, final SocketAddress[] contactPoints,
123 final ColumnMapping[] columns, final boolean useTls,
124 final String clusterName, final String keyspace, final String table,
125 final String username, final String password,
126 final boolean useClockForTimestampGenerator, final int bufferSize,
127 final boolean batched, final BatchStatement.Type batchType) {
128 return getManager(name,
129 new FactoryData(contactPoints, columns, useTls, clusterName, keyspace, table, username, password,
130 useClockForTimestampGenerator, bufferSize, batched, batchType), CassandraManagerFactory.INSTANCE);
131 }
132
133 private static class CassandraManagerFactory implements ManagerFactory<CassandraManager, FactoryData> {
134
135 private static final CassandraManagerFactory INSTANCE = new CassandraManagerFactory();
136
137 @Override
138 public CassandraManager createManager(final String name, final FactoryData data) {
139 final Cluster.Builder builder = Cluster.builder()
140 .addContactPointsWithPorts(data.contactPoints)
141 .withClusterName(data.clusterName);
142 if (data.useTls) {
143 builder.withSSL();
144 }
145 if (Strings.isNotBlank(data.username)) {
146 builder.withCredentials(data.username, data.password);
147 }
148 if (data.useClockForTimestampGenerator) {
149 builder.withTimestampGenerator(new ClockTimestampGenerator());
150 }
151 final Cluster cluster = builder.build();
152
153 final StringBuilder sb = new StringBuilder("INSERT INTO ").append(data.table).append(" (");
154 for (final ColumnMapping column : data.columns) {
155 sb.append(column.getName()).append(',');
156 }
157 sb.setCharAt(sb.length() - 1, ')');
158 sb.append(" VALUES (");
159 final List<ColumnMapping> columnMappings = new ArrayList<>(data.columns.length);
160 for (final ColumnMapping column : data.columns) {
161 if (Strings.isNotEmpty(column.getLiteralValue())) {
162 sb.append(column.getLiteralValue());
163 } else {
164 sb.append('?');
165 columnMappings.add(column);
166 }
167 sb.append(',');
168 }
169 sb.setCharAt(sb.length() - 1, ')');
170 final String insertQueryTemplate = sb.toString();
171 LOGGER.debug("Using CQL for appender {}: {}", name, insertQueryTemplate);
172 return new CassandraManager(name, data.getBufferSize(), cluster, data.keyspace, insertQueryTemplate,
173 columnMappings, data.batched ? new BatchStatement(data.batchType) : null);
174 }
175 }
176
177 private static class FactoryData extends AbstractFactoryData {
178 private final InetSocketAddress[] contactPoints;
179 private final ColumnMapping[] columns;
180 private final boolean useTls;
181 private final String clusterName;
182 private final String keyspace;
183 private final String table;
184 private final String username;
185 private final String password;
186 private final boolean useClockForTimestampGenerator;
187 private final boolean batched;
188 private final BatchStatement.Type batchType;
189
190 private FactoryData(final SocketAddress[] contactPoints, final ColumnMapping[] columns, final boolean useTls,
191 final String clusterName, final String keyspace, final String table, final String username,
192 final String password, final boolean useClockForTimestampGenerator, final int bufferSize,
193 final boolean batched, final BatchStatement.Type batchType) {
194 super(bufferSize, null);
195 this.contactPoints = convertAndAddDefaultPorts(contactPoints);
196 this.columns = columns;
197 this.useTls = useTls;
198 this.clusterName = clusterName;
199 this.keyspace = keyspace;
200 this.table = table;
201 this.username = username;
202 this.password = password;
203 this.useClockForTimestampGenerator = useClockForTimestampGenerator;
204 this.batched = batched;
205 this.batchType = batchType;
206 }
207
208 private static InetSocketAddress[] convertAndAddDefaultPorts(final SocketAddress... socketAddresses) {
209 final InetSocketAddress[] inetSocketAddresses = new InetSocketAddress[socketAddresses.length];
210 for (int i = 0; i < inetSocketAddresses.length; i++) {
211 final SocketAddress socketAddress = socketAddresses[i];
212 inetSocketAddresses[i] = socketAddress.getPort() == 0
213 ? new InetSocketAddress(socketAddress.getAddress(), DEFAULT_PORT)
214 : socketAddress.getSocketAddress();
215 }
216 return inetSocketAddresses;
217 }
218 }
219 }