命名空间(namespace): 默认为default命名空间 list_namespace create_namespace ‘my_ns’ create ‘my_ns:my_table’,’fam’ exists ‘my_ns:my_table’ list list_namespace disable ‘my_ns:my_table’ drop ‘my_ns:my_table’ drop_namespace ‘my_ns’
rowkey与family: rowkey行键唯一,family列簇,每一个数据项都有时间戳 put ‘my_ns:my_table’,’rowkey1’,’fam:f1’,’r1f1’ put ‘my_ns:my_table’,’rowkey1’,’fam:f2’,’r1f2’ put ‘my_ns:my_table’,’rowkey1’,’fam:f3’,’r1f3’ put ‘my_ns:my_table’,’rowkey2’,’fam:f1’,’r2f1’ put ‘my_ns:my_table’,’rowkey2’,’fam:f2’,’r2f2’ put ‘my_ns:my_table’,’rowkey2’,’fam:f3’,’r2f3’ put ‘my_ns:my_table’,’rowkey3’,’fam:f1’,’r3f1’ put ‘my_ns:my_table’,’rowkey3’,’fam:f2’,’r3f2’ put ‘my_ns:my_table’,’rowkey3’,’fam:f3’,’r3f3’ list ‘my_ns:my_table’ scan ‘my_ns:my_table’ get ‘my_ns:my_table’,’rowkey1’ get ‘my_ns:my_table’,’rowkey1’,’fam’ get ‘my_ns:my_table’,’rowkey1’,’fam:f1’ delete ‘my_ns:my_table’,’rowkey1’,’fam:f1’ delete ‘my_ns:my_table’,’rowkey2’,’fam:f2’ delete ‘my_ns:my_table’,’rowkey3’,’fam:f3’ scan ‘my_ns:my_table’
hbase java api:
官方API文档:http://hbase.apache.org/devapidocs/index.html
步骤一:关闭hbase机器上的防火墙,确保在本地能与远程hbase建立tcp连接:service iptable stop
步骤二: maven项目中添加依赖
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.0</version> </dependency>注意:有个小问题,jdk1.6的tools.jar自动下载不到,得自己手动去下个包把依赖写上,hbase-client里面依赖了这个包,如下
<dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.6</version> <scope>system</scope> <systemPath>E:/downloads/jar/tools.jar</systemPath> </dependency>步骤三:本地机器需要加上hbase服务器hostname的ip映射,我在hosts文件里添加了如下一行: 192.168.137.10 centos001
步骤四: 编写HBaseClient类与测试(hbase单机版安装:http://blog.csdn.net/wyh9459/article/details/69053599)
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; public class HBaseClient { private Configuration configuration; private Connection connection; private Admin admin; private static HBaseClient instance; private HBaseClient() throws IOException{ configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "192.168.137.10:2181,192.168.137.10:2182,192.168.137.10:2183"); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); }; public static HBaseClient getInstance() throws IOException{ if(instance==null){ synchronized (HBaseClient.class) { if(instance==null){ instance = new HBaseClient(); } } } return instance; } public void close() throws IOException{ if(admin!=null){ admin.close(); } if(connection!=null){ connection.close(); } } /** * 判断命名空间是否存在 * @param strNamespace 命名空间 * @return true-存在,false-不存在 * @throws IOException */ public boolean isExistsNamespace(String strNamespace) throws IOException{ NamespaceDescriptor[] namespaces = admin.listNamespaceDescriptors(); for(int i=0; i<namespaces.length; i++){ if(strNamespace.equals(namespaces[i].getName())){ return true; } } return false; } /** * 创建命名空间 * @param strNamespace 命名空间 * @return true-创建成功,false-存在该namespace * @throws IOException */ public boolean createNamespace(String strNamespace) throws IOException{ if(isExistsNamespace(strNamespace)){ return false; }else{ admin.createNamespace(NamespaceDescriptor.create(strNamespace).build()); return true; } } /** * 判断表是否存在 * @param strTableName 表名 * @return true-存在,false-不存在 * @throws IOException */ public boolean isExistsTable(String strTableName) throws IOException{ TableName tableName = TableName.valueOf(strTableName); return admin.tableExists(tableName); } /** * 创建表,存在同名表时不删除同名表也不新建表 * @param strTableName 表名 * @param strFamily 列簇名 * @return true-成功创建,false-已存在同名表,未新建表 * @throws IOException */ public boolean createTable(String strTableName, String strFamily) throws IOException { TableName tableName = TableName.valueOf(strTableName); if (admin.tableExists(tableName)) { return false; } HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor family = new HColumnDescriptor(strFamily); hTableDescriptor.addFamily(family); admin.createTable(hTableDescriptor); return true; } /** * 创建表,存在同名表时删除同名表然后新建表 * @param strTableName 表名 * @param strFamily 列簇名 * @return 表创建完成后返回true * @throws IOException */ public boolean createTableForced(String strTableName, String strFamily) throws IOException { TableName tableName = TableName.valueOf(strTableName); if (admin.tableExists(tableName)) { deleteTable(strTableName); } HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor family = new HColumnDescriptor(strFamily); hTableDescriptor.addFamily(family); admin.createTable(hTableDescriptor); return true; } /** * 插入数据 * @param strTableName 表名 * @param put 插入数据对象 * @throws IOException */ public void insertData(String strTableName, Put put) throws IOException { Table table = connection.getTable(TableName.valueOf(strTableName)); table.put(put); } /** * 插入数据 * @param strTableName 表名 * @param putList 插入数据对象集合 * @throws IOException */ public void insertData(String strTableName, List<Put> putList) throws IOException { Table table = connection.getTable(TableName.valueOf(strTableName)); table.put(putList); } /** * 扫描表 * @param strTableName 表名 * @return ResultScanner 扫描结果对象 * @throws IOException */ public ResultScanner queryTable(String strTableName) throws IOException { Table table = connection.getTable(TableName.valueOf(strTableName)); ResultScanner scanner = table.getScanner(new Scan()); return scanner; } /** * 获取指定行的数据 * @param strTableName 表名 * @param get 查询条件对象 * @return 查询结果对象 * @throws IOException */ public Result queryTableByRowKey(String strTableName, Get get) throws IOException { Table table = connection.getTable(TableName.valueOf(strTableName)); Result result = table.get(get); return result; } /** * 根据Filter条件对象扫描表 * @param strTableName 表名 * @param filter Filter条件对象 * @return 扫描结果对象 * @throws IOException */ public ResultScanner queryTableByFilter(String strTableName, Filter filter) throws IOException { Table table = connection.getTable(TableName.valueOf(strTableName)); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); return scanner; } /** * 根据Filter条件对象列表扫描表 * @param strTableName 表名 * @param filters Filter条件对象集合 * @return 扫描结果集合 * @throws IOException */ public ResultScanner queryTableByFilters(String strTableName, List<Filter> filters) throws IOException { Table table = connection.getTable(TableName.valueOf(strTableName)); FilterList filterList = new FilterList(filters); Scan scan = new Scan(); scan.setFilter(filterList); ResultScanner scanner = table.getScanner(scan); return scanner; } /** * 添加列 * @param strTableName 表名 * @param strColumn 列簇名 * @throws IOException */ public void addColumn(String strTableName, String strColumn) throws IOException { TableName tableName = TableName.valueOf(strTableName); HColumnDescriptor columnDescriptor = new HColumnDescriptor(strColumn); admin.addColumn(tableName, columnDescriptor); } /** * 删除列 * @param strTableName 表名 * @param strColumn 列簇名 * @throws IOException */ public void deleteColumn(String strTableName, String strColumn) throws IOException { TableName tableName = TableName.valueOf(strTableName); admin.deleteColumn(tableName, strColumn.getBytes()); } /** * 根据rowkey删除行 * @param strTableName 表名 * @param rowkey 行名 * @throws IOException */ public void deleteByRowKey(String strTableName, String rowkey) throws IOException { Table table = connection.getTable(TableName.valueOf(strTableName)); Delete delete = new Delete(Bytes.toBytes(rowkey)); table.delete(delete); } /** * 删除行 * @param strTableName 表名 * @param list 删除数据集合 * @throws IOException */ public void deleteRow(String strTableName, List<Delete> list) throws IOException { Table table = connection.getTable(TableName.valueOf(strTableName)); table.delete(list); } /** * 根据Filter条件对象删除行 * @param strTableName 表名 * @param filter Filter条件对象 * @throws IOException */ public void deleteByFilter(String strTableName, Filter filter) throws IOException { ResultScanner scanner = queryTableByFilter(strTableName, filter); List<Delete> list = new ArrayList<Delete>(); for (Result result : scanner) { Delete delete = new Delete(result.getRow()); list.add(delete); } deleteRow(strTableName, list); scanner.close(); } /** * 截断表 * @param strTableName 表名 * @throws IOException */ public void truncateTable(String strTableName) throws IOException { TableName tableName = TableName.valueOf(strTableName); admin.disableTable(tableName); admin.truncateTable(tableName, true); } /** * 删除表 * @param strTableName 表名 * @throws IOException */ public void deleteTable(String strTableName) throws IOException { TableName tableName = TableName.valueOf(strTableName); admin.disableTable(tableName); admin.deleteTable(tableName); } public static void main(String[] args) throws IOException { HBaseClient test = HBaseClient.getInstance(); test.testCreateNamespace(); test.testCreateTable(); test.testInsertData(); test.testQuery(); test.testQueryByRow(); test.testQueryByFilter(); test.testQueryByFilters(); test.testAddColumn(); test.testDeleteColumn(); test.testDeleteRow(); test.testDeleteTable(); test.close(); } public void testCreateNamespace() throws IOException{ createNamespace("ns1"); } public void testCreateTable() throws IOException{ createTableForced("ns1:t1", "cf"); } public void testInsertData() throws IOException { List<Put> putList = new ArrayList<Put>(); Put put = null; for (int i = 0; i < 10; i++) { put = new Put(Bytes.toBytes("row" + i)); put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("c1"), Bytes.toBytes("r" + i + "c1")); put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("c2"), Bytes.toBytes("r" + i + "c2")); putList.add(put); } insertData("ns1:t1", putList); } public void testQuery() throws IOException{ ResultScanner scanner = queryTable("ns1:t1"); for (Result result : scanner) { byte[] row = result.getRow(); System.out.println("row key is:" + new String(row)); List<Cell> listCells = result.listCells(); for (Cell cell : listCells) { byte[] familyArray = cell.getFamilyArray(); byte[] qualifierArray = cell.getQualifierArray(); byte[] valueArray = cell.getValueArray(); System.out.println("row value is:" + new String(familyArray) + new String(qualifierArray) + new String(valueArray)); } } scanner.close(); } public void testQueryByRow() throws IOException{ Get get = new Get("row5".getBytes()); Result result = queryTableByRowKey("ns1:t1", get); byte[] row = result.getRow(); System.out.println("row key is:" + new String(row)); List<Cell> listCells = result.listCells(); for (Cell cell : listCells) { byte[] familyArray = cell.getFamilyArray(); byte[] qualifierArray = cell.getQualifierArray(); byte[] valueArray = cell.getValueArray(); System.out.println("row value is:" + new String(familyArray) + new String(qualifierArray) + new String(valueArray)); } } public void testQueryByFilter() throws IOException{ Filter filter = new SingleColumnValueFilter( Bytes.toBytes("cf"), Bytes.toBytes("c1"), CompareOp.EQUAL, Bytes.toBytes("r3c1")); ResultScanner scanner = queryTableByFilter("ns1:t1", filter); for (Result result : scanner) { byte[] row = result.getRow(); System.out.println("row key is:" + new String(row)); List<Cell> listCells = result.listCells(); for (Cell cell : listCells) { byte[] familyArray = cell.getFamilyArray(); byte[] qualifierArray = cell.getQualifierArray(); byte[] valueArray = cell.getValueArray(); System.out.println("row value is:" + new String(familyArray) + new String(qualifierArray) + new String(valueArray)); } } scanner.close(); } public void testQueryByFilters() throws IOException{ List<Filter> filters = new ArrayList<Filter>(); Filter filter1 = new SingleColumnValueFilter( Bytes.toBytes("cf"), Bytes.toBytes("c1"), CompareOp.EQUAL, Bytes.toBytes("r5c1")); Filter filter2 = new SingleColumnValueFilter( Bytes.toBytes("cf"), Bytes.toBytes("c2"), CompareOp.EQUAL, Bytes.toBytes("r5c2")); filters.add(filter1); filters.add(filter2); ResultScanner scanner = queryTableByFilters("ns1:t1", filters); for (Result result : scanner) { byte[] row = result.getRow(); System.out.println("row key is:" + new String(row)); List<Cell> listCells = result.listCells(); for (Cell cell : listCells) { byte[] familyArray = cell.getFamilyArray(); byte[] qualifierArray = cell.getQualifierArray(); byte[] valueArray = cell.getValueArray(); System.out.println("row value is:" + new String(familyArray) + new String(qualifierArray, "UTF-8") + new String(valueArray, "UTF-8")); } } scanner.close(); } public void testAddColumn() throws IOException{ addColumn("ns1:t1", "fam"); testQueryByRow(); } public void testDeleteColumn() throws IOException{ deleteColumn("ns1:t1", "fam"); testQueryByRow(); } public void testDeleteRow() throws IOException{ List<Delete> list = new ArrayList<Delete>(); Delete delete = new Delete(Bytes.toBytes("row4")); list.add(delete); deleteRow("ns1:t1",list); testQuery(); } public void testDeleteTable() throws IOException{ truncateTable("ns1:t1"); testQuery(); deleteTable("ns1:t1"); System.out.println(admin.tableExists(TableName.valueOf("ns1:t1"))); } }