원영's
life is short
[Stream Process as a Platform] Netflix의 실시간 스트림 처리 플랫폼 Keystone 소개

아래 포스트는 Keystone Real-time Streaming Processing Platform(medium)을 읽고 정리한 글입니다.

원글 글쓴이 : Zhenzhong Xu in Real-time Data Infrastructure team 


Keystone Stream Processing Platform은 넷플릭스의 Data-driven culture을 가능케한 Data backbone 이자 infrastructure의 필수적인 부분을 뜻한다. Netflix Keystone은 2015년 기준, 8,000,000tps의 데이터를 처리하고 있으며 약 1PB 양의 데이터를 처리하고 있다.



오늘날 Keystone Platform은 두가지의 핵심 서비스를 제공한다.

1) Data Pipeline

Routing service, Messaging service를 통해 Data의 producing, collecting, processing, aggregating 역할을 하며, microservice event를 실시간으로 전달해주기도 한다.


2) Stream Processing as a Service(SPaaS)

사용자들이 Stream processing application을 빌드하고 커스텀하게 운영할 수 있도록 제공한다. 또한 플랫폼에서 Data scale, operation, domain expertise를 제공하여 사용자들은 Application logic에 집중하도록 도와준다.


그림. 넷플릭스 키스톤 파이프라인


이제, 상기 그림과 같은 Keystone platform을 설계하기 위해 직면했던 도전과제(challenges)설계원칙(Design principle)을 살펴보자.



Challenges

Netflix Keystone과 같이 대규모 데이터 플랫폼을 운영하기 위해서는 많은 장애물이 발생하고 이를 극복해나가야 한다. Netflix에서 keystone을 운영하기 위한 도전과제를 아래와 같이 정의하였다.

1. Scale

넷플릭스 서비스는 약 190개국의 1억 3천만 구독자가 사용한다. 데이터 스트리밍 플랫폼은 하루에 1조번 이상의 event와 페타바이트(petabyte)단위의 데이터를 처리하게 된다. 그러므로 구독자가 늘어남에 따라 Keystone platform은 scale out되는 구조를 가져야 한다.

2. Diverse Use-cases

Keystone Routing Service : 이 서비스는 사용자의 요구에 따라 입수되는 이벤트를 라우팅하는 역할을 한다. 각 라우팅된 이벤트들은 병렬로 스트리밍 프로세싱 처리 된다. 사용자는 선택적으로 해당 이벤트들에 대해 filter 혹은 aggregation 가능하다. 입수된 데이터들은 batch/stream processing을 위해 storage에 저장된다.


Stream Processing as a Service: SPaaS 플랫폼은 2017년부터 넷플릭스에서 사용되었다. 아직 성숙되지 않은 플랫폼이지만 아래와 같이 몇가지 고려해야할 사항이 있다.(Job state, Job Complexity, Traffic pattern, Failure recovery 등)

3. Multi-tenancy

Keystone은 수천개의 스트리밍 작업을 지원하면서 데이터 전달, 분석을 수행하고 microservice architecture pattern을 돌아가게 한다. 이러한 스트리밍 작업의 다양한 특성에도 안정적으로 데이터플랫폼을 운영하려면, 사용자들에게 의미있는 서비스 수준을 보장하기 위해 infrastructure level에서 실행&운영의 격리가 필요하며, 공유 자원의 overhead를 최소화 해야한다.

4. Elasticity

보통 스트림의 트래픽 패턴은 고정적이지만, 예상치 못한 급격한 트래픽 변화에 따른 시스템 자동 조정 및 대응이 필요하다.

5. Cloud Native Resiliency

넷플릭스의 마이크로서비스들은 모두 cloud service에 올라가 있다. Cloud service로 인한 네트워크 장애, 인스턴스 장애, regional disaster failure등 모든 단계에서 장애를 모니터링 탐지 할 수 있는 시스템을 설계해야한다.

6. Operation overhead

Keystone은 수천개의 라우팅 작업과 스트리밍 애플리케이션을 서비스한다. 모든 스트림을 수동으로 관리하게 되면 플랫폼팀에 의존하게 되므로, 운영하는데 비용이 증가하게된다. 그러므로, 사용자는 스트리밍 서비스의 각 job에 대해 lifecycle을 명시해야하며, 인프라는 가능한한 자동화 해야한다.

7. Agility

넷플릭스는 변경사항에 대해 하루에도 여러번 신속하게 개발하고 배포 할 수 있기를 원한다. 또한 사용자도 또한 동일한 수준으로 서비스를 자신있게 사용 할 수 있도록 플랫폼을 제공해주고자 한다.


Platform Mindset & Design Principle

Netflix Keystone 플랫폼의 주요 목표 중 하나는 다른 팀이 비즈니스 로직에 집중할 수 있께 하여 실험, 구현 및 스트림 처리 작업을 쉽게 수행할 수 있도록 도와주는 것이다. 또한 실패에 대해 생각을 해야하며, user과 플랫폼의 관계, 그리고 스트리밍 프로세스를 운영함에 있어 resource(CPU, Network, Memory)에 대한 효율적인 운용방법에 대해 고민해야한다.




Netflix's Approach

앞서 언급한 문제점들과 설계원칙을 고려하여 Key stone을 설계하였고 각 세부 설계 내용은 아래와 같다.

1. Declarative Reconciliation

Declarative Reconciliation protocol(선언적 조화?)는 전체 아키텍쳐에서 사용된다. 'Source of Truth(진실의 근원)'는 AWS RDS를 사용하여 저장한다. 이것은 Keystone의 전체시스템의 Single source of truth이다. 만약 Kafka cluster가 ZK의 실패로 인해 날라간다면, 넷플릭스는 항상 'source of truth'을 기반으로 전체 클러스터를 재창조 할 수 있다.


글쓴이 : 아마도 넷플릭스 키스톤의 모든 플랫폼, 모든 job들에 대해 source로서 선언을 해서 사용한다는 뜻을 풀어 쓴것 같다. Infra 단위에서는 Infrastructure as a Code(IaaC)를 말한 것 같고, Data Job단위로는 각 data에 대한 정보를 특정 저장공간에 저장하여 cloud에 존재하느 모든 cluster가 실패하여 동작을 멈추더라도 재빨리 재시작 가능하다는 점을 말하고 싶은 것으로 보인다.


2. Deployment Orchestration

Netflix 자체 Continuous deployment tool인 스피나커를 활용하여 application을 배포한다. Streaming dataflow 엔진을 기반으로 한 스트리밍 & 배치 프로세싱 플랫폼인 Flink의 효율적인 운영에도 spinnaker을 사용한다.

3. Self-service Tooling

For routing jobs : 사용자들은 자신이 원하는 event에 대해 filtering을 걸거나 elasticsearch, hive 혹은 기타 real-time consuming downstream으로 sink되도록 설정 가능하다. UI를 통해 사용자는 간편하게 input output을 설정할 수 있다.

스크린샷. Keystone platform | 사용자가 직접 데이터 입수부터 저장까지 경로를 설정 할 수 있다.



For custome SPaaS jobs : 넷플릭스는 사용자들에게 flink template code를 generate하고 CI를 제공해주는 command line tool을 제공한다. 사용자가 code를 수정하게 되면, CI automation으로 자동적으로 docker image로 생성, 등록, configuration이 유연하게 진행된다.

스크린샷. Keystone self service | Spinnaker과 비슷하게 생긴 Flink code CI tool


4. Stream Processing Engines

넷플릭스는 현재 Apache Flink를 활용하고 Keystone 분석 사용 사례를위한 생태계를 구축하는 데 중점을두고 있다. 넷플릭스는 운영 use case를 위해 Mantis 스트림 처리 엔진을 통합하고 확장 할 계획을 가지고 있다.

5. Connectors, Managed Operators and Application Abstraction

넷플릭스는 Kafka, Elasticsearch, Hive 등의 관리되는 커넥터를 제공한다. 커넥터는 사용자가 여러 동작, 일괄처리, 직렬화 등을 커스터마이징 할 수 있으며, 이를 통해 processing DAG(Directed Acyclic Graph)를 쉽게 만들 수 있다. 

글쓴이 : 유향 비순환 그래프란 방향 순환이 없는 무한 유향 그래프를 뜻하는데, 데이터의 입수가 된 이후의 데이터 처리동작들을 뜻하는 것으로 보인다.

그림. Directed Acyclic Graph


6. Configuration & Immutable Deployment

소프트웨어 멀티 테넌시 구성의 관리는 쉽지 않다. 넷플릭스는 동적으로 구성할수 있도록 만듦으로서, 쉽게 관리하고자 하였다. 기본 관리는 모두 파일에 저장되며, 구성을 환경 변수로서 사용 가능하다. 이 모든것들이 UI로 제공된다.

스크린샷. Keystone self service | kafka의 option들에 대해 환경변수를 설정 할 수 있다.


7. Self-healing

분산시스템에서 Failure은 불가피하다. 넷플릭스는 언제든지 failure이 날 수 있다고 예상하고 시스템이 자동적으로 감지, 장애 복구할 수 있도록 설계하였다. 구조적으로도 Keystone 플랫폼의 구성요소가 장애가 나더라도 영향이 커지지 않도록 격리된 아키텍쳐로 설계하였다.

8. Backfill & Rewind

한번더 말하지만, 분산시스템에서 Failure은 불가피하다. 그렇기에 가끔 사용자들에게 processing중인 job들에 대해 backfill 혹은 rewind를 요구하기도 한다. Failure상황에서 사용자들이 코드를 다시 짜는 등 불필요한 작업을 막기 위해서 UI를 사용하여 플랫폼에서 job을 재시작 할 수 있도록 설계, 제공하고 있다. 다만, 이러한 배경에는 stateless한 job들에 대해서만 제공된다.

9. Monitoring & Alerting

모든 개별 스트리밍 작업에는 개인화 된 모니터링 및 경고 대시보드를 제공한다. 이를 통해 넷플릭스의 데이터 플랫폼, 데이터 인프라 팀, application을 개발하는 팀 모두 같이 문제를 진단하고 모니터랑이 가능하다.


스크린샷. 넷플릭스 데이터 플랫폼 모니터링 웹페이지


10. Reliability & Testing

플랫폼 및 기본 인프라 서비스가 새로운 기능과 개선사항을 사용자에게 제공하기 위해 변경사항을 신속하게 채택해야한 하는 부담은 구조적으로 Bottom-up으로 올라간다. 그리고 Application이 개발되고 사용에 배포됨에 따라 신뢰성에 대한 압박은 Top-down 방식으로 내려가게 된다. 이렇게 양방향으로 들어오는 압력에서 넷플릭스 데이터 플랫폼팀이 신뢰를 얻기 위해서는 전체 스택을 효율적으로 테스트 할 수 있어야 한다. 이에 따라, 넷플릭스의 데이터 플랫폼 팀은 스트림 처리 패러다임에서 unit test, integration test, canary test 등을 모든 사용자가 이용할 수 있고  쉽게 적용 가능하게 하도록 만들게 하고 있으며, 이러한 노력은 진전을 이루고 있다.



