代码编织梦想

Es通过脚本的方式计算每条数据
go demo

	fsq := elastic.NewFunctionScoreQuery().BoostMode("replace")
	script := `if(doc['device_count'].size() != 0 && doc['device_count'].value != 0 )
							{
							double price = 0.000001;
								if (doc['discount_price'].size() != 0 && doc['discount_price'].value != 0)
								{
									price = doc['discount_price'].value;
								}
							return doc['device_count'].value * price
							}`

	fsq = fsq.AddScoreFunc(elastic.NewScriptFunction(elastic.NewScript(script)))
	
	boolQuery := elastic.NewBoolQuery()
	boolQuery.Filter(elastic.NewTermsQuery("city.code", "130200"))
	
	search := elastic.NewSearchService(es.EsClient).Index(esMediaResourceIndex).Size(10)

	search.Query(fsq.Query(boolQuery))
	searchResult, err := search.
		Pretty(true).
		Do(context.Background())

	if err != nil {
		panic(err)
	}

求最大值,最小,平均,求和

	boolQuery := elastic.NewBoolQuery()
	boolQuery.Filter(elastic.NewTermsQuery("city.code", "130200"))

	deviceSumAggs := elastic.NewSumAggregation().Field("device_count")
	deviceAvgAggs := elastic.NewAvgAggregation().Field("device_count")
	deviceMinAggs := elastic.NewMinAggregation().Field("device_count")
	deviceMaxAggs := elastic.NewMaxAggregation().Field("device_count")

	search := elastic.NewSearchService(es.EsClient).Index(esMediaResourceIndex).Size(10)
	search.Query(boolQuery)
	searchResult, err := search.
		Aggregation("device_sum", deviceSumAggs).
		Aggregation("device_avg", deviceAvgAggs).
		Aggregation("device_min", deviceMinAggs).
		Aggregation("device_max", deviceMaxAggs).
		Pretty(true).
		Do(context.Background())

	if err != nil {
		panic(err)
	}

	if deviceSumAgg, found := searchResult.Aggregations.Sum("device_sum"); found {
	    if deviceSumAgg.Value != nil {
	    	fmt.Println(*deviceSumAgg.Value)
		}
	}
	if deviceAvgAgg, found := searchResult.Aggregations.Sum("device_avg"); found {
		if deviceAvgAgg.Value != nil {
			fmt.Println(*deviceAvgAgg.Value)
		}
	}

es分桶统计计数
demo 统计不同媒体类型的设备数

	boolQuery := elastic.NewBoolQuery()
	boolQuery.Filter(elastic.NewTermsQuery("city.code", "130200"))

	mediaCodeTermAggs := elastic.NewTermsAggregation().Field("media_type.code")
	mediaCodeTermAggs.SubAggregation("device_sum", elastic.NewSumAggregation().Field("device_count"))

	search := elastic.NewSearchService(es.EsClient).Index(esMediaResourceIndex).Size(10)
	search.Query(boolQuery)
	searchResult, err := search.
		Aggregation("media_type_sum", mediaCodeTermAggs).
		Pretty(true).
		Do(context.Background())

	if err != nil {
		panic(err)
	}

	if mediaCodeTermAgg, found := searchResult.Aggregations.Terms("media_type_sum"); found {
	    for _, buckt := range mediaCodeTermAgg.Buckets {
			fmt.Println(buckt.Key)
	    	if agg, ok := buckt.Aggregations.Sum("device_sum"); ok{
	    		fmt.Println(*agg.Value)
			}
		}
	}

es 围栏的方式查询数据

package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
	"wtp-media-house/internal/app/es"
	"wtp-media-house/internal/app/media_resource/domain/params"
	"wtp-media-house/internal/pkg/logger"
)

var (
	esIndex = "unidata_heat_with_tag_202201"
	esMediaResourceIndex = "wtp_media_resource"
)




