本文通过ES提供的Restful Api来获取索引的最新一条数据,来简单监控其延时情况。
#!/bin/sh export LANG=en_US.UTF-8 index="app_logs" timeField="@timestamp" jsonRes=`curl -XGET http://localhost:9200/$index/_search?pretty -d ' { "query": { "match_all": {}}, "size": 1, "sort": [{ "'$timeField'": {"order": "desc" }} ]}'` parseRes("${jsonRes}" $timeField $index)获得结果数据格式如下:
{ "took": 65, "timed_out": false, "_shards": { "total": 8, "successful": 8, "failed": 0 }, "hits": { "total": 109697, "max_score": null, "hits": [{ "_index": "app_logs", "_type": "blog", "_id": "indicator.147-1487951940", "_score": null, "_source": { "log_data": "" }, "sort": [ 1487951940 ] }] } }parseRes函数解析结果,延时超过5分钟则报警:
LATENCY_SECONDS = 5 * 60 def parseRes(jsonStr, timeField, index): errMsg = "" try: qjson = json.loads(jsonStr.strip()) except Exception,e: errMsg = "json load error : [" + index + "] \n" + jsonStr return errMsg total = qjson.get("hits",{}).get("total",""); if(total == ""): errMsg = "hits total is 0 : [" + index + "] \n" + jsonStr return errMsg timeStr = qjson.get("hits", {}).get("hits",[])[0].get("_source", {}).get(timeField, ""); if(timeStr == ""): errMsg = "hists time is empty : [" + index + "] \n" + jsonStr return errMsg if(timeStr.find("T") >=0): timeStr = timeStr[0 : timeStr.find(".")] hitTime = datetime.datetime.strptime(timeStr, '%Y-%m-%dT%H:%M:%S') curTime = datetime.datetime.today() print index, hitTime, curTime secondsDiff = (curTime - hitTime).seconds - 8*60*60 if(secondsDiff > LATENCY_SECONDS): errMsg = "The latency is more than 5 minutes : ["+index+"] \n" + jsonStr return errMsg return errMsg