본문 바로가기

빅데이터/Kafka

kafka connect 로그를 logstash로 수집하기 + grok 설정(multiline)

카프카 커넥트를 운영하다보면 로그를 파일로 쌓고 + 로그스태시를 통해 엘라스틱서치에 쌓고 싶을때가 있습니다. 이를 위해 아래와 같이 설정합니다.

 

 

1. connect-log4j.properties

router.logs.dir=./connect-logs

log4j.rootLogger=INFO, connectAppender

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR

log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.File=${router.logs.dir}/connect.log
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectAppender.layout.ConversionPattern=%p %d %t %c{2}(%L) - %m%n

 

2. connect-logstash.conf

input {
  file {
    type => "log4j"
    path => ["/Users/wonyoung/Documents/kafka_2.11-2.3.1/bin/connect-logs/connect.log"]
    mode => "tail"
    codec => multiline {
      pattern => "^%{LOGLEVEL}"
      negate => "true"
      what => "previous"
    }
  }
}
filter {
       grok {
         match => {
           "message" => "%{LOGLEVEL:loglevel} %{TIMESTAMP_ISO8601:logdate} %{DATA:thread} %{JAVACLASS:logclass}\(%{NUMBER:codeline}\) - %{GREEDYDATA:message}"
         }
        overwrite => [ "message" ]
       }
}
output {
  elasticsearch {
    hosts => ["localhost:9092"]
    index => "connect-log-%{+YYYY.MM}"
  }
}

log4j의 패턴과 grok의 패턴을 맞춰주면 아래와 같이 es에 파싱된 채로 적재됩니다.

{
  "_index": "connect-log-2020.09",
  "_type": "doc",
  "_id": "o5v2uXQB6vJEDLSUovyO",
  "_version": 1,
  "_score": null,
  "_source": {
    "host": "pineappleui-iMac.local",
    "type": "log4j",
    "@timestamp": "2020-09-23T07:56:22.431Z",
    "logclass": "errors.ConnectExceptionMapper",
    "tags": [
      "multiline"
    ],
    "path": "/Users/wonyoung/Documents/kafka_2.11-2.3.1/bin/connect-logs/connect.log",
    "codeline": "61",
    "message": "Uncaught exception in REST call to /connecto\njavax.ws.rs.NotFoundException: HTTP 404 Not Found\n\tat org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:250)\n\tat org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)\n\tat org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)\n\tat org.glassfish.jersey.internal.Errors.process(Errors.java:292)\n\tat org.glassfish.jersey.internal.Errors.process(Errors.java:274)\n\tat org.glassfish.jersey.internal.Errors.process(Errors.java:244)\n\tat org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)\n\tat org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)\n\tat org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)\n\tat org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)\n\tat org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)\n\tat org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:365)\n\tat org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:318)\n\tat org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)\n\tat org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:873)\n\tat org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)\n\tat org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1700)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)\n\tat org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)\n\tat org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)\n\tat org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1667)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)\n\tat org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)\n\tat org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)\n\tat org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)\n\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)\n\tat org.eclipse.jetty.server.Server.handle(Server.java:505)\n\tat org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)\n\tat org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)\n\tat org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)\n\tat org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)\n\tat org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)\n\tat java.lang.Thread.run(Thread.java:748)",
    "loglevel": "ERROR",
    "@version": "1",
    "logdate": "2020-09-23 16:56:21,667",
    "thread": "qtp107994825-24"
  },
  "fields": {
    "@timestamp": [
      "2020-09-23T07:56:22.431Z"
    ]
  },
  "sort": [
    1600847782431
  ]
}

 

Grok ㄷㅣ버깅 사이트 -> grokconstructor.appspot.com/do/match

 

Test grok patterns

 

grokconstructor.appspot.com