func main() {

	genoInfos := []params.GeoFence {
		{
			Name:         "",
			GeofenceType: 1,
			Lon:          116.424268,
			Lat:          39.978495,
			Radius:       200,
			Polygon:      nil,
		},
	}

	boolQuery := elastic.NewBoolQuery()
	subBoolQuery := elastic.NewBoolQuery()
	for _, geoInfo := range genoInfos {
		query := GeofenceUniDataQuery(geoInfo)
		if query != nil {
			subBoolQuery.Should(query)
		}
	}
	boolQuery.Filter(subBoolQuery)

	search := es.EsClient.Search().Index(esMediaResourceIndex)
	search.Size(10)
	search.Query(boolQuery)

	searchResult, err := search.
		Pretty(true).
		Do(context.Background())

	if err != nil {
		panic(err)
	}

	logger.WayzLog.Info(fmt.Sprintf("【Ta特征查询】 Query took %d milliseconds ", searchResult.TookInMillis))

}

func GeofenceUniDataQuery(in interface{}) elastic.Query {
	fence, ok := in.(params.GeoFence)
	if !ok {
		return nil
	}
	fenceType := fence.GeofenceType

	//圆形
	if fenceType == 1 && fence.Lon != 0 && fence.Lat != 0 {
		return elastic.NewGeoDistanceQuery("location").
			Lat(fence.Lat).
			Lon(fence.Lon).
			Distance(fmt.Sprintf("%fm", fence.Radius))
	}

	//多边形
	if fenceType == 2 && len(fence.Polygon) != 0 {
		geoQuery := elastic.NewGeoPolygonQuery("location")
		for _, polygon := range fence.Polygon {
			if len(polygon) == 2 {
				geoQuery.AddPoint(polygon[1], polygon[0])
			}
		}
		return geoQuery
	}

	//矩形(视窗)
	if fenceType == 3 && len(fence.Polygon) == 2 {
		point1 := fence.Polygon[0]
		point2 := fence.Polygon[1]
		if len(point1) == 2 && len(point2) == 2 {
			geoQuery := elastic.NewGeoBoundingBoxQuery("location").
				TopLeft(max(point1[1], point2[1]), min(point1[0], point2[0])).
				BottomRight(min(point1[1], point2[1]), max(point1[0], point2[0]))
			return geoQuery
		}
	}
	return nil
}

func max(a, b float64) float64 {
	if a < b {
		return b
	}
	return a
}

func min(a, b float64) float64 {
	if a > b {
		return b
	}
	return a
}

es 根据输入查询的参数分桶计算
demo 查询2个城市下的信息和uv

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/olivere/elastic/v7"
	"wtp-media-house/internal/app/es"
	"wtp-media-house/internal/pkg/config"
)

func main() {
	cityCodes := []string{
		"510100", "130200",
	}

	fetch := elastic.NewFetchSourceContext(true).Include("city") // 只取部分数据

	aggs := elastic.NewFiltersAggregation()

	for _, city := range cityCodes{
		boolQuery := elastic.NewBoolQuery()
		boolQuery.Filter(elastic.NewTermsQuery("city.code", city))
		aggs.FilterWithName(city, boolQuery)
	}
	// 每个桶取2条数
	aggs.SubAggregation("city_info", elastic.NewTopHitsAggregation().FetchSourceContext(fetch).Size(2))
	// 统计每个桶的uv
	aggs.SubAggregation("city_uv", elastic.NewSumAggregation().Field("resident.uv"))
	
	search := elastic.NewSearchService(es.EsHeatClient).Index(config.ES_UNIDATA_HEAR_WITH_TAG)
	searchResult, err := search.
		Aggregation("city_code", aggs).
		Pretty(true).
		Do(context.Background())

	if err != nil {
		panic(err)
	}

	type CityInfo struct {
		City struct {
			Name string `json:"name"`
			Code string `json:"code"`
		} `json:"city"`
	}
	resultCityInfo := make(map[string][]string)
	resultCityUv := make(map[string]int)

	if agg, found := searchResult.Aggregations.Filters("city_code"); found {
		for key, bucket :=range agg.NamedBuckets {
			resultCityInfo[key] = []string{}
			if city, ok := bucket.Aggregations.TopHits("city_info"); ok {
				if city.Hits.TotalHits.Value > 0 {
				    for _, hit := range city.Hits.Hits {
				    	var cityName CityInfo
				    	json.Unmarshal(hit.Source, &cityName)
						resultCityInfo[key]= append(resultCityInfo[key], cityName.City.Name)
					}
				}
			}
			if cityUv, ok := bucket.Aggregations.Sum("city_uv"); ok {
				if cityUv.Value != nil {
					resultCityUv[key] = int(*cityUv.Value)
				}
			}
		}
	}
	fmt.Println(resultCityInfo)
	fmt.Println(resultCityUv)
}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_42988748/article/details/126975597