Now and Future

위에서 설명한것과 같이 넷플릭스의 Keystone 플랫폼은 지난 18개월동안 수조의 데이터를 처리할 수 있는 것을 증명하였다. 넷플릭스의 여러팀들이 streaming use case들을 만들었고, 상용으로 배포되었다. 또한 높은 수준의 플랫폼을 구축할 수 있다는 것을 보여줄 수 있었다. 그러나 아직 나아가야할일이 많이 남아있다. 아래는 미래의 넷플릭스 데이터팀이 고민해야할 문제들이다.

# Schema
# Service layer to enable more flexible platform interaction
# Provide streaming SQL and other higher level abstractions to unlock values for different audiences
# Analytics & Machine Learning use cases
# Microservices event sourcing architectural pattern



결론

이번 medium글을 읽으면서 머리속에 박혀있던 Data-driven기업이란 정의가 다시금 재정의 되게 되었다. 이전에는 단순히 고객들의 Data를 모아서 Data를 분석하여, 의미 있는 수치, 자료들을 찾아서 다시금 고객들에게 효과적인 서비스를 제공하는 것에 머물러 있었다. 하지만 이것은 1차원적인 생각에 지나지 않았다. 

이미 Netflix는 2012년부터 Data-driven culture을 수행하고 있었다. (House of Card의 성공 비결 기사, 2013년) 여기서 더 나아가 Netflix는 데이터 기반의 플랫폼을 확산시키고 application개발자들도 data-driven development를 할 수 있는 Platform인 Keystone platform을 운영하기까지 이르렀다. 플랫폼을 신규 개발하고 운영하는 것은 resource도 많이들고 도전하기에 어려운일임에 틀림없다. 그럼에도 불구하고 Netflix가 Keystone이라는 플랫폼을 개발 시도했던 이유는 바로 Data-driven culture에서 data를 모든 개발자가 효과적이고 간편하게 적용, 사용할 수 있게 하는 것이 Data기반 기업으로서 성장하는데 필수불가결하기 때문이지 않을까? 라는 생각이 든다.

End of Document.




1  Comments,   0  Trackbacks
댓글 쓰기
매초마다 반복되는 shell script 한줄 구문

Getting started

보통 tail을 통해 file 내부의 변화를 확인하고 log를 확인하곤 한다.

가끔 linux file system을 통한 개발을 하다보면 반복적으로 command를 실행시키고 싶을때는 어떻게 하면 될까?


$ while true; do 명령문 sleep 시간; done

상기와 같이 수행하면 된다. 시간(seconds)마다 명령문을 입력한다.


Example

5초 마다 ls 명령어 호출

$ while true; do ls /app/home/; sleep 5; done


1초 마다 echo

$ while true; do echo hi; sleep 1; done


10초 마다 hdfs 를 통해 ls 명령어 호출

$ while true; do ./hdfs dfs -ls /2018/11/12/; sleep 10; done




0  Comments,   0  Trackbacks
댓글 쓰기
효과적인 소프트웨어 문서를 적는 방법

『Documenting Software Architectures』를 읽고 정리, 추가 한 포스트입니다.


호랑이는 죽어서 가죽을 남기고, 개발자는 문서를 남긴다. 


Why? 왜 문서화 해야하나?

개발자로서 협업을 하다 보면 항상 마주치는 현실이 있다. 


이건 왜 이렇게 짜셨나요? 

이렇게 설계한 이유는 무엇인가요? 

commit log를 보세요. 

히스토리에 대해 알고 싶어요. 

어디서 볼수 있나요? 

wiki를 보세요. 

wiki에 어디로 가야하나요? 

XXX page에 맨 아래쪽에 나와 있어요. 

여기 wiki내용은 code에 반영된 것과 다른데요? 

wiki 작성할 시간이 없어요. 

바빠서 작성을 못했네요.


인수인계를 할 때뿐만아니라 일상 실무에서도 많이 일어나는 개발자들 간의 커뮤니케이션 내용이다. 많은 개발자들이 히스토리나 아키텍쳐에 대해 알고 싶어하지만 없거나 정확하지 않은 내용이 있기 부지기수이다. 그러다보니 전래동화나 구전처럼 내려져 오는 이야기들이 문서를 대체하곤 한다.


왜 XXX기능은 YYY에서만 동작하죠?

그건 철수님께 물어보시면 아실거에요.

ABC기능이 왜 configuration이 8로 되어 있을까요?

그건 3은 적합하지 않아서 8로 설정했어요.

3이 적합하지 않다는 것에 대한 문서가 있나요?

그건 제가 테스트 해본 내용이에요. 문서는 작성하지 않았어요.


끔찍한 일임에 틀림없다.


What? 어떤것을 문서화 해야하나?

소프트웨어를 개발 혹은 논의하면서 주장했거나 상상했거나 혹은 칠판에 초안을 적어놓은 모든 것(이상적인 방향)까지 문서화 해야한다. 이런것들이 모이면 아키텍쳐가 설계되는데 정한 여러가지 결정적인 요소들이 기록되게 되고, 왜 아키텍쳐가 이렇게 되었는지에 대한 해답이 된다.


매우 간단한 소프트웨어 시스템이 아닌 다음에야, 소프트웨어 아키텍쳐(프레임워크의 아키텍쳐든 라이브러리의 아키텍쳐든)는 한마디로 설명하기 어렵기 마련이다. 또한, 매우 훌륭한 소프트웨어 아키텍쳐가 있다고 하더라도 이해하기 어렵거나 의미를 전달하기 어려울 때, 즉 문서화가 잘 안되어 있다면 해당 소프트웨어를 사용하는 사람들이나 개발하는 사람은 방향성을 잃기 마련이다.


특히 Kubernetes, Spinnaker, Kafka 등 Opensource software 생태계에서는 Document의 중요성은 두말할 필요가 없을 것이다. Github의 해당 software의 readme.md와 공식홈페이지의 Document를 읽어보고 소프트웨어가 나아가고자 하는 방향성에 대해 공감하고 이해를 하지 못한다면 opensource의 contributing은 나침판을 잃은 사막의 여행자와 다름없다.


How? 어떻게 문서화 해야하나?

1. 읽기 쉬운 문서를 작성한다.

읽는 사람 관점에서 쉬운 문서를 작성해야 한다. 예를 들어 외부인이 보는 문서에 불필요한 내부 전문용어와 같은 것들은 문서를 읽고 빠르게 이해하는데 큰 장애물이 된다.  그리고 작성자가 문서를 조급하게 쓴다면 머릿속에 떠오른 순서대로 문서를 작성하게 된다. 이는 독자가 파악하기 쉬운 구조가 아니다. 보통은 소프트웨어의 실행의 흐름에 따라 작성했을 때 읽기 수월 할 것이다.


2. 불필요한 반복을 피한다.

정보는 종류별로 반드시 한군데에만 기록해야한다. 그래야 문서를 활용하기 편하고 무엇보다 유지보수에 드는 노력이 급감한다. 정보를 확인할때 반복(duplicate)된 정보가 있을 경우, 특히 두 정보가 상이할 경우 독자는 패닉에 빠지게 된다.


3. 모호함을 피한다.

소프트웨어 문서를 읽었을 때, 여러 의도로 해석될 수 있는 상황이라면 문서가 잘못된 것이다. 예를 들어 동그라미 1개에서 나온 선이 네모 1개를 가리키고 있는데 그 선은 삼각형으로 되어 있다고 하자. 과연 그 선은 무엇을 뜻하고? 동그라미는 무엇인가? 객체인지? 클래스인지? 프로세스인지? 선은 상속을 뜻하는지 동기화인지? 알수가 없다. 그러므로, 가능한 독자가 표기법의 의미를 파악하기 쉽게 해주기 위해 기호에 대한 범례를 표시하여 한가지 뜻으로만 이해할 수 있도록 해야한다.


4. 표준체계 문서양식을 따른다.

표준 문서 체계를 정하고, 그 체계에 따라 독자들이 읽도록 유도해야한다. 표준을 따르면 독자는 특정정보를 빠르고 정확하게 찾을 수 있다. 또한 정보의 완결성 규칙도 준수 할수 있기 때문에, 문서를 적는사람 입장에서도 한결 수월해 진다.


5. 근거를 남긴다.

의사 결정의 결과를 문서화 할때는 결과 뿐만 아니라 결과를 선택하는 과정에 대해서도 명확히 기록하여 왜 그런 근거가 되었는지 기록한다.


6. 최신내용을 담되 앞서나가지 않는다.

소프트웨어를 설계하는 과정을 거치다 보면 의사결정 과정에서 다시 생각해보거나 수정하는 일이 빈번하다. 곧 뒤집힐 결정을 반영하여 문서에 적는 것은 낭비이다.


7. 목적에 맞게 작성됐는지 검토한다.

문서에 담고자 했던 정보가 정상적으로 들어갔는지 확인 해보고, 실제 독자에게도 문서를 소개시켜서 양식을 준수하고 있는지 확인 해야한다.


어떤 문서가 좋은 소프트웨어 문서일까?

그렇다면 어떤 문서가 좋은 문서이고 그러한 문서는 어디서 찾아 볼 수 있을까? 
우리가 흔히들 개발을 하면서 보는 여러 문서들이 있다. Pivotal의 Spring boot document, Boot strap의 component document 등. 우리가 개발하고 있는 가까운 곳에 훌륭한 문서들이 존재하고 있다. 


Apache http server 2.4 Document의 목차


상기 스크린샷은 apache http server 2.4 버젼의 document이다. 상기 document를 읽는 독자(개발자)는 대부분 기존에 apache server을 사용하던 사람일 것이고, 그들은 2.4 이전 version 을 사용 중일 것이다. 이 때 필요한 document는 새로운 version에는 어떤 기능이 들어갔는지, 어떻게 업그레이드하는지가 가장 궁금할 것이기에 가장 앞에 목차를 두었다고 볼 수 있다.


Apache http server 2.4 Compiling and Installing 페이지


아파치 서버 설치내용에 대한 절차와 목차도 명확하다. 우측에는 각 설치단계별 수행방법에 대해서 절차대로 카테고리화 하였고, 좌측에는 각 단계별 설치사항에 대해 SAD(Software Architecture Document) 형식에 맞게 작성하였다.

그리고, 마지막으로 검은색 상자의 글씨는 shell command라는 것은 그 어느 개발자가 보아도 알 수 있을 정도로 명확하다.


