Spark机器学习的数据类型(Python版)

    xiaoxiao2021-03-25  68

    数据类型: 1、局部向量 2、标签点 3、局部矩阵 4、分布矩阵 4.1 RowMatrix 4.2 IndexedRowMatrix 4.3 CoordinateMatrix 4.4 BlockMatrix MLlib支持存储在单个机器上的局部向量和矩阵,以及由一个或多个RDD支持的分布式矩阵。局部向量和局部矩阵是用作公共接口的简单数据模型。 基本线性代数运算由Breeze提供。在监督学习中使用的训练示例在MLlib中被称为“标记点”。 """ """ 1、局部向量: 局部向量具有整数类型和基于0的索引和双重类型值,存储在单个机器上。MLlib支持两种类型的局部向量:密集和稀疏。密集向量由表示其条目 值的双数组支持,而稀疏向量由两个并行数组支持:索引和值。例如,向量(1.0,0.0,3.0)可以以密集格式表示为[1.0,0.0,3.0]或以稀疏格式表示 为(3,[0,2],[1.0,3.0]),其中3是向量的大小。 MLlib将以下类型识别为密集向量: 1、NumPy的数组 2、Python的列表,例如[1,2,3] 和以下作为稀疏矢量: 1、MLlib的SparseVector。 2、单列的SciPy的csc_matrix 我们建议在列表上使用NumPy数组来提高效率,并使用在Vectors中实现的工厂方法来创建稀疏向量。 """ import numpy as np import scipy.sparse as sps from pyspark.mllib.linalg import Vectors # 使用NumPy数组作为密集向量 dv1 = np.array([1.0, 0.0, 3.0]) dv1 """ 输出: [ 1. 0. 3.] """ # 使用Python list 作为密集向量 dv2 = [1.0, 0.0, 3.0] dv2 """ 输出: [1.0, 0.0, 3.0] """ # 使用SparseVector创建一个稀疏向量 sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0]) sv1 """ 输出: (3,[0,2],[1.0,3.0]) """ # 使用单列SciPy csc_matrix 作为一个稀疏向量 sv2 sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1)) """ 输出(坐标--值): (0, 0) 1.0 (2, 0) 3.0 """ """ 2、标签点 标记点是与标记/响应相关联的稠密或稀疏的局部向量。 在MLlib中,在监督学习算法中使用标记点。 我们使用double来存储标签, 因此我们可以在回归和分类中使用标记点。对于二进制分类,标签应为0(负)或1(正)。 对于多类分类,标签应该是从零开始的类索引:0,1,2,.... """ from pyspark.mllib.linalg import SparseVector from pyspark.mllib.regression import LabeledPoint # 创建一个正的和密集向量的标签点 pos = LabeledPoint(1.0, [1.0, 0.0, 3.0]) # 创建一个负的和稀疏向量的标签点 neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0])) """ 稀疏数据: 在实践中非常常见的是具有稀疏的训练数据。MLlib支持阅读以LIBSVM格式存储的训练示例,LIBSVM格式是LIBSVM和LIBLINEAR使用的默认格式。 它是一种文本格式,其中每行代表使用以下格式的带标签的稀疏特征向量: label index1:value1 index2:value2 ... """ """ 3、局部矩阵 局部矩阵具有整数类型的行和列索引和双重类型值,存储在单个机器上。 MLlib支持稠密矩阵,其条目值以列为主的顺序存储在单个双阵列中, 稀疏矩阵的非零条目值以列为主的顺序存储在压缩稀疏列(CSC)格式中。 例如,以下稠密矩阵: ⎛1.0 2.0⎞ ⎜3.0 4.0⎟ ⎝5.0 6.0⎠ 被存储在具有矩阵大小(3,2)的一维数组[1.0,3.0,5.0,2.0,4.0,6.0]中。 本地矩阵的基类是Matrix,我们提供两个实现:DenseMatrix和SparseMatrix。 我们建议使用在Matrices中实现的工厂方法来创建局部矩阵。 记住,MLlib中的局部矩阵以列为主的顺序存储。 """ from pyspark.mllib.linalg import Matrix, Matrices # 创建一个密集矩阵((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6]) dm2 """ 输出: DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False) """ # 创建一个稀疏矩阵((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) #sparse(行数, 列数, colPtrs, 行索引, 值) 其中colPtrs (0,1,3)->(1-0,3-1)=(1,2)(每列非零元素的个数) sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8]) sm """ 输出: SparseMatrix(3, 2, [0, 1, 3], [0, 2, 1], [9.0, 6.0, 8.0], False) """ sm.toArray() """ 输出: array([[ 9., 0.], [ 0., 8.], [ 0., 6.]]) 计算: 根据colPtrs计算出每列非零的个数,然后根据每列的个数从行索依次得出对应的值。 如:colPtrs(0,1,3)->(1-0,3-1)=(1,2)第一列有一个非零值,从行索[0, 2, 1]读取第一列中那非零值的行是0,表示第一列第一行为(0,0)9.0。 然后看第二列有2个非零元素,从行索[0, 2, 1]读取的所2和1行,则表示第二列第三行为(1,2)6.0,第二列第二行为(1,1)8.0。 """ """ 4、分布矩阵 分布式矩阵具有长类型的行和列索引和双重类型值,分布式存储在一个或多个RDD中。选择正确的格式来存储大型和分布式矩阵非常重要。将分布式 矩阵转换为不同的格式可能需要全局混洗,这是相当昂贵的。到目前为止,已经实现了四种类型的分布式矩阵。基本类型称为RowMatrix。 RowMatrix 是没有意义的行索引的面向行的分布式矩阵,例如特征向量的集合。它由行的RDD支持,其中每行是局部向量。我们假设RowMatrix的列数不是巨大, 以便单个局部向量可以合理地传递给驱动程序,并且也可以使用单个节点存储/操作。 IndexedRowMatrix类似于RowMatrix,但具有行索引,可用于 标识行和执行联接。 CoordinateMatrix是一个以坐标列表(COO)格式存储的分布式矩阵,由其条目的RDD作为后盾。 BlockMatrix是由MatrixBlock 的RDD支持的分布式矩阵,它是(Int,Int,Matrix)的元组。 注意:分布式矩阵的基础RDD必须是确定性的,因为我们缓存矩阵大小。通常,使用非确定性RDD可能导致错误。 4.1 RowMatrix RowMatrix是一个面向行的分布式矩阵,没有有意义的行索引,由其行的RDD支持,其中每行都是局部向量。 由于每一行由局部向量表示,列数受限于 整数范围,但实际上应小得多。 """ from pyspark.mllib.linalg.distributed import RowMatrix # 创建一个RDD向量. rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]) # 从RDD向量创建一个RowMatrix mat = RowMatrix(rows) # 获得行和列数 m = mat.numRows() # 4 n = mat.numCols() # 3 # 再一次获得行的RDD rowsRDD = mat.rows """ 4.2 IndexedRowMatrix IndexedRowMatrix类似于RowMatrix,但具有有意义的行索引。 它由索引行的RDD支持,以便每行由其索引(长类型)和本地向量表示。 """ from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix #创建索引行的RDD # - 这个可以使用IndexedRow 类实现: indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]), IndexedRow(1, [4, 5, 6]), IndexedRow(2, [7, 8, 9]), IndexedRow(3, [10, 11, 12])]) # - 或则使用(long, vector) 元组: indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]), (2, [7, 8, 9]), (3, [10, 11, 12])]) # 从一个IndexedRows的RDD 创建 IndexedRowMatrix . mat = IndexedRowMatrix(indexedRows) # 获得行和列数 m = mat.numRows() # 4 n = mat.numCols() # 3 # 获得一个IndexedRows的RDD行 rowsRDD = mat.rows # 通过去掉行索引转成 RowMatrix rowMat = mat.toRowMatrix() """ 4.3 CoordinateMatrix CoordinateMatrix是由其条目的RDD支持的分布式矩阵。 每个条目是(i:Long,j:Long,value:Double)的元组,其中i是行索引,j是列索引, 值是条目值。 只有当矩阵的两个维度都很大并且矩阵非常稀疏时,才应使用CoordinateMatrix。 """ from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry #创建坐标项的RDD。 # - 这个可以通过MatrixEntry类实现: entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(6, 1, 3.7)]) # - 或者使用元组(long, long, float) : entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)]) # 从MatrixEntries的RDD创建一个CoordinateMatrix。 mat = CoordinateMatrix(entries) # 获得它的行和列数 m = mat.numRows() # 3 n = mat.numCols() # 2 # 获取条目作为MatrixEntries的RDD entriesRDD = mat.entries # 转换为RowMatrix rowMat = mat.toRowMatrix() # 转换为IndexedRowMatrix indexedRowMat = mat.toIndexedRowMatrix() # 转换为BlockMatrix blockMat = mat.toBlockMatrix() """ 4.4 BlockMatrix BlockMatrix是由MatrixBlocks的RDD支持的分布式矩阵,其中MatrixBlock是((Int,Int),Matrix的元组),其中(Int,Int)是块的索引, 矩阵在给定的索引,大小为rowsPerBlock x colsPerBlock。BlockMatrix支持诸如添加和乘法与另一个BlockMatrix的方法。 BlockMatrix还有一个 帮助函数验证,可用于检查BlockMatrix是否设置正确。 """ from pyspark.mllib.linalg import Matrices from pyspark.mllib.linalg.distributed import BlockMatrix #创建子矩阵块的RDD blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) #从子矩阵块的RDD创建一个BlockMatrix mat = BlockMatrix(blocks, 3, 2) # 获得它的行和列数 m = mat.numRows() # 6 n = mat.numCols() # 2 #获取块作为子矩阵块的RDD blocksRDD = mat.blocks #转换为LocalMatrix localMat = mat.toLocalMatrix() #转换为IndexedRowMatrix indexedRowMat = mat.toIndexedRowMatrix() #转换为CoordinateMatrix coordinateMat = mat.toCoordinateMatrix()
    转载请注明原文地址: https://ju.6miu.com/read-37234.html

    最新回复(0)