아파치 플링크는 2.0 버전부터는 더이상 scala API를 지원하지 않습니다.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support
플링크1.15버전까지는 스칼라와 강결합되어 있는 코드를 제공했습니다. 그러나 점차 스칼라 관련 기여가 적어지고 있는 상황에서 더 이상 스칼라에 대한 직접적인 api 지원이 의미 없다고 생각되어서 더 이상 지원을 하지 않는 방향의 의견이 나왔습니다.
이에 따라 플링크 PMC(Project Management Committee)의 투표를 받았고, 거의 만장일치로 승인이 떨어졌습니다. vote 관련 링크는 다음에서 볼 수 있습니다. https://lists.apache.org/thread/qfz4opcbc2p59fhmymncxyzxb70cn098
플링크 프로젝트 운영 규칙(Flink Bylaws)에 따라 PMC는 이런 사안에 대해 투표를 해야만 합니다. 투표에 대한 상세한 사항은 https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws 에서 확인할 수 있습니다.
무엇이 바뀌나?
가장 크게 바뀌는 부분은 직접적인 scala api가 없어진다는 부분입니다. scala는 tuple, 간결한 문법, collection 등 다양한 장점을 가진 언어이고 이를 지원함으로써 스칼라를 사용하는 유저들은 플링크를 더욱 scala 친화적으로 사용할 수 있었습니다. 동일한 jvm 라이브러리라고 하더라도 언어가 다르면 지저분해지기 때문에 이런 부분은 스칼라 유저들에게 친숙하게 다가왔습니다. 스파크를 사용하던 스칼라 유저는 이런 점이 반가웠으리라 생각됩니다.
Flink Scala API를 활용한 예제
import org.apache.flink.api.scala._
object WordCountScala {
case class WordWithCount(word: String, count: Int)
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?"
)
val counts = text
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map(word => WordWithCount(word, 1))
.groupBy(_.word)
.reduce { (a, b) => WordWithCount(a.word, a.count + b.count) }
counts.print()
}
}
위와 같이 scala flink로 사용한 구문은 무척이나 간결합니다. 그럼 자바 라이브러리로 구현하면 어떻게 될까요?
// Java Tuple2를 사용하는 예제
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object JavaStyleWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text
.flatMap(new FlatMapFunction[String, Tuple2[String, Integer]] {
override def flatMap(value: String, out: Collector[Tuple2[String, Integer]]): Unit = {
for (word <- value.toLowerCase.split("\\W+")) {
if (word.nonEmpty) out.collect(new Tuple2(word, 1))
}
}
})
.keyBy(0)
.sum(1)
counts.print()
env.execute("Java Style Word Count in Scala")
}
}
JAVA api를 그대로 사용하면 코드의 구조 자체가 자바에 의존적이기 때문에 상대적으로 읽기 어렵고 장황해집니다. 예를 들자면, Java의 Tuple 클래스를 사용하거나, Collector를 사용한 사용자 정의 함수 정의가 필요할 때 코드가 더 복잡해집니다. 그리고 Java POJO를 사용하면 타입 정보가 감소되고 간결한 코드 작성이 어려워집니다. Scala에서는 케이스 클래스를 사용하여 더욱 타입 안전하고 간결한 코드를 작성할 수 있습니다.