의견

일상 업무가 바쁘고 일정에 쫓기다보면 항상 document를 최신버젼으로 update를 못할 때도 있다. 나도 그렇다. 하지만 항상 document를 적는것에 대해 자각하고 있어야 한다고 생각한다. 틀린 길을 가고 있다는 것을 자각하지 못하는 것 만큼 위험한 일은 없기 때문이다. 

Document 작성은 개발자에게 개발과 같이 흥미로운 일은 절대 아니다. 그러나, 우리는 하나의 software를 함께 개발하고 함께 사용한다. 이에 따라, 개발자가 누구에게나 쉽고 명확하게 읽을 수 있는 Document를 적는 것도 하나의 업무이자 미래이자 올바른 방향임에 틀림없다고 생각한다.



0  Comments,   0  Trackbacks
댓글 쓰기
Java 8 에서 사라진 maxPermSize, PermSize을 대체하는 옵션?

아래 포스팅은 HotSpot JVM을 기준으로 설명합니다.


JVM architecture(오라클 GC설명페이지)


Java 7에서 있던 -XX:MaxPermSize=size, -XX:PermSize=size option들은 이제 Java 8에서는 Deprecated되었다. 더 정확히는 metaSpaceSize로 대체(superseded)되었다고 볼 수 있다. 그래서 많은 개발자들이 java7에서 8으로 업그레이드 한 이후에 아래와 같은 warning 메시지를 본적이 있을 것이다.


Java HotSpot(TM) Server VM warning: ignoring option PermSize=32m; support 
was removed in 8.0
Java HotSpot(TM) Server VM warning: ignoring option MaxPermSize=128m; support 
was removed in 8.0


Perm영역은 Class의 Meta 정보, Method의 Meta 정보, Static변수와 상수 정보 저장되는 공간. 흔히 메타데이터 저장 영역이라고 한다.

이 영역은 Java 8 부터는 Native memory 영역으로 이동하여 Metaspace영역으로 명명되었으며, 다만 static object는 Heap영역으로 옮겨져서 GC의 대상이 최대한 될 수 있도록 하였다.



Java 7 HotSpot JVM 구조



Java  8 HotSpot JVM 구조


상기 Java 8의 memory 구조를 보면 알 수 있듯이 Metaspace가 Native Memory를 이용함으로서 java application이 필요한 만큼 계속해서 늘어나는 특징이 있다. 


이는 java metaspace 초기치, 기대치 확인을 통해 정확히 알 수 있다.

$ java -XX:+PrintFlagsFinal -version -server | grep MetaspaceSize
    uintx InitialBootClassLoaderMetaspaceSize       = 4194304                             {product}
    uintx MaxMetaspaceSize                          = 18446744073709547520                    {product}
    uintx MetaspaceSize                             = 21807104                            {pd product}
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)


상기 옵션을 통해 알 수 있듯이, Java 8의 MaxMetaspaceSize는 18,446,744,073,709,547,520Byte(약 16ExaByte, 64bit 프로세서 최고 메모리 상한치)라는 것을 알 수 있다.

그러므로, Java 8의 -XX:MaxMetaspaceSize를 두지 않는 이상, Native memory 자원을 최대한 사용한다.

※ Default jvm MaxMetaspaceSize =  None


결국 Metaspace는 필요에 따라 자동적으로 증가하기 때문에 일반적으로 크게 주의를 가지고 설정할 필요는 없을 것으로 보인다.



0  Comments,   0  Trackbacks
댓글 쓰기
Apache Kafka에서 topic의 partition 개수 줄이는 방법?


아파치 카프카에서 topic의 partition 개수 줄이는 방법?

정답 : 없다(~2.1 version)

Apache Kafka는 partition number을 줄이는 것을 지원하지 않는다. Topic에 대한 data 처리량을 늘리기 위해 partition을 쉽게 늘릴 수 있지만, partition number을 줄이는 것 즉 partition 삭제에 대한 정책은 지원하지 않는다.


Partition을 삭제하게되면 그 즉시 파티션에 존재하는 data가 삭제되기 때문이다. 이 때문에 partition의 삭제가 가능하기 위해서는 kafka에서 능동적으로 partition삭제에 대한 처리(data rebalancing)을 해야하는데 이는 아직(현재 version 2.1) 개발되지 않았다.


LinkedIn의 Senior Software Engineer이자 Confluent의 Software Engineer인 Guozhang Wang(이력 보러 가기)씨는 2015년에 아래와 같이 답변을 남긴 적이 있다.


[Kafka-users] Decrease the number of partitions for topic


상기와 같은 글을 보면 알 수 있듯이 Kafka 내부에서도 partition decrease에 대한 관심이 있었던 것으로 보인다. 


하지만 아직(2018년)까지도 구현되지 않고 있는 것을 보아, kafka내부적으로 처리하는데 이슈가 있거나 혹은 낮은 우선순위로 작업이 진행되고 있는 것으로 보인다.



0  Comments,   0  Trackbacks
댓글 쓰기
안드로이드에서 외부 jar file import하는 방법

이번포스팅에서는 Android studio에서 jar file을 import하는 방법에 대해 알아보겠습니다.


#1 Android studio open





#2 Android studio 우측에 있는 structure에서 Project 클릭





#3 libs folder 안에 추가하고자 하는 jar file copy&paste





#4 붙여넣은 jar file을 우클릭하여 아래쪽에 있는 Add As Library 클릭





#5 library로 추가하고자 하는 module 선택 및 OK





#6 cmd+; 단축키를 눌러 Project Structure을 확인하면 Library import 완료된 것을 확인 할 수 있다.




0  Comments,   0  Trackbacks
댓글 쓰기
아파치 Kafka Consumer의 데이터 처리 내부 architecture 설명 및 튜닝포인트

지난 포스트에서 Kafka producer의 데이터 처리 내부 architecture에 대해서 알아보았다.


아파치 Kafka Producer architecture 설명 포스팅


이번 포스트에서는 kafka architecture의 Consumer 내부 데이터 흐름에 대해 알아보려고 한다.


Kafka Consumer 데이터 내부 처리 순서


#1 : poll(record 취득 api) 호출

#2 : 가져오고자 하는 record가 Fetcher queue에 없는 경우, Fetch request를 발동하여 broker에서부터 record를 가져온다.

#3 : record batch를 Fetcher queue에 저장

#4 : 어디까지 읽었는지에 대한 offset을 consumer측에서 보관

#5 : record batch의 압축을 풀고, record를 user application thread에 반환 


Kafka Consumer 튜닝 파라미터


#A : Fetch Request

   - fetch.min.bytes=1byte

   - max.partition.fetch.bytes=1MB

   - fetch.max.bytes=50MB

   - fetch.wait.max.ms=500ms

   - receive.buffer.bytes=64KB

#B : Offset auto commit

   - enable.auto.commit=true

   - auto.commit.interval.ms=5seconds

#C : Consumer group의 consumer 수 조정(application scale up)



첨부파일 :  apache kafka consumer architecture.key



0  Comments,   0  Trackbacks
댓글 쓰기
아파치 Kafka Producer의 데이터 처리 내부 architecture 설명 및 튜닝포인트

지난 포스트에서 Kafka architecture 및 개요에 대해 알아보았다.


빅 데이터 처리를 위한 아파치 Kafka 개요 및 설명 포스팅


이번 포스트에서는 kafka architecture의 Producer 내부 데이터 흐름에 대해 알아보려고 한다.


Kafka Producer 데이터 내부 처리 순서


#1 : User application thread에서 Record 추가

#2 : Record Batch단위로 Record를 압축

#3 : 복수의 Record Batch를 묶어 Broker로 보냄

#4 : Record Batch를 각 Partition에 저장

#5 : 지정시간에 request 에 대한 완료(ack)를 회신

   - acks=0 : ack 응답없음(속도가장빠름, 데이터유실확률 가장 높음)

   - acks=1  : Leader Replica writer 완료시  회신, follower는 확인하지 않음(속도 중간, 데이터유실확률 중간)

   - acks=all(-1) : 최소 ISR(In-sync replicas)수까지 복제 완료시 회신(속도 가장 낮음, 데이터 유실확률 아주 낮음)


Kafka Producer 튜닝 파라미터


#A : 메모리사용량

   - buffer.memory=64MB 

#B : Record Batch size & 압축

   - batch.size=16KB

   - compression.type=none ('gzip', 'snappy', 'lz4')

#C : Request size

   - max.request.size=1MB

   - acks=1

#D : Request connection

   - linger.ms=0ms(데이터 축적 최대 대기 시간)

   - max.in.flight.requests.per.connection=5

   - send.buffer.bytes=128kb


첨부파일 :  apache kafka producer architecture.key



0  Comments,   0  Trackbacks
댓글 쓰기
빅 데이터 처리를 위한 아파치 Kafka 개요 및 설명

Apache Kafka

LinkedIn에서 최초로 만들고 opensource화 한 확장성이 뛰어난 분산 메시지 큐(FIFO : First In First Out)


→ 분산 아키텍쳐 구성, Fault-tolerance한 architecture(with zookeeper), 데이터 유실 방지를 위한 구성이 잘되어 있음

→ AMQP, JMS API를 사용하지 않은 TCP기반 프로토콜 사용

→ Pub / Sub 메시징 모델을 채용

→ 읽기 / 쓰기 성능을 중시

→ Producer가 Batch형태로 broker로 메시지 전송이 가능하여 속도 개선

→ 파일 시스템에 메시지를 저장하므로, 데이터의 영속성 보장

→ Consume된 메시지를 곧바로 삭제하지 않고 offset을 통한 consumer-group별 개별 consume가능


# Kafka website url http://kafka.apache.org/

# Github urlhttps://github.com/apache/kafka

# Kafka contributors https://github.com/apache/kafka/graphs/contributors


Kafka 사용 주요 사례

LinkedIn : activity streams, operational metrics, data bus(400 nodes, 18k topics, 220B msg/day in May 2014)

Netflix : real-time monitoring and event processing

Twitter : as part of their Storm real-time data pipelines

Spotify : log delivery, Hadoop

11번가 : 카프카를 이용한 비동기 주문시스템(카프카 컨슈머 애플리케이션 배포 전략 medium post)


Kafka Architecture


Broker : Kafka를 구성하는 각 서버 1대 = 1 broker

Topic : Data가 저장되는 곳

Producer : Broker에 data를 write하는 역할

Consumer : Broker에서 data를 read하는 역할

Consumer-Group : 메세지 소비자 묶음 단위(n consumers)

Zookeeper : Kafka를 운용하기 위한 Coordination service(zookeeper 소개)

Partition : topic이 복사(replicated)되어 나뉘어지는 단위


