본문 바로가기

빅데이터/Kafka

카프카 커넥트/커넥터 내부 살펴보기 - 2.8기준, sinkTask 위주로

카프카 커넥트는 data sink/source를 위한 파이프라인을 운영하기 위해 만들어진 모듈입니다. 크게 두가지 Connect 타입을 지원하고 있습니다. standalone부터 distributed까지 코드를 보면서 내부 구조를 살펴보겠습니다.

 

우선 살펴봐야할 것은 cli입니다. ConnectDistributed.java 또는 ConnectStandalone.java에서 시작합니다.

커넥트를 실행할 때는 다음과 같은 명령어로 실행하기 때문에 위 자바 파일이 시작점이라 볼 수 있습니다.

// standalone일 경우
$ bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]
// distributed일 경우
$ bin/connect-distributed worker.properties

ConnectStandalone

standalone모드 커넥트는 내부적으로 태스크/커넥터가 실행되는 것은 동일하지만 단일 워커로 실행된다는 점이 다릅니다. 

/**
 * <p>
 * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
 * distributed. Instead, all the normal Connect machinery works within a single process. This is
 * useful for ad hoc, small, or experimental jobs.
 * </p>
 * <p>
 * By default, no job configs or offset data is persistent. You can make jobs persistent and
 * fault tolerant by overriding the settings to use file storage for both.
 * </p>
 */
public class ConnectStandalone {
    private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);

    public static void main(String[] args) {

        if (args.length < 2 || Arrays.asList(args).contains("--help")) {
            log.info("Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...]");
            Exit.exit(1);
        }
        ....

주석에 적힌것 처럼 싱글 프로세스로 실행되기 때문에 ad hoc, small, 실험적 기능에 적합하다고 적혀있습니다. 기본적으로 실행을 위해 옵션은 파일로 받는 것을 확인할 수 있습니다. 위 코드 아래로는 플러그인 로딩, RestServer 초기화, 클러스터ID 가져오기, 커넥터 클라이언트 컨피그 오버라이드, 워커(Worker) 초기화를 수행합니다.

 

여기서 워커는 커넥트 프로세스 하나를 뜻합니다. 워커 내부는 다음과 같습니다.

/**
 * <p>
 * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
 * data to/from Kafka.
 * </p>
 * <p>
 * Since each task has a dedicated thread, this is mainly just a container for them.
 * </p>
 */
public class Worker {

    public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);

    private static final Logger log = LoggerFactory.getLogger(Worker.class);

    protected Herder herder;
    private final ExecutorService executor;
    private final Time time;
    private final String workerId;
    //kafka cluster id
    private final String kafkaClusterId;
    private final Plugins plugins;
    ...

워커는 여러개의 태스크를 다이나믹하게 실행시키는 역할을 수행합니다. 더 아래로 내려가보면 Worker에서 startTask를 통해 태스크를 셋팅하고 Executor로 실행시키는 것을 볼 수 있습니다.

/**
 * Start a task managed by this worker.
 *
 * @param id the task ID.
 * @param connProps the connector properties.
 * @param taskProps the tasks properties.
 * @param statusListener a listener for the runtime status transitions of the task.
 * @param initialState the initial state of the connector.
 * @return true if the task started successfully.
 */
public boolean startTask(
        ConnectorTaskId id,
        ClusterConfigState configState,
        Map<String, String> connProps,
        Map<String, String> taskProps,
        TaskStatus.Listener statusListener,
        TargetState initialState
) {
    final WorkerTask workerTask;
    final TaskStatus.Listener taskStatusListener = workerMetricsGroup.wrapStatusListener(statusListener);
    try (LoggingContext loggingContext = LoggingContext.forTask(id)) {
        log.info("Creating task {}", id);

        if (tasks.containsKey(id))
            throw new ConnectException("Task already exists in this worker: " + id);

        connectorStatusMetricsGroup.recordTaskAdded(id);
        ClassLoader savedLoader = plugins.currentThreadLoader();
        try {
            String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
            ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
            savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
            final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
            final TaskConfig taskConfig = new TaskConfig(taskProps);
            final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
            final Task task = plugins.newTask(taskClass);
            log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());

            // By maintaining connector's specific class loader for this thread here, we first
            // search for converters within the connector dependencies.
            // If any of these aren't found, that means the connector didn't configure specific converters,
            // so we should instantiate based upon the worker configuration

            ... // 메시지키,메시지값, 헤더 컨버터 코드

            workerTask = buildWorkerTask(configState, connConfig, id, task, taskStatusListener,
                    initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
            workerTask.initialize(taskConfig);
            Plugins.compareAndSwapLoaders(savedLoader);
        } catch (Throwable t) {
            log.error("Failed to start task {}", id, t);
            // Can't be put in a finally block because it needs to be swapped before the call on
            // statusListener
            Plugins.compareAndSwapLoaders(savedLoader);
            connectorStatusMetricsGroup.recordTaskRemoved(id);
            taskStatusListener.onFailure(id, t);
            return false;
        }

        WorkerTask existing = tasks.putIfAbsent(id, workerTask);
        if (existing != null)
            throw new ConnectException("Task already exists in this worker: " + id);

        executor.submit(workerTask);
        if (workerTask instanceof WorkerSourceTask) {
            sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
        }
        return true;
    }
}

초기화가 완료된 태스크는 executor.submit()을 통해 실행됩니다. 여기서 사용되는 executor는 newCachedThreadPool()로 자동 초기화 됩니다. newCachedThreadPool()은 필요에 따라 사용하는 스레드로서 기존에 사용되는 스레드가 있으면 그대로 사용하고, 만약 더이상 용하지 않는다면 해당 태스크는 종료된다는 특징을 가집니다. 결과적으로 태스크는 newCachedThreadPool() 내부 생명주기에 종속되는 스레드로 운영된다고 볼 수 있습니다. 

 

다시 ConnectStandalone로 돌아와서 코드를 살펴보겠습니다. 

Herder herder = new StandaloneHerder(worker, kafkaClusterId, connectorClientConfigOverridePolicy);
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);

