Hazelcast IMDG和Spark 2實現大數據項目 — tomask79

19-10-15 banq
                   

將HBase中的數據放入Hazelcast IMDG,從Spark以RDD方式訪問,這是一個非常廣泛使用的解決方案。

先決條件

  • Spring Boot演示應用程序(入門版1.5.9),其數據存儲在Hazelcast IMap(hazelcast-app文件夾)中
  • 好用且老式的Hortonworks Sandbox?2.6.5(因為您安裝了它并可以使用)
  • 打包成胖jar的Spark 2任務以RDD方式訪問Hazelcast IMap(spark-hazelcast文件夾中的sbt項目)
  • 作為從Spark 2到Hazelcast的連接器,我將使用Greg Luck的用于Hazelcast的Spark連接器

Spring Boot演示應用程序

hazelcast-app的核心是LifecycleListener,它會在Hazelcast群集啟動后立即使用數據初始化IMap。

package com.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;

/**
?* @author tomask79
?*/
public class NodeLifecycleListener implements LifecycleListener {

? ? private String hazelcastInstanceName;

? ? /**
? ? ?* @param instanceName
? ? ?*/
? ? public NodeLifecycleListener(final String instanceName) {
? ? ? ? this.hazelcastInstanceName = instanceName;
? ? }

? ? @Override
? ? public void stateChanged(LifecycleEvent event) {

? ? ? ? switch(event.getState()) {
? ? ? ? ? ? case STARTED: {
? ? ? ? ? ? ? ? System.out.println("Cluster is started, putting test data into distributed map!");
? ? ? ? ? ? ? ? preloadHZMapOnClusterStart();
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? ? ? default: {
? ? ? ? ? ? ? ? System.out.println(event.toString());
? ? ? ? ? ? }
? ? ? ? }
? ? }

? ? private void preloadHZMapOnClusterStart() {
? ? ? ? getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
? ? ? ? ? ? ? ? put("1234HZC", 100.0);
? ? ? ? getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
? ? ? ? ? ? ? ? put("5344HZC", 1500.0);
? ? ? ? getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
? ? ? ? ? ? ? ? put("7662HZC", 1300.0);
? ? ? ? getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
? ? ? ? ? ? ? ? put("8626HZC", 1400.0);
? ? ? ? getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
? ? ? ? ? ? ? ? put("7277HZC", 1500.0);
? ? ? ? getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
? ? ? ? ? ? ? ? put("6636HZC", 1500.0);
? ? }

? ? private HazelcastInstance getHazelcastInstance(final String name) {
? ? ? ? return Hazelcast.getHazelcastInstanceByName(name);
? ? }
}

不要錯過Hazelcast的STARTED事件直接在注冊監聽器配置

?@Bean
? ? public Config config() {
? ? ? ? Config config = new Config();

? ? ? ? config.setInstanceName(HZArtifactAPI.HAZELCAST_INSTANCE);
? ? ? ? config.setProperty("hazelcast.wait.seconds.before.join","10");

? ? ? ? config.getGroupConfig().setName("mygroup");
? ? ? ? config.getGroupConfig().setPassword("mypassword");

? ? ? ? config.getNetworkConfig().setPortAutoIncrement(true);
? ? ? ? config.getNetworkConfig().setPort(10555);
? ? ? ? config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(true);

? ? ? ? config.addListenerConfig(
? ? ? ? ? ? ? ? new ListenerConfig( new NodeLifecycleListener(HZArtifactAPI.HAZELCAST_INSTANCE) ));

? ? ? ? SSLConfig sslConfig = new SSLConfig();
? ? ? ? sslConfig.setEnabled(false);
? ? ? ? config.getNetworkConfig().setSSLConfig(sslConfig);

? ? ? ? return config;
? ? }

好吧,現在讓我們編譯并運行Spring boot應用程序并測試Hazelcast IMap:

tomask79:hazelcast-app tomask79$ pwd
/Users/tomask79/workspace/spark-hazelcast-integration/hazelcast-app
tomask79:hazelcast-app tomask79$ mvn clean install
tomask79:hazelcast-app tomask79$ java -jar spring-microservice-service1/target/service1-0.0.1-SNAPSHOT.war

為了測試IMap是否正常工作以及是否公開了數據,我添加了簡單的REST控制器,以便在另一個終端中運行:

tomask79:hazelcast-app tomask79$ curl http://localhost:8082/payments/7662HZC
1300.0
tomask79:hazelcast-app tomask79$ curl http://localhost:8082/payments/1234HZC
100.0

Spring Boot演示應用程序準備就緒!

Spark 2任務以RDD訪問Hazelcast IMap

在將Spark Connector用于Hazelcast時,請確保按照先決條件中的說明使用Hazelcast 3.7.x或更高版本。由于使用3.6.4,有一段時間我無法將Spark 2任務與Hazelcast連接。作為演示,我將在Hazelcast IMap上獲取RDD并在其上運行RDD collect

package com.example

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{Row, SparkSession}

import com.hazelcast.spark.connector.{toSparkContextFunctions}

object HazelcastConnectorTest {

? def runCode() = {
? ? ? val conf = new SparkConf()
? ? ? ? ? .set("hazelcast.server.addresses", "127.0.0.1:10555")
? ? ? ? ? .set("hazelcast.server.groupName", "mygroup")
? ? ? ? ? .set("hazelcast.server.groupPass", "mypassword")
? ? ? ? ? .set("hazelcast.spark.valueBatchingEnabled", "true")
? ? ? ? ? .set("hazelcast.spark.readBatchSize", "5000")
? ? ? ? ? .set("hazelcast.spark.writeBatchSize", "5000")

? ? ? val sc = new SparkContext(conf)

? ? ? val rddFromMap = sc.fromHazelcastMap("payments_map")

? ? ? rddFromMap.collect().foreach(println)
? }
}

IMap名稱“ payments_map”等于已經提到的NodeLifecycleListener中的HZArtifactAPI.PAYMENTS_MAP常量。

測試一切

倉庫有兩個文件夾,一個是hazelcast-app,一個是maven項目,另一個是spark scala?sbt項目:

構建hazelcast-app:

tomask79:hazelcast-app tomask79$ pwd
/Users/tomask79/workspace/spark-hazelcast-integration/hazelcast-app
tomask79:hazelcast-app tomask79$ mvn clean install

構建Spark sbt項目:

tomask79:spark-hazelcast tomask79$ pwd
/Users/tomask79/workspace/spark-hazelcast-integration/spark-hazelcast
tomask79:spark-hazelcast tomask79$ sbt assembly

現在,讓我們啟動Hortonworks沙箱,然后將Spark胖jar上傳到其中:

tomask79:scala-2.11 tomask79$ pwd
/Users/tomask79/workspace/spark-hazelcast-integration/spark-hazelcast/target/scala-2.11
tomask79:scala-2.11 tomask79$ scp -P 2222 apache-spark-2-scala-starter-template-assembly-1.0.jar root@localhost:/root
root@localhost's password:?
apache-spark-2-scala-starter-template-assembly-1.0.jar ? ? ? ?

現在,如果將VirtualBox用于Sandbox,并且要訪問在外部運行的Hazelcast應用程序,則必須設置端口轉發。我不想弄亂它,所以我也將hazelcast-app WAR文件上傳到了沙箱中...由您決定。

啟動hazelcast-app:

[root@sandbox-hdp ~]# java -jar service1-0.0.1-SNAPSHOT.war?

再次讓我們在另一個終端中對其進行測試:

[root@sandbox-hdp ~]# curl http://localhost:8082/payments/7662HZC
1300.0?

看起來不錯...

啟動Spark 2任務以訪問Hazelcast IMDG

[root@sandbox-hdp ~]# spark-submit --class com.example.Main --master yarn-client apache-spark-2-scala-starter-template-assembly-1.0.jar

并且在輸出中,您應該看到在Spark 2中可見的IMap“ payments_map”的內容:

19/09/01 20:10:16 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/09/01 20:10:16 INFO DAGScheduler: ResultStage 0 (collect at HazelcastConnectorTest.scala:25) finished in 12.410 s
19/09/01 20:10:16 INFO DAGScheduler: Job 0 finished: collect at HazelcastConnectorTest.scala:25, took 12.682620 s
(7277HZC,1500.0)
(8626HZC,1400.0)
(5344HZC,1500.0)
(1234HZC,100.0)
(7662HZC,1300.0)
(6636HZC,1500.0)

Hazelcast的Spark連接器似乎正在工作。太好了,如果您在Hazelcast IMDG中有一些數據,并且需要在大數據Hadoop Warhouse處理中使用它們,那么我認為此連接器非常有用。

點擊標題獲取源碼

                   

一级黄色录像影片 夫妻性生活影片 免费在线观看 一级a做爰片