Kafka 데이터 쓰기, 복제, 저장

Producer는 1개이상의 partition에 나뉘어 데이터를 write한다.
상기 partition에 적힌 번호는 각 partition의 offset번호임.


각 Topic의 partition은 1개의 Leader Replica + 0개 이상의 follower Replica로 구성

→ Leader Replica에 데이터를 write, 다른 broker에 follower replica로 복제

→ Topic의 데이터(log) 중 replica 데이터는 log segment라는 파일(disk)에 기록

→ 메모리가 남아 있으면 페이지 캐시 사용


Kafka 데이터 읽기


Consumer는 Partition단위로 데이터를 병렬로 읽을 수 있음
→ 복수의 Consumer로 이루어진 Consumer group을 구성하여 1 topic의 데이터를 분산하여 처리 가능
Topic partition number >= Consumer Group number 일 경우만 가능
   (Topic partition number < Consumer Group number일 경우 1개 이상의 consumer는 유휴 상태가 됨)


첨부파일 :  apache kafka architecture.key



0  Comments,   0  Trackbacks
댓글 쓰기
Hadoop에서 hadoop job은 어떻게 각 data node에서 job을 수행할까?

Hadoop에서 가장 많이 쓰이는 명령어로 아래와 같이 job을 수행하는 것이 있다.

/bin/hadoop jar [jar file] [arguments..]
상기와 같이 jar를 실행하게되면 hadoop은 어떤 node로 어떤 job을 보내게 되는걸까?




  1. Client 혹은 interface server에서 hadoop jar와 함께 command를 입력
  2. Client는 실행에 대한 신규 application Id를 발급받. 그리고 jar file은 HDFS의 job resource로 복사(by default 10 on large clusters)
  3. Cluster(각 데이터노드)에서 실행됨.
    (다만 hadoop config에 local 설정을 하게 되면 해당 호스트에서 실행됨)




0  Comments,   0  Trackbacks
댓글 쓰기
모든 것을 측정하는 방법 - Bigdata시대에 부족한 data로 예측하기

이 포스트는 모든 것을 측정하는 방법을 읽고 정리한 글입니다.

해당 서적 : How to Maesure Anything(amazon.com)



근사값 밖에 구할 수 없는 상황에서는 완벽한 정확성을 추구하는 것보다 가능한 수준의 정밀도로 만족하는 것이 지식인에게 필요한 태도이다. - 아리스토텔레스(BC. 384 - BC. 322)


왜 측정이 필요한가?

모든 것은 측정 가능하다. 대체로 측정 불가능하다고 여겨지는 많은 문제들이 간단한 측정 방법을 통해 해결될 수 있다. 특히 사업 경영에서 흔히 접하는 '보이지 않는' 것들에 대해 측정이 가능하다. 다음과 같은 것들이 일반적으로 생각되는 보이지 않는 것들의 예이다.


# IT 프로젝트가 실패할 위험

# 수집한 정보의 가치

# 품질

# 대중적 이미지


위의 예들은 사업의 의사결정에 많은 영향을 미친다. 회사나 정부에서 많은 비용을 투자하는 새로운 계획에 가장 중요한 요소 중 하나일 수 있다. 하지만 많은 곳에서 이런 '보이지 않는' 항목을 측정할 수 없다고 믿고 이에 대한 중요 정보 없이 의사결정이 이루어지고 있다. 끔찍한 상황이 아닐 수 없다.


'보이지 않는' 것들을 측정의 문제를 정의하고 해결하기 위해 아래 세가지 사실을 가정한다.


1. 측정은 불확실성을 포함한 의사 결정에 영향을 미치기 때문에 중요하다.

2. 어떤 의사 결정이든 불확실성에 관한 많은 측정 대상과 이를 위한 다양한 측정방법을 갖고 있지만, 이를 완벽하게 제거하기는  현실적으로 불가능하다.

3. 따라서 의사결정에 있어서 불확실성은 없애기보다는 줄일 방법이 필요하다.


즉, 측정은 불확실성을 줄이기 위한 최적화 문제로 생각해야 한다. 만약 결정해야하는 문제가 매우 불확실하고, 잘못된 의사 결정이 매우 중대한 결과를 가져올 수 있는 상황이라면, 그 문제에 대한 불확실성을 줄일 수 있는 측정 방법은 매우 큰 가치를 지니게 된다.


측정이란 무엇인가?

무언가를 측정 불가능하다고 믿는 사람에게 측정의 개념은 다소 다르다. 만약 '측정'이 현실에서 거의 도달할 수 없는 확실함을 요구하는 것이라고 정의한다면, 세상에는 측정 가능한 것이 거의 없을 것이다.


여기서 '측정'이란 하나 이상의 관찰을 통해 정량적으로 표현된 불확실성의 감소로 정의한다. 실제로 과학자들은 측정을 불확실성을 정량적으로 줄여줄 수 있는 관찰의 결과로 생각한다. 일부 오류는 피할 수 없겟지만, 그 오류가 기존 지식에 대한 개선의 여지가 된다는 점이 실험, 조사, 기타 과학적인 측정을 수행하는 방법의 핵심이다. 이러한 측정의 정의는 탄탄한 수학적 기반을 바탕으로 하고 있다. 이 분야는 1940년대 클로드 셰논(Claude Shannon, wiki바로가기)에 의해 시작되었고 이를 정보이론이라고 불린다.

측정 예시

측정의 한가지 방법으로 확률 보정이 있다. 현재 얼마나 알고 있나 라는 질문에서 시작한다. 먼저, 불확실한 숫자를 표현하는 한 방법으로 가능한 값의 범위로 생각하는 것이다. 통계에서는 이렇게 미리 정해진 확률로 정답을 포함하는 범위를 '신뢰구간(Confidence interval)'이라고 부른다. 90% 신뢰구간은 90%의 확률로 정답을 포함할 범위를 말한다. 예를 들어 지금 거래를 시작할 것 같은 잠재 고객 가운데 다음 분기에 얼마나 많은 고객이 거래를 시작할지 정확하게 알지 못할때, 적어도 3명이 거래를 할 것 같고, 아무래도 7명 보다는 많은 사람이 거래를 하지 않을 것이라고 하자. 이 경우, 실제 거래를 시작하는 고객의 수가 이 구간 사이에 있을 것이라고 90% 확신하다면, 다음 분기 고객수에 대한 90%의 신뢰구간이 3과 7 사이가 되는 것이다.


확률 보정 Example - 747 비행기의 날개는 몇 피트 인가요?

B(실험자) : 100에서 120피트 사이일 것 같아요

A(전문가) : 정말 값이 100에서 120일거라고 90% 확신하시나요?

B : 모르겠어요 그냥 찍은 거에요.

A : 하지만 100에서 120사이라는 범위를 저에게 말했을때 적어도 그 값에 대한 신뢰할만한 아이디어가 있지 않았나요?

B : 하지만 확신이 없어요.

A : 그럼 진짜 90$ 신뢰구간은 넓어야 되겠네요. 날개길이가 20피트는 될 수 있을까요?

B : 아니요 그건 불가능해요.

A : 그럼 50피트보단 짧을 순 있을까요?

B : 그건 아닐거같은데.. 그값을 최소라고 보죠.

A : 점점 좋아지고 있네요 그럼 날개길이가 500피트보다 길 수 있나요?

B : ... 아니요. 그렇게 긴 날개는 없어요.

A : 좋습니다. 그럼 축구 경기장 보다 클 수 있을까요? 300피트?

B : 그럼 제 생각에 날개 길이 최대값은 250피트라고 말할 수 있어요.

A : 그럼 747 비행기의 날기 길이가 50피트 보다 길고 250피트보다 작다고 90% 확신 하시나요?

B(실험자) : 네

A(전문가) : 그럼 90% 신뢰구간은 100에서 120피트가 아니라 50에서 250피트네요.


위 실험에서 처럼 알고있는 정보가 정확하지 않다면, 확률이나 범위를 통해 그것을 표현할 수 있다. 만약 좁은 범위의 예측값이 맞을지 '모르겠다'고 한다면, 여러분이 알고 있는 무언가가 반영될 때 까지 범위를 넓히면 된다.


정교하게 틀리는 것보다 대략 맞는게 낫다. - 워렌 버핏


또 한가지 방법으로 '중간값(median)'이 있다. 직장에서 재택근무를 확대하는 방안으로 출퇴근 시간을 조사한다고 하자. 모든 직원들을 대상으로 설문조사를 진행하여 답을 얻을 수도 있겟지만 시간과 비용을 필요로하고, 그를 통해 얻게될 값은 필요이상으로 정확한 값이될 것이다. 이런 방식 대신 임의로 다섯명의 직원을 선정한다고 생각해보자. 각각의 다섯 직원은 30분, 60분, 45분, 80분, 60분 이라고 대답했다. 샘플에서 가장 큰 값과 가장 작은 값은 30과 80이다. 그렇다면 전체 직원들에 대해 조사했을 때 그 중간값(median)이 이들 30분과 80분 사이에 있을 확률은 93.75%이다. 이를 다섯의 법칙(Rule of five, R을 사용하여 증명한 포스팅)이라고 부른다.


# 다섯의 법칙 : 전체 집단(median)이 그 집단에서 임의로 추출한 다섯 개의 표본의 최소값과 최대값 사이에 존재할 확률은 93.75% 이다.


맺음말

본 책에서는 사업과 정책에 있어 의사결정을 할 때를 대비하여 측정이 필요하다고 말하지만, 사실 우리내 삶의 매 순간이 의사 결정이 필요한 상황이라고도 볼 수 있다. 이러한 관점에서 본다면 측정을 하는 것은 중요하다고 볼 수 있다. 불확실한 미래에서 내가 고민하고 측정을 해야하는 것들은 아래와 같은 것들이 있을 것같다.


# 출퇴근시간을 줄임으로서 얻을 수 있는 체력적 효과

# 포스팅을 하고 블로그를 운영함으로서 얻을 수 있는 미래가치

# 내 github 계정의 가치


측정 방법이 쉽진 않겠지만 측정함으로서 불확실한 미래의 가치에 더 도움이 되는 방향으로 갈 수 있지 않을까 생각된다.




0  Comments,   0  Trackbacks
댓글 쓰기
Custom listview의 각 item에 animation 적용하기


개요

안드로이드 custom listview를 사용하면서 parent item view에 각종 animation effect를 주고 싶을때 사용


제공 애니메이션

# Alpha with translate animation : 좌측으로 이동 + 투명도가 변하면서 스르륵 나타나는 애니메이션
# Alpha animation : 투명도가 변하면서 스르륵 나타나는 애니메이션
# Scale animation : 점점 커지는 애니메이션


주요 코드