try {
    connect.start();
    for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
        Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
            if (error != null)
                log.error("Failed to create job for {}", connectorPropsFile);
            else
                log.info("Created connector {}", info.result().name());
        });
        herder.putConnectorConfig(
                connectorProps.get(ConnectorConfig.NAME_CONFIG),
                connectorProps, false, cb);
        cb.get();
    }
} catch (Throwable t) {
    log.error("Stopping after connector error", t);
    connect.stop();
    Exit.exit(3);
}

// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();

양치기라는 뜻의 Herder 클래스의 인스턴스가 만들어집니다. 이때 Herder는 두가지 종류가 있습니다. StandaloneHerder과 DistributedHerder입니다. StandaloneHerder는 단일 프로세스환경에서 인메모리 Herder로 운영됩니다.

/**
 * Single process, in-memory "herder". Useful for a standalone Kafka Connect process.
 */
public class StandaloneHerder extends AbstractHerder {
    private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);

    private final AtomicLong requestSeqNum = new AtomicLong();
    private final ScheduledExecutorService requestExecutorService;

Herder에 대한 자세한 사항은 Herder Interface의 주석에서 살펴볼 수 있습니다.

/**
 * <p>
 * The herder interface tracks and manages workers and connectors. It is the main interface for external components
 * to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class
 * knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so
 * the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one
 * of the workers.
 * </p>
 * <p>
 * This class must implement all the actions that can be taken on the cluster (add/remove connectors, pause/resume tasks,
 * get state of connectors and tasks, etc). The non-Java interfaces to the cluster (REST API and CLI) are very simple
 * wrappers of the functionality provided by this interface.
 * </p>
 * <p>
 * In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case,
 * the implementation will mainly be delegating tasks directly to other components. For example, when creating a new
 * connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the
 * same process, so the standalone herder implementation can immediately instantiate and start the connector and its
 * tasks.
 * </p>
 */
public interface Herder {

    void start();

    void stop();

