View Javadoc
1   package net.sumaris.server.service.technical;
2   
3   /*-
4    * #%L
5    * SUMARiS:: Server
6    * %%
7    * Copyright (C) 2018 SUMARiS Consortium
8    * %%
9    * This program is free software: you can redistribute it and/or modify
10   * it under the terms of the GNU General Public License as
11   * published by the Free Software Foundation, either version 3 of the
12   * License, or (at your option) any later version.
13   *
14   * This program is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU General Public License for more details.
18   *
19   * You should have received a copy of the GNU General Public
20   * License along with this program.  If not, see
21   * <http://www.gnu.org/licenses/gpl-3.0.html>.
22   * #L%
23   */
24  
25  import com.google.common.base.Preconditions;
26  import io.reactivex.BackpressureStrategy;
27  import io.reactivex.Observable;
28  import io.reactivex.schedulers.Schedulers;
29  import net.sumaris.core.dao.technical.model.IUpdateDateEntityBean;
30  import net.sumaris.core.exception.DataNotFoundException;
31  import net.sumaris.core.exception.SumarisTechnicalException;
32  import net.sumaris.server.dao.technical.EntityDao;
33  import org.nuiton.i18n.I18n;
34  import org.reactivestreams.Publisher;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  import org.springframework.beans.factory.annotation.Autowired;
38  import org.springframework.core.convert.ConversionService;
39  import org.springframework.stereotype.Service;
40  
41  import java.io.Serializable;
42  import java.util.Calendar;
43  import java.util.Date;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicLong;
46  
47  @Service("changesPublisherService")
48  public class ChangesPublisherServiceImpl implements ChangesPublisherService {
49  
50      /**
51       * Logger.
52       */
53      private static final Logger log =
54              LoggerFactory.getLogger(ChangesPublisherServiceImpl.class);
55  
56      private final AtomicLong publicherCount = new AtomicLong(0);
57  
58      @Autowired
59      private EntityDao dataChangeDao;
60  
61      @Autowired
62      private ConversionService conversionService;
63  
64      @Autowired
65      private ChangesPublisherService self; // Loop back to be able to force transaction handling
66  
67      @Override
68      public <K extends Serializable, D extends Date, T extends IUpdateDateEntityBean<K, D>, V extends IUpdateDateEntityBean<K, D>> Publisher<V>
69      getPublisher(final Class<T> entityClass,
70                   final Class<V> targetClass,
71                   final K id,
72                   Integer minIntervalInSecond,
73                   boolean startWithActualValue) {
74  
75          Preconditions.checkArgument(minIntervalInSecond == null || minIntervalInSecond.intValue() >= 10, "minimum interval value should be >= 10 seconds");
76          if (minIntervalInSecond == null) minIntervalInSecond = 30;
77  
78          // Check conversion is possible
79          if (!conversionService.canConvert(entityClass, targetClass)) {
80              throw new SumarisTechnicalException(I18n.t("sumaris.error.missingConverter", entityClass.getSimpleName()));
81          }
82  
83          // Make sure the entity exists
84          T initialEntity = dataChangeDao.get(entityClass, id);
85          if (initialEntity == null) {
86              throw new DataNotFoundException(I18n.t("sumaris.error.notFound", entityClass.getSimpleName(), id));
87          }
88  
89          log.info(String.format("Checking changes on %s #%s every %s sec. (total publishers: %s)", entityClass.getSimpleName(), id, minIntervalInSecond, publicherCount.incrementAndGet()));
90  
91  
92          final Calendar lastUpdateDate = Calendar.getInstance();
93          if (initialEntity.getUpdateDate() != null) {
94              lastUpdateDate.setTime(initialEntity.getUpdateDate());
95          } else {
96              lastUpdateDate.setTime(new Date());
97          }
98  
99          // Create stop event, after a too long delay (to be sure old publisher are closed)
100         Observable stop = Observable.just(Boolean.TRUE).delay(1, TimeUnit.HOURS);
101         stop.subscribe(o -> log.debug(String.format("Closing publisher on %s #%s: max time reached. (total publishers: %s)", entityClass.getSimpleName(), id, publicherCount.decrementAndGet())));
102 
103         Observable<V> observable = Observable
104                 .interval(minIntervalInSecond, TimeUnit.SECONDS)
105                 .takeUntil(stop)
106                 .observeOn(Schedulers.io())
107                 .flatMap(n -> {
108                     // Try to get a newer bean
109                     V newerVOOrNull = self.getIfNewer(entityClass, targetClass, id, lastUpdateDate.getTime());
110 
111                     // Update the date used for comparision
112                     if (newerVOOrNull != null) {
113                         lastUpdateDate.setTime(newerVOOrNull.getUpdateDate());
114                         return Observable.just(newerVOOrNull);
115                     }
116                     return Observable.empty();
117                 });
118 
119         // Sending the initial value when starting
120         if (startWithActualValue) {
121             // Convert the entity into VO
122             V initialVO = conversionService.convert(initialEntity, targetClass);
123             if (initialVO == null) {
124                 throw new DataNotFoundException(I18n.t("sumaris.error.notFound", entityClass.getSimpleName(), id));
125             }
126             observable = observable.startWith(initialVO);
127         }
128 
129 
130         return observable.toFlowable(BackpressureStrategy.LATEST);
131     }
132 
133     @Override
134     public <K extends Serializable, D extends Date, T extends IUpdateDateEntityBean<K, D>, V extends IUpdateDateEntityBean<K, D>> V
135     getIfNewer(Class<T> entityClass,
136                Class<V> targetClass,
137                K id,
138                Date lastUpdateDate) {
139 
140         T entity = dataChangeDao.get(entityClass, id);
141         // Entity has been deleted
142         if (entity == null) {
143             throw new DataNotFoundException(I18n.t("sumaris.error.notFound", entityClass.getSimpleName(), id));
144         }
145 
146         // Entity is newer than last update date
147         if (entity.getUpdateDate() != null && entity.getUpdateDate().after(lastUpdateDate)) {
148             if (!conversionService.canConvert(entityClass, targetClass)) {
149                 throw new SumarisTechnicalException(I18n.t("sumaris.error.missingConverter", entityClass.getSimpleName()));
150             }
151             return conversionService.convert(entity, targetClass);
152         }
153         return null;
154     }
155 
156     /* -- -- */
157 }