ListViewAdapter.java
custom list view를 사용하기 위해 BaseAdapter를 상속받은 ListViewAdapter에서 parent view에 animation을 적용하면 각 item에 animation이 적용된다.

@Override
public View getView(int position, View convertView, ViewGroup parent) {
    Context context = parent.getContext();
    if (convertView == null) {
        LayoutInflater inflater = (LayoutInflater) context.getSystemService(Context.LAYOUT_INFLATER_SERVICE);
        convertView = inflater.inflate(R.layout.customlist_item, parent, false);
    }
    
    //1) Animation 정의
    Animation ...
    
    //2) CustomListview의 parent item에 animation 정의
    convertView.setAnimation(animation);

    return convertView;
}

코드 다운로드




0  Comments,   0  Trackbacks
댓글 쓰기
Hdfs dfs 명령어 정리 및 설명(ls, cat, du, count, copyFromLocal 등)

<HDFS architecutre>


Hadoop을 적절히 사용하기 위해서는 hdfs 명령어를 알아야 한다. 아래에 정리를 해 보았다.


※ 2.6.0 version을 기준으로 정리함


Hadoop 명령어

Hadoop filesystem command를 사용하기 위함.

Usage : hdfs dfs [GENERIC_OPTIONS] [COMMAND_OPTIONS]


HDFS 명령어

File System(FS) shell은 Hadoop Distributed File System(HDFS)를 여타 파일시스템처럼 관리하기 위한 목적으로 command를 사용할 수 있다. Hadoop-2.6.0에서는 총 33개의 HDFS 명령어를 지원한다.


1) appendToFile

Local 파일들을 hdfs에 append 저장하기 위한 목적

Usage: hdfs dfs -appendToFile {localsrc} ... {dst}


2) cat

해당 파일을 stdout으로 찍어서 보여준다. (linux 명령어 cat과 동일)

Usage: hdfs dfs -cat URI [URI ...]


3) chgrp

해당 파일의 오너이거나 슈퍼오너라면, 해당 파일의 그룹 권한을 변경가능하다.

Usage: hdfs dfs -chgrp [-R] GROUP URI [URI ...]


4) chmod

해당 파일의 오너이거나 슈퍼오너라면, 특정 파일의 permission 수정. -R 옵션과 함께라면 예하 파일들에 대해서 동일하게 permission 적용 가능

Usage: hdfs dfs -chmod [-R] {MODE[,MODE]... | OCTALMODE} URI [URI ...]


5) chown

슈퍼오너일 경우 해당 파일의 owner를 변경. 상세 Permission guide(바로가기) 참고

Usage: hdfs dfs -chown [-R] [OWNER][:[GROUP]] URI [URI ]


6) copyFromLocal

Local 파일을 hdfs에 업로드. put명령어와 유사

Usage: hdfs dfs -copyFromLocal {localsrc} URI


7) copyToLocal

Hdfs에 있는 파일을 local directory에 다운로드, get 명령어와 유사

Usage: hdfs dfs -copyToLocal [-ignorecrc] [-crc] URI {localdst}


8) count

Directory 개수, file 개수 등을 카운트하여 숫자로 보여줌. 

Usage: hdfs dfs -count [-q] [-h] 

-count : DIR_COUNT, FILE_COUNT, CONTENT_SIZE FILE_NAME 을 보여줌

-count -q : QUOTA, REMAINING_QUATA, SPACE_QUOTA, REMAINING_SPACE_QUOTA, DIR_COUNT, FILE_COUNT, CONTENT_SIZE, FILE_NAME 을 보여줌

-h : Show sizes human readable format 


9) cp

Hdfs내부에서 파일을 복붙함. 만약 복사하고자 하는 대상이 여러개라면 붙여넣는 곳은 반드시 Directory여야 한다.

Usage: hdfs dfs -cp [-f] [-p | -p[topax]] URI [URI ...] {dest}

-f : Overwrite the destination if it already exist

-p : 파일 속성(timestamps, ownership, permission, ACL, XAttr)을 유지하고 복붙 수행


10) du

Hdfs내부의 특정 file이나 directory의 size를 보여줌

Usage: hdfs dfs -du [-s] [-h] URI [URI ...]

-s : 각각의 파일(혹은 directory) size의 sum 값을 보여줌

-h : Show human-readable format


11) dus

특정 file의 length를 보여줌.

Usage: hdfs dfs -dus {args}


12) expunge

휴지통 비우기(완전 삭제)

Usage: hdfs dfs -expunge


13) get

Hdfs의 파일을 local directory로 다운로드

Usage: hdfs dfs -get [-ignorecrc] [-crc] {src} {localdst}


14) getfacl

Hdfs의 특정 파일 혹은 디렉토리의 ACLs(Access Control Lists)정보를 보여줌

Usage: hdfs dfs -getfacl [-R] {path}


15) getfattr

Hdfs의 특정 파일 혹은 디렉토리의 속성 정보들을 나열, 보여줌

Usage: hdfs dfs -getfattr [-R] -n name | -d [-e en] {path}

-R : 파일 혹은 디렉토리 이하의 폴더들에 대한 정보 보여줌


16) getmerge

Hdfs내부의 source file을 local file에 append하여 붙여 다운로드

Usage: hdfs dfs -getmerge {src} {localdst} [addnl]


17) ls

특정 디렉토리의 파일 혹은 디렉토리를 보여줌

Usage: hdfs dfs -ls [-R] {args}

-R : 특정 디렉토리 이하에 대해서 정보를 보여줌


18) lsr

ls -R 과 동일하게 작동

Usage: hdfs dfs -lsr {args}


19) mkdir

특정 path에 directory 생성

Usage: hdfs dfs -mkdir [-p] {paths}


20) movefromLocal

Local의 파일을 hdfs에 저장. put과 비슷하지만 저장 이후 local file은 삭제

Usage: hdfs dfs -moveFromLocal {localsrc} {dst}


21) moveToLocal

Hdfs의 파일을 local에 저장. get과 비슷하지만 저장 이후 hdfs file은 삭제

Usage: hdfs dfs -moveToLocal [-crc] {src} {dst}


22) mv

Hdfs내부에서 파일을 옮김

Usage: hdfs dfs -mv URI [URI ...] {dest}


23) put

Local의 파일들을 hdfs에 저장

Usage: hdfs dfs -put {localsrc} ... {dst}


24) rm

Hdfs의 특정 폴더 혹은 파일을 삭제

Usage: hdfs dfs -rm [-f] [-r|-R] [-skipTrash] URI [URI ...]

-R : 특정 디렉토리 이하의 폴더 모두 제거

-r : -R과 동일

-skipTrash : 즉시 완전 삭제


25) rmr

rm -r과 동일한 명령어

Usage: hdfs dfs -rmr [-skipTrash] URI [URI ...]


26) setfacl

Hdfs의 특정 폴더 혹은 파일에 대해 Access Control Lists(ACLs)를 set

Usage: hdfs dfs -setfacl [-R] [-b|-k -m|-x {acl_spec} {path}]|[--set {acl_spec} {path}]


27) setfattr

Hdfs의 특정 폴더 혹은 파일에 대해 속성을 set

Usage: hdfs dfs -setfattr -n name [-v value] | -x name {path}


28) setrep

Hdfs의 특정 파일에 대해 replication factor을 수정

Usage: hdfs dfs -setrep [-R] [-w] {numReplicas} {path}


29) stat

Hdfs의 특정 디렉토리의 stat information 확인

Usage: hdfs dfs -stat URI [URI ...]


30) tail

특정 file에 대해 마지막 kilobyte을 stdout으로 보여줌

Usage: hdfs dfs -tail [-f] URI


31) test

옵션과 함께 파일 혹은 디렉토리의 이상 유무를 체크

Usage: hdfs dfs -test -[ezd] URI

-e : file exist, return 0

-z : file is zero length, return 0

-d : path is diretory, return 0


32) text

Hdfs의 특정 파일을 text format으로 확인

Usage: hdfs dfs -text {src}


33) touchz

Zero length인 file을 생성

Usage: hdfs dfs -touchz URI [URI ...]


출처 : apache 2.6.0 filesystem commands(바로가기)



0  Comments,   0  Trackbacks
댓글 쓰기
지난 2년(17, 18년) 동안의 개발 블로그 운영 회고

<블로그 대문을 분석한 word cloud>


개발 블로그의 시작

처음 시작은 Effective Java 스터디를 시작하면서 공부했던 내용을 정리하자는 차원에서 시작하였다. 


# 생성자 인자가 많을 때는 Builder 패턴 적용을 고려하라.

# 예외는 예외적 상황에만 사용하라

# 불필요한 점검지정 예외 사용은 피하라

# 표준 예외를 사용하라

# 메서드에서 던져지는 모든 예외에 대해 문서를 남겨라

# 어떤 오류인지를 드러내는 정보를 상세한 메시지에 담으라

# 실패 원자성 달성을 위해 노력하라

# 예외를 무시하지 마라

# int상수 대신 enum을 사용하라

# 작명 패턴 대신 어노테이션을 사용하라

# Override 어노테이션은 일관되게 사용하라

# 자료형을 정의할 때 표식 인터페이스를 사용하라


처음에는 에버노트에 할지, 네이버 블로그, google blogger, 티스토리 등 따져보았으나 가장 확장성이 높은 티스토리를 선택하고 2월 20일에 최초로 글을 쓰기 시작했다.


1포스팅/4.2일

2월 20일에 시작하고난 뒤 오늘 652일 째 되는날이다. 총 155개의 개발관련글을 포스팅했으니 역산하면 4.2일당 1포스팅, 일주일에 1개 이상 글을 썼다. 관심가고 재미있는 것 위주로 하다보니 주제가 다양해져서 카테고리만 27개나 된다.


시기에 따라 회사가 바쁠때는 한달을 통으로 블로그를 못할 때도 있었고, 한참 불타오를 때는 한달에 18개(2일당 1포스팅!!!)를 올렸을 때도 있다는 점이 재미있다.


<시기별 포스팅 개수 그래프>


인기 글

Effective java에 대해서, kubernetes에 대해서 정성을 다해 많은 글을 적어도 유입 숫자는 쉽게 늘지 않았다. 그런데 작은 단위로 문제를 해결해나가던 이슈들에 대해 적은 글들(jquery, spring boot 기초)에 오히려 더 유입이 늘었다는 것이 흥미로웠다. 아무래도 좀더 관심 pool이 넓은 문제를 해결하면 유입이 많이 늘어나는 것으로 보인다.


<지난 한달(18년 11월)간 인기글 순위>


마무리