    boolean isRunning();

주석을 읽어보면 알겠지만, 이 양치기라고 불리는 Herder클래스는 워커(프로세스)들과 커넥터들을 관리하고 트래킹하기 위한 목적으로 사용됩니다. 클러스터에 워커가 들어왔는지 빠졌는지 커넥터의 상태는 어떤지 모두 관리하는 역할인 것입니다. Herder는 커넥트와 커넥터의 핵심 로직이라고 볼 수 있고 분산모드와 단일모드를 나누는 큰 차이점이라 볼 수 있습니다.

 

다시 돌아가서 StandaloneHerder을 보면 putConnectorConfig()를 호출하고 대기상태(connect.awaitStop)로 들어가는 것을 볼 수 있습니다. putConnectorConfig()내부에서 무언가 일어나고 있다는 것을 알 수 있습니다.

@Override
public synchronized void putConnectorConfig(String connName,
                                            final Map<String, String> config,
                                            boolean allowReplace,
                                            final Callback<Created<ConnectorInfo>> callback) {
    try {
        validateConnectorConfig(config, (error, configInfos) -> {
            if (error != null) {
                callback.onCompletion(error, null);
                return;
            }

            requestExecutorService.submit(
                () -> putConnectorConfig(connName, config, allowReplace, callback, configInfos)
            );
        });
    } catch (Throwable t) {
        callback.onCompletion(t, null);
    }
}

StandaloneHerder의 putConnectorConfig는 requestExecutorService에 커넥터 configuration을 입력하여 스레드를 새로 만들어 운영하는 것을 알 수 있습니다. requestExecutorService는 newSingleThreadScheduledExecutor()로 동작합니다. 아마도 커넥터는 태스크가 아니므로 하나만 있어도 되기 때문에 task와 다른 Executor로 동작한다고 볼 수 있겠습니다.

 

submit()으로 다시 호출된 putConnectorConfig()에서는 updateConnectorTasks()를 통해 태스크를 실행시킵니다.

private void updateConnectorTasks(String connName) {
    if (!worker.isRunning(connName)) {
        log.info("Skipping update of connector {} since it is not running", connName);
        return;
    }

    List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
    List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);

    if (!newTaskConfigs.equals(oldTaskConfigs)) {
        removeConnectorTasks(connName);
        List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);
        configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
        createConnectorTasks(connName, configState.targetState(connName));
    }
}

기존 태스크 컨피그가 신규 컨피그의 차이점을 확인하고 동일하지 않다면 신규 태스크를 실행시킵니다. 즉, 기존에 컨피그가 없다면 신규이고 만약 기존에 있었다면 새로운 컨피그로 업어쳐서 커넥터 태스크를 새로만드는 격인 것이죠.

private void createConnectorTasks(String connName, TargetState initialState) {
    Map<String, String> connConfigs = configState.connectorConfig(connName);

    for (ConnectorTaskId taskId : configState.tasks(connName)) {
        Map<String, String> taskConfigMap = configState.taskConfig(taskId);
        worker.startTask(taskId, configState, connConfigs, taskConfigMap, this, initialState);
    }
}

최종적으로 앞서 살펴봤던데로 worker의 startTask가 실행되는 것을 볼 수 있습니다. startTask()를 통해 실행되는 것은 WokerTask입니다. 햇갈리실 수도 있지만 WorkerTask는 태스크 스레드를 뜻합니다. 당연하게도 기존에 말하던 워커는 현재 설명하는 WorkerTask와는 다릅니다.

/**
 * Handles processing for an individual task. This interface only provides the basic methods
 * used by {@link Worker} to manage the tasks. Implementations combine a user-specified Task with
 * Kafka to create a data flow.
 *
 * Note on locking: since the task runs in its own thread, special care must be taken to ensure
 * that state transitions are reported correctly, in particular since some state transitions are
 * asynchronous (e.g. pause/resume). For example, changing the state to paused could cause a race
 * if the task fails at the same time. To protect from these cases, we synchronize status updates
 * using the WorkerTask's monitor.
 */
abstract class WorkerTask implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(WorkerTask.class);
    private static final String THREAD_NAME_PREFIX = "task-thread-";

    protected final ConnectorTaskId id;
    private final TaskStatus.Listener statusListener;
    protected final ClassLoader loader;
    protected final StatusBackingStore statusBackingStore;
    protected final Time time;
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private final TaskMetricsGroup taskMetricsGroup;
    private volatile TargetState targetState;
    private volatile boolean stopping;   // indicates whether the Worker has asked the task to stop
    private volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)

WorkerTask는 개별 태스크 스레드를 뜻하고 그 자체로 스레드를 공유하지 않고 자체적으로 돌아간다는 특징을 가집니다. 이전에 살펴봤다싶이 이 WorkerTask는 태스크별로 따로 newCachedThreadPool()로 실행됩니다. 그리고 내부적으로는 사용자가 작성한(user-specified) 태스크로 실행됩니다. 우리가 만들었던 SinkTask, SourceTask의 클래스 내부동작은 이 WorkerTask()가 호출함으로서 실행된다고 볼 수 있습니다.

 

