본문 바로가기

빅데이터/하둡

하둡 맵리듀스 접근법

맵리듀스는 일괄질의처리기이다. 합리적인 시간 내에 결과를 보여주는 능력을 가지고 있음. 질의를 실행한 이후 수초내에 결과를 받는 시스템과는 다름. 하둡은 기존 병렬프로그래밍과 다른 분산 프로그래밍 관점에서 바라봐야한다. MPI와 같은 병렬프로그래밍은 사용자로 하여금 직접적인 알고리즘을 짜도록 도와주지만 하둡은 최상위 수준에서만 동작한다. 개발자는 데이터의 흐름을 신경쓰지 않아도 되고 데이터 모델 관점에서만 생각하면된다.

 

맵리듀스와 같은 분산프로그래밍에서 어려운 점은 원격 프로세스의 실패/성공 여부이다. 프로세스(태스크)가 실패했을 경우를 감지하여 장애가 없는 머신에서 다시 돌려야한다. 하둡의 맵리듀스는 태스크간의 의존성이 없는 아키텍처이기 때문에 실패에 크게 고민하지 않아도 된다.  맵리듀스는 맵의 실패보다 리듀스의 실패에 더 주의한다. 리듀스가 집중하면 맵의 출력을 반드시 다시 추출해야하기 때문이다.

 

맵리듀스 작업의 예시(word count)

public class WordCount {

// 맵
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());
          while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one); // key/value
          }
      }
  }

// 리듀서
  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values,) throws IOException, InterruptedException {
          int sum = 0;
          for (IntWritable val : values) {
          	sum += val.get();
          }
          result.set(sum);  // key/value
          context.write(key, result);
      }
  }

// main 수행 애플리케이션
  public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
      }
      Job job = new Job(conf, "word count");
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
  } 
}

 

하둡은 YARN을 통해 자원을 관리한다. 맵리듀스의 잡(job)은 클라이언트가 수행하는 작업의 기본단위이다. 잡은 입력데이터, 맵리듀스 프로그램, 설정 정보로 구성된다.  하둡은 맵 태스크와 리듀스 태스크로 나뉘어 실행한다.

 

하둡의 맵리듀스 잡은 스플릿으로 나누어 수행한다. 전체 데이터를 한번에 처리하는 것보다 나누어서 수행하는 것이 더 효과적이기 때무이다. 스플릿이 세분화 될수록 데이터는 더 잘게 쪼개진다. 스플릿의 크기가 너무 작으면 스플릿관리와 맵 태스크 생성의 오버헤드가 커지기 때문에 스플릿의 크기는 보통 HDFS에서 사용하는 기본 크기(128MB)로 설정한다. 

 

하둡의 HDFS는 각 데이터노드마다 데이터가 존재하는데, 각 데이터가 존재하는 데이터노드에서 맵 태스크를 실행할 때 가장 빠르게 동작한다. 이를 데이터 로컬리티라고 부른다. 그러나 항상 데이터 로컬리티 최적화가 이루어진다는 보장이 없으므로 데이터 로컬리티가 없는 데이터는 블록 복제본이 저장된 다른 데이터 노드의 데이터를 사용한다. 최악의 경우에는 저장된 데이터가 없는 데이터 노드에 네트워크 전송을 하여 데이터를 처리한다.

 

HDFS클러스터는 네임노드와 데이터 노드로 구성되어 있다. 네임노드는 파일 시스템의 네임스페이스를 관리하며 파일과 디렉토리에 대한 메타데이터를 가지고 있다. 즉 네임노드는 데이터가 어느 데이터 노드에 있는지 알 수 있는 것이다. HDFS클라이언트는 POSIX 와 유사한 파일시스템이기 때문에 ls, mkdir, rm 과 같은 익숙한 명령어들을 제공한다. 데이터 노드는 파일을 가지고 있는 실질적인 서버이다. 네임노드의 요청에 따라 블록을 저장, 탐색한다.

 

맵 태스크의 결과는 HDFS가 아닌 로컬 디스크에 저장된다. 맵의 결과는 리듀스가 사용하기 위한 결과물이고 맵의 결과 데이터는 최종 데이터를 만들기 위한 중간단계이기 때문이다. 

 

리듀스 태스크는 일반적으로 모든 맵 태스크의 결과물을 입력으로 받기 때문에 데이터 로컬리티장점이 없다. 리듀스의 결과는 최종결과이고 HDFS에 최종 적재된다. 

입력데이터 -> 맵 -> 셔플 -> 리듀스 -> 출력

만약 리듀스 태스크가 2개 잇아이라면 맵 태스크는 리듀스 수만큼 파티션을 생성한다. 각 파티션은 파티셔닝 알고리즘에 의해 여러 파티션에 저장되게 된다. 카프카와 유사하게 기본 파티셔닝 알고리즘은 해시 알고리즘이다. 리듀스 태스크는 파티션을 확인하고 데이터를 리듀스한다.


맵리듀스 잡이 사용하는 네트워크 리소스는 한계가 있다. 맵과 리듀스 태스크 사이 데이터 전송을 최소화 하기 위해 맵 결과를 처리하는 컴바이너(Combiner)함수가 제공된다. 컴바이너 함수는 분산된맵 결과를 반복해서 연산하지 않도록 도와준다.

 

예를 들어 각 년도별 온도를 측정하는 맵리듀스가 있다고 가정하자.

1번 데이터 노드
1995, 25...
1995, 24...
1995, 27...
1995, 22...
2005, 17...
....

2번 데이터 노드
1995, 17...
1995, 12...
2004, 17...

컴바이너를 사용하기 전에는 아래와 같은 연산 과정을 거쳐야 한다.

# 맵 프로세스
1번 데이터 노드
(1995, 25)
(1995, 24)
(1995, 27)
(1995, 22)
2번 데이터 노드
(1995, 17)
(1995, 12)

# 리듀스 프로세스
(1995, [25,24,27,22,17,12])

# 결과
(1995, 27)

반면 컴바이너를 사용하면 다음과 같이 연산과정이 간소화 된다.

# 맵 프로세스(with Combiner)
1번 데이터 노드
(1995, 27)
2번 데이터 노드
(1995, 17)

# 리듀스 프로세스
(1995, [27,17])

# 결과
(1995, 27)

훌륭하게 동작하지만 모든 함수에서 컴바이너를 사용할수는 없다. 예를 들어 mean, avg 와 같은 연산은 컴바이너로 인해 결과가 예상한대로 나오지 않을 수도 있기 때문이다. 컴바이너 적용 여부를 검토하고 만약 사용할 수 있다면 컴바이너를 도입하여 성능향상을 꾀할 수 있다.

 


맵리듀스가 하둡 클러스터 위에서 실행될 때는 YARN(Yet Another Resource Negotiator)를 통해 제어된다. YARN은 클러스터의 자우너을 요청하고 사용하기 위한 API를 제공한다. YARN은 리소스 매니저와 노드 매니저를 통해 서비스를 제공한다. 리소스 매니저는 클러스터 전체 자원의 사용량을 관리한다. 노드 매니저는 모든 서버에서 실행됙 컨테이너를 구동, 모니터링하는 역할을 한다. 맵리듀스를 YARN을 통해 실행을 요청하면 리소스 매니저에 접속하여 애플리케이션 마스터 프로세스 구동을 요청한다. 그러면 리소스 매니저는 컨테이너에서 애플리케이션 마스터를 시작할 수 있는 노드 매니저를 찾는 방식으로 구동된다.

 

 

 

참고서적 : www.yes24.com/Product/Goods/36151445

 

 

 

 

 

반응형