나름대로 귀차니즘도 많고 쉽게 질리는 성격이라 생각했지만 지금까지 꾸준히 포스팅 한 것이 놀랍고 스스로 대견스럽다. 딱히 목적을 가지고 한 것도 아니 였다. 일도하고 주말에 공부하면서 정리하면서 글을 쓰는 것이 습관이자 취미가 된 것 같다. 처음에는 잘쓰기 위해 많이 노력했는데 글을 쓰면 쓸수록 요령이 생겨 편한 글을 쓰려고 하는 점이 보이는데 이점은 고쳐야할 것 같다. 


앞으로도 꾸준히 포스팅을 해서 비슷한 이슈로 인해 고민하거나 비슷한 주제에 대해 논의하는 사람들에게 내 블로그를 통해 지식을 많이 얻어갔으면 좋겠다.


그럼 이만!!



0  Comments,   0  Trackbacks
댓글 쓰기
Java 시큐어 코딩 - 공격 종류 및 해결방안

1. SQL injection


취약점 발생 원인

외부입력값을 동적으로 SQL실행문을 만들어서 사용할때 주로 나타남.

/* SQL injection에 취약한 코드*/
String userId=request.getParameter("userId");
String password=request.getParameter("password");
...
Statement stmt = conn.createStatement();
ResultSet result =
   stmt.executeQuery("select count(*) as count from student where userid='"+userId+"' and password='"+password+"'");


userid에 admin' OR '1'='1 을 입력하면 그대로 아래와 같이 실행된다.

select count(*) from member where userid='admin' OR '1'='1' and password = 'a'


혹은 악의적으로 drop table, drop database도 가능하다.


해결방안

- 유저에게 받은 값을 그대로 넘기지 않는다.

- 클라이언트측에서 값을 입력받을 때 regex 등으로 검증하여 입력받는다.

- 서버측에서 클라이언트에서 넘어온 값을 regex 등으로 검증하여 입력받는다.


Spring boot JPA에서는?

ORM으로 주로 쓰이는 Spring boot JPA에서도 injection공격이 통할까?

stackOverFlow에 따르면 입력받은 값이 그 자체로 JPA에서는 value로 사용되기 때문에 injection이 불가하다고 한다.


Are SQL injection attacks possible in JPA?(답변 바로가기)


그러나 JPA native query는 어떨까? Native query를 사용하니 그대로 전달되지 않을까?

@Query(query="select count(*) as count from student where userid=:userId", nativeQuery = true) 
int findUser( @Param("userId") String userId);


userId value로 injection code를 심어서 수행한 결과를 debug log로 찍었다. 결과는 아래와 같다.

[main] hello.Application                        : InjectionTest:
[main] org.hibernate.SQL                        : select count(*) as count from student where userid = ?
Hibernate: select count(*) as count from student where userid  = ?
[main] o.h.type.descriptor.sql.BasicBinder      : binding parameter [1] as [VARCHAR] - [admin' OR '1'='1]


내부적으로 PreparedStatement 처럼 동작하여 injection으로 동작하는 것이 아닌 value로 값이 들어가게 되어 안전한 상태임을 확인 할 수 있다.


PreparedStatement를 사용하는 경우 파라미터로 들어가는 값을 바인딩하여 사용한다. 바인딩데이터는 SQL문법이 아닌 컴파일언어로 처리하기 때문에 문법적 의미를 가질 수 없으므로, 바인딩 변수에 SQL injection query를 넣더라도 의미있는 쿼리로 동작하지 않는다.



2. 인증 탈취

인터넷 서비스에서 인증(Authentication)과 권한(Authorization) 관리를 철저히 해야하는건 당연지사이다. 인증이나 세션 관리와 관련된 애플리케이션의 기능이 올바르게 구현되지 않은 경우 공격자는 쉽게 다른 사용자로 가장할 수 있다.


취약점 발생원인

- 세션 ID 추측 : 추측가능한 ID를 무작위 대입하는 경우, 리버스 엔지니어링이 쉬운 알고리즘으로 생성된 세션ID값의 경우

- 세션 ID 훔치기 : XSS 취약점을 가진 사이트에 올려진 게시물을 클릭할 경우 자바스크립트를 통해 세션정보를 공격자에게 전달

- 세션 ID 고정 : 로그인 후에 저장된 동일 세션을 복사하여 사용

- 세션 관리 정책 미비 : 유요기간이 잘 관리되지 않은 세션ID의 경우 쿠키 스니핑, 프록시 서버의 로그취득을 통해 공격자에게 정보 전달


해결방안

- 브라우저의 XSS필터 정책을 활성화하여 세션정보 탈취 방어

- 서버 설정을 통한 세션 ID쿠키값을 httponly로 설정하여 스크립트에서 읽지 못하도록 방어

- 세션ID가 url에 포함되지 않도록(노출되지 않도록)


Spring security ACL(Access Control List)을 사용한 인증체크

웹 서비스에 있어서 user별 권한 체크는 필수불가결하다. 이 때 Spring security에서 제공하는 기술 중 인증, ROLE기반 접근제어 방식인 ACL을 활용하면 더욱 효과적으로 적용 가능하다.


먼저 필요한 정의는 롤(ROLE)이다. 사용자에게 롤을 정의하고 어떤 메소드(Where)를 어떻게(What)할 것인지 까지 정의 가능하다.

// Role define example
// QnA 게시판은 누구나(any) 읽기(read), 쓰기(write) 가능
// 자료실 게시판은 누구나(any) 일기(read) 가능. 관리자(admin)만 쓰기(write) 가능


Spring security의 acl사용을 위해 아래와 같은 라이브러리 추가가 필요하다.

<dependency>
    <groupid>org.springframework.security</groupid>
    <artifactid>spring-security-core</artifactid>
    <version>4.1.3.RELEASE</version>
    <scope>compile</scope>
</dependency>  
<dependency>
    <groupid>org.springframework.security</groupid>
    <artifactid>spring-security-web</artifactid>
    <version>4.1.3.RELEASE</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupid>org.springframework.security</groupid>
    <artifactid>spring-security-config</artifactid>
    <version>4.1.3.RELEASE</version>
</dependency>
<dependency>
    <groupid>org.springframework.security</groupid>
    <artifactid>spring-security-acl</artifactid>
    <version>4.1.3.RELEASE</version>
    <scope>test</scope>  
</dependency>


스크립트를 통해 아래와 같은 table이 생성된다.


# ACL_SID : 롤, 사용자에 대한 키 정보가 저장

# ACL_CLASS : 도메인 객체 클래스에 대한 정보 저장

# ACL_OBJECT_IDENTITY : 도메인 객체 인스턴스 정보 저장

# ACL_ENTRY : 접근 권한 데이터 저장 테이블


기타 ACL을 위한 configuration을 선언하고 나면 아래와 같이 접근에 대한 정의를 method별로 선언 가능하다.

@PostFilter("hasPermission(filterObject, 'READ')")
List<NoticeMessage> findAll();
     
@PostAuthorize("hasPermission(returnObject, 'READ')")
NoticeMessage findById(Integer id);
     
@PreAuthorize("hasPermission(#noticeMessage, 'WRITE')")
NoticeMessage save(@Param("noticeMessage")NoticeMessage noticeMessage);


자세한 내용은 Introduction to Spring Security ACL(바로가기)에서 확인 가능




3. XSS(크로스 사이트 스크립팅)

외부 입력값이 충분한 검증 없이 동적으로 생성되는 응답 페이지에 사용 되는 경우 해당 페이지를 사용하는 다른 클라이언트들이 공격자로 부터 공격 받을 수 있다.


XSS 취약점은 공격 유형에 따라 Reflective XSS, Stored XSS, DOM XSS로 구분가능하다.


취약점 발생 원인

- Reflective XSS : 공격자가 악성 스크립트가 포함된 URL을 클라이언트에 노출시키게 하여 클릭을 유도하여 정보탈취, 피싱사이트 리다이렉트 등의 공격을 함.

- Stored XSS : 악성 스크립트를 DB에 저장해 해당 DB정보를 이용하는 애플리케이션을 통해 시스템을 사용하는 모든 사용자들이 해당 스크립트를 실행하게함.

- DOM XSS : AJAX프로그램에서 사용되는 자바스크립트를 이용해 브라우저에 수신된 데이터를 다시 잘라내서 Document에 write하는 작업을 수행할 경우 XSS 공격이 가능하게 한다.


해결방안

- 입/출력 값에 대해 필터링 적용

  - Black list 방식 : 간단함. 모든 방어에 대해 준비가 불가능함.

  - White list 방식 : 지속적으로 list를 추가해줘야함. 강력함.

  - 오픈소스 라이브러리 사용 : 네이버 개발자 그룹에서 개발한 자바기반의 lucy-xss Filter(바로가기)을 사용하여 Spring boot MVC의 Controller 컴포넌트에 적용 가능. 화이트리스트 정책 사용



4. 파일 업/다운로드

취약점 발생 원인

웹서버에 업로드 된 파일에 악성코드가 포함된 경우. 웹서버에 암호화되지 않은 정보(주민등록증, 여권 등)가 웹서버에 저장되는 경우 공격자가 파일 경로를 조작하여 확인절차 없이 다운로드를 허가될 수도 있다.


해결방안

- 파일을 굳이 웹서버에 올려야하는지 정책을 다시 확립한다.

- 반드시 업로드 해야 한다면 업로드되는 파일의 타입을 제한하고, 외부에서 직접 접근 가능하지 않은 경로에 파일을 저장한다.

- 업로드되는 파일 개수, 크기에 대해 제약한다.


Object storage에서는?

<기존 파일시스템과 object storage를 잘 비교한 그림>


Public cloud 서비스에서 주로 쓰이는 object storage는 안전할까? 


Object storage로는 유명한 aws의 s3(Simple Storage Service), ibm의 COS(Cloud Object Storage)가 있다. Instance(서버)를 scalable하게 사용하는 cloud public 특성상 서버에 데이터를 저장하는 경우에는 file system에 의존하기 힘든 경우가 생기는데, 이런 경우 파일 저장소로 object storage를 많이 쓴다. 


Object storage도 보안에 취약하긴 마찬가지이다. 권한과 접근에 대해 정확한 인지 없이 사용하게 된다면 공격자에게 공격당할 수 있다. 


대부분의 사례는 버킷 단위로 public하게 객체 권한을 열어주는데서 생기는데 이와 같은 경우 공격자는 public으로 설정된 모든 파일(object)들에 접근가능하다. 그러므로 각 public cloud별 object storage에서 제공하는 권한방법에 대해서 확인하고 서비스에 맞는 정책을 세워 불필요한 데이터는 외부로 노출되지 않게 하는 것이 최고의 방어 방법이다.



End of Document.



0  Comments,   0  Trackbacks
댓글 쓰기
Java 시큐어 코딩 - 개론 및 보안사례


