From 606847e0aa764039db3f128ac6a2307d7188977d Mon Sep 17 00:00:00 2001 From: jlf <1251489546@qq.com> Date: Mon, 8 Jun 2026 16:27:25 +0800 Subject: [PATCH] KYLIN-6090 Add project permission verification to the job information retrieval API --- .../apache/kylin/job/common/SegmentUtil.java | 6 +- .../org/apache/kylin/job/dao/JobInfoDao.java | 35 +- .../job/execution/ExecutableManager.java | 37 +- .../kylin/job/rest/JobMapperFilter.java | 28 +- .../kylin/job/runners/JobCheckRunner.java | 4 +- .../kylin/job/scheduler/JdbcJobScheduler.java | 8 +- .../mybatis-mapper/JobInfoMapper.xml | 10 + .../apache/kylin/job/dao/JobInfoDaoTest.java | 108 ++++++ .../job/scheduler/JdbcJobSchedulerTest.java | 2 +- .../kylin/rest/controller/JobController.java | 6 +- .../rest/controller/JobControllerTest.java | 2 + .../kylin/rest/service/JobInfoService.java | 16 +- .../rest/service/JobResourceService.java | 6 +- .../apache/kylin/rest/service/JobService.java | 3 +- .../apache/kylin/rest/util/JobFilterUtil.java | 22 +- .../rest/service/JobInfoServiceTest.java | 325 ++++++++++++------ .../kylin/rest/util/JobFilterUtilTest.java | 115 +++++++ 17 files changed, 559 insertions(+), 174 deletions(-) create mode 100644 src/core-job/src/test/java/org/apache/kylin/job/dao/JobInfoDaoTest.java create mode 100644 src/data-loading-service/src/test/java/org/apache/kylin/rest/util/JobFilterUtilTest.java diff --git a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java index bdaa6c364d6..3bfb90a5355 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java @@ -180,10 +180,8 @@ private static boolean isAnyPartitionRefreshing(T segment) public static Segments getValidSegments(String modelId, String project) { val df = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId); - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setProject(project); - jobMapperFilter.setModelIds(Lists.newArrayList(modelId)); - jobMapperFilter.setStatuses(ExecutableState.getNotFinalStates()); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().project(project) + .modelIds(Lists.newArrayList(modelId)).statuses(ExecutableState.getNotFinalStates()).build(); List runningJobInfoList = JobContextUtil.getJobInfoDao(KylinConfig.getInstanceFromEnv()) .getJobInfoListByFilter(jobMapperFilter); ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java index 638ce3c2975..c2762c5227e 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java @@ -25,17 +25,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.function.Predicate; import java.util.stream.Collectors; -import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.job.domain.JobInfo; import org.apache.kylin.job.domain.JobLock; import org.apache.kylin.job.exception.ExecuteRuntimeException; @@ -73,9 +76,16 @@ public class JobInfoDao { @Setter private JobLockMapper jobLockMapper; + public List getJobInfoListByProjectFilter(final JobMapperFilter jobMapperFilter) { + if (StringUtils.isBlank(jobMapperFilter.getProject()) + && CollectionUtils.isEmpty(jobMapperFilter.getProjects())) { + return Collections.emptyList(); + } + return getJobInfoListByFilter(jobMapperFilter); + } + public List getJobInfoListByFilter(final JobMapperFilter jobMapperFilter) { - List jobInfoList = jobInfoMapper.selectByJobFilter(jobMapperFilter); - return jobInfoList; + return jobInfoMapper.selectByJobFilter(jobMapperFilter); } public long countByFilter(JobMapperFilter jobMapperFilter) { @@ -83,8 +93,7 @@ public long countByFilter(JobMapperFilter jobMapperFilter) { } public List getJobs(String project) { - JobMapperFilter filter = new JobMapperFilter(); - filter.setProject(project); + JobMapperFilter filter = JobMapperFilter.builder().project(project).build(); return jobInfoMapper.selectByJobFilter(filter).stream().map(JobInfoUtil::deserializeExecutablePO) .collect(Collectors.toList()); } @@ -136,12 +145,20 @@ public ExecutablePO getExecutablePOByUuid(String uuid) { return null; } + public ExecutablePO getExecutablePoByUuidWithProject(String project, String jobId) { + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().project(project).jobIds(Lists.newArrayList(jobId)) + .build(); + List jobInfoList = jobInfoMapper.selectByJobFilter(jobMapperFilter); + if (CollectionUtils.isEmpty(jobInfoList) || jobInfoList.size() != 1) { + return null; + } + return JobInfoUtil.deserializeExecutablePO(jobInfoList.get(0)); + } + public List getExecutablePoByStatus(String project, List jobIds, List filterStatuses) { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setProject(project); - jobMapperFilter.setStatuses(filterStatuses); - jobMapperFilter.setJobIds(jobIds); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().project(project).statuses(filterStatuses) + .jobIds(jobIds).build(); List jobInfoList = jobInfoMapper.selectByJobFilter(jobMapperFilter); if (CollectionUtils.isEmpty(jobInfoList)) { return new ArrayList<>(); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 874f36722a0..c5fcf4d6ec3 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -1103,8 +1103,7 @@ public Map getWaiteTime(ExecutablePO executablePO, AbstractExecu if (task instanceof ChainedStageExecutable) { final ChainedStageExecutable stageExecutable = (ChainedStageExecutable) task; Map> stageMap = Optional - .ofNullable(stageExecutable.getStagesMap()) - .orElse(Maps.newHashMap()); + .ofNullable(stageExecutable.getStagesMap()).orElse(Maps.newHashMap()); val taskStartTime = task.getStartTime(); for (Map.Entry> entry : stageMap.entrySet()) { final String segmentId = entry.getKey(); @@ -1287,8 +1286,7 @@ public List getPartialExecutables(String modelId) { } private List getExecutablePOByModelId(String project, String modelId) { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setProject(project); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().project(project).build(); if (null != modelId) { jobMapperFilter.setModelIds(Lists.newArrayList(modelId)); } @@ -1385,8 +1383,7 @@ public List listMultiPartitionModelExec(String model, Predic } public List getExecutablePOsByStatus(List jobIds, List executableStates) { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setProject(project); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().project(project).build(); if (CollectionUtils.isNotEmpty(jobIds)) { jobMapperFilter.setJobIds(jobIds); } @@ -1411,16 +1408,6 @@ public List getExecutablesByStatus(ExecutableState status) { return getExecutablesByStatus(null, Lists.newArrayList(status)); } - public List getExecutablesByJobType(Set RELATED_JOBS) { - List jobTypeNames = RELATED_JOBS.stream().map(jobTypeEnum -> jobTypeEnum.name()) - .collect(Collectors.toList()); - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setJobNames(jobTypeNames); - List jobInfoList = jobInfoDao.getJobInfoListByFilter(jobMapperFilter); - return jobInfoList.stream().map(jobInfo -> JobInfoUtil.deserializeExecutablePO(jobInfo)).map(this::fromPO) - .collect(Collectors.toList()); - } - public ExecutablePO getExecutablePO(String jobId) { return jobInfoDao.getExecutablePOByUuid(jobId); } @@ -1672,15 +1659,14 @@ public List getRunningExecutables(String project, String mod if (StringUtils.isNotBlank(model)) { return listExecByModelAndStatus(model, ExecutableState::isRunning, null); } else { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); List runningStates = Lists.newArrayList(); for (ExecutableState executableState : ExecutableState.values()) { if (executableState.isRunning()) { runningStates.add(executableState); } } - jobMapperFilter.setStatuses(runningStates); - jobMapperFilter.setProject(project); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().project(project).statuses(runningStates) + .build(); return jobInfoDao.getJobInfoListByFilter(jobMapperFilter).stream() .map(jobInfo -> JobInfoUtil.deserializeExecutablePO(jobInfo)).map(this::fromPO) .collect(Collectors.toList()); @@ -1752,9 +1738,8 @@ public void suicideRunningJobByJobType(String project, String targetModelId, Lis } public void checkSuicideJobOfModel(String project, String modelId) { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setProject(project); - jobMapperFilter.setModelIds(Lists.newArrayList(modelId)); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().project(project) + .modelIds(Lists.newArrayList(modelId)).build(); jobMapperFilter.setStatuses(ExecutableState.ERROR, ExecutableState.PAUSED); List errorJobInfoList = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project) .fetchJobsByFilter(jobMapperFilter); @@ -2013,12 +1998,10 @@ public List fetchJobsByTypesAndStates(String project, List jobT } public List getNotFinalExecutablesByType(List jobTypeEnums) { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setProject(project); - jobMapperFilter.setStatuses(ExecutableState.getNotFinalStates()); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().project(project) + .statuses(ExecutableState.getNotFinalStates()).build(); if (CollectionUtils.isNotEmpty(jobTypeEnums)) { - jobMapperFilter.setJobNames( - jobTypeEnums.stream().map(jobTypeEnum -> jobTypeEnum.name()).collect(Collectors.toList())); + jobMapperFilter.setJobNames(jobTypeEnums.stream().map(Enum::name).collect(Collectors.toList())); } List jobInfoList = jobInfoDao.getJobInfoListByFilter(jobMapperFilter); return jobInfoList.stream().map(jobInfo -> fromPO(JobInfoUtil.deserializeExecutablePO(jobInfo))) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java b/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java index b907a5a0705..e0db3d8ae3f 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java @@ -58,6 +58,8 @@ public class JobMapperFilter { private String project; + private List projects; + private String orderByFiled; private String orderType; @@ -76,8 +78,32 @@ public class JobMapperFilter { public void setStatuses(List stateList) { statuses = stateList; } - + public void setStatuses(ExecutableState... states) { statuses = Lists.newArrayList(states); } + + public void setProjects(List projects) { + this.project = null; + this.projects = projects; + } + + public void setProject(String project) { + this.project = project; + this.projects = null; + } + + public static class JobMapperFilterBuilder { + public JobMapperFilterBuilder projects(List projects) { + this.project = null; + this.projects = projects; + return this; + } + + public JobMapperFilterBuilder project(String project) { + this.project = project; + this.projects = null; + return this; + } + } } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java index a97f34f49c9..ec9e494e357 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java @@ -124,8 +124,8 @@ private static boolean markSuicideJobWithTransaction(AbstractExecutable jobExecu } private void markSuicideForErrorOrPausedJobs() { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setStatuses(Lists.newArrayList(ExecutableState.ERROR, ExecutableState.PAUSED)); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder() + .statuses(Lists.newArrayList(ExecutableState.ERROR, ExecutableState.PAUSED)).build(); jobMapperFilter.setLimit(10); jobMapperFilter.setOffset(0); List jobInfoList = jobContext.getJobInfoMapper().selectByJobFilter(jobMapperFilter); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java index 57d7903b4f5..8414513bfc8 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java @@ -360,10 +360,9 @@ private static boolean markSuicideJobWithTransaction(JobInfo jobInfo) { } private List getProcessingJobInfoWithOrder() { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().orderByFiled("priority,create_time") + .orderType("ASC").build(); jobMapperFilter.setStatuses(ExecutableState.READY, ExecutableState.PENDING, ExecutableState.RUNNING); - jobMapperFilter.setOrderByFiled("priority,create_time"); - jobMapperFilter.setOrderType("ASC"); return jobContext.getJobInfoMapper().selectByJobFilter(jobMapperFilter); } @@ -407,12 +406,11 @@ private Map getProjectProduceCount(Map project private void releaseExpiredLock() { int batchSize = jobContext.getKylinConfig().getJobSchedulerMasterPollBatchSize(); - JobMapperFilter filter = new JobMapperFilter(); List jobIds = jobContext.getJobLockMapper().findExpiredORNonLockIdList(batchSize); if (jobIds.isEmpty()) { return; } - filter.setJobIds(jobIds); + JobMapperFilter filter = JobMapperFilter.builder().jobIds(jobIds).build(); List jobs = jobContext.getJobInfoMapper().selectByJobFilter(filter); List jobInfoIds = jobs.stream().map(JobInfo::getJobId).collect(Collectors.toList()); List toRemoveLocks = Lists.newArrayList(jobIds).stream().filter(jobId -> !jobInfoIds.contains(jobId)) diff --git a/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml b/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml index a51f449e0f6..ad6e056c111 100644 --- a/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml +++ b/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml @@ -214,6 +214,11 @@ project = #{project} + + + #{item} + + AND update_time >= #{queryStartTime} @@ -275,6 +280,11 @@ project = #{project} + + + #{item} + + AND update_time >= #{queryStartTime} diff --git a/src/core-job/src/test/java/org/apache/kylin/job/dao/JobInfoDaoTest.java b/src/core-job/src/test/java/org/apache/kylin/job/dao/JobInfoDaoTest.java new file mode 100644 index 00000000000..3accdcc1b1b --- /dev/null +++ b/src/core-job/src/test/java/org/apache/kylin/job/dao/JobInfoDaoTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.dao; + +import java.util.List; + +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.job.domain.JobInfo; +import org.apache.kylin.job.mapper.JobInfoMapper; +import org.apache.kylin.job.rest.JobMapperFilter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.springframework.test.util.ReflectionTestUtils; + +public class JobInfoDaoTest { + + private JobInfoMapper jobInfoMapper; + private JobInfoDao jobInfoDao; + + @Before + public void setUp() { + jobInfoMapper = Mockito.mock(JobInfoMapper.class); + jobInfoDao = new JobInfoDao(); + ReflectionTestUtils.setField(jobInfoDao, "jobInfoMapper", jobInfoMapper); + } + + private JobInfo mockJobInfo(String jobId) { + JobInfo jobInfo = new JobInfo(); + jobInfo.setJobId(jobId); + return jobInfo; + } + + @Test + public void testGetJobInfoListByProjectFilterReturnsEmptyWhenNoProjectScope() { + JobMapperFilter filter = JobMapperFilter.builder().build(); + + List result = jobInfoDao.getJobInfoListByProjectFilter(filter); + + Assert.assertTrue(result.isEmpty()); + // must short-circuit without hitting the mapper to avoid a cross-project scan + Mockito.verify(jobInfoMapper, Mockito.never()).selectByJobFilter(ArgumentMatchers.any()); + } + + @Test + public void testGetJobInfoListByProjectFilterReturnsEmptyWhenProjectIsBlank() { + JobMapperFilter filter = JobMapperFilter.builder().project(" ").build(); + + List result = jobInfoDao.getJobInfoListByProjectFilter(filter); + + Assert.assertTrue(result.isEmpty()); + Mockito.verify(jobInfoMapper, Mockito.never()).selectByJobFilter(ArgumentMatchers.any()); + } + + @Test + public void testGetJobInfoListByProjectFilterDelegatesWhenSingleProjectPresent() { + JobMapperFilter filter = JobMapperFilter.builder().project("default").build(); + List expected = Lists.newArrayList(mockJobInfo("job-1")); + Mockito.when(jobInfoMapper.selectByJobFilter(filter)).thenReturn(expected); + + List result = jobInfoDao.getJobInfoListByProjectFilter(filter); + + Assert.assertSame(expected, result); + Mockito.verify(jobInfoMapper, Mockito.times(1)).selectByJobFilter(filter); + } + + @Test + public void testGetJobInfoListByProjectFilterDelegatesWhenProjectsPresent() { + JobMapperFilter filter = JobMapperFilter.builder().projects(Lists.newArrayList("p1", "p2")).build(); + List expected = Lists.newArrayList(mockJobInfo("job-1"), mockJobInfo("job-2")); + Mockito.when(jobInfoMapper.selectByJobFilter(filter)).thenReturn(expected); + + List result = jobInfoDao.getJobInfoListByProjectFilter(filter); + + Assert.assertEquals(2, result.size()); + Mockito.verify(jobInfoMapper, Mockito.times(1)).selectByJobFilter(filter); + } + + @Test + public void testGetJobInfoListByFilterAlwaysDelegates() { + JobMapperFilter filter = JobMapperFilter.builder().build(); + List expected = Lists.newArrayList(mockJobInfo("job-1")); + Mockito.when(jobInfoMapper.selectByJobFilter(filter)).thenReturn(expected); + + List result = jobInfoDao.getJobInfoListByFilter(filter); + + Assert.assertSame(expected, result); + Mockito.verify(jobInfoMapper, Mockito.times(1)).selectByJobFilter(filter); + } +} diff --git a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java index c83bac84dd1..c10f2de403b 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java @@ -106,7 +106,7 @@ void JobsScheduledOnTwoNode() throws Exception { for (int i = 0; i < 3; i++) { mockJob(); } - JobMapperFilter filter = new JobMapperFilter(); + JobMapperFilter filter = JobMapperFilter.builder().build(); filter.setStatuses(ExecutableState.RUNNING); await().atMost(5, TimeUnit.SECONDS).until(() -> jobInfoDao.getJobInfoListByFilter(filter).size() == 3); Assertions.assertEquals(secondJobContext.getJobScheduler().getRunningJob().size() diff --git a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java index 7080bc48802..08c8071f02a 100644 --- a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -210,9 +210,9 @@ private EnvelopeResponse remoteUpdateJobStatus(JobUpdateRequest jobUpdat @ResponseBody public EnvelopeResponse> getJobDetail(@PathVariable(value = "job_id") String jobId, @RequestParam(value = "project") String project) { - checkProjectName(project); + String projectName = checkProjectName(project); checkRequiredArg(JOB_ID_ARG_NAME, jobId); - return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, jobInfoService.getJobDetail(project, jobId), ""); + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, jobInfoService.getJobDetail(projectName, jobId), ""); } @ApiOperation(value = "updateJobStatus", tags = { @@ -446,7 +446,7 @@ public EnvelopeResponse updateDumpedMetadata(@RequestBody ReplaceMetaReq } private Set getLogicalViewMetaDumpList(KylinConfig config, String project, String viewTable, - String modelId) { + String modelId) { Set dumpList = new LinkedHashSet<>(); if (!config.isDDLLogicalViewEnabled()) { return dumpList; diff --git a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java index b276cc81cee..231c5e8a15e 100644 --- a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java +++ b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java @@ -257,6 +257,8 @@ public void testGetJobDetail() throws Exception { .andExpect(MockMvcResultMatchers.status().isOk()).andReturn(); Mockito.verify(jobController).getJobDetail("e1ad7bb0-522e-456a-859d-2eab1df448de", "default"); + // the resolved (normalized) project name from checkProjectName is forwarded to the service + Mockito.verify(jobInfoService).getJobDetail("default", "e1ad7bb0-522e-456a-859d-2eab1df448de"); } private List mockStepsResponse() { diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java index 4b45be394ea..071f1f00135 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java @@ -250,13 +250,12 @@ public List listJobs(final JobFilter jobFilter) { // TODO model == null || !model.isFusionModel(); public List listJobs(final JobFilter jobFilter, int offset, int limit) { - // TODO check permission when 'project' is empty if (StringUtils.isNotEmpty(jobFilter.getProject())) { aclEvaluate.checkProjectOperationPermission(jobFilter.getProject()); } JobMapperFilter jobMapperFilter = JobFilterUtil.getJobMapperFilter(jobFilter, offset, limit, modelService, tableExtService, projectService); - List jobInfoList = jobInfoDao.getJobInfoListByFilter(jobMapperFilter); + List jobInfoList = jobInfoDao.getJobInfoListByProjectFilter(jobMapperFilter); List result = jobInfoList.stream().map(JobInfoUtil::deserializeExecutablePO) .map(executablePO -> { AbstractExecutable executable = getManager(ExecutableManager.class, executablePO.getProject()) @@ -264,6 +263,7 @@ public List listJobs(final JobFilter jobFilter, int offset, val convert = this.convert(executable, executablePO); return convert; }).collect(Collectors.toList()); + sortByDurationIfNeed(result, jobFilter.getSortBy(), jobMapperFilter.getOrderType()); return result; } @@ -287,7 +287,6 @@ public void sortByDurationIfNeed(List list, final String ord } public long countJobs(final JobFilter jobFilter) { - // TODO check permission when 'project' is empty if (StringUtils.isNotEmpty(jobFilter.getProject())) { aclEvaluate.checkProjectOperationPermission(jobFilter.getProject()); } @@ -298,7 +297,7 @@ public long countJobs(final JobFilter jobFilter) { public List getJobDetail(String project, String jobId) { aclEvaluate.checkProjectOperationPermission(project); - ExecutablePO executablePO = jobInfoDao.getExecutablePOByUuid(jobId); + ExecutablePO executablePO = jobInfoDao.getExecutablePoByUuidWithProject(project, jobId); if (executablePO == null) { throw new KylinException(JOB_NOT_EXIST, jobId); } @@ -373,8 +372,7 @@ public List getJobDetail(String project, String jobId) { // table sampling and snapshot table don't have some segment if (!StringUtils.equals(task.getId(), segmentId)) { setSegmentSubStageParams(project, targetSubject, task, segmentId, segmentSubStages, - stageExecutables, - stageResponses, waiteTimeMap, output.getState(), executablePO); + stageExecutables, stageResponses, waiteTimeMap, output.getState(), executablePO); stringSubStageMap.put(segmentId, segmentSubStages); } } @@ -474,6 +472,7 @@ public ExecutableResponse manageJob(String project, ExecutableResponse job, Stri updateJobStatus(job.getId(), executablePO, project, action); return getJobInstance(job.getId()); } + private void jobActionValidate(String action) { JobActionEnum.validateValue(action.toUpperCase(Locale.ROOT)); } @@ -995,9 +994,7 @@ public List getBatchModelJobIdsOfFusionModel(String project, List ignoreStates = Lists.newArrayList(ExecutableState.SUCCEED, ExecutableState.ERROR, ExecutableState.DISCARDED, ExecutableState.SUICIDAL); List states = Arrays.stream(ExecutableState.values()) @@ -1007,6 +1004,7 @@ public List getBatchModelJobIdsOfFusionModel(String project, List jobInfo.getJobId()).collect(Collectors.toList()); } + @Override @Transaction(project = 0) public void stopBatchJob(String project, TableDesc tableDesc) { List fusionModelIds = getFusionModelsByTableDesc(project, tableDesc); diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobResourceService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobResourceService.java index 2c831af7ee5..9e4000ece72 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobResourceService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobResourceService.java @@ -96,10 +96,8 @@ public Set getQueueNames() { val jobInfoDao = JobContextUtil.getJobInfoDao(config); Set queues = Sets.newHashSet(); projects.forEach(projectInstance -> { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setProject(projectInstance.getName()); - jobMapperFilter.setStatuses(ExecutableState.SUCCEED); - jobMapperFilter.setLimit(10); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().project(projectInstance.getName()) + .statuses(Lists.newArrayList(ExecutableState.SUCCEED)).limit(10).build(); val jobs = jobInfoDao.getJobInfoListByFilter(jobMapperFilter); if (CollectionUtils.isNotEmpty(jobs)) { for (JobInfo jobInfo : jobs) { diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index 0e577dadf99..51da68a86c3 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -244,8 +244,7 @@ public Pair getProjectNameAndJobStepId(String yarnAppId) { } public String getProjectByJobId(String jobId) { - JobMapperFilter jobMapperFilter = new JobMapperFilter(); - jobMapperFilter.setJobId(jobId); + JobMapperFilter jobMapperFilter = JobMapperFilter.builder().jobId(jobId).build(); List jobInfoList = jobInfoDao.getJobInfoListByFilter(jobMapperFilter); if (CollectionUtils.isEmpty(jobInfoList)) { return null; diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/util/JobFilterUtil.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/util/JobFilterUtil.java index a744384089d..8e6d56432e3 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/util/JobFilterUtil.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/util/JobFilterUtil.java @@ -20,6 +20,7 @@ import static org.apache.kylin.common.exception.ServerErrorCode.INVALID_PARAMETER; +import java.io.IOException; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; @@ -48,8 +49,11 @@ import org.apache.kylin.rest.service.TableExtService; import org.sparkproject.guava.collect.Lists; +import lombok.SneakyThrows; + public class JobFilterUtil { + @SneakyThrows(IOException.class) public static JobMapperFilter getJobMapperFilter(final JobFilter jobFilter, int offset, int limit, ModelService modelService, TableExtService tableExtService, ProjectService projectService) { Date queryStartTime = getQueryStartTime(jobFilter.getTimeFilter()); @@ -90,9 +94,21 @@ public static JobMapperFilter getJobMapperFilter(final JobFilter jobFilter, int .forEach(jobStatus -> scheduleStates.addAll(JobStatusUtil.mapJobStatusToScheduleState(jobStatus))); } - return new JobMapperFilter(scheduleStates, jobFilter.getJobNames(), queryStartTime.getTime(), - Lists.newArrayList(subjects), null, jobId, null, jobFilter.getProject(), orderByField, orderType, - offset, limit, null, null); + JobMapperFilter.JobMapperFilterBuilder filterBuilder = JobMapperFilter.builder().statuses(scheduleStates) + .jobNames(jobFilter.getJobNames()).queryStartTime(queryStartTime.getTime()) + .subjects(Lists.newArrayList(subjects)).jobId(jobId).project(jobFilter.getProject()) + .orderByFiled(orderByField).orderType(orderType).offset(offset).limit(limit); + + if (StringUtils.isEmpty(jobFilter.getProject())) { + List projects = projectService + .getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false, + AclPermissionEnum.OPERATION) + .stream().map(projectAndPermission -> projectAndPermission.getProject().getName()) + .collect(Collectors.toList()); + return filterBuilder.projects(projects).build(); + } + + return filterBuilder.build(); } private static Date getQueryStartTime(int timeFilter) { diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobInfoServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobInfoServiceTest.java index d921bc2a10f..bc5e46210dd 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobInfoServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobInfoServiceTest.java @@ -23,9 +23,14 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_UPDATE_STATUS_FAILED; import static org.apache.kylin.job.constant.JobStatusEnum.PENDING; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.BufferedReader; import java.io.File; @@ -48,7 +53,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.ErrorCode; @@ -64,6 +69,7 @@ import org.apache.kylin.engine.spark.job.NSparkExecutable; import org.apache.kylin.engine.spark.job.NTableSamplingJob; import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.dao.ExecutableOutputPO; @@ -90,16 +96,18 @@ import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.rest.request.JobUpdateRequest; import org.apache.kylin.rest.response.ExecutableResponse; import org.apache.kylin.rest.response.ExecutableStepResponse; +import org.apache.kylin.rest.response.UserProjectPermissionResponse; +import org.apache.kylin.rest.security.AclPermissionEnum; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.AclUtil; import org.apache.spark.application.NoRetryException; import org.assertj.core.api.Assertions; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -114,7 +122,7 @@ public class JobInfoServiceTest extends LogOutputTestCase { - String project = "default"; + private static final String DEFAULT_PROJECT = "default"; private JobInfoService jobInfoService = Mockito.spy(JobInfoService.class); @@ -168,21 +176,22 @@ public void testListJobs() throws Exception { getTestConfig().setProperty("kylin.streaming.enabled", "false"); // test size List jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", false, "default", "", true); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", false, DEFAULT_PROJECT, "", + true); List jobs = jobInfoService.listJobs(jobFilter); - Assert.assertEquals(3, jobs.size()); + assertEquals(3, jobs.size()); jobInfoService.addOldParams(jobs); jobFilter.setSubject(""); jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW)); jobFilter.setTimeFilter(1); List jobs4 = jobInfoService.listJobs(jobFilter); - Assert.assertEquals(2, jobs4.size()); + assertEquals(2, jobs4.size()); jobFilter.setSubject(""); jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW, JobStatusEnum.FINISHED)); jobFilter.setTimeFilter(1); jobs4 = jobInfoService.listJobs(jobFilter); - Assert.assertEquals(3, jobs4.size()); + assertEquals(3, jobs4.size()); jobFilter.setStatuses(Lists.newArrayList()); jobFilter.setTimeFilter(3); @@ -197,26 +206,26 @@ public void testListJobs() throws Exception { maxDuration = duration; } } - Assert.assertTrue(jobs7.size() == 3 && jobs7.get(0).getDuration() == maxDuration); + assertTrue(jobs7.size() == 3 && jobs7.get(0).getDuration() == maxDuration); jobFilter.setSortBy("create_time"); jobFilter.setReverse(true); List jobs8 = jobInfoService.listJobs(jobFilter); - Assert.assertTrue(jobs8.size() == 3 && jobs8.get(0).getId().equals("sparkjob3")); + assertTrue(jobs8.size() == 3 && jobs8.get(0).getId().equals("sparkjob3")); jobFilter.setReverse(false); jobFilter.setStatuses(Lists.newArrayList()); jobFilter.setSortBy(""); List jobs10 = jobInfoService.listJobs(jobFilter); - Assert.assertEquals(3, jobs10.size()); + assertEquals(3, jobs10.size()); jobFilter.setSortBy("job_status"); List jobs11 = jobInfoService.listJobs(jobFilter); - Assert.assertTrue(jobs11.size() == 3 && jobs11.get(2).getId().equals("sparkjob1")); + assertTrue(jobs11.size() == 3 && jobs11.get(2).getId().equals("sparkjob1")); jobFilter.setSortBy("create_time"); List jobs12 = jobInfoService.listJobs(jobFilter); - Assert.assertTrue(jobs12.size() == 3 && jobs12.get(0).getId().equals("sparkjob1")); + assertTrue(jobs12.size() == 3 && jobs12.get(0).getId().equals("sparkjob1")); jobFilter.setSortBy("target_subject"); for (ExecutablePO job : mockJobs) { @@ -226,7 +235,7 @@ public void testListJobs() throws Exception { }); } List sortJobs2 = jobInfoService.listJobs(jobFilter); - Assert.assertTrue(sortJobs2.size() == 3 && sortJobs2.get(0).getId().equals("sparkjob1")); + assertTrue(sortJobs2.size() == 3 && sortJobs2.get(0).getId().equals("sparkjob1")); for (ExecutablePO job : mockJobs) { jobInfoDao.updateJob(job.getUuid(), jobUpdater -> { jobUpdater.setJobType(JobTypeEnum.TABLE_SAMPLING); @@ -234,7 +243,7 @@ public void testListJobs() throws Exception { }); } List sortJobs3 = jobInfoService.listJobs(jobFilter); - Assert.assertTrue(sortJobs3.size() == 3 && sortJobs3.get(0).getId().equals("sparkjob1")); + assertTrue(sortJobs3.size() == 3 && sortJobs3.get(0).getId().equals("sparkjob1")); for (ExecutablePO job : mockJobs) { jobInfoDao.updateJob(job.getUuid(), jobUpdater -> { jobUpdater.setJobType(JobTypeEnum.SNAPSHOT_BUILD); @@ -248,7 +257,7 @@ public void testListJobs() throws Exception { } jobFilter.setSortBy("job_status"); List sortJobs4 = jobInfoService.listJobs(jobFilter); - Assert.assertTrue(sortJobs4.size() == 3 && sortJobs4.get(2).getId().equals("sparkjob2")); + assertTrue(sortJobs4.size() == 3 && sortJobs4.get(2).getId().equals("sparkjob2")); for (ExecutablePO job : mockJobs) { jobInfoDao.updateJob(job.getUuid(), jobUpdater -> { jobUpdater.setJobType(JobTypeEnum.SNAPSHOT_REFRESH); @@ -261,7 +270,7 @@ public void testListJobs() throws Exception { }); } List sortJobs5 = jobInfoService.listJobs(jobFilter); - Assert.assertTrue(sortJobs5.size() == 3 && sortJobs5.get(0).getId().equals("sparkjob1")); + assertTrue(sortJobs5.size() == 3 && sortJobs5.get(0).getId().equals("sparkjob1")); jobFilter.setSortBy("total_time"); assertKylinExeption(() -> { @@ -270,20 +279,20 @@ public void testListJobs() throws Exception { jobFilter.setSortBy("create_time"); List jobs13 = jobInfoService.listJobs(jobFilter, 0, 10); - Assert.assertEquals(3, jobs13.size()); + assertEquals(3, jobs13.size()); String jobId = jobs13.get(0).getId(); for (ExecutablePO job : mockJobs) { job.setJobType(JobTypeEnum.TABLE_SAMPLING); } jobFilter.setKey(jobId); List jobs14 = jobInfoService.listJobs(jobFilter, 0, 10); - Assert.assertTrue(jobs14.size() == 1 && jobs14.get(0).getId().equals(jobId)); + assertTrue(jobs14.size() == 1 && jobs14.get(0).getId().equals(jobId)); jobFilter.setStatuses(Lists.newArrayList()); List jobs15 = jobInfoService.listJobs(jobFilter, 0, 10); assertEquals(1, jobs15.size()); jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW)); List jobs16 = jobInfoService.listJobs(jobFilter, 0, 10); - assertEquals(0, jobs16.size()); + assertTrue(jobs16.isEmpty()); } private List mockDetailJobs(boolean random) throws Exception { @@ -304,10 +313,6 @@ private List mockDetailJobInfoList(boolean random) throws Exception { return jobs; } - private String getProject() { - return "default"; - } - private long getCreateTime(String name) { switch (name) { case "1": @@ -325,7 +330,7 @@ private ExecutablePO mockExecutablePO(boolean random, String name) { ExecutablePO mockJob = new ExecutablePO(); mockJob.setType("org.apache.kylin.job.execution.SucceedChainedTestExecutable"); mockJob.setJobType(JobTypeEnum.INDEX_BUILD); - mockJob.setProject(getProject()); + mockJob.setProject(DEFAULT_PROJECT); mockJob.setUuid("sparkjob" + name); mockJob.setTargetModel("model" + name); val jobOutput = mockJob.getOutput(); @@ -342,7 +347,7 @@ private ExecutablePO mockExecutablePO(boolean random, String name) { val childExecutable = new ExecutablePO(); childExecutable.setUuid(mockJob.getId() + "_0" + i); childExecutable.setType("org.apache.kylin.job.execution.SucceedSubTaskTestExecutable"); - childExecutable.setProject(getProject()); + childExecutable.setProject(DEFAULT_PROJECT); val jobChildOutput = childExecutable.getOutput(); mockOutputTime(random, lastEndTime, jobChildOutput, i); lastEndTime = jobChildOutput.getEndTime(); @@ -383,8 +388,8 @@ public void testFilterJob() throws Exception { .collect(Collectors.toList()); List copyDurationList = new ArrayList<>(totalDurationArrays); copyDurationList.sort(Collections.reverseOrder()); - Assert.assertEquals(3, copyDurationList.size()); - Assert.assertEquals(totalDurationArrays, copyDurationList); + assertEquals(3, copyDurationList.size()); + assertEquals(totalDurationArrays, copyDurationList); } for (JobInfo jobInfo : mockJobs) { @@ -397,7 +402,7 @@ public void testFilterJob() throws Exception { JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "default", false, "default", "", false); List jobs = jobInfoService.listJobs(jobFilter); - Assert.assertEquals(0, jobs.size()); + assertEquals(0, jobs.size()); } @Test @@ -415,7 +420,7 @@ public void testGetJobCreateTime() { List jobNames = Lists.newArrayList(); JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4, "", "", false, "default", "", true); List jobs = jobInfoService.listJobs(jobFilter); - Assert.assertTrue(jobs.get(0).getCreateTime() > 0); + assertTrue(jobs.get(0).getCreateTime() > 0); } private void addSegment(AbstractExecutable job) { @@ -428,13 +433,13 @@ private void addSegment(AbstractExecutable job) { public void testGetTargetSubjectAndJobType() { ExecutableManager manager = ExecutableManager.getInstance(jobInfoService.getConfig(), "default"); SucceedChainedTestExecutable job1 = new SucceedChainedTestExecutable(); - job1.setProject(getProject()); + job1.setProject(DEFAULT_PROJECT); job1.setName("mocked job"); job1.setTargetSubject("12345678"); job1.setJobType(JobTypeEnum.INDEX_BUILD); - final TableDesc tableDesc = NTableMetadataManager.getInstance(getTestConfig(), getProject()) + final TableDesc tableDesc = NTableMetadataManager.getInstance(getTestConfig(), DEFAULT_PROJECT) .getTableDesc("DEFAULT.TEST_KYLIN_FACT"); - NTableSamplingJob samplingJob = NTableSamplingJob.internalCreate(tableDesc, getProject(), "ADMIN", 20000); + NTableSamplingJob samplingJob = NTableSamplingJob.internalCreate(tableDesc, DEFAULT_PROJECT, "ADMIN", 20000); manager.addJob(job1); manager.addJob(samplingJob); List jobNames = Lists.newArrayList(); @@ -442,20 +447,20 @@ public void testGetTargetSubjectAndJobType() { jobFilter.setSortBy("create_time"); List jobs = jobInfoService.listJobs(jobFilter); - Assert.assertEquals("The model is deleted", jobs.get(0).getTargetSubject()); // no target model so it's null - Assert.assertEquals("mocked job", jobs.get(0).getJobName()); - Assert.assertEquals(tableDesc.getIdentity(), jobs.get(1).getTargetSubject()); - Assert.assertEquals("TABLE_SAMPLING", jobs.get(1).getJobName()); + assertEquals("The model is deleted", jobs.get(0).getTargetSubject()); // no target model so it's null + assertEquals("mocked job", jobs.get(0).getJobName()); + assertEquals(tableDesc.getIdentity(), jobs.get(1).getTargetSubject()); + assertEquals("TABLE_SAMPLING", jobs.get(1).getJobName()); } @Test public void testJobnameResponse() throws Exception { - ExecutableManager manager = Mockito.spy(ExecutableManager.getInstance(getTestConfig(), getProject())); + ExecutableManager manager = Mockito.spy(ExecutableManager.getInstance(getTestConfig(), DEFAULT_PROJECT)); ConcurrentHashMap> managersByPrjCache = NLocalFileMetadataTestCase .getInstanceByProject(); - managersByPrjCache.get(ExecutableManager.class).put(getProject(), manager); + managersByPrjCache.get(ExecutableManager.class).put(DEFAULT_PROJECT, manager); ExecutablePO job1 = Mockito.spy(ExecutablePO.class); - job1.setProject(getProject()); + job1.setProject(DEFAULT_PROJECT); job1.setUuid("sparkjob1"); job1.setTargetModel("model1"); job1.setJobType(JobTypeEnum.INC_BUILD); @@ -464,7 +469,7 @@ public void testJobnameResponse() throws Exception { subJob.setType("org.apache.kylin.job.execution.SucceedChainedTestExecutable"); subJob.setJobType(JobTypeEnum.INC_BUILD); subJob.getOutput().setStatus("SUCCEED"); - subJob.setProject(getProject()); + subJob.setProject(DEFAULT_PROJECT); subJob.setUuid(job1.getId() + "_00"); job1.setTasks(Lists.newArrayList(subJob)); manager.addJob(job1); @@ -477,11 +482,11 @@ public void testJobnameResponse() throws Exception { false); List jobs = jobInfoService.listJobs(jobFilter); - Assert.assertEquals(2, jobs.size()); + assertEquals(2, jobs.size()); ExecutableResponse executableResponse = jobs.get(0); - Assert.assertEquals("sparkjob1", executableResponse.getId()); + assertEquals("sparkjob1", executableResponse.getId()); } @@ -499,15 +504,15 @@ public void testFilterJobExactMatch() throws Exception { List jobNames = Lists.newArrayList(); JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "def", false, "default", "", false); List jobs = jobInfoService.listJobs(jobFilter); - Assert.assertEquals(0, jobs.size()); + assertEquals(0, jobs.size()); JobFilter jobFilter2 = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "def", true, "default", "", false); List jobs2 = jobInfoService.listJobs(jobFilter2); - Assert.assertEquals(0, jobs2.size()); + assertEquals(0, jobs2.size()); JobFilter jobFilter3 = new JobFilter(Lists.newArrayList(), jobNames, 0, "", null, true, "default", "", false); List jobs3 = jobInfoService.listJobs(jobFilter3); - Assert.assertEquals(3, jobs3.size()); + assertEquals(3, jobs3.size()); } @Test @@ -527,7 +532,7 @@ public void testUpdateStageOutput() { String segmentId2 = RandomUtil.randomUUIDStr(); String segmentIds = segmentId + "," + segmentId2; - ExecutableManager manager = ExecutableManager.getInstance(jobInfoService.getConfig(), project); + ExecutableManager manager = ExecutableManager.getInstance(jobInfoService.getConfig(), DEFAULT_PROJECT); SucceedChainedTestExecutable executable = new SucceedChainedTestExecutable(); executable.setId(RandomUtil.randomUUIDStr()); executable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa"); @@ -565,7 +570,7 @@ public void testUpdateStageOutput() { manager.updateStageStatus(logicStep.getId(), segmentId, ExecutableState.RUNNING, null, "test output"); - List jobDetail = jobInfoService.getJobDetail(project, executable.getId()); + List jobDetail = jobInfoService.getJobDetail(DEFAULT_PROJECT, executable.getId()); assertEquals(1, jobDetail.size()); ExecutableStepResponse executableStepResponse = jobDetail.get(0); checkResponse(executableStepResponse, sparkExecutable.getId(), null); @@ -587,7 +592,7 @@ public void testUpdateStageOutput() { manager.updateStageStatus(logicStep.getId(), null, ExecutableState.SUCCEED, null, "test output"); - jobDetail = jobInfoService.getJobDetail(project, executable.getId()); + jobDetail = jobInfoService.getJobDetail(DEFAULT_PROJECT, executable.getId()); assertEquals(1, jobDetail.size()); executableStepResponse = jobDetail.get(0); checkResponse(executableStepResponse, sparkExecutable.getId(), null); @@ -626,7 +631,118 @@ public void testGetJobDetail() { executable.setJobType(JobTypeEnum.INC_BUILD); manager.addJob(executable); List result = jobInfoService.getJobDetail("default", executable.getId()); - Assert.assertEquals(1, result.size()); + assertEquals(1, result.size()); + } + + @Test + public void testGetExecutablePoByUuid() throws Exception { + ExecutablePO po = jobInfoDao.addJob(mockExecutablePO(false, "1")); + + // found when both project and jobId match + ExecutablePO found = jobInfoDao.getExecutablePoByUuidWithProject(DEFAULT_PROJECT, po.getId()); + assertNotNull(found); + assertEquals(po.getId(), found.getId()); + + // not found when jobId does not exist + assertNull(jobInfoDao.getExecutablePoByUuidWithProject(DEFAULT_PROJECT, "not_exist_job_id")); + + // not found when the job belongs to another project (project scoping) + assertNull(jobInfoDao.getExecutablePoByUuidWithProject("not_exist_project", po.getId())); + } + + @Test + public void testGetJobDetailProjectScoped() throws Exception { + Mockito.doNothing().when(aclEvaluate).checkProjectOperationPermission(Mockito.anyString()); + ExecutablePO po = jobInfoDao.addJob(mockExecutablePO(false, "1")); + + // queried under the owning project -> succeeds + List result = jobInfoService.getJobDetail(DEFAULT_PROJECT, po.getId()); + assertFalse(result.isEmpty()); + + // same job queried under a different project -> JOB_NOT_EXIST + assertKylinExeption(() -> jobInfoService.getJobDetail("not_exist_project", po.getId()), po.getId()); + } + + @Test + public void testListJobsEmptyProjectPermissionFilter() throws Exception { + mockDetailJobs(true); + getTestConfig().setProperty("kylin.streaming.enabled", "false"); + ProjectInstance defaultProject = NProjectManager.getInstance(getTestConfig()).getProject(DEFAULT_PROJECT); + + // empty project + user has OPERATION permission on 'default' -> all 3 jobs are visible + Mockito.doReturn(Lists.newArrayList(new UserProjectPermissionResponse(defaultProject, "OPERATION"))) + .when(projectService).getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false, + AclPermissionEnum.OPERATION); + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), Lists.newArrayList(), 4, "", "", false, null, "", + true); + List jobs = jobInfoService.listJobs(jobFilter, 0, 10); + assertEquals(3, jobs.size()); + + // empty project + user has no permitted project -> all jobs filtered out + Mockito.doReturn(Lists.newArrayList()).when(projectService) + .getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false, + AclPermissionEnum.OPERATION); + assertTrue(jobInfoService.listJobs(jobFilter, 0, 10).isEmpty()); + + // empty project + user has OPERATION permission on 'default' -> all 3 jobs are visible + jobFilter.setProject(StringUtils.EMPTY); + UserProjectPermissionResponse project1 = Mockito.mock(UserProjectPermissionResponse.class); + when(project1.getProject()) + .thenReturn(ProjectInstance.create(DEFAULT_PROJECT, "UT", StringUtils.EMPTY, Maps.newLinkedHashMap())); + UserProjectPermissionResponse project2 = Mockito.mock(UserProjectPermissionResponse.class); + when(project2.getProject()) + .thenReturn(ProjectInstance.create("default2", "UT", StringUtils.EMPTY, Maps.newLinkedHashMap())); + when(projectService.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false, + AclPermissionEnum.OPERATION)).thenReturn(Lists.newArrayList(project1, project2)); + assertEquals(3, jobInfoService.listJobs(jobFilter, 0, 10).size()); + + // empty project + user has OPERATION permission on 'default3' -> all jobs filtered out + when(project1.getProject()) + .thenReturn(ProjectInstance.create("default3", "UT", StringUtils.EMPTY, Maps.newLinkedHashMap())); + assertTrue(jobInfoService.listJobs(jobFilter, 0, 10).isEmpty()); + } + + @Test + public void testCountJobs() throws Exception { + mockDetailJobs(false); + getTestConfig().setProperty("kylin.streaming.enabled", "false"); + + // count all jobs in project + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), Lists.newArrayList(), 4, "", "", false, + DEFAULT_PROJECT, "", true); + assertEquals(3, jobInfoService.countJobs(jobFilter)); + Mockito.verify(aclEvaluate, Mockito.atLeastOnce()).checkProjectOperationPermission("default"); + + // count by status + jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW)); + assertEquals(2, jobInfoService.countJobs(jobFilter)); + jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW, JobStatusEnum.FINISHED)); + assertEquals(3, jobInfoService.countJobs(jobFilter)); + + // fuzzy count by job id when key can not be converted to subjects + jobFilter.setStatuses(Lists.newArrayList()); + jobFilter.setKey("sparkjob1"); + assertEquals(1, jobInfoService.countJobs(jobFilter)); + jobFilter.setKey("not_exist_job_id"); + assertEquals(0, jobInfoService.countJobs(jobFilter)); + + // empty project -> count jobs of projects with OPERATION permission only + jobFilter.setKey(StringUtils.EMPTY); + jobFilter.setProject(StringUtils.EMPTY); + ProjectInstance defaultProject = NProjectManager.getInstance(getTestConfig()).getProject(DEFAULT_PROJECT); + Mockito.doReturn(Lists.newArrayList(new UserProjectPermissionResponse(defaultProject, "OPERATION"))) + .when(projectService).getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false, + AclPermissionEnum.OPERATION); + assertEquals(3, jobInfoService.countJobs(jobFilter)); + + // project `default1` -> all jobs filtered out + UserProjectPermissionResponse project1 = Mockito.mock(UserProjectPermissionResponse.class); + when(project1.getProject()) + .thenReturn(ProjectInstance.create("default1", "UT", StringUtils.EMPTY, Maps.newLinkedHashMap())); + Mockito.doReturn(Lists.newArrayList(project1)).when(projectService) + .getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false, + AclPermissionEnum.OPERATION); + assertEquals(0, jobInfoService.countJobs(jobFilter)); } @Test @@ -638,7 +754,7 @@ public void testBasic() throws IOException { manager.addJob(executable); jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "PAUSE", Lists.newArrayList()); - Assert.assertEquals(ExecutableState.PAUSED, manager.getJob(executable.getId()).getStatus()); + assertEquals(ExecutableState.PAUSED, manager.getJob(executable.getId()).getStatus()); UnitOfWork.doInTransactionWithRetry(() -> { jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "RESUME", Lists.newArrayList()); @@ -646,24 +762,24 @@ public void testBasic() throws IOException { }, "default"); jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "PAUSE", Lists.newArrayList()); - Assert.assertEquals(ExecutableState.PAUSED, manager.getJob(executable.getId()).getStatus()); + assertEquals(ExecutableState.PAUSED, manager.getJob(executable.getId()).getStatus()); UnitOfWork.doInTransactionWithRetry(() -> { jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "RESUME", Lists.newArrayList("STOPPED")); return null; }, "default"); - Assert.assertEquals(ExecutableState.READY, manager.getJob(executable.getId()).getStatus()); + assertEquals(ExecutableState.READY, manager.getJob(executable.getId()).getStatus()); UnitOfWork.doInTransactionWithRetry(() -> { jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "DISCARD", Lists.newArrayList()); return null; }, "default"); - Assert.assertEquals(ExecutableState.DISCARDED, manager.getJob(executable.getId()).getStatus()); - Assert.assertNull(dsMgr.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment()); + assertEquals(ExecutableState.DISCARDED, manager.getJob(executable.getId()).getStatus()); + assertNull(dsMgr.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment()); Mockito.doNothing().when(tableExtService).removeJobIdFromTableExt(executable.getId(), "default"); jobInfoService.batchDropJob("default", Lists.newArrayList(executable.getId()), Lists.newArrayList()); List executables = manager.getAllExecutables(); - Assert.assertFalse(executables.contains(executable)); + assertFalse(executables.contains(executable)); } @Test @@ -676,7 +792,7 @@ public void testDiscardJobException() throws IOException { manager.updateJobOutput(executable.getId(), ExecutableState.PENDING, null, null, null); manager.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, null, null); manager.updateJobOutput(executable.getId(), ExecutableState.SUCCEED, null, null, null); - Assert.assertEquals(ExecutableState.SUCCEED, executable.getStatus()); + assertEquals(ExecutableState.SUCCEED, executable.getStatus()); thrown.expect(KylinException.class); thrown.expectMessage(JOB_UPDATE_STATUS_FAILED.getMsg("DISCARD", executable.getId(), ExecutableState.SUCCEED)); jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "DISCARD", @@ -698,27 +814,27 @@ public void testGlobalBasic() throws IOException { Mockito.when(projectService.getOwnedProjects()).thenReturn(Lists.newArrayList("default")); jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "PAUSE", Lists.newArrayList()); - Assert.assertEquals(ExecutableState.PAUSED, manager.getJob(executable.getId()).getStatus()); + assertEquals(ExecutableState.PAUSED, manager.getJob(executable.getId()).getStatus()); jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "RESUME", Lists.newArrayList()); jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "PAUSE", Lists.newArrayList()); - Assert.assertEquals(ExecutableState.PAUSED, manager.getJob(executable.getId()).getStatus()); + assertEquals(ExecutableState.PAUSED, manager.getJob(executable.getId()).getStatus()); jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "RESUME", Lists.newArrayList("STOPPED")); - Assert.assertEquals(ExecutableState.READY, manager.getJob(executable.getId()).getStatus()); + assertEquals(ExecutableState.READY, manager.getJob(executable.getId()).getStatus()); jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()), "default", "DISCARD", Lists.newArrayList()); - Assert.assertEquals(ExecutableState.DISCARDED, manager.getJob(executable.getId()).getStatus()); + assertEquals(ExecutableState.DISCARDED, manager.getJob(executable.getId()).getStatus()); - Assert.assertNull(dsMgr.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment()); + assertNull(dsMgr.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment()); Mockito.doNothing().when(tableExtService).removeJobIdFromTableExt(executable.getId(), "default"); jobInfoService.batchDropGlobalJob(Lists.newArrayList(executable.getId()), Lists.newArrayList()); - Assert.assertFalse(manager.getAllExecutables().contains(executable)); + assertFalse(manager.getAllExecutables().contains(executable)); } @Test @@ -770,7 +886,7 @@ public void testGetAllJobOutput() throws IOException { sampleLog = sampleData.toString(); } String[] actualLines = StringUtils.splitByWholeSeparatorPreserveAllTokens(sampleLog, "\n"); - Assert.assertTrue(Arrays.deepEquals(exceptLines, actualLines)); + assertTrue(Arrays.deepEquals(exceptLines, actualLines)); } public void testFusionModelStopBatchJob() { @@ -796,7 +912,7 @@ public void testFusionModelStopBatchJob() { return null; }, project); AbstractExecutable job = manager.getJob(executable.getId()); - Assert.assertEquals(ExecutableState.DISCARDED, job.getStatus()); + assertEquals(ExecutableState.DISCARDED, job.getStatus()); // test no fusion model String table2 = "SSB.DATES"; @@ -809,26 +925,26 @@ public void testFusionModelStopBatchJob() { @Test public void testKillExistApplication() { - ExecutableManager manager = ExecutableManager.getInstance(jobInfoService.getConfig(), getProject()); + ExecutableManager manager = ExecutableManager.getInstance(jobInfoService.getConfig(), DEFAULT_PROJECT); SucceedChainedTestExecutable executable = new SucceedChainedTestExecutable(); - executable.setProject(getProject()); + executable.setProject(DEFAULT_PROJECT); addSegment(executable); val task = new NSparkExecutable(); - task.setProject(getProject()); + task.setProject(DEFAULT_PROJECT); addSegment(task); executable.addTask(task); executable.setJobType(JobTypeEnum.INC_BUILD); manager.addJob(executable); jobInfoService.killExistApplication(executable); - jobInfoService.killExistApplication(getProject(), executable.getId()); + jobInfoService.killExistApplication(DEFAULT_PROJECT, executable.getId()); } @Test public void testSetExceptionResolveAndCode() { - val manager = ExecutableManager.getInstance(jobInfoService.getConfig(), project); + val manager = ExecutableManager.getInstance(jobInfoService.getConfig(), DEFAULT_PROJECT); val executable = new SucceedChainedTestExecutable(); - executable.setProject(project); + executable.setProject(DEFAULT_PROJECT); executable.setId(RandomUtil.randomUUIDStr()); executable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa"); executable.setJobType(JobTypeEnum.INC_BUILD); @@ -839,46 +955,46 @@ public void testSetExceptionResolveAndCode() { var failedSegmentId = RandomUtil.randomUUIDStr(); var failedStack = ExceptionUtils.getStackTrace(new NoRetryException("date format not match")); var failedReason = "date format not match"; - jobInfoService.updateJobError(project, jobId, failedStepId, failedSegmentId, failedStack, failedReason); + jobInfoService.updateJobError(DEFAULT_PROJECT, jobId, failedStepId, failedSegmentId, failedStack, failedReason); ExecutableStepResponse executableStepResponse = new ExecutableStepResponse(); jobInfoService.setExceptionResolveAndCodeAndReason(executable.getOutput(), executableStepResponse); - Assert.assertEquals(JobExceptionResolve.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionResolve().getResolve(), + assertEquals(JobExceptionResolve.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionResolve().getResolve(), executableStepResponse.getFailedResolve()); - Assert.assertEquals(JobErrorCode.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toErrorCode().getLocalizedString(), + assertEquals(JobErrorCode.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toErrorCode().getLocalizedString(), executableStepResponse.getFailedCode()); - Assert.assertEquals(JobExceptionReason.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionReason().getReason(), + assertEquals(JobExceptionReason.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionReason().getReason(), executableStepResponse.getFailedReason()); ErrorCode.setMsg("en"); ExceptionResolve.setLang("en"); jobInfoService.setExceptionResolveAndCodeAndReason(executable.getOutput(), executableStepResponse); - Assert.assertEquals(JobExceptionResolve.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionResolve().getResolve(), + assertEquals(JobExceptionResolve.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionResolve().getResolve(), executableStepResponse.getFailedResolve()); - Assert.assertEquals(JobErrorCode.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toErrorCode().getLocalizedString(), + assertEquals(JobErrorCode.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toErrorCode().getLocalizedString(), executableStepResponse.getFailedCode()); - Assert.assertEquals(JobExceptionReason.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionReason().getReason(), + assertEquals(JobExceptionReason.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionReason().getReason(), executableStepResponse.getFailedReason()); // test default reason / code / resolve manager.updateJobError(jobId, null, null, null, null); - jobInfoService.updateJobError(project, jobId, failedStepId, failedSegmentId, failedStack, "test"); + jobInfoService.updateJobError(DEFAULT_PROJECT, jobId, failedStepId, failedSegmentId, failedStack, "test"); jobInfoService.setExceptionResolveAndCodeAndReason(executable.getOutput(), executableStepResponse); - Assert.assertEquals(JobExceptionResolve.JOB_BUILDING_ERROR.toExceptionResolve().getResolve(), + assertEquals(JobExceptionResolve.JOB_BUILDING_ERROR.toExceptionResolve().getResolve(), executableStepResponse.getFailedResolve()); - Assert.assertEquals(JobErrorCode.JOB_BUILDING_ERROR.toErrorCode().getLocalizedString(), + assertEquals(JobErrorCode.JOB_BUILDING_ERROR.toErrorCode().getLocalizedString(), executableStepResponse.getFailedCode()); - Assert.assertEquals(JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason() + ": test", + assertEquals(JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason() + ": test", executableStepResponse.getFailedReason()); ErrorCode.setMsg("en"); ExceptionResolve.setLang("en"); jobInfoService.setExceptionResolveAndCodeAndReason(executable.getOutput(), executableStepResponse); - Assert.assertEquals(JobExceptionResolve.JOB_BUILDING_ERROR.toExceptionResolve().getResolve(), + assertEquals(JobExceptionResolve.JOB_BUILDING_ERROR.toExceptionResolve().getResolve(), executableStepResponse.getFailedResolve()); - Assert.assertEquals(JobErrorCode.JOB_BUILDING_ERROR.toErrorCode().getLocalizedString(), + assertEquals(JobErrorCode.JOB_BUILDING_ERROR.toErrorCode().getLocalizedString(), executableStepResponse.getFailedCode()); - Assert.assertEquals(JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason() + ": test", + assertEquals(JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason() + ": test", executableStepResponse.getFailedReason()); } @@ -904,20 +1020,21 @@ public void testHistoryTrackerUrl() { @Test public void testDiscardJobAndNotify() { - ExecutableManager manager = ExecutableManager.getInstance(getTestConfig(), project); + ExecutableManager manager = ExecutableManager.getInstance(getTestConfig(), DEFAULT_PROJECT); val job = new DefaultExecutable(); - job.setProject(project); + job.setProject(DEFAULT_PROJECT); job.setJobType(JobTypeEnum.INC_BUILD); manager.addJob(job); overwriteSystemProp("kylin.job.notification-enabled", "true"); UnitOfWork.doInTransactionWithRetry(() -> { - jobInfoService.updateJobStatus(job.getId(), ExecutableManager.toPO(job, project), project, "DISCARD"); + jobInfoService.updateJobStatus(job.getId(), ExecutableManager.toPO(job, DEFAULT_PROJECT), DEFAULT_PROJECT, + "DISCARD"); return null; - }, project); + }, DEFAULT_PROJECT); - Assert.assertTrue(containsLog("[Job Discarded] is not specified by user, not need to notify users.")); + assertTrue(containsLog("[Job Discarded] is not specified by user, not need to notify users.")); } @Test @@ -951,18 +1068,18 @@ public void testExecutableResponse() throws Exception { List jobs = jobInfoService.listJobs(jobFilter); List executableResponses = jobInfoService.addOldParams(jobs); ExecutableResponse executable = executableResponses.get(0); - Assert.assertEquals("", executable.getRelatedSegment()); - Assert.assertEquals(0, executable.getProgress(), 0); + assertEquals("", executable.getRelatedSegment()); + assertEquals(0, executable.getProgress(), 0); executable.getSteps().get(0).setStatus(JobStatusEnum.FINISHED); - Assert.assertEquals(33, executable.getProgress(), 1); + assertEquals(33, executable.getProgress(), 1); executable.setSteps(null); String uuid = UUID.randomUUID().toString(); executable.setTargetSegments(Lists.newArrayList(uuid)); - Assert.assertEquals(0.0, executable.getProgress(), 0); - Assert.assertEquals(uuid, executable.getRelatedSegment()); + assertEquals(0.0, executable.getProgress(), 0); + assertEquals(uuid, executable.getRelatedSegment()); executable.setTargetSegments(Collections.emptyList()); - Assert.assertEquals(0.0, executable.getProgress(), 0); - Assert.assertEquals("", executable.getRelatedSegment()); + assertEquals(0.0, executable.getProgress(), 0); + assertEquals("", executable.getRelatedSegment()); } @Test @@ -971,19 +1088,19 @@ public void testParseToExecutableStepWithStepOutputNull() { task.setProject("default"); ExecutableState jobState = ExecutableState.RUNNING; ExecutableStepResponse result = jobInfoService.parseToExecutableStep(task, null, new HashMap<>(), jobState); - Assert.assertSame(PENDING, result.getStatus()); + assertSame(PENDING, result.getStatus()); } @Test public void testJobDiscard() { ExecutableManager executableManager = Mockito.mock(ExecutableManager.class); - Mockito.when(jobInfoService.getManager(ExecutableManager.class, project)).thenReturn(executableManager); + Mockito.when(jobInfoService.getManager(ExecutableManager.class, DEFAULT_PROJECT)).thenReturn(executableManager); Mockito.doAnswer(invocation -> { // ensure unit of work transaction - Assert.assertNotNull(UnitOfWork.get()); + assertNotNull(UnitOfWork.get()); return null; }).when(executableManager).discardJob(Mockito.any()); - jobInfoService.discardJobs(project, Lists.newArrayList("job1", "job2")); + jobInfoService.discardJobs(DEFAULT_PROJECT, Lists.newArrayList("job1", "job2")); Mockito.verify(executableManager, Mockito.times(1)).discardJob("job1"); Mockito.verify(executableManager, Mockito.times(1)).discardJob("job2"); } @@ -991,13 +1108,13 @@ public void testJobDiscard() { @Test public void testSuicideJobOfModel() { ExecutableManager executableManager = Mockito.mock(ExecutableManager.class); - Mockito.when(jobInfoService.getManager(ExecutableManager.class, project)).thenReturn(executableManager); + Mockito.when(jobInfoService.getManager(ExecutableManager.class, DEFAULT_PROJECT)).thenReturn(executableManager); Mockito.doAnswer(invocation -> { // ensure unit of work transaction - Assert.assertNotNull(UnitOfWork.get()); + assertNotNull(UnitOfWork.get()); return null; }).when(executableManager).checkSuicideJobOfModel(Mockito.any(), Mockito.anyString()); - jobInfoService.checkSuicideJobOfModel(project, "test"); - Mockito.verify(executableManager, Mockito.times(1)).checkSuicideJobOfModel(project, "test"); + jobInfoService.checkSuicideJobOfModel(DEFAULT_PROJECT, "test"); + Mockito.verify(executableManager, Mockito.times(1)).checkSuicideJobOfModel(DEFAULT_PROJECT, "test"); } } diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/util/JobFilterUtilTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/util/JobFilterUtilTest.java new file mode 100644 index 00000000000..412be07cb0f --- /dev/null +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/util/JobFilterUtilTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.job.rest.JobFilter; +import org.apache.kylin.job.rest.JobMapperFilter; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.rest.response.UserProjectPermissionResponse; +import org.apache.kylin.rest.security.AclPermissionEnum; +import org.apache.kylin.rest.service.ModelService; +import org.apache.kylin.rest.service.ProjectService; +import org.apache.kylin.rest.service.TableExtService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class JobFilterUtilTest { + + // JobTimeFilterEnum.ALL -> avoids relative-date computation in getQueryStartTime + private static final int TIME_FILTER_ALL = 4; + + private ModelService modelService; + private TableExtService tableExtService; + private ProjectService projectService; + + @Before + public void setUp() { + modelService = Mockito.mock(ModelService.class); + tableExtService = Mockito.mock(TableExtService.class); + projectService = Mockito.mock(ProjectService.class); + } + + private JobFilter newJobFilter(String project, boolean reverse) { + return new JobFilter(Lists.newArrayList(), Lists.newArrayList(), TIME_FILTER_ALL, "", "", false, project, "", + reverse); + } + + private UserProjectPermissionResponse permissionResponse(String projectName) { + ProjectInstance instance = new ProjectInstance(); + instance.setName(projectName); + return new UserProjectPermissionResponse(instance, AclPermissionEnum.OPERATION.name()); + } + + @Test + public void testGetJobMapperFilterEmptyProjectPopulatesProjects() throws Exception { + Mockito.when(projectService.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false, + AclPermissionEnum.OPERATION)) + .thenReturn(Lists.newArrayList(permissionResponse("p1"), permissionResponse("p2"))); + + JobMapperFilter filter = JobFilterUtil.getJobMapperFilter(newJobFilter(null, true), 0, 10, modelService, + tableExtService, projectService); + + Assert.assertEquals(Lists.newArrayList("p1", "p2"), filter.getProjects()); + Assert.assertTrue(StringUtils.isEmpty(filter.getProject())); + Mockito.verify(projectService, Mockito.times(1)) + .getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false, + AclPermissionEnum.OPERATION); + } + + @Test + public void testGetJobMapperFilterEmptyProjectNoPermittedProjects() throws Exception { + Mockito.when(projectService.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false, + AclPermissionEnum.OPERATION)).thenReturn(Lists.newArrayList()); + + JobMapperFilter filter = JobFilterUtil.getJobMapperFilter(newJobFilter("", true), 0, 10, modelService, + tableExtService, projectService); + + Assert.assertNotNull(filter.getProjects()); + Assert.assertTrue(filter.getProjects().isEmpty()); + } + + @Test + public void testGetJobMapperFilterWithProjectKeepsProjectsNull() throws Exception { + JobMapperFilter filter = JobFilterUtil.getJobMapperFilter(newJobFilter("default", true), 5, 20, modelService, + tableExtService, projectService); + + Assert.assertEquals("default", filter.getProject()); + // single-project branch must not populate the multi-project list + Assert.assertNull(filter.getProjects()); + Assert.assertEquals(5, filter.getOffset()); + Assert.assertEquals(20, filter.getLimit()); + Mockito.verify(projectService, Mockito.never()).getProjectsFilterByExactMatchAndPermissionWrapperUserPermission( + Mockito.any(), Mockito.anyBoolean(), Mockito.any()); + } + + @Test + public void testGetJobMapperFilterOrderType() throws Exception { + JobMapperFilter reversed = JobFilterUtil.getJobMapperFilter(newJobFilter("default", true), 0, 10, modelService, + tableExtService, projectService); + Assert.assertEquals("DESC", reversed.getOrderType()); + + JobMapperFilter ascending = JobFilterUtil.getJobMapperFilter(newJobFilter("default", false), 0, 10, + modelService, tableExtService, projectService); + Assert.assertEquals("ASC", ascending.getOrderType()); + } +}