Skip to content

Commit

Permalink
fix input diagnosis failure metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonEbel committed Feb 6, 2025
1 parent 8cd2250 commit caaf214
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.graylog2.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package org.graylog.failure;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.graylog2.indexer.messages.Indexable;
import org.graylog2.indexer.messages.IndexingError;
import org.graylog2.inputs.diagnosis.InputDiagnosisMetrics;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
Expand Down Expand Up @@ -53,19 +53,19 @@ public class FailureSubmissionService {
private final FailureSubmissionQueue failureSubmissionQueue;
private final FailureHandlingConfiguration failureHandlingConfiguration;

private final MetricRegistry metricRegistry;
private final InputDiagnosisMetrics inputDiagnosisMetrics;
private final ObjectMapper objectMapper;
private final Meter dummyMeter = new Meter();

@Inject
public FailureSubmissionService(
FailureSubmissionQueue failureSubmissionQueue,
FailureHandlingConfiguration failureHandlingConfiguration,
MetricRegistry metricRegistry,
InputDiagnosisMetrics inputDiagnosisMetrics,
ObjectMapperProvider objectMapperProvider) {
this.failureSubmissionQueue = failureSubmissionQueue;
this.failureHandlingConfiguration = failureHandlingConfiguration;
this.metricRegistry = metricRegistry;
this.inputDiagnosisMetrics = inputDiagnosisMetrics;
this.objectMapper = objectMapperProvider.get();
}

Expand Down Expand Up @@ -203,17 +203,15 @@ private IndexingFailure fromIndexingError(IndexingError indexingError) {
private void updateProcessingFailureMetric(Message message) {
Object inputId = message.getField(FIELD_GL2_SOURCE_INPUT);
if (inputId != null) {
final String indexingFailureMetricName = name("org.graylog2.inputs", inputId.toString(), "failures.processing");
metricRegistry.meter(indexingFailureMetricName).mark();
inputDiagnosisMetrics.incCount(name("org.graylog2.inputs", inputId.toString(), "failures.processing"));
}
}

private void updateIndexingFailureMetric(Indexable message) {
final Map<String, Object> searchObject = message.toElasticSearchObject(objectMapper, dummyMeter);
Object inputId = searchObject.get(FIELD_GL2_SOURCE_INPUT);
if (inputId != null) {
final String indexingFailureMetricName = name("org.graylog2.inputs", inputId.toString(), "failures.indexing");
metricRegistry.meter(indexingFailureMetricName).mark();
inputDiagnosisMetrics.incCount(name("org.graylog2.inputs", inputId.toString(), "failures.indexing"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.graylog2.events.ClusterEventCleanupPeriodical;
import org.graylog2.events.ClusterEventPeriodical;
import org.graylog2.indexer.fieldtypes.IndexFieldTypePollerPeriodical;
import org.graylog2.inputs.diagnosis.InputDiagnosisMetricsPeriodical;
import org.graylog2.periodical.ClusterHealthCheckThread;
import org.graylog2.periodical.ContentPackLoaderPeriodical;
import org.graylog2.periodical.DataNodeHousekeepingPeriodical;
Expand Down Expand Up @@ -66,5 +67,6 @@ protected void configure() {
periodicalBinder.addBinding().to(TelemetryClusterInfoPeriodical.class);
periodicalBinder.addBinding().to(GraylogCertificateProvisioningPeriodical.class);
periodicalBinder.addBinding().to(DataNodeHousekeepingPeriodical.class);
periodicalBinder.addBinding().to(InputDiagnosisMetricsPeriodical.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.graylog2.inputs.diagnosis;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.graylog2.shared.metrics.MetricUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

@Singleton
public class InputDiagnosisMetrics {

private static final int SIZE_FOR_15_MINUTES = 15;
private final Map<String, CircularFifoQueue<Long>> metrics = new HashMap<>();
private final AtomicReference<MetricRegistry> localMetricRegistry = new AtomicReference<>(new MetricRegistry());
private final MetricRegistry metricRegistry;

@Inject
public InputDiagnosisMetrics(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
}

public void incCount(String metricName) {
localMetricRegistry.get().counter(metricName).inc();
}

void update() {
MetricRegistry registry = localMetricRegistry.getAndSet(new MetricRegistry());
registry.getCounters().forEach((metric, counter) ->
metrics.compute(metric, (m, q) -> {
final CircularFifoQueue<Long> queue = Objects.requireNonNullElseGet(q, () -> new CircularFifoQueue<>(SIZE_FOR_15_MINUTES));
queue.add(counter.getCount());
MetricUtils.safelyRegister(metricRegistry, metric, (Gauge<Long>) () -> queue.stream()
.mapToLong(value -> value)
.sum());
return queue;
}));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.graylog2.inputs.diagnosis;

import jakarta.inject.Inject;
import org.graylog2.plugin.periodical.Periodical;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

public class InputDiagnosisMetricsPeriodical extends Periodical {

private static final Logger LOG = LoggerFactory.getLogger(InputDiagnosisMetricsPeriodical.class);

private final InputDiagnosisMetrics inputDiagnosisMetrics;

@Inject
public InputDiagnosisMetricsPeriodical(InputDiagnosisMetrics inputDiagnosisMetrics) {
this.inputDiagnosisMetrics = inputDiagnosisMetrics;
}

@Override
public boolean runsForever() {
return false;
}

@Override
public boolean stopOnGracefulShutdown() {
return true;
}

@Override
public boolean startOnThisNode() {
return true;
}

@Override
public boolean isDaemon() {
return true;
}

@Override
public int getInitialDelaySeconds() {
return 60;
}

@Override
public int getPeriodSeconds() {
return 60;
}

@Override
protected @Nonnull Logger getLogger() {
return LOG;
}

@Override
public void doRun() {
inputDiagnosisMetrics.update();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.inputs;
package org.graylog2.inputs.diagnosis;

import com.google.common.collect.ImmutableSet;
import jakarta.inject.Inject;
Expand All @@ -36,6 +36,7 @@
import org.graylog.plugins.views.search.searchtypes.pivot.buckets.Values;
import org.graylog.plugins.views.search.searchtypes.pivot.series.Count;
import org.graylog2.database.NotFoundException;
import org.graylog2.inputs.Input;
import org.graylog2.plugin.indexer.searches.timeranges.RelativeRange;
import org.graylog2.rest.models.system.inputs.responses.InputDiagnostics;
import org.graylog2.streams.StreamService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.graylog2.audit.AuditEventTypes;
import org.graylog2.audit.jersey.AuditEvent;
import org.graylog2.inputs.Input;
import org.graylog2.inputs.InputDiagnosticService;
import org.graylog2.inputs.InputService;
import org.graylog2.inputs.diagnosis.InputDiagnosticService;
import org.graylog2.inputs.encryption.EncryptedInputConfigs;
import org.graylog2.plugin.configuration.ConfigurationException;
import org.graylog2.plugin.database.ValidationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.lmax.disruptor.EventHandler;
import org.graylog2.inputs.diagnosis.InputDiagnosisMetrics;
import org.graylog2.plugin.GlobalMetricNames;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ResolvableInetSocketAddress;
Expand All @@ -35,19 +36,18 @@
import org.graylog2.plugin.inputs.codecs.MultiMessageCodec;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static com.codahale.metrics.MetricRegistry.name;
Expand All @@ -65,22 +65,22 @@ public interface Factory {
private final Map<String, Codec.Factory<? extends Codec>> codecFactory;
private final ServerStatus serverStatus;
private final MetricRegistry metricRegistry;
private final Journal journal;
private final InputDiagnosisMetrics inputDiagnosisMetrics;
private final MessageQueueAcknowledger acknowledger;
private final Timer parseTime;

@AssistedInject
public DecodingProcessor(Map<String, Codec.Factory<? extends Codec>> codecFactory,
final ServerStatus serverStatus,
final MetricRegistry metricRegistry,
final Journal journal,
final InputDiagnosisMetrics inputDiagnosisMetrics,
MessageQueueAcknowledger acknowledger,
@Assisted("decodeTime") Timer decodeTime,
@Assisted("parseTime") Timer parseTime) {
this.codecFactory = codecFactory;
this.serverStatus = serverStatus;
this.metricRegistry = metricRegistry;
this.journal = journal;
this.inputDiagnosisMetrics = inputDiagnosisMetrics;
this.acknowledger = acknowledger;

// these metrics are global to all processors, thus they are passed in directly to avoid relying on the class name
Expand Down Expand Up @@ -158,16 +158,18 @@ private void processMessage(final MessageEvent event) {
message = codec.decodeSafe(raw);
}
} catch (InputProcessingException e) {
if(LOG.isTraceEnabled() && e.inputMessageString().isPresent()) {
if (LOG.isTraceEnabled() && e.inputMessageString().isPresent()) {
LOG.error("%s - input message: %s".formatted(e.getMessage(), e.inputMessageString().get()), e.getCause());
}else{
} else {
LOG.error(e.getMessage(), e.getCause());
}
metricRegistry.meter(name(baseMetricName, "failures")).mark();
inputDiagnosisMetrics.incCount(name("org.graylog2.inputs", inputIdOnCurrentNode, "failures.input"));
throw e;
} catch (RuntimeException e) {
LOG.error("Unable to decode raw message {} on input <{}>.", raw, inputIdOnCurrentNode);
metricRegistry.meter(name(baseMetricName, "failures")).mark();
inputDiagnosisMetrics.incCount(name("org.graylog2.inputs", inputIdOnCurrentNode, "failures.input"));
throw e;
} finally {
decodeTime = decodeTimeCtx.stop();
Expand All @@ -191,11 +193,12 @@ private void processMessage(final MessageEvent event) {
}

@Nullable
private Message postProcessMessage(RawMessage raw, Codec codec, String inputIdOnCurrentNode, String baseMetricName, Message message, long decodeTime) {
if (message == null) {
metricRegistry.meter(name(baseMetricName, "failures")).mark();
return null;
}
private Message postProcessMessage(RawMessage raw,
Codec codec,
String inputIdOnCurrentNode,
String baseMetricName,
@Nonnull Message message,
long decodeTime) {
if (!message.isComplete()) {
metricRegistry.meter(name(baseMetricName, "incomplete")).mark();
if (LOG.isDebugEnabled()) {
Expand Down
Loading

0 comments on commit caaf214

Please sign in to comment.