本文共 5500 字,大约阅读时间需要 18 分钟。
EsSpark -> EsRDDWriter -> RestService -> RestRepository -> RestClient -> NetworkClient -> CommonsHttpTransport
RestRepository repository = (iformat.hasPattern() ? initMultiIndices(settings, currentSplit, resource, log) : initSingleIndex(settings, currentSplit, resource, log));这里我们只解析单索引部分代码,在对应的initSingleIndex方法里有如下代码:
int bucket = currentInstance % targetShards.size();Shard chosenShard = orderedShards.get(bucket);Node targetNode = targetShards.get(chosenShard);
package org.elasticsearch.sparkimport ....class ESShardPartitioner(settings:String) extends Partitioner { protected val log = LogFactory.getLog(this.getClass()) protected var _numPartitions = -1 override def numPartitions: Int = { val newSettings = new PropertiesSettings().load(settings) val repository = new RestRepository(newSettings) val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly()) repository.close() _numPartitions = targetShards.size() _numPartitions } override def getPartition(key: Any): Int = { val shardId = ShardAlg.shard(key.toString(), _numPartitions) shardId }}public class ShardAlg { public static int shard(String id, int shardNum) { int hash = Murmur3HashFunction.hash(id); return mod(hash, shardNum); } public static int mod(int v, int m) { int r = v % m; if (r < 0) { r += m; } return r; }}
......partitionBy(new ESShardPartitioner(settings)).foreachPartition { iter => try { val newSettings = new PropertiesSettings().load(settings) //创建EsRDDWriter val writer = EsRDDCreator.createWriter(newSettings.save()) writer.write(TaskContext.get(), iter.map(f => f._2)) }
......partitionBy(new ESShardPartitioner(settings)).foreachPartition { iter => try { val newSettings = new PropertiesSettings().load(settings) //创建EsRDDWriter val writer = EsRDDCreator.createWriter(newSettings.save()) writer.write(TaskContext.get(), iter.map(f => f._2)) }
RestService.initSingleIndex添加如下代码:if(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID) != null){ targetNode = targetShards.get(orderedShards.get(Integer.parseInt(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID)))); }
//val settings = new SparkSettings(conf).save().partitionBy(new ESShardPartitioner(settings)).mapPartitionsWithIndex { (partitionIndex, iter) => try { val writer = EsSpark.createEsRDDWriter[Map[String,String]](settings, resource) //shardToPartitions个 Spark partition 对应一个ES Shard val shardId = ESShardPartitioner.shardIdFromPartitionId(partionId, shardToPartitions) //强制该分片写入到特定的Shard里 val stats = writer.writeToSpecificPrimaryShard(TaskContext.get(), shardId, iter.map(f => f._2)) List(NewStats(stats.bulkTotalTime, stats.docsSent)).iterator } catch {
转载地址:http://qkena.baihongyu.com/