개요 및 단어 설명

시큐어 코딩에 앞서 단어설명을 한다.

보안약점(weakness)

 - 개발자가 개발을 하는 단계, 구현을 하는 단계에서 나타나는 bug

 - 코드 리뷰를 통해 보안약점을 찾겠다. 

   - But, LOC(Line of code)가 너무 많으므로 정적분석도구를 사용한다.

   - ex) findBug, findSecurity plugin 

 - Secure coding이 필요한 단계


보안취약점(vulnerability)

 - 서비스가 나가고 난 뒤 침해관련 사고

 - 실제 침해사고로 일어난 사례

 - 공격자(해커)가 의도하는 공격(해킹)이 일어나는 곳


Secure coding(보안 약점 구간)에서 이슈를 알아 낼 수 없는 경우도 있다. 예를 들어 논리적인 & 구조적인 이슈(ex. sms를 통한 인증)로 인해 보안 취약점에서 들어나는 경우도 있다. 이와 같은 경우에는 분석/설계단계에서 해결해야 한다.


소프트웨어 개발보안(Compliance)이란?

안전한 SW개발을 위해 보안을 고려하여 기능을 설계, 구현 등 모든 일련의 활동을 뜻한다.

요구사항분석 

설계 

구현 

테스트 

유지보수 

- 요구사항 중 보안항목 식별

- 요구사항 명세서

- 위험원 도출을 위한 위협모델링

- 보안통제수립 

- 표준 코딩 정의서 및 SW개발보안 가이드를 준수해 개발

- 소스코드 보안약점 진단 및 개선 

- 시큐어 코딩 단계

- 모의침투 테스트

- 동적분석

- 지속적인 개선

- 보안패치


요구사항 및 보안항목의 기준은 법적기준(KISA, 개인정보보호법 등)에 따른다.

# 행정 안전부 보안 가이드(바로가기)

# KISA 보안 가이드(바로가기)


이슈 발생 시점에 따른 보안취약점 개선비용 table

the economic impacts of inadequate infrastructure for software testing 논문 -  software testing report.pdf



<Preliminary Estimates of Relative Cost Factors of Correcting Errors as a Function of Where Errors Are Introduced and Found>


보안 사고 사례를 통한 시큐어 코딩 이해

이미 일어났던 사고사례를 통해 사전에 대처 가능하다.

1) SQL 인젝션 사례

여기어때 해킹사건, 왜 '과징금 3억원' 경징계 나왔나 매출액 3%까지 부과 가능…법상으론 높은 징계


97만여명의 숙박업소 이용 이력 정보가 유출되면서 피해자들이 협박 문자까지 받았던 여기어때 해킹 사건에 대해 방송통신위원회가 서비스 운영사인 위드이노베이션에 과징금 3억100만원, 책임자 징계 권고 등 행정처분을 내렸다.
...
위드이노베이션은 ▲개인정보처리시스템 다운로드 등의 접근권한이 있는 개인정보취급자의 컴퓨터를 외부 인터넷망과 업무망으로 분리하지 않은 점 ▲적절한 규모의 침입차단탐지시스템을 설치하고 개인정보처리시스템에 접속한 IP 등을 재분석해 불법적인 개인정보 유출 시도를 탐지하지 않은 점 ▲해킹을 당한 마케팅센터 웹페이지에 대해서 웹페이지 취약점 점검을 수행하지 않은 점 ▲고객상담사 등에게 파일 다운로드 권한이 있는 관리자페이지 접근권한을 부여하는 등 접근권한을 과하게 부여한 점 ▲인사이동 시 취급자의 접근권한을 지체 없이 변경하지 않아 해커가 이를 악용해 개인정보 파일을 다운로드 한 점 등 정보통신망법 제28조제1항에 따른 접근통제 조치 전반을 소홀히 한 점이 확인됐다.

2) URL 파라미터 조작 사례

주문 번호 쳤는데 신상 '탈탈'…책 사려다가 봉변 

국내 최대 인터넷서점인 YES 24에 고객정보가 무방비로 노출됐습니다. 주문 번호만 치면 다른 사람에 개인정보까지 다 볼 수 있었습니다. 
...
장 씨가 했던 것처럼 취재진도 YES 24 사이트에서 임의로 주문 번호를 쳐봤습니다. 해당 번호를 받은 다른 주문자의 개인 정보를 확인할 수 있었습니다. 이렇게 스무 명의 개인정보를 입수해 직접 확인해 봤는데, 모두 YES 24 고객들이었습니다.
...
YES 24 측은 보안 점검 과정의 실수로 지난달 초부터 3주 동안 개인정보가 노출된 사실이 있다고 인정했습니다.


3) 무작위 대입공격

비번역할 ‘CVC’ 수없이 틀려도 차단장치 없었다

中조직, 3억대 기프트카드 정보 빼가 서울 강서구에 사는 A 씨는 지난해 12월 중순 50만 원짜리 기프트카드 8장을 샀다. 불과 며칠 뒤, A 씨는 이 기프트카드로 결제를 하려다 잔액이 ‘0원’이라는 사실을 확인했다. 
...
총 16자리인 카드번호 가운데 일부 숫자만 바꾸면 유효기간이 같은 새로운 카드번호가 생성된다는 점을 이용한 것으로 경찰은 추정하고 있다. 한 카드사 관계자는 “카드번호는 자릿수마다 특정한 정보를 담고 있으며 일정한 패턴이 있다”면서 “카드번호 생성 알고리즘을 잘 알고 있는 사람의 소행으로 보인다”고 말했다.


End of Document.



0  Comments,   0  Trackbacks
댓글 쓰기
JVM에서 MapReduce를 간편하게 쓸수 있는 오픈소스 라이브러리 Cascading

Cascading

Cascading은 opensource library로서 JVM에서 쉽게 bigdata처리를 가능캐 한다.

오픈소스이며 apache license를 보유하고있다.


# Website : https://www.cascading.org/

# Github : https://github.com/Cascading/cascading

# twitter : https://twitter.com/Cascading


<그림. pipeline 처럼 물흐르듯이 data를 처리하는 cascading library>


Source-pipe-sink 패러다임을 채용하여 객체지향 프로그래밍 언어에서도 data처리를 직관적으로 구현할 수 있다.


기존에 많이 사용하던 MapReduce code와 비교해보자.


Old MapReduce code

Bigdata를 다룰 때 맵리듀스(MapReduce)는 흩어져 있는 데이터를 수직화 하여 종류별로 모으고(MAP), Filtering과 Sorting을 통해 데이터를 뽑아(Reduce)낸다.


가장 유명한 MapReduce문제는 wordCount이다. 데이터의 word의 개수를 count하여 각각의 word가 얼마나 노출되었는지 확인해주는데, 코드는 아래와 같다.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.*

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<bject, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<text,intwritable,text,intwritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<intwritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

그리 길지않지만, 직관적이지 않아서 읽기 어렵다. 하지만 cascading이라면?


WordCount program with Cascading library

아래와 같이 간단하고 명료하게 작성 가능하다. 

cascading의 여러 기능을 통해 pipeline을 구성하여 replace, count, aggregate 등등 강력한 기능을 local / hadoop 할 것없이 유동적으로 작성 가능하다
package cascading.project;

import java.util.Properties;

import cascading.*

public class MainLocal {
    public static void main(String[] args) {
        String inPath = "~/Documents/github/ProjectTemplate/testWord";
        String outPath = "~/Documents/github/ProjectTemplate/test2";

        LocalFlowConnector flowConnector = new LocalFlowConnector(new Properties());

        // source and result field 정의
        Tap srctap = new FileTap(new TextLine(new Fields("line")), inPath);
        Tap sinkTap = new FileTap(new TextLine(new Fields("word", "count")),
                outPath, SinkMode.REPLACE);

        // source에서 구분 기준을 Regex로 정의
        Pipe words = new Each("start", new RegexSplitGenerator("\\s+"));
        Pipe wordCount = new Every(new GroupBy(words), new Count());

        FlowDef flowDef = FlowDef.flowDef()
                .addSource(wordCount, srctap) // connect pipe and src
                .addTailSink(wordCount, sinkTap); // connect pipe and sink

        // run the flow
        flowConnector.connect(flowDef).complete();
    }
}

Cascading library를 사용한 MapReduce 결과물

Source file

아파치 하둡은 대량의 자료를 처리할 수 있는 큰 컴퓨터 클러스터에서 동작하는 분산 응용 프로그램을 지원하는 프리웨어 자바 소프부 프로젝트이다. 분산처리 시스템인 구글 파일 시스템을 대체할 수 있는 하둡 분산 파일 시스템과 맵리듀스를 구현한 것이다.

Result file

개발된  1
것으로, 1
것이다. 1
구글    1
구현한  1
너치의  1
대량의  1
대체할  1
동작하는        1
루씬의  1
맵리듀스를      1
분산    3
분산처리        1
소프트웨어      1
수      2
시스템과        1
시스템을        1
시스템인        1
아파치  2
원래    1
위해    1
응용    1
있는    2
자료를  1
자바    1
지원하기        1
지원하는        1
처리를  1
처리할  1
컴퓨터  1
큰      1
클러스터에서    1
파일    2
프레임워크이다. 1
프로그램을      1
프로젝트이다.   1
프리웨어        1
하둡    1
하둡은  1
하부    1


End of Document



0  Comments,   0  Trackbacks
댓글 쓰기
Spring boot에 AWS Elasticbeanstalk의 ebextensions 적용하기

Elasticbeanstalk란?

Elasticbeanstalk는 Java, .NET, PHP, Node.js, Python, Ruby, Go, Docker를 사용하여 Apache, Nginx, Passenger, IIS와 같은 친숙한 서버에서 개발된 웹 애플리케이션 및 서비스를 간편하게 배포하고 조정할 수 있는 서비스이다.


Tomcat 배포시 web server configuration?

Spring boot MVC를 war package하여 Elasticbeanstalk(tomcat)을 통해 배포하면 자동으로 WAS, WEB 서버가 프로비져닝되어 배포된다. 아주 간편하고 쉬운 배포지만 was control 뿐만아니라 web server을 어떻게 컨트롤 할지 의문이든다. 


이 때를 위하여 Elasticbeanstalk는 .ebextensions를 사용하여 고급 환경 사용자 설정을 유도하고 있다. .ebextensions를 통해 배포시 apache설정, JVM 설정등을 포함하여 배포하면 배포시 자동적용된다.



이번 포스팅에서는 Tomcat was와 Nginx web server을 사용할때를 가정한다. 

http 접속을 요청할 때 https로 auto redirect되도록 하는 configuration을 셋팅하는 예제를 아래에서 보여줄 것이다.


Spring boot + maven 사용시 ebextensions 설정