여기서 주의사항을 볼 수 있는데 바로 locking에 관련된 것입니다. 태스크는 하나의 고유한 스레드에서 운영되기 때문에 자체적으로 상태를 유지하고 운영하는 것이 정말 중요합니다. 특히 pause/resume과 같은 비동기 동작에서 주의해야 합니다. 예를 들어 태스크의 상태를 두개 이상의 명령에 이해 중복처리 된다면 태스크 장애가 발생할 수 있습니다. 이런 케이스를 막기 위해서 내부적으로 status는 동기로 관리하는 것을 볼 수 있습니다. 이 부분은 사용자의 로직이 아닌 WorkerTask의 특징이라 볼 수 있습니다.

 

우리는 커넥터의 종류가 sink와 source가 있는 것을 알고 있습니다. 여기까지 보면 알 수 있다시피 커네터는 그 자체로서 sink와 source로 나뉘지 않습니다. task에서야 sink동작과 source동작이 나뉩니다. WorkerTask는 Runner 인터페이스이고 실질적으로 실행되는 것은 WorkerSinkTask또는 WorkerSouceTask입니다. 여기서는 WorkerSinkTask를 살펴봅시다.

/**
 * WorkerTask that uses a SinkTask to export data from Kafka.
 */
class WorkerSinkTask extends WorkerTask {
    private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);

    private final WorkerConfig workerConfig;
    private final SinkTask task;
    private final ClusterConfigState configState;

sinkTask에서 사용자의 로직에 의해 주요하게 동작하는 것은 initialize, put, flush, stop입니다. 그러므로 WorkerSinkTask에서 각 메서드가 어디서 호출되고 어떻게 동작하는지 살펴보면 좋겠습니다. 사용자가 작성한 SinkTask는 WorkerSinkTask가 생성될 때 정의됩니다. 여기서는 initialize와 put의 동작만 살펴보겠습니다.

1) initialize

/**
 * Initializes and starts the SinkTask.
 */
@Override
protected void initializeAndStart() {
    SinkConnectorConfig.validate(taskConfig);

    if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
        List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig);
        consumer.subscribe(topics, new HandleRebalance());
        log.debug("{} Initializing and starting task for topics {}", this, Utils.join(topics, ", "));
    } else {
        String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
        Pattern pattern = Pattern.compile(topicsRegexStr);
        consumer.subscribe(pattern, new HandleRebalance());
        log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);
    }

    task.initialize(context);
    task.start(taskConfig);
    log.info("{} Sink task finished initialization and start", this);
}

configuration을 썰정하고 정의하는 것입니다. 별 다른 특징은 없고 task.start()를 통해 사용자의 리소스를 초기화하는 코드를 실행시킵니다.

2) put

사실 SinkTask의 핵심이라 볼 수 있습니다. Consumer로 동작하는 SinkTask가 어떻게 poll()부터 데이터를 처리하는지 살펴보겠습니다. 우선 봐야할 부분은 WorkerSinkTask의 poll()입니다. 이 메서드가 어떻게 매번 Record를 가져오는지 보겠습니다. 우선 poll()이 호출하는 것은 iteration() 메서드입니다. 이 메서드는 execute()이후에 매번 반복하면서 실행됩니다. 우리가 무한 for구문에서 consumer.poll()하는 것과 동일합니다. 최고 윗 단위부터 차례대로 살펴보겠습니다.

@Override
public void execute() {
    log.info("{} Executing sink task", this);
    // Make sure any uncommitted data has been committed and the task has
    // a chance to clean up its state
    try (UncheckedCloseable suppressible = this::closePartitions) {
        while (!isStopping())
            iteration();
    } catch (WakeupException e) {
        log.trace("Consumer woken up during initial offset commit attempt, " 
            + "but succeeded during a later attempt");
    }
}

!isStopping() 이라면 계속해서 iteration()을 호출합니다.

protected void iteration() {
    final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);

    try {
        long now = time.milliseconds();

        // Maybe commit
        if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
            commitOffsets(now, false);
            nextCommit = now + offsetCommitIntervalMs;
            context.clearCommitRequest();
        }

        final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);

        // Check for timed out commits
        if (committing && now >= commitTimeoutMs) {
            log.warn("{} Commit of offsets timed out", this);
            commitFailures++;
            committing = false;
        }

        // And process messages
        long timeoutMs = Math.max(nextCommit - now, 0);
        poll(timeoutMs);
    } catch (WakeupException we) {
        log.trace("{} Consumer woken up", this);

        if (isStopping())
            return;

        if (shouldPause()) {
            pauseAll();
            onPause();
            context.requestCommit();
        } else if (!pausedForRedelivery) {
            resumeAll();
            onResume();
        }
    }
}

