Ebatis是什么
ebatis是一个声明式Elasticsearch ORM框架。只需要定义接口,便可轻松访问Elasticsearch。ebatis优雅地帮你隔离业务对Elasticserach底层驱动接口的直接调用,你不再需要自己手动去构建繁琐DSL 语句。同时,当升级Elastisearch版本的时候,业务可以完全不用关心底层接口的变动,平滑升级。
目前,支持Elastisearch 6.5.1与7.5.1版本。
总体流程
Cluster集群
为了保证高ES集群的高可用,同时支持对集群的负载均衡,ebatis没有直接使用elasticsearch提供的RestClient和RestHighLevelClient接口来访问集群,而是抽象出一个Cluster。一个Cluster代表一个ES集群,
示例
@Configuration
public class ESClusterConfig {
@Lazy
@Bean("recommendationRouteShipperCluster")
public ESCluster recommendationRouteShipperCluster(EsLionConfig esLionConfig) {
Credentials credentials = Credentials.basicCredentials(esLionConfig.getRecommendationRouteShipperClusterUsername(),
esLionConfig.getRecommendationRouteShipperClusterPassword());
return ESCluster.generateEsCluster(esLionConfig.getRecommendationRouteShipperHost(),
esLionConfig.getRecommendationRouteShipperHttpPort(),
esLionConfig.getRecommendationRouteShipperClusterName(),
credentials,
esLionConfig.getEsMaxConnTotal());
}
}
ClusterRouter
如果系统需要连接多个集群,则通过ClusterRouter和ClusterLoadBalancer来实现,多集群的路由和负载均衡。
通过 @EnableEsMapper(basePackages = "com.xxxx.xxx", clusterRouter = "xxxxxxxClusterRouter")
扫描指定包com.xxxx.xxx
下的所有Mapper ,并将其绑定到指定的集群路由中。
通过 @Bean(destroyMethod = "close", name = "xxxxxClusterRouter"),新建一个集群路由,将其命名为xxxxxclusterRouter。
示例:
@Configuration
@EnableEsMapper(basePackages = "com.ymm.driver.index.biz.es.repository.driverrecommendation",clusterRouter = "recommendationRouteShipperClusterRouter")
public class RecommendationRouteShipperConfig {
@Lazy
@Autowired
@Qualifier("recommendationRouteShipperCluster")
private ESCluster recommendationRouteShipperCluster;
@Bean(destroyMethod = "close", name = "recommendationRouteShipperClusterRouter")
public ClusterRouter clusterRouter() {
// 随机负载均衡,当输入多个集群时有效,当前输入一个集群,等价于single方法。
return ClusterRouter.random(recommendationRouteShipperCluster);
}
}
Mapper
通过@EsMapper注解, index = "xxxxx" 绑定索引名;
(此处的clusterRouter = "xxxxclusterRouter" 指定的集群路由应当作用不大,可作为额外参考,可以省略)
再通过注解@Update @Bulk @DeleteByQuery 等等,定义对ES集群的操作。
@EsMapper(index = "recommendation_shipper_v2", clusterRouter = "recommendationRouteShipperClusterRouter")
public interface RecommendationShipperNewMapper {
@Update(docAsUpsert = true, retryOnConflict = 3, id = "id")
CompletableFuture<UpdateResponse> upsert(RecommendationShipperModel model);
@Bulk(bulkType = BulkType.UPDATE, update = @Update(id = "userId", docAsUpsert = true, retryOnConflict = 5))
BulkResponse bulkUpsert(RecommendationShipperModel[] models);
@DeleteByQuery(batchSize = 10000)
BulkByScrollResponse deleteByQuery(OfflineShipperCondition condition);
}
Mapper依赖的类
文档结构类(返回结果映射、更新文档语句的输入结构)
一般命名为 xxxModel
示例:
@Data
public class RecommendationShipperModel {
/**
* docId格式
*/
private static String DOC_ID_PATTERN = "%s:%s:%s";
@Id
private String id;
/**
* 司机ID
*/
private Long driverId;
/**
* 货主ID
*/
private Long shipperId;
/**
* 北斗定位
*/
private Integer sourceType;
/**
* 关系创建时间
*/
private Long createTime;
/**
* 文档ID生成
* @param sourceType
* @param driverId
* @param shipperId
* @return
*/
public String genereateDocId(Integer sourceType, Long driverId, Long shipperId) {
String input = String.format(DOC_ID_PATTERN, String.valueOf(sourceType),
String.valueOf(driverId), String.valueOf(shipperId));
return SecureUtil.md5(input);
}
}
查询条件类(映射查询语句)
一般命名为 xxxCondition。
基本条件定义
查询条件统一定义一个POJO对象,对象的属性名即为Mapping字段名称,如果属性名称和Mapping字段名称不一致,通过@Field注解来映射;属性分为基本类型和对象类型,对象类型会再次递归定义查询条件。
序号 | 注解 | 说明 |
---|---|---|
1 | @Must | 必须满足的条件 |
2 | @MustNot | 必须排除的条件 |
3 | @Should | 可选条件 |
4 | @Filter | 过滤条件 |
5 | @Ignore | 忽略的条件,不参与语句拼装 |
6 | @Exists | 字段是否存在 |
如果属性不加注解,默认就是@Must条件,语句支持嵌套属性
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class OfflineShipperCondition {
@Terms
@Field("sourceType")
private List<Integer> sourceTypes;
@Terms
@Field("driverId")
private List<Long> driverIds;
@Must
@Field("createTime")
private Range createTimeRange;
}