1) pom.xml

src/main/ebextensions의 내용을 war로 package시에 .ebextensions folder을 만들어 root directory에 저장하도록 한다.

<build>
  <plugins>
    <!-- Package as an executable jar/war -->
    <plugin>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-maven-plugin</artifactid>
    </plugin>
    <plugin>
      <artifactid>maven-war-plugin</artifactid>
      <version>2.6</version>
      <configuration>
        <webresources>
          <resource>
            <directory>src/main/ebextensions</directory>
            <targetpath>.ebextensions</targetpath>
            <filtering>true</filtering>
          </resource>
        </webresources>
      </configuration>
    </plugin>
  </plugins>
</build>

2) 경로

src/main/ebextensions 폴더에 configuration에 필요한 내용들을 넣는다.
| pom.xml
+ src/main
|    + ebextensions/nginx/conf.d/elasticbeanstalk/00_application.conf
|    + resources
|    + java/com.company/HelloworldApplication.java
|    + webapp/WEB-INF/jsp

3) configuration

nginx의 셋팅에 필요한 configuration을 작성한다. 
github의 awsdocs에서도 examples(바로가기)를 제공한다.
location / {
  set $redirect 0;
  if ($http_x_forwarded_proto != "https") {
    set $redirect 1;
  }
  if ($http_user_agent ~* "ELB-HealthChecker") {
    set $redirect 0;
  }
  if ($redirect = 1) {
    return 301 https://$host$request_uri;
  }

  proxy_pass          http://127.0.0.1:8080;
    proxy_http_version  1.1;

  proxy_set_header    Connection          $connection_upgrade;
  proxy_set_header    Upgrade             $http_upgrade;
  proxy_set_header    Host                $host;
  proxy_set_header    X-Real-IP           $remote_addr;
  proxy_set_header    X-Forwarded-For     $proxy_add_x_forwarded_for;
}


결과물

Elasticbeanstalk에 war를 배포하면 아래와 같이 configuration이 완료되었음을 동시에 알리면서 배포가 완료된다.



맺음말

Elasticbeanstalk와 ebextension이 함께라면 ec2에서 수행해야할 어려운 configuration들을 code화 하여 간편하게 배포할 뿐만아니라 history관리까지 가능하다. Infra에 대한 걱정은 적게 생각하고 비즈니스에 집중할 수 있게 도와주는 훌륭한 aws 제품인 Elasticbeanstalk는 1인 개발, 스타트업, 학생들에게 유용한 도구임에 틀림없다. 


End of Document

'개발이야기 > AWS' 카테고리의 다른 글

Spring boot에 AWS Elasticbeanstalk의 ebextensions 적용하기  (0) 2018.11.19


0  Comments,   0  Trackbacks
댓글 쓰기
Spring boot에서 kafka 사용시 application.yaml 설정








# APACHE KAFKA (KafkaProperties)
spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup.
spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client.
spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.admin.ssl.key-store-location= # Location of the key store file.
spring.kafka.admin.ssl.key-store-password= # Store password for the key store file.
spring.kafka.admin.ssl.key-store-type= # Type of the key store.
spring.kafka.admin.ssl.protocol= # SSL protocol to use.
spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.admin.ssl.trust-store-type= # Type of the trust store.
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden.
spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.
spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.
spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background.
spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size".
spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request.
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group to which this consumer belongs.
spring.kafka.consumer.heartbeat-interval= # Expected time between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
spring.kafka.consumer.properties.*= # Additional consumer-specific properties used to configure the client.
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.consumer.ssl.key-store-location= # Location of the key store file.
spring.kafka.consumer.ssl.key-store-password= # Store password for the key store file.
spring.kafka.consumer.ssl.key-store-type= # Type of the key store.
spring.kafka.consumer.ssl.protocol= # SSL protocol to use.
spring.kafka.consumer.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.consumer.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.consumer.ssl.trust-store-type= # Type of the trust store.
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.jaas.control-flag=required # Control flag for login configuration.
spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration.
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module.
spring.kafka.jaas.options= # Additional JAAS options.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property.
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received).
spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level).
spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used.
spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive.
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
spring.kafka.listener.type=single # Listener type.
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
spring.kafka.producer.batch-size= # Default batch size.
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers.
spring.kafka.producer.buffer-memory= # Total memory size the producer can use to buffer records waiting to be sent to the server.
spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
spring.kafka.producer.key-serializer= # Serializer class for keys.
spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client.
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.producer.ssl.key-store-location= # Location of the key store file.
spring.kafka.producer.ssl.key-store-password= # Store password for the key store file.
spring.kafka.producer.ssl.key-store-type= # Type of the key store.
spring.kafka.producer.ssl.protocol= # SSL protocol to use.
spring.kafka.producer.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.producer.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.producer.ssl.trust-store-type= # Type of the trust store.
spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.key-store-location= # Location of the key store file.
spring.kafka.ssl.key-store-password= # Store password for the key store file.
spring.kafka.ssl.key-store-type= # Type of the key store.
spring.kafka.ssl.protocol= # SSL protocol to use.
spring.kafka.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.ssl.trust-store-type= # Type of the trust store.
spring.kafka.streams.application-id= # Kafka streams application.id property; default spring.application.name.
spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean.
spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for streams.
spring.kafka.streams.cache-max-size-buffering= # Maximum memory size to be used for buffering across all threads.
spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams.
spring.kafka.streams.replication-factor= # The replication factor for change log topics and repartition topics created by the stream processing application.
spring.kafka.streams.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.streams.ssl.key-store-location= # Location of the key store file.
spring.kafka.streams.ssl.key-store-password= # Store password for the key store file.
spring.kafka.streams.ssl.key-store-type= # Type of the key store.
spring.kafka.streams.ssl.protocol= # SSL protocol to use.
spring.kafka.streams.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.streams.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.streams.ssl.trust-store-type= # Type of the trust store.
spring.kafka.streams.state-dir= # Directory location for the state store.
spring.kafka.template.default-topic= # Default topic to which messages are sent.


0  Comments,   0  Trackbacks
댓글 쓰기
Spring boot scheduler를 활용한 kafka producer/consumer 예제

Kafka는 분산 메시징 플랫폼으로 폭넓은 확장성과 우수한 성능을 가진다. Kafka의 간단한 사용을 위해 Spring boot를 사용하여 consumer, producer개념을 익힐 수 있다. Spring boot의 scheduler기능을 통해서 producer가 kafka에 topic을 내려 주면, subscribe하고 있는 consumer가 해당 메시지를 받는 형태로 만들 것이다.


Architecture

Spring boot scheduler와 kafka의 연동 구성도


Kafka 설치

Kafka의 설치과정은 아래 posting에서 확인할 수 있다.

Macbook에 Kafka 1분만에 설치하기(바로가기)



Project directory

프로젝트 directory는 intellij의 spring boot default 설정을 따라간다. 



build.gradle

이번 포스팅에서는 gradle을 사용하여 spring boot를 구성하고자 한다. gradle에 대한 자세한 설명은 Gradle build tool 4.0 가이드(바로가기) 포스팅에서 확인 가능하다.

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:2.0.5.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

bootJar {
    baseName = 'gs-scheduling-tasks'
    version =  '0.1.0'
}

repositories {
    mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter")
    compile "org.springframework.kafka:spring-kafka:2.1.10.RELEASE"
    testCompile("org.springframework.boot:spring-boot-starter-test")
}


application.properties

spring-kafka 사용시 설정할 몇가지 부분에 대해서 셋팅한다. 여기서는 반드시 필요한 2가지 요소인 consumer.group-id와 server ip를 선언하였다.

application.properties 혹은 application.yaml을 통해서 spring에서 사용할 kafka설정을 자유자재로 설정이 가능한데 각각의 설정에 대한 설명은 Spring boot common-properties 공식사이트(바로가기) 에서 확인 가능하다.

spring.kafka.consumer.group-id=kafka-intro
spring.kafka.bootstrap-servers=localhost:9092


Application.java

scheduler처리를 위해 @EnableScheduling 을 추가.
package hello;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class);
    }
}



ScheduledTasks.java

kafka연동의 핵심적인 부분이다. 크게 두가지 역할을 하는 것으로 볼 수 있다.

1) 1000ms마다 producer는 "helloworld"+now Date() format의 데이터를 send.

2) consumer는 "test" topic에 들어오는 모든 메시지를 가져와서 log를 남김.

package hello;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class ScheduledTasks {

    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);

    private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(String topic, String payload) {
        kafkaTemplate.send(topic, payload);
        log.info("Message: " + payload + " sent to topic: " + topic);
    }


    @Scheduled(fixedRate = 1000)
    public void reportCurrentTime() {
        send("test", "helloworld " + dateFormat.format(new Date()));
    }

    @KafkaListener(topics = "test")
    public void receiveTopic1(ConsumerRecord consumerRecord) {
        log.info("Receiver on topic1: "+consumerRecord.toString());
    }
}



결과물

Producer는 정상적으로 topic을 send하고, consumer도 정상적으로 topic을 receive하는 모습을 볼 수 있다.
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.5.RELEASE)

2018-11-06 20:37:52.755  INFO 58130 --- [           main] hello.Application                        : Starting Application on 1003855ui-MacBook-Pro.local with PID 58130
...
...
...
2018-11-06 20:37:53.925  INFO 58130 --- [pool-1-thread-1] hello.ScheduledTasks                     : Message: helloworld 20:37:53 sent to topic: test
2018-11-06 20:37:54.038  INFO 58130 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=kafka-intro] Successfully joined group with generation 1
2018-11-06 20:37:54.039  INFO 58130 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=kafka-intro] Setting newly assigned partitions [test-1, test-0, test-3, test-2, test-13, test-12, test-15, test-14, test-17, test-16, test-19, test-18, test-5, test-4, test-7, test-6, test-9, test-8, test-11, test-10]
2018-11-06 20:37:54.061  INFO 58130 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test-1, test-0, test-3, test-2, test-13, test-12, test-15, test-14, test-17, test-16, test-19, test-18, test-5, test-4, test-7, test-6, test-9, test-8, test-11, test-10]
2018-11-06 20:37:54.744  INFO 58130 --- [pool-1-thread-1] hello.ScheduledTasks                     : Message: helloworld 20:37:54 sent to topic: test
2018-11-06 20:37:54.769  INFO 58130 --- [ntainer#0-0-C-1] hello.ScheduledTasks                     : Receiver on topic1: ConsumerRecord(topic = test, partition = 5, offset = 4, CreateTime = 1541504274743, serialized key size = -1, serialized value size = 19, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld 20:37:54)















0  Comments,   0  Trackbacks
댓글 쓰기