1 package net.sumaris.server.service.technical;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 import com.google.common.base.Preconditions;
26 import io.reactivex.BackpressureStrategy;
27 import io.reactivex.Observable;
28 import io.reactivex.schedulers.Schedulers;
29 import net.sumaris.core.dao.technical.model.IUpdateDateEntityBean;
30 import net.sumaris.core.exception.DataNotFoundException;
31 import net.sumaris.core.exception.SumarisTechnicalException;
32 import net.sumaris.server.dao.technical.EntityDao;
33 import org.nuiton.i18n.I18n;
34 import org.reactivestreams.Publisher;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.springframework.beans.factory.annotation.Autowired;
38 import org.springframework.core.convert.ConversionService;
39 import org.springframework.stereotype.Service;
40
41 import java.io.Serializable;
42 import java.util.Calendar;
43 import java.util.Date;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicLong;
46
47 @Service("changesPublisherService")
48 public class ChangesPublisherServiceImpl implements ChangesPublisherService {
49
50
51
52
53 private static final Logger log =
54 LoggerFactory.getLogger(ChangesPublisherServiceImpl.class);
55
56 private final AtomicLong publicherCount = new AtomicLong(0);
57
58 @Autowired
59 private EntityDao dataChangeDao;
60
61 @Autowired
62 private ConversionService conversionService;
63
64 @Autowired
65 private ChangesPublisherService self;
66
67 @Override
68 public <K extends Serializable, D extends Date, T extends IUpdateDateEntityBean<K, D>, V extends IUpdateDateEntityBean<K, D>> Publisher<V>
69 getPublisher(final Class<T> entityClass,
70 final Class<V> targetClass,
71 final K id,
72 Integer minIntervalInSecond,
73 boolean startWithActualValue) {
74
75 Preconditions.checkArgument(minIntervalInSecond == null || minIntervalInSecond.intValue() >= 10, "minimum interval value should be >= 10 seconds");
76 if (minIntervalInSecond == null) minIntervalInSecond = 30;
77
78
79 if (!conversionService.canConvert(entityClass, targetClass)) {
80 throw new SumarisTechnicalException(I18n.t("sumaris.error.missingConverter", entityClass.getSimpleName()));
81 }
82
83
84 T initialEntity = dataChangeDao.get(entityClass, id);
85 if (initialEntity == null) {
86 throw new DataNotFoundException(I18n.t("sumaris.error.notFound", entityClass.getSimpleName(), id));
87 }
88
89 log.info(String.format("Checking changes on %s #%s every %s sec. (total publishers: %s)", entityClass.getSimpleName(), id, minIntervalInSecond, publicherCount.incrementAndGet()));
90
91
92 final Calendar lastUpdateDate = Calendar.getInstance();
93 if (initialEntity.getUpdateDate() != null) {
94 lastUpdateDate.setTime(initialEntity.getUpdateDate());
95 } else {
96 lastUpdateDate.setTime(new Date());
97 }
98
99
100 Observable stop = Observable.just(Boolean.TRUE).delay(1, TimeUnit.HOURS);
101 stop.subscribe(o -> log.debug(String.format("Closing publisher on %s #%s: max time reached. (total publishers: %s)", entityClass.getSimpleName(), id, publicherCount.decrementAndGet())));
102
103 Observable<V> observable = Observable
104 .interval(minIntervalInSecond, TimeUnit.SECONDS)
105 .takeUntil(stop)
106 .observeOn(Schedulers.io())
107 .flatMap(n -> {
108
109 V newerVOOrNull = self.getIfNewer(entityClass, targetClass, id, lastUpdateDate.getTime());
110
111
112 if (newerVOOrNull != null) {
113 lastUpdateDate.setTime(newerVOOrNull.getUpdateDate());
114 return Observable.just(newerVOOrNull);
115 }
116 return Observable.empty();
117 });
118
119
120 if (startWithActualValue) {
121
122 V initialVO = conversionService.convert(initialEntity, targetClass);
123 if (initialVO == null) {
124 throw new DataNotFoundException(I18n.t("sumaris.error.notFound", entityClass.getSimpleName(), id));
125 }
126 observable = observable.startWith(initialVO);
127 }
128
129
130 return observable.toFlowable(BackpressureStrategy.LATEST);
131 }
132
133 @Override
134 public <K extends Serializable, D extends Date, T extends IUpdateDateEntityBean<K, D>, V extends IUpdateDateEntityBean<K, D>> V
135 getIfNewer(Class<T> entityClass,
136 Class<V> targetClass,
137 K id,
138 Date lastUpdateDate) {
139
140 T entity = dataChangeDao.get(entityClass, id);
141
142 if (entity == null) {
143 throw new DataNotFoundException(I18n.t("sumaris.error.notFound", entityClass.getSimpleName(), id));
144 }
145
146
147 if (entity.getUpdateDate() != null && entity.getUpdateDate().after(lastUpdateDate)) {
148 if (!conversionService.canConvert(entityClass, targetClass)) {
149 throw new SumarisTechnicalException(I18n.t("sumaris.error.missingConverter", entityClass.getSimpleName()));
150 }
151 return conversionService.convert(entity, targetClass);
152 }
153 return null;
154 }
155
156
157 }