概述
服务端的RPC处理器,可以从executor进程的外部提供shuffle block。
处理器注册executor,并且从executor打开shuffle block。shuffle block通过“一对一”策略注册,即传输层的chunk对应于spark应用层的block。
成员变量
final ExternalShuffleBlockResolver blockManager; private final OneForOneStreamManager streamManager;ManagedBufferIterator内部类
ShuffleWriter在输出文件到磁盘时,文件名的格式为(reduceId为0): "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;
在初始化ManagedBufferIterator时,只是记录本次拉取请求的appId、execId、shuffle file name集合,分别赋值到他的成员变量中。真正拉取文件延迟到next()方法中。
private class ManagedBufferIterator implements Iterator<ManagedBuffer> { private int index = 0; private final String appId; private final String execId; private final int shuffleId; // An array containing mapId and reduceId pairs. //记录mapId和reduceId pair对的数组 private final int[] mapIdAndReduceIds; /*@param String[] blockIds 请求拉取的shuffle文件名的集合,这些文件名都拥有相同的shuffleId */ ManagedBufferIterator(String appId, String execId, String[] blockIds) { this.appId = appId; this.execId = execId; String[] blockId0Parts = blockIds[0].split("_"); if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) { throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]); } //将文件名的shuffleId设置到成员变量shuffleId this.shuffleId = Integer.parseInt(blockId0Parts[1]); mapIdAndReduceIds = new int[2 * blockIds.length]; for (int i = 0; i < blockIds.length; i++) { String[] blockIdParts = blockIds[i].split("_"); if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) { throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]); } if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockIds[i]); } //将文件名的mapId和reduceId设置到成员变量mapId和ReduceIds mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]); mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]); } } @Override public boolean hasNext() { return index < mapIdAndReduceIds.length; } @Override public ManagedBuffer next() { //ExternalShuffleBlockResolver#getBlockData()方法根据shuffleId、mapId、reduceId获取数据 final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId, mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]); index += 2; metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); return block; } }handleMessage()方法
protected void handleMessage( BlockTransferMessage msgObj, TransportClient client, RpcResponseCallback callback) { //如果是处理OpenBlocks消息类型 if (msgObj instanceof OpenBlocks) { final Timer.Context responseDelayContext = metrics.openBlockRequestLatencyMillis.time(); try { OpenBlocks msg = (OpenBlocks) msgObj; checkAuth(client, msg.appId); long streamId = streamManager.registerStream(client.getClientId(), new ManagedBufferIterator(msg.appId, msg.execId, msg.blockIds), client.getChannel()); if (logger.isTraceEnabled()) { logger.trace("Registered streamId {} with {} buffers for client {} from host {}", streamId, msg.blockIds.length, client.getClientId(), getRemoteAddress(client.getChannel())); } callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer()); } finally { responseDelayContext.stop(); } //如果是处理registerExecutor消息类型 } else if (msgObj instanceof RegisterExecutor) { final Timer.Context responseDelayContext = metrics.registerExecutorRequestLatencyMillis.time(); try { RegisterExecutor msg = (RegisterExecutor) msgObj; checkAuth(client, msg.appId); blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo); callback.onSuccess(ByteBuffer.wrap(new byte[0])); } finally { responseDelayContext.stop(); } } else { throw new UnsupportedOperationException("Unexpected message: " + msgObj); } }getBlockData()方法
/** * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions * about how the hash and sort based shuffles store their data. */ public ManagedBuffer getBlockData( String appId, String execId, int shuffleId, int mapId, int reduceId) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { throw new RuntimeException( String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); } //返回FileSegmentManagedBuffer return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); }getSortBasedShuffledData()方法
server端从存储shuffle file的本地目录,获取shuffle block数据。
/** * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver, * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId. */ private ManagedBuffer getSortBasedShuffleBlockData( ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { //ExecutorShuffleInfo保存着shuffle file的本地存储目录信息、本地存储子目录信息 //获取shuffle file的index文件 File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.index"); try { ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile); ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId); return new FileSegmentManagedBuffer( conf, getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data"), shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength()); } catch (ExecutionException e) { throw new RuntimeException("Failed to open file: " + indexFile, e); } }概述
该StreamManager允许ManagedBuffer的注册,每一个注册的buffer就是一个chunk,可以被客户端通过网络拉取。
OneForOneStreamManager只能按顺序一个接一个的读取一个流的数据块。它借助于内部的StreamSate类,。要读的块编号必须为当前块编号加1。
成员变量
private final AtomicLong nextStreamId; private final ConcurrentHashMap<Long, StreamState> streams;StreamState内部类
StreamState类里存放了客户端编号,ManagedBufffer的迭代器,以及当前读的块编号.
final String appId; final Iterator<ManagedBuffer> buffers; // The channel associated to the stream final Channel associatedChannel; // Used to keep track of the index of the buffer that the user has retrieved, just to ensure // that the caller only requests each chunk one at a time, in order. int curChunk = 0;registerStream()方法
/** * Registers a stream of ManagedBuffers which are served as individual chunks one at a time to * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a * client connection is closed before the iterator is fully drained, then the remaining buffers * will all be release()'d. * * If an app ID is provided, only callers who've authenticated with the given app ID will be * allowed to fetch from this stream. * * This method also associates the stream with a single client connection, which is guaranteed * to be the only reader of the stream. Once the connection is closed, the stream will never * be used again, enabling cleanup by `connectionTerminated`. */ public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel) { //生成下一个StreamId long myStreamId = nextStreamId.getAndIncrement(); //记录下一个StreamId对应的StreamState,并放入内部成员变量concurrentMap集合中 streams.put(myStreamId, new StreamState(appId, buffers, channel)); return myStreamId; }getChunk()方法
@Override public ManagedBuffer getChunk(long streamId, int chunkIndex) { StreamState state = streams.get(streamId); if (chunkIndex != state.curChunk) { throw new IllegalStateException(String.format( "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk)); } else if (!state.buffers.hasNext()) { throw new IllegalStateException(String.format( "Requested chunk index beyond end %s", chunkIndex)); } state.curChunk += 1; ManagedBuffer nextChunk = state.buffers.next(); if (!state.buffers.hasNext()) { logger.trace("Removing stream id {}", streamId); streams.remove(streamId); } return nextChunk; }