흥미로운 부분입니다. iteration()은 매번 호출됩니다. 커밋이 필요하다면 커밋을 수행하고 커밋관련 로직이 완전히 수행되고난 이후에 프로세스를 실행하기 위해 poll()메서드를 호출합니다. 당연하게도 컨슈머의 안전한 종료를 위해 WakeupException이 적용된 것도 볼 수 있습니다. 그렇다면 poll()에는 kafkaConsumer가 있음을 짐작할 수 있습니다.

/**
 * Poll for new messages with the given timeout. Should only be invoked by the worker thread.
 */
protected void poll(long timeoutMs) {
    rewind();
    long retryTimeout = context.timeout();
    if (retryTimeout > 0) {
        timeoutMs = Math.min(timeoutMs, retryTimeout);
        context.timeout(-1L);
    }

    log.trace("{} Polling consumer with timeout {} ms", this, timeoutMs);
    ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
    assert messageBatch.isEmpty() || msgs.isEmpty();
    log.trace("{} Polling returned {} messages", this, msgs.count());

    convertMessages(msgs);
    deliverMessages();
}

매번 호출되는 poll()는 rewind -> pollConsumer -> convert -> deliver 순서로 동작합니다. rewind()메서드는 레코드의 처리 지연이나 장애가 발생했을 경우 consumer.seek()를 통해 다시 오프셋 위치를 변경시키는 작업입니다. 이 로직에 대한 상세 내용은 https://issues.apache.org/jira/browse/KAFKA-2480 에서 확인할 수 있습니다.

 

[KAFKA-2480] Handle non-CopycatExceptions from SinkTasks - ASF JIRA

Currently we catch Throwable in WorkerSinkTask, but we just log the exception. This can lead to data loss because it indicates the messages in the put(records) call probably were not handled properly. We need to decide what the policy for handling these ty

issues.apache.org

Currently we catch Throwable in WorkerSinkTask, but we just log the exception. This can lead to data loss because it indicates the messages in the put(records) call probably were not handled properly. We need to decide what the policy for handling these types of exceptions should be – try repeating the same records again, risking duplication? or skip them, risking loss? or kill the task immediately and require intervention since it's unclear what happened?

이후에는 pollConsumer()메서드를 호출하여 레코드들을 가져옵니다.

private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {
    ConsumerRecords<byte[], byte[]> msgs = consumer.poll(Duration.ofMillis(timeoutMs));

    // Exceptions raised from the task during a rebalance should be rethrown to stop the worker
    if (rebalanceException != null) {
        RuntimeException e = rebalanceException;
        rebalanceException = null;
        throw e;
    }

    sinkTaskMetricsGroup.recordRead(msgs.count());
    return msgs;
}

드디어 consumer.poll()이 보입니다. poll() 이후에 sensor라고 불리는 클래스에 msg.count를 기록하는 점이 독특합니다. 

/**
 * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
 * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
 * of metrics about request sizes such as the average or max.
 */
public final class Sensor {

    private final Metrics registry;
    private final String name;
    private final Sensor[] parents;
    private final List<StatAndConfig> stats;

Sensor는 메트릭을 기록하는데 활용되는데요. 어떤 용도로 사용되는지는 따로 살펴봐야할것 같습니다.

 

이후에 deliverMessage()를 통해 어떤 로직이 동작하는지 살펴봅시다.

private void deliverMessages() {
    // Finally, deliver this batch to the sink
    try {
        // Since we reuse the messageBatch buffer, ensure we give the task its own copy
        log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
        long start = time.milliseconds();
        task.put(new ArrayList<>(messageBatch));
        // if errors raised from the operator were swallowed by the task implementation, an
        // exception needs to be thrown to kill the task indicating the tolerance was exceeded
        if (retryWithToleranceOperator.failed() && !retryWithToleranceOperator.withinToleranceLimits()) {
            throw new ConnectException("Tolerance exceeded in error handler",
                retryWithToleranceOperator.error());
        }
        recordBatch(messageBatch.size());
        sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
        currentOffsets.putAll(origOffsets);
        messageBatch.clear();
        // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
        // the task had not explicitly paused
        if (pausedForRedelivery) {
            if (!shouldPause())
                resumeAll();
            pausedForRedelivery = false;
        }
    } catch (RetriableException e) {
        log.error("{} RetriableException from SinkTask:", this, e);
        // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
        // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
        pausedForRedelivery = true;
        pauseAll();
        // Let this exit normally, the batch will be reprocessed on the next loop.
    } catch (Throwable t) {
        log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not "
                + "recover until manually restarted. Error: {}", this, t.getMessage(), t);
        throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.", t);
    }
}

주석에도 나와있다싶이 결국!!!! 여기서 배치가 sink됩니다. 긴 여정이였습니다. 그런데 deliverMessages()에는 message(ConsumerRecord)가 없습니다. 왜냐면 convert에서 공용으로 사용하는 messageBatch에 데이터를 넣고 재사용하기 때문입니다. 그렇기 때문에 task 스레드에서 사용하는 messageBatch를 ArrayList에 넣어 활용하게 됩니다. task.put()로 전달하는 것이죠. 이후에 데이터가 처리완료되었을 경우 messageBatch.clear()하는 방식으로 버퍼를 비웁니다.

ConnectDistributed

distributed모드 커넥트는 복수 워커로 실행된다는 점이 다릅니다. 

/**
 * <p>
 * Command line utility that runs Kafka Connect in distributed mode. In this mode, the process joints a group of other workers
 * and work is distributed among them. This is useful for running Connect as a service, where connectors can be
 * submitted to the cluster to be automatically executed in a scalable, distributed fashion. This also allows you to
 * easily scale out horizontally, elastically adding or removing capacity simply by starting or stopping worker
 * instances.
 * </p>
 */
public class ConnectDistributed {
    private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);

ConnectDistributed의 내부를보면 가장 다른 점이 DistributedHerder로 실행한다는 점입니다.

// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. This is easier than having to track and own the lifecycle ourselves.
DistributedHerder herder = new DistributedHerder(config, time, worker,
        kafkaClusterId, statusBackingStore, configBackingStore,
        advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);

final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
    connect.start();
} catch (Exception e) {
    log.error("Failed to start Connect", e);
    connect.stop();
    Exit.exit(3);
}

DistributedHerder는 standalone보다 훨씬 복잡한 형태입니다.

/**
 * <p>
 *     Distributed "herder" that coordinates with other workers to spread work across multiple processes.
 * </p>
 * <p>
 *     Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized
 *     group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current
 *     configuration state is (where it is in the configuration log). The group coordinator selects one member to take
 *     this information and assign each instance a subset of the active connectors & tasks to execute. This assignment
 *     is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose
 *     to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once
 *     an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker.
 * </p>
 * <p>
 *     In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment
 *     to select a leader for this generation of the group who is responsible for other tasks that can only be performed
 *     by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks,
 *     (and therefore, also for creating, destroy, and scaling up/down connectors).
 * </p>
 * <p>
 *     The DistributedHerder uses a single thread for most of its processing. This includes processing
 *     config changes, handling task rebalances and serving requests from the HTTP layer. The latter are pushed
 *     into a queue until the thread has time to handle them. A consequence of this is that requests can get blocked
 *     behind a worker rebalance. When the herder knows that a rebalance is expected, it typically returns an error
 *     immediately to the request, but this is not always possible (in particular when another worker has requested
 *     the rebalance). Similar to handling HTTP requests, config changes which are observed asynchronously by polling
 *     the config log are batched for handling in the work thread.
 * </p>
 */
public class DistributedHerder extends AbstractHerder implements Runnable {
    private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private final Logger log;

여러 워커들에 적절히 분산하여 태스크를 나누어주고 state를 관리한다는 점이 가장 어려운 점 중 하나 일것입니다. 태스크 분배로직에 대한 설명은 https://voidmainvoid.tistory.com/473?category=698302 를 참고하세요.

 

카프카 커넥트의 태스크 밸런싱 로직, DistributedHerder(양치기) 그리고 IncrementalCooperativeAssignor 내부

Herder; 명사 1. 양치기, 목부 카프카 커넥트는 워커, 커넥터, 태스크로 이루어져 있습니다. 워커는 카프카 커넥트 프로세스를 뜻하며 커넥터와 태스크를 실행시키기 위한 프로세스입니다. 커넥

blog.voidmainvoid.net

정리

커넥트는 분산 아키텍처에 알맞게 잘 동작하도록 만들어져 있습니다. 그에 따라 내부 로직도 상당히 어렵다는 것을 알 수 있습니다. 내부적으로 엄청나게 많은 고민이 쌓여있습니다. 내부 동작 방식을 아는 것은 운영을 잘 하는 것 만큼 중요합니다. 지금까지 알아본 아키텍처를 그림으로 그려보면 다음과 같습니다.

 

반응형