elasticsearch入门——golang-爱代码爱编程

目录 基础概念什么是es倒排索引基本类型分词器选主写数据过程读数据过程搜索过程restful操作查看es节点信息设置副本索引文档查询分词analyzer api:golang操作连接es插入,删除,修改,数据通过id查询查询所有匹配检索精准查找筛选聚合分页 排序 包含其他操作别名alias索引模板滚动索引rollover分段合并 forcemerg

elasticsearch,golang客户端聚合查询-爱代码爱编程

目前在做的监控项目中有个对es的聚合查询的需求,需要用go语言实现, 需求就是查询某个IP在一个时间范围内,各个监控指标取时间单位内的平均值。有点拗口,如下是es的查询语句,可以很明显的看到是要聚合cpu和mem两个field。另外,时区必须要加上,否则少8小时,你懂的。 GET /monitor/v1/_search { "query"

go module常用命令-爱代码爱编程

Go Module常用命令@TOC Go Module常用命令 go mod init #初始化go.mod go mod tidy #更新依赖文件 go

通过一个例子演示golang调用c语言动态链接库中的函数-爱代码爱编程

本例提供了cgo调用C函数的示例,也演示了如何将C函数打印内容保存到golang的变量中 目录和源码 目录结构 admin@hpc-1:~/go/my_stdout$ tree . ├── include │

go 解压和压缩包-爱代码爱编程

將压缩包放在zippath="D:/xx/xx/xx"中,解压到pathto="D:/xx/xx1/xx"中 type UploaddeployLogic struct { logx.Logger ctx con

go 互斥锁的实现原理?-爱代码爱编程

Go sync包提供了两种锁类型:互斥锁sync.Mutex 和 读写互斥锁sync.RWMutex,都属于悲观锁。 概念 Mutex是互斥锁,当一个 goroutine 获得了锁后,其他 goroutine 不能获取锁

golang基础面试题1-爱代码爱编程

来源于fullstack,绿色为重点 1. Go语言是什么 Go 是一种通用编程语言,设计初衷是为了进行系统编程。它最初是由 Google 的 Robert Griesemer、Rob Pike 和 Ken Thomps

【golang】25、图片操作-爱代码爱编程

用 “github.com/fogleman/gg” 可以画线, 框 用 “github.com/disintegration/imaging” 可以变换颜色 一、渲染 1.1 框和字 import "github

elasticsearch 聚合性能优化 -爱代码爱编程

1. ES 聚合 Elasticsearch 查询条件中可以同时有多个条件聚合,但这个时候的多个聚合不是并行运行的。 例如:当前ES2.6亿的数据总量,使用下面聚合方式,耗时10s左右 GET /test_index/_

golang gin单独部署vue3.0前后端分离应用-爱代码爱编程

概述 因为公司最近的项目前端使用vue 3.0,后端api使用golang gin框架。测试通过后,博文记录,用于备忘。 步骤 npm run build,构建出前端项目的dist目录,dist目录的结构具体如下图 将

go并发模式之-爱代码爱编程

常见模式之四:工作池/协程池模式 定义 顾名思义,就是有固定数量的工人(协程),去执行批量的任务 使用场景 适用于需要限制并发执行任务数量的情况 创建一个固定大小的 goroutine 池,将任务分发给池中的 goroutine 并等待它们完成,使用带缓冲的通道来接收任务,以避免阻塞主线程 示例 有生产需求,建议使用大佬写的 a