scala实现hbase的分页查询,并封装PageInfo

来源:互联网 时间:1970-01-01

在mysql,oracle等关系型数据库的时候,实现分页查询很简单,只需要在写sql语句的时候,用limit就可以了,最近在做大数据,需要页面的展示,不得不查询hbase数据库,又不能一次显示的数据太多,所以就想到了分页,在最新的hbase中我们进入shell命令下,也有limit的实现,但是hbase的limit只能是查询前多少条内容,不能查询某个区间段的。于是我是用row来做的分页查询。

下面是PageInfo的代码:

import java.utilimport java.util.{LinkedHashMap, Map}import org.apache.hadoop.hbase.client.{ Result, ResultScanner}import org.apache.hadoop.hbase.filter.{SingleColumnValueFilter, Filter, CompareFilter}import org.apache.hadoop.hbase.util.Bytes/** * Created by zhaocd on 2015-10-19. */class PageInfo { var currentPage: Int = 1//第几页 var pageSize: Int = 20//每页条数 var totalPage: Int = 0//总页数 var totalCount: Int = 0//总条数 var resultList: Object = new util.LinkedList()//存放结果集的集合 def getCurrentPage: Int = { return currentPage } def setCurrentPage(currentPage: Int) { this.currentPage = currentPage } def getPageSize: Int = { return pageSize } def getResultList: Object={ return resultList } def setResultList(resultList:Object ) { this.resultList=resultList } def setPageSize(pageSize: Int) { this.pageSize = pageSize } def getTotalPage: Int = { return totalPage } def setTotalPage(totalPage: Int) { this.totalPage = totalPage } def getTotalCount: Int = { return totalCount } def setTotalCount(totalCount: Int) { this.totalCount = totalCount } def getTotalPage(pageSize: Int, totalCount: Int): Int = { val n: Int = totalCount / pageSize if (totalCount % pageSize == 0) { return n } else { return (n.toInt) + 1 } } def newFilter(f: Array[Byte], c: Array[Byte], op: CompareFilter.CompareOp, v: Array[Byte]): Filter = { return new SingleColumnValueFilter(f, c, op, v) } def closeScanner(scanner: ResultScanner) { if (scanner != null) scanner.close } def toStr(bt: Array[Byte]): String = { Bytes.toString(bt) } def getBytes(str: String): Array[Byte] = { if (str == null){ val str = "" } return Bytes.toBytes(str) }}

下面是测试类的代码:

<pre name="code" class="plain">import java.io.IOExceptionimport java.util.{LinkedList, Map, List}import org.apache.commons.lang.StringUtilsimport org.apache.hadoop.hbase.KeyValueimport org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.filter.CompareFilter.CompareOpimport org.apache.hadoop.hbase.filter.{SubstringComparator, SingleColumnValueFilter, FilterList}import org.apache.hadoop.hbase.util.Bytesobject TestDao { var tp: HTablePool = null def query(vo:TestPo,pageSize: Int,currentPage:Int)= { val tableName = "test1" println("enter into query...") var conf = SparkUtils.initMethod() val pageInfo = new PageInfo() tp = new HTablePool(conf, 10) var mapList = new LinkedList[TestPo] //存放数据的集合 var scanner: ResultScanner = null var tbData: PageInfo = null try { if (pageSize == null || pageSize == 0L){ val pageSize = 20 } if (currentPage == null || currentPage == 0){ val currentPage = 1 } val firstPage: Integer = (currentPage - 1) * pageSize//起始页 val endPage: Integer = firstPage + pageSize val table: HTableInterface = getTable(tableName) var filterList = new FilterList(); if(!vo.createTime.equals("")){//过滤器 var filter1 = new SingleColumnValueFilter(Bytes.toBytes("line"), Bytes.toBytes("CREATE_TIME"), CompareOp.EQUAL, new SubstringComparator(vo.createTime)); filterList.addFilter(filter1) } val scan: Scan = new Scan if(filterList.getFilters.size()>0){ scan.setFilter(filterList) } scan.setCaching(1000)//缓存 scan.setCacheBlocks(false) scanner = table.getScanner(scan) var i: Int = 0 val rowList: List[Array[Byte]] = new LinkedList[Array[Byte]]() import scala.collection.JavaConversions._ for (result <- scanner) {//遍历数据 val row: String = pageInfo.toStr(result.getRow) System.out.println("enter into scan... result is" + row) if (i >= firstPage && i < endPage) { rowList.add(getBytes(row)) } i += 1 } val results: Array[Result] = table.get(getList(rowList)) for (r <- results) {//封装数据 var po = new TestPo po.id = new String(r.getRow) for (v: KeyValue <- r.raw()) { if("CPU".equals(new String(v.getQualifier))){ po.cpu=new String(v.getValue) } if("MEM".equals(new String(v.getQualifier))){ po.mem=new String(v.getValue) } if("USED_MEM".equals(new String(v.getQualifier))){ po.usedMem=new String(v.getValue) } if("TIME_COST".equals(new String(v.getQualifier))){ po.timeCost=new String(v.getValue) } if("CTRATE_TIME".equals(new String(v.getQualifier))){ po.createTime=new String(v.getValue) } if("OPT_TYPE".equals(new String(v.getQualifier))){ po.optType=new String(v.getValue) } } mapList.add(po) } tbData = new PageInfo tbData.setCurrentPage(currentPage) tbData.setPageSize(pageSize) tbData.setTotalCount(i) tbData.setTotalPage(pageInfo.getTotalPage(pageSize, i)) tbData.setResultList(mapList) } catch { case e: IOException => { e.printStackTrace } } finally { if (scanner != null) scanner.close } tbData //最后返回一个对象 } def getTable(tableName: String): HTableInterface = { if (StringUtils.isEmpty(tableName)) return null return tp.getTable(getBytes(tableName)) } def getList(rowList: List[Array[Byte]]): List[Get] = {//取数据 val list: List[Get] = new LinkedList[Get] import scala.collection.JavaConversions._ for (row <- rowList) { val get: Get = new Get(row) get.addColumn(getBytes("line"), getBytes("MEM")) get.addColumn(getBytes("line"), getBytes("USED_MEM")) get.addColumn(getBytes("line"), getBytes("TIME_COST")) get.addColumn(getBytes("line"), getBytes("OPT_TYPE")) get.addColumn(getBytes("line"), getBytes("CTRATE_TIME")) get.addColumn(getBytes("line"), getBytes("CPU")) list.add(get) } list } def getBytes(str: String): Array[Byte] = { if (str == null){ val str = "" } return Bytes.toBytes(str) }}



下面是连接远程的hbase的工具类代码:

 def initMethod():Configuration={ val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum","hbase的ip地址"); conf.set("hbase.zookeeper.property.clientPort","3351"); println("connect success...") conf; }

最后是TestPo类

class TestPo{ var id:String="" var timeCost:String="" var cpu:String="" var mem:String="" var usedMem:String="" var optType:String="" var createTime:String=""}


总结:hbase本来就是不能做实事的页面显示,一般表中会有百万千万级别的数据,是部署到分布式服务器上的,但是我们在这只是一张测试表,数据大概有几百条,还是可以的。



相关阅读:
Top