Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package edu.asu.diging.citesphere.core.service;

import java.util.concurrent.Future;

import edu.asu.diging.citesphere.core.exceptions.ZoteroHttpStatusException;
import edu.asu.diging.citesphere.user.IUser;

public interface IAsyncCitationProcessor {

void sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException;
Future<String> sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

public interface ICitationManager {

boolean cancel(String groupId);

List<ICitationGroup> getGroups(IUser user);

CitationResults getGroupItems(IUser user, String groupId, String collectionId, int page, String sortBy, List<String> conceptIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand All @@ -17,6 +18,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import edu.asu.diging.citesphere.core.exceptions.ZoteroHttpStatusException;
Expand Down Expand Up @@ -83,14 +85,14 @@ public void init() {
*/
@Override
@Async
public void sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException {
public Future<String> sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException {
GroupSyncJob prevJob = jobManager.getMostRecentJob(groupId + "");
// it's un-intuitive to test for not inactive statuses here, but it's more likely we'll add
// more activate job statuses than inactive ones, so it's less error prone to use the list that
// more active job statuses than inactive ones, so it's less error prone to use the list that
// is less likely to change.
if (prevJob != null && !inactiveJobStatuses.contains(prevJob.getStatus())) {
// there is already a job running, let's not start another one
return;
return new AsyncResult<String>(null);
}

logger.info("Starting sync for " + groupId);
Expand All @@ -100,6 +102,11 @@ public void sync(IUser user, String groupId, long contentVersion, String collect
job.setStatus(JobStatus.PREPARED);
jobRepo.save(job);
jobManager.addJob(job);
Thread currentThread = Thread.currentThread();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just do this in checkIfThreadIsInterrupted and remove the first parameter of the method.


if(checkIfThreadIsInterrupted(currentThread, job, groupId)) {
return new AsyncResult<String>(job.getId());
}

// we'll retrieve the latest group version first in case there are more changes
// in between
Expand All @@ -119,24 +126,65 @@ public void sync(IUser user, String groupId, long contentVersion, String collect
jobRepo.save(job);

AtomicInteger counter = new AtomicInteger();

if(checkIfThreadIsInterrupted(currentThread, job, groupId)) {
return new AsyncResult<String>(job.getId());
}

syncCitations(user, groupId, job, versions, counter);

if(checkIfThreadIsInterrupted(currentThread, job, groupId)) {
return new AsyncResult<String>(job.getId());
}

syncCollections(user, groupId, job, collectionVersions, groupVersion, counter);

if(checkIfThreadIsInterrupted(currentThread, job, groupId)) {
return new AsyncResult<String>(job.getId());
}

removeDeletedItems(deletedElements, job);


if(checkIfThreadIsInterrupted(currentThread, job, groupId)) {
return new AsyncResult<String>(job.getId());
}

// while this thread has been running, the group might have been updated by another thread
// so, we have to make sure there is no group with the same group id but other object id
// or we'll end up with two groups with the same group id.
Optional<ICitationGroup> group = groupRepo.findFirstByGroupId(new Long(groupId));

if (group.isPresent()) {
group.get().setContentVersion(groupVersion);
groupRepo.save((CitationGroup) group.get());
}

if(checkIfThreadIsInterrupted(currentThread, job, groupId)) {
return new AsyncResult<String>(job.getId());
}

job.setStatus(JobStatus.DONE);
job.setFinishedOn(OffsetDateTime.now());
jobRepo.save(job);

Future<String> result = new AsyncResult<String>(job.getId());
return result;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge with line 170

}

private boolean checkIfThreadIsInterrupted(Thread thread, GroupSyncJob job, String groupId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not only check but also set the task to cancelled


if(thread.isInterrupted()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge with 173

setJobToCanceledState(job, groupId);
return true;
}
return false;
}

private void setJobToCanceledState(GroupSyncJob job, String groupId) {
logger.info("Aborting sync for " + groupId);
job.setStatus(JobStatus.CANCELED);
job.setFinishedOn(OffsetDateTime.now());
jobRepo.save(job);
}

private void syncCitations(IUser user, String groupId, GroupSyncJob job, Map<String, Long> versions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.BiFunction;

import javax.annotation.PostConstruct;
Expand Down Expand Up @@ -86,6 +88,8 @@ public class CitationManager implements ICitationManager {

private Map<String, BiFunction<ICitation, ICitation, Integer>> sortFunctions;

Map<String, Future<String>> futureMap = new ConcurrentHashMap<>();

@PostConstruct
public void init() {
sortFunctions = new HashMap<>();
Expand Down Expand Up @@ -384,6 +388,12 @@ public CitationResults getGroupItems(IUser user, String groupId, String collecti
throw new GroupDoesNotExistException("There is no group with id " + groupId);
}

Future<String> existingFuture = futureMap.get(groupId);
if (existingFuture != null && !existingFuture.isDone()) {
existingFuture.cancel(true);
futureMap.remove(groupId);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the job cancelled here?


boolean isModified = zoteroManager.isGroupModified(user, groupId, group.getContentVersion());
CitationResults results = new CitationResults();
if (isModified) {
Expand All @@ -400,7 +410,8 @@ public CitationResults getGroupItems(IUser user, String groupId, String collecti

// then update content
results.setNotModified(false);
asyncCitationProcessor.sync(user, group.getGroupId() + "", previousVersion, collectionId);
Future<String> future = asyncCitationProcessor.sync(user, group.getGroupId() + "", previousVersion, collectionId);
futureMap.put(groupId, future);
} else {
results.setNotModified(true);
}
Expand Down Expand Up @@ -430,6 +441,16 @@ public CitationResults getGroupItems(IUser user, String groupId, String collecti

}

@Override
public boolean cancel(String groupId) {
Future<String> future = futureMap.get(groupId);
if (future != null) {
futureMap.remove(groupId);
return future.cancel(true);
}
return false;
}

@Override
public void forceGroupItemsRefresh(IUser user, String groupId, String collectionId, int page, String sortBy) {
Optional<ICitationGroup> groupOptional = groupRepository.findFirstByGroupId(new Long(groupId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;

import edu.asu.diging.citesphere.core.model.jobs.IJob;
import edu.asu.diging.citesphere.core.model.jobs.JobStatus;
import edu.asu.diging.citesphere.core.model.jobs.impl.GroupSyncJob;
import edu.asu.diging.citesphere.core.repository.jobs.GroupSyncJobRepository;
Expand All @@ -37,7 +36,7 @@ public class SyncJobManager implements ISyncJobManager {
public void init() {
currentJobs = new ConcurrentHashMap<>();
}

/* (non-Javadoc)
* @see edu.asu.diging.citesphere.core.service.jobs.impl.ISyncJobManager#addJobId(edu.asu.diging.citesphere.core.model.jobs.impl.GroupSyncJob)
*/
Expand Down Expand Up @@ -79,14 +78,15 @@ public long getJobsCount(IUser user) {
@Override
public void cancelJob(String jobId) {
Optional<GroupSyncJob> jobOptional = jobRepo.findById(jobId);
String groupId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declare where needed

if (jobOptional.isPresent()) {
GroupSyncJob job = currentJobs.get(jobOptional.get().getGroupId());
if (job == null) {
job = jobOptional.get();
groupId = job.getGroupId();
if(citationManager.cancel(groupId)) {
job.setStatus(JobStatus.CANCELED);
job.setFinishedOn(OffsetDateTime.now());
jobRepo.save(job);
}
job.setStatus(JobStatus.CANCELED);
job.setFinishedOn(OffsetDateTime.now());
jobRepo.save(job);
}
}
}