Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
apollo
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
openSource
apollo
Commits
52e88f6e
Commit
52e88f6e
authored
Jun 01, 2016
by
Jason Song
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add notification id for long poll
parent
cdcd35a7
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
283 additions
and
68 deletions
+283
-68
ReleaseMessage.java
...com/ctrip/framework/apollo/biz/entity/ReleaseMessage.java
+12
-0
ReleaseMessageListener.java
.../framework/apollo/biz/message/ReleaseMessageListener.java
+4
-2
ReleaseMessageScanner.java
...p/framework/apollo/biz/message/ReleaseMessageScanner.java
+4
-4
ReleaseMessageRepository.java
...ework/apollo/biz/repository/ReleaseMessageRepository.java
+3
-0
ReleaseMessageService.java
...p/framework/apollo/biz/service/ReleaseMessageService.java
+22
-0
ReleaseMessageScannerTest.java
...amework/apollo/biz/message/ReleaseMessageScannerTest.java
+10
-8
RemoteConfigRepository.java
...ip/framework/apollo/internals/RemoteConfigRepository.java
+15
-3
ConfigIntegrationTest.java
...p/framework/apollo/integration/ConfigIntegrationTest.java
+2
-1
NotificationController.java
...ollo/configservice/controller/NotificationController.java
+50
-31
NotificationControllerTest.java
.../configservice/controller/NotificationControllerTest.java
+81
-9
NotificationControllerIntegrationTest.java
...ce/integration/NotificationControllerIntegrationTest.java
+54
-1
cleanup.sql
...igservice/src/test/resources/integration-test/cleanup.sql
+1
-0
test-release-message.sql
.../test/resources/integration-test/test-release-message.sql
+6
-0
ApolloConfigNotification.java
...p/framework/apollo/core/dto/ApolloConfigNotification.java
+13
-3
ClassLoaderUtil.java
...om/ctrip/framework/apollo/core/utils/ClassLoaderUtil.java
+1
-1
ResourceUtils.java
.../com/ctrip/framework/apollo/core/utils/ResourceUtils.java
+5
-5
No files found.
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/entity/ReleaseMessage.java
View file @
52e88f6e
package
com
.
ctrip
.
framework
.
apollo
.
biz
.
entity
;
import
com.google.common.base.MoreObjects
;
import
java.util.Date
;
import
javax.persistence.Column
;
...
...
@@ -55,4 +57,14 @@ public class ReleaseMessage {
public
void
setMessage
(
String
message
)
{
this
.
message
=
message
;
}
@Override
public
String
toString
()
{
return
MoreObjects
.
toStringHelper
(
this
)
.
omitNullValues
()
.
add
(
"id"
,
id
)
.
add
(
"message"
,
message
)
.
add
(
"dataChangeLastModifiedTime"
,
dataChangeLastModifiedTime
)
.
toString
();
}
}
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/MessageListener.java
→
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/
Release
MessageListener.java
View file @
52e88f6e
package
com
.
ctrip
.
framework
.
apollo
.
biz
.
message
;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public
interface
MessageListener
{
void
handleMessage
(
String
message
,
String
channel
);
public
interface
Release
MessageListener
{
void
handleMessage
(
ReleaseMessage
message
,
String
channel
);
}
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/ReleaseMessageScanner.java
View file @
52e88f6e
...
...
@@ -33,7 +33,7 @@ public class ReleaseMessageScanner implements InitializingBean {
@Autowired
private
ReleaseMessageRepository
releaseMessageRepository
;
private
int
databaseScanInterval
;
private
List
<
MessageListener
>
listeners
;
private
List
<
Release
MessageListener
>
listeners
;
private
ScheduledExecutorService
executorService
;
private
long
maxIdScanned
;
...
...
@@ -66,7 +66,7 @@ public class ReleaseMessageScanner implements InitializingBean {
* add message listeners for release message
* @param listener
*/
public
void
addMessageListener
(
MessageListener
listener
)
{
public
void
addMessageListener
(
Release
MessageListener
listener
)
{
if
(!
listeners
.
contains
(
listener
))
{
listeners
.
add
(
listener
);
}
...
...
@@ -115,9 +115,9 @@ public class ReleaseMessageScanner implements InitializingBean {
*/
private
void
fireMessageScanned
(
List
<
ReleaseMessage
>
messages
)
{
for
(
ReleaseMessage
message
:
messages
)
{
for
(
MessageListener
listener
:
listeners
)
{
for
(
Release
MessageListener
listener
:
listeners
)
{
try
{
listener
.
handleMessage
(
message
.
getMessage
()
,
Topics
.
APOLLO_RELEASE_TOPIC
);
listener
.
handleMessage
(
message
,
Topics
.
APOLLO_RELEASE_TOPIC
);
}
catch
(
Throwable
ex
)
{
Cat
.
logError
(
ex
);
logger
.
error
(
"Failed to invoke message listener {}"
,
listener
.
getClass
(),
ex
);
...
...
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/repository/ReleaseMessageRepository.java
View file @
52e88f6e
...
...
@@ -4,6 +4,7 @@ import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import
org.springframework.data.repository.PagingAndSortingRepository
;
import
java.util.Collection
;
import
java.util.List
;
/**
...
...
@@ -13,4 +14,6 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel
List
<
ReleaseMessage
>
findFirst500ByIdGreaterThanOrderByIdAsc
(
Long
id
);
ReleaseMessage
findTopByOrderByIdDesc
();
ReleaseMessage
findTopByMessageInOrderByIdDesc
(
Collection
<
String
>
messages
);
}
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/service/ReleaseMessageService.java
0 → 100644
View file @
52e88f6e
package
com
.
ctrip
.
framework
.
apollo
.
biz
.
service
;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
import
com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
java.util.Collection
;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Service
public
class
ReleaseMessageService
{
@Autowired
private
ReleaseMessageRepository
releaseMessageRepository
;
public
ReleaseMessage
findLatestReleaseMessageForMessages
(
Collection
<
String
>
messages
)
{
return
releaseMessageRepository
.
findTopByMessageInOrderByIdDesc
(
messages
);
}
}
apollo-biz/src/test/java/com/ctrip/framework/apollo/biz/message/ReleaseMessageScannerTest.java
View file @
52e88f6e
...
...
@@ -44,8 +44,8 @@ public class ReleaseMessageScannerTest {
@Test
public
void
testScanMessageAndNotifyMessageListener
()
throws
Exception
{
SettableFuture
<
String
>
someListenerFuture
=
SettableFuture
.
create
();
MessageListener
someListener
=
(
message
,
channel
)
->
someListenerFuture
.
set
(
message
);
SettableFuture
<
ReleaseMessage
>
someListenerFuture
=
SettableFuture
.
create
();
Release
MessageListener
someListener
=
(
message
,
channel
)
->
someListenerFuture
.
set
(
message
);
releaseMessageScanner
.
addMessageListener
(
someListener
);
String
someMessage
=
"someMessage"
;
...
...
@@ -55,13 +55,14 @@ public class ReleaseMessageScannerTest {
when
(
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
0L
)).
thenReturn
(
Lists
.
newArrayList
(
someReleaseMessage
));
String
someListenerMessage
=
ReleaseMessage
someListenerMessage
=
someListenerFuture
.
get
(
5000
,
TimeUnit
.
MILLISECONDS
);
assertEquals
(
someMessage
,
someListenerMessage
);
assertEquals
(
someMessage
,
someListenerMessage
.
getMessage
());
assertEquals
(
someId
,
someListenerMessage
.
getId
());
SettableFuture
<
String
>
anotherListenerFuture
=
SettableFuture
.
create
();
MessageListener
anotherListener
=
(
message
,
channel
)
->
anotherListenerFuture
.
set
(
message
);
SettableFuture
<
ReleaseMessage
>
anotherListenerFuture
=
SettableFuture
.
create
();
Release
MessageListener
anotherListener
=
(
message
,
channel
)
->
anotherListenerFuture
.
set
(
message
);
releaseMessageScanner
.
addMessageListener
(
anotherListener
);
String
anotherMessage
=
"anotherMessage"
;
...
...
@@ -71,10 +72,11 @@ public class ReleaseMessageScannerTest {
when
(
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
someId
)).
thenReturn
(
Lists
.
newArrayList
(
anotherReleaseMessage
));
String
anotherListenerMessage
=
ReleaseMessage
anotherListenerMessage
=
anotherListenerFuture
.
get
(
5000
,
TimeUnit
.
MILLISECONDS
);
assertEquals
(
anotherMessage
,
anotherListenerMessage
);
assertEquals
(
anotherMessage
,
anotherListenerMessage
.
getMessage
());
assertEquals
(
anotherId
,
anotherListenerMessage
.
getId
());
}
...
...
apollo-client/src/main/java/com/ctrip/framework/apollo/internals/RemoteConfigRepository.java
View file @
52e88f6e
...
...
@@ -59,6 +59,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private
final
AtomicBoolean
m_longPollingStopped
;
private
SchedulePolicy
m_longPollSchedulePolicy
;
private
AtomicReference
<
ServiceDTO
>
m_longPollServiceDto
;
private
AtomicReference
<
ApolloConfigNotification
>
m_longPollResult
;
/**
* Constructor.
...
...
@@ -82,8 +83,9 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
m_executorService
=
Executors
.
newScheduledThreadPool
(
1
,
ApolloThreadFactory
.
create
(
"RemoteConfigRepository"
,
true
));
m_longPollingService
=
Executors
.
newFixedThreadPool
(
2
,
ApolloThreadFactory
.
create
(
"RemoteConfigRepository-LongPolling"
,
true
));
ApolloThreadFactory
.
create
(
"RemoteConfigRepository-LongPolling"
,
true
));
m_longPollServiceDto
=
new
AtomicReference
<>();
m_longPollResult
=
new
AtomicReference
<>();
this
.
trySync
();
this
.
schedulePeriodicRefresh
();
this
.
scheduleLongPollingRefresh
();
...
...
@@ -270,7 +272,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
String
url
=
assembleLongPollRefreshUrl
(
lastServiceDto
.
getHomepageUrl
(),
appId
,
cluster
,
m_namespace
,
dataCenter
);
m_namespace
,
dataCenter
,
m_longPollResult
.
get
()
);
logger
.
debug
(
"Long polling from {}"
,
url
);
HttpRequest
request
=
new
HttpRequest
(
url
);
...
...
@@ -286,6 +288,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
logger
.
debug
(
"Long polling response: {}, url: {}"
,
response
.
getStatusCode
(),
url
);
if
(
response
.
getStatusCode
()
==
200
)
{
m_longPollServiceDto
.
set
(
lastServiceDto
);
if
(
response
.
getBody
()
!=
null
)
{
m_longPollResult
.
set
(
response
.
getBody
());
transaction
.
addData
(
"Result"
,
response
.
getBody
().
toString
());
}
longPollingService
.
submit
(
new
Runnable
()
{
@Override
public
void
run
()
{
...
...
@@ -320,7 +326,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
}
private
String
assembleLongPollRefreshUrl
(
String
uri
,
String
appId
,
String
cluster
,
String
namespace
,
String
dataCenter
)
{
String
namespace
,
String
dataCenter
,
ApolloConfigNotification
previousResult
)
{
Escaper
escaper
=
UrlEscapers
.
urlPathSegmentEscaper
();
Map
<
String
,
String
>
queryParams
=
Maps
.
newHashMap
();
queryParams
.
put
(
"appId"
,
escaper
.
escape
(
appId
));
...
...
@@ -337,6 +344,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
queryParams
.
put
(
"ip"
,
escaper
.
escape
(
localIp
));
}
if
(
previousResult
!=
null
)
{
//number doesn't need encode
queryParams
.
put
(
"notificationId"
,
String
.
valueOf
(
previousResult
.
getNotificationId
()));
}
String
params
=
MAP_JOINER
.
join
(
queryParams
);
if
(!
uri
.
endsWith
(
"/"
))
{
uri
+=
"/"
;
...
...
apollo-client/src/test/java/com/ctrip/framework/apollo/integration/ConfigIntegrationTest.java
View file @
52e88f6e
...
...
@@ -234,6 +234,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
final
String
someKey
=
"someKey"
;
final
String
someValue
=
"someValue"
;
final
String
anotherValue
=
"anotherValue"
;
long
someNotificationId
=
1
;
long
pollTimeoutInMS
=
50
;
Map
<
String
,
String
>
configurations
=
Maps
.
newHashMap
();
...
...
@@ -242,7 +243,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
ContextHandler
configHandler
=
mockConfigServerHandler
(
HttpServletResponse
.
SC_OK
,
apolloConfig
);
ContextHandler
pollHandler
=
mockPollNotificationHandler
(
pollTimeoutInMS
,
HttpServletResponse
.
SC_OK
,
new
ApolloConfigNotification
(
apolloConfig
.
getNamespaceName
()),
false
);
new
ApolloConfigNotification
(
apolloConfig
.
getNamespaceName
()
,
someNotificationId
),
false
);
startServerWithHandlers
(
configHandler
,
pollHandler
);
...
...
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationController.java
View file @
52e88f6e
...
...
@@ -10,9 +10,11 @@ import com.google.common.collect.Multimaps;
import
com.google.common.collect.Sets
;
import
com.ctrip.framework.apollo.biz.entity.AppNamespace
;
import
com.ctrip.framework.apollo.biz.message.MessageListener
;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
import
com.ctrip.framework.apollo.biz.message.ReleaseMessageListener
;
import
com.ctrip.framework.apollo.biz.message.Topics
;
import
com.ctrip.framework.apollo.biz.service.AppNamespaceService
;
import
com.ctrip.framework.apollo.biz.service.ReleaseMessageService
;
import
com.ctrip.framework.apollo.biz.utils.EntityManagerUtil
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
import
com.ctrip.framework.apollo.core.dto.ApolloConfigNotification
;
...
...
@@ -38,7 +40,7 @@ import java.util.Set;
*/
@RestController
@RequestMapping
(
"/notifications"
)
public
class
NotificationController
implements
MessageListener
{
public
class
NotificationController
implements
Release
MessageListener
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NotificationController
.
class
);
private
static
final
long
TIMEOUT
=
30
*
1000
;
//30 seconds
private
final
Multimap
<
String
,
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>>
...
...
@@ -53,6 +55,9 @@ public class NotificationController implements MessageListener {
private
AppNamespaceService
appNamespaceService
;
@Autowired
private
ReleaseMessageService
releaseMessageService
;
@Autowired
private
EntityManagerUtil
entityManagerUtil
;
@RequestMapping
(
method
=
RequestMethod
.
GET
)
...
...
@@ -61,6 +66,7 @@ public class NotificationController implements MessageListener {
@RequestParam
(
value
=
"cluster"
)
String
cluster
,
@RequestParam
(
value
=
"namespace"
,
defaultValue
=
ConfigConsts
.
NAMESPACE_DEFAULT
)
String
namespace
,
@RequestParam
(
value
=
"dataCenter"
,
required
=
false
)
String
dataCenter
,
@RequestParam
(
value
=
"notificationId"
,
defaultValue
=
"-1"
)
long
notificationId
,
@RequestParam
(
value
=
"ip"
,
required
=
false
)
String
clientIp
)
{
Set
<
String
>
watchedKeys
=
assembleWatchKeys
(
appId
,
cluster
,
namespace
,
dataCenter
);
...
...
@@ -72,24 +78,42 @@ public class NotificationController implements MessageListener {
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
deferredResult
=
new
DeferredResult
<>(
TIMEOUT
,
NOT_MODIFIED_RESPONSE
);
//register all keys
for
(
String
key
:
watchedKeys
)
{
this
.
deferredResults
.
put
(
key
,
deferredResult
);
}
//check whether client is out-dated
ReleaseMessage
latest
=
releaseMessageService
.
findLatestReleaseMessageForMessages
(
watchedKeys
);
deferredResult
.
onTimeout
(()
->
logWatchedKeysToCat
(
watchedKeys
,
"Apollo.LongPoll.TimeOutKeys"
));
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil
.
closeEntityManager
();
deferredResult
.
onCompletion
(()
->
{
//unregister all keys
if
(
latest
!=
null
&&
latest
.
getId
()
!=
notificationId
)
{
deferredResult
.
setResult
(
new
ResponseEntity
<>(
new
ApolloConfigNotification
(
namespace
,
latest
.
getId
()),
HttpStatus
.
OK
));
}
else
{
//register all keys
for
(
String
key
:
watchedKeys
)
{
deferredResults
.
remove
(
key
,
deferredResult
);
this
.
deferredResults
.
put
(
key
,
deferredResult
);
}
logWatchedKeysToCat
(
watchedKeys
,
"Apollo.LongPoll.CompletedKeys"
);
});
logWatchedKeysToCat
(
watchedKeys
,
"Apollo.LongPoll.RegisteredKeys"
);
logger
.
info
(
"Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}"
,
watchedKeys
,
appId
,
cluster
,
namespace
,
dataCenter
);
deferredResult
.
onTimeout
(()
->
logWatchedKeysToCat
(
watchedKeys
,
"Apollo.LongPoll.TimeOutKeys"
));
deferredResult
.
onCompletion
(()
->
{
//unregister all keys
for
(
String
key
:
watchedKeys
)
{
deferredResults
.
remove
(
key
,
deferredResult
);
}
logWatchedKeysToCat
(
watchedKeys
,
"Apollo.LongPoll.CompletedKeys"
);
});
logWatchedKeysToCat
(
watchedKeys
,
"Apollo.LongPoll.RegisteredKeys"
);
logger
.
info
(
"Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}"
,
watchedKeys
,
appId
,
cluster
,
namespace
,
dataCenter
);
}
return
deferredResult
;
}
...
...
@@ -101,13 +125,6 @@ public class NotificationController implements MessageListener {
String
namespace
,
String
dataCenter
)
{
AppNamespace
appNamespace
=
appNamespaceService
.
findByNamespaceName
(
namespace
);
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil
.
closeEntityManager
();
//check whether the namespace's appId equals to current one
if
(
Objects
.
isNull
(
appNamespace
)
||
Objects
.
equals
(
applicationId
,
appNamespace
.
getAppId
()))
{
...
...
@@ -140,27 +157,29 @@ public class NotificationController implements MessageListener {
}
@Override
public
void
handleMessage
(
String
message
,
String
channel
)
{
public
void
handleMessage
(
ReleaseMessage
message
,
String
channel
)
{
logger
.
info
(
"message received - channel: {}, message: {}"
,
channel
,
message
);
Cat
.
logEvent
(
"Apollo.LongPoll.Messages"
,
message
);
if
(!
Topics
.
APOLLO_RELEASE_TOPIC
.
equals
(
channel
)
||
Strings
.
isNullOrEmpty
(
message
))
{
String
content
=
message
.
getMessage
();
Cat
.
logEvent
(
"Apollo.LongPoll.Messages"
,
content
);
if
(!
Topics
.
APOLLO_RELEASE_TOPIC
.
equals
(
channel
)
||
Strings
.
isNullOrEmpty
(
content
))
{
return
;
}
List
<
String
>
keys
=
STRING_SPLITTER
.
splitToList
(
message
);
//message should be appId
|cluster|
namespace
List
<
String
>
keys
=
STRING_SPLITTER
.
splitToList
(
content
);
//message should be appId
+cluster+
namespace
if
(
keys
.
size
()
!=
3
)
{
logger
.
error
(
"message format invalid - {}"
,
message
);
logger
.
error
(
"message format invalid - {}"
,
content
);
return
;
}
ResponseEntity
<
ApolloConfigNotification
>
notification
=
new
ResponseEntity
<>(
new
ApolloConfigNotification
(
keys
.
get
(
2
)),
HttpStatus
.
OK
);
new
ApolloConfigNotification
(
keys
.
get
(
2
)
,
message
.
getId
()
),
HttpStatus
.
OK
);
//create a new list to avoid ConcurrentModificationException
List
<
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>>
results
=
Lists
.
newArrayList
(
deferredResults
.
get
(
message
));
logger
.
info
(
"Notify {} clients for key {}"
,
results
.
size
(),
message
);
Lists
.
newArrayList
(
deferredResults
.
get
(
content
));
logger
.
info
(
"Notify {} clients for key {}"
,
results
.
size
(),
content
);
for
(
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
result
:
results
)
{
result
.
setResult
(
notification
);
...
...
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerTest.java
View file @
52e88f6e
...
...
@@ -5,8 +5,10 @@ import com.google.common.collect.Lists;
import
com.google.common.collect.Multimap
;
import
com.ctrip.framework.apollo.biz.entity.AppNamespace
;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
import
com.ctrip.framework.apollo.biz.message.Topics
;
import
com.ctrip.framework.apollo.biz.service.AppNamespaceService
;
import
com.ctrip.framework.apollo.biz.service.ReleaseMessageService
;
import
com.ctrip.framework.apollo.biz.utils.EntityManagerUtil
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
import
com.ctrip.framework.apollo.core.dto.ApolloConfigNotification
;
...
...
@@ -25,6 +27,8 @@ import java.util.List;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
import
static
org
.
mockito
.
Matchers
.
anyCollectionOf
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
/**
...
...
@@ -39,10 +43,13 @@ public class NotificationControllerTest {
private
String
defaultNamespace
;
private
String
somePublicNamespace
;
private
String
someDataCenter
;
private
long
someNotificationId
;
private
String
someClientIp
;
@Mock
private
AppNamespaceService
appNamespaceService
;
@Mock
private
ReleaseMessageService
releaseMessageService
;
@Mock
private
EntityManagerUtil
entityManagerUtil
;
private
Multimap
<
String
,
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>>
deferredResults
;
...
...
@@ -51,6 +58,7 @@ public class NotificationControllerTest {
public
void
setUp
()
throws
Exception
{
controller
=
new
NotificationController
();
ReflectionTestUtils
.
setField
(
controller
,
"appNamespaceService"
,
appNamespaceService
);
ReflectionTestUtils
.
setField
(
controller
,
"releaseMessageService"
,
releaseMessageService
);
ReflectionTestUtils
.
setField
(
controller
,
"entityManagerUtil"
,
entityManagerUtil
);
someAppId
=
"someAppId"
;
...
...
@@ -59,6 +67,7 @@ public class NotificationControllerTest {
defaultNamespace
=
ConfigConsts
.
NAMESPACE_DEFAULT
;
somePublicNamespace
=
"somePublicNamespace"
;
someDataCenter
=
"someDC"
;
someNotificationId
=
1
;
someClientIp
=
"someClientIp"
;
deferredResults
=
...
...
@@ -70,7 +79,8 @@ public class NotificationControllerTest {
public
void
testPollNotificationWithDefaultNamespace
()
throws
Exception
{
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
deferredResult
=
controller
.
pollNotification
(
someAppId
,
someCluster
,
defaultNamespace
,
someDataCenter
,
someClientIp
);
.
pollNotification
(
someAppId
,
someCluster
,
defaultNamespace
,
someDataCenter
,
someNotificationId
,
someClientIp
);
List
<
String
>
clusters
=
Lists
.
newArrayList
(
someCluster
,
someDataCenter
,
ConfigConsts
.
CLUSTER_NAME_DEFAULT
);
...
...
@@ -86,11 +96,34 @@ public class NotificationControllerTest {
}
@Test
public
void
testPollNotificationWithDefaultNamespaceWithNotificationIdOutDated
()
throws
Exception
{
long
notificationId
=
someNotificationId
+
1
;
ReleaseMessage
someReleaseMessage
=
mock
(
ReleaseMessage
.
class
);
when
(
someReleaseMessage
.
getId
()).
thenReturn
(
notificationId
);
when
(
releaseMessageService
.
findLatestReleaseMessageForMessages
(
anyCollectionOf
(
String
.
class
)))
.
thenReturn
(
someReleaseMessage
);
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
deferredResult
=
controller
.
pollNotification
(
someAppId
,
someCluster
,
defaultNamespace
,
someDataCenter
,
someNotificationId
,
someClientIp
);
ResponseEntity
<
ApolloConfigNotification
>
result
=
(
ResponseEntity
<
ApolloConfigNotification
>)
deferredResult
.
getResult
();
assertEquals
(
HttpStatus
.
OK
,
result
.
getStatusCode
());
assertEquals
(
defaultNamespace
,
result
.
getBody
().
getNamespaceName
());
assertEquals
(
notificationId
,
result
.
getBody
().
getNotificationId
());
}
@Test
public
void
testPollNotificationWithDefaultNamespaceWithDefaultClusterWithDataCenter
()
throws
Exception
{
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
deferredResult
=
controller
.
pollNotification
(
someAppId
,
defaultCluster
,
defaultNamespace
,
someDataCenter
,
someClientIp
);
.
pollNotification
(
someAppId
,
defaultCluster
,
defaultNamespace
,
someDataCenter
,
someNotificationId
,
someClientIp
);
List
<
String
>
clusters
=
Lists
.
newArrayList
(
someDataCenter
,
defaultCluster
);
...
...
@@ -110,7 +143,7 @@ public class NotificationControllerTest {
throws
Exception
{
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
deferredResult
=
controller
.
pollNotification
(
someAppId
,
defaultCluster
,
defaultNamespace
,
null
,
someClientIp
);
.
pollNotification
(
someAppId
,
defaultCluster
,
defaultNamespace
,
null
,
some
NotificationId
,
some
ClientIp
);
List
<
String
>
clusters
=
Lists
.
newArrayList
(
defaultCluster
);
...
...
@@ -137,7 +170,8 @@ public class NotificationControllerTest {
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
deferredResult
=
controller
.
pollNotification
(
someAppId
,
someCluster
,
somePublicNamespace
,
someDataCenter
,
someClientIp
);
.
pollNotification
(
someAppId
,
someCluster
,
somePublicNamespace
,
someDataCenter
,
someNotificationId
,
someClientIp
);
List
<
String
>
clusters
=
Lists
.
newArrayList
(
someCluster
,
someDataCenter
,
ConfigConsts
.
CLUSTER_NAME_DEFAULT
);
...
...
@@ -160,16 +194,49 @@ public class NotificationControllerTest {
}
@Test
public
void
testPollNotificationWithPublicNamespaceWithNotificationIdOutDated
()
throws
Exception
{
long
notificationId
=
someNotificationId
+
1
;
ReleaseMessage
someReleaseMessage
=
mock
(
ReleaseMessage
.
class
);
when
(
someReleaseMessage
.
getId
()).
thenReturn
(
notificationId
);
when
(
releaseMessageService
.
findLatestReleaseMessageForMessages
(
anyCollectionOf
(
String
.
class
)))
.
thenReturn
(
someReleaseMessage
);
String
somePublicAppId
=
"somePublicAppId"
;
AppNamespace
somePublicAppNamespace
=
assmbleAppNamespace
(
somePublicAppId
,
somePublicNamespace
);
when
(
appNamespaceService
.
findByNamespaceName
(
somePublicNamespace
))
.
thenReturn
(
somePublicAppNamespace
);
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
deferredResult
=
controller
.
pollNotification
(
someAppId
,
someCluster
,
somePublicNamespace
,
someDataCenter
,
someNotificationId
,
someClientIp
);
ResponseEntity
<
ApolloConfigNotification
>
result
=
(
ResponseEntity
<
ApolloConfigNotification
>)
deferredResult
.
getResult
();
assertEquals
(
HttpStatus
.
OK
,
result
.
getStatusCode
());
assertEquals
(
somePublicNamespace
,
result
.
getBody
().
getNamespaceName
());
assertEquals
(
notificationId
,
result
.
getBody
().
getNotificationId
());
}
@Test
public
void
testPollNotificationWithDefaultNamespaceAndHandleMessage
()
throws
Exception
{
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
deferredResult
=
controller
.
pollNotification
(
someAppId
,
someCluster
,
defaultNamespace
,
someDataCenter
,
someClientIp
);
.
pollNotification
(
someAppId
,
someCluster
,
defaultNamespace
,
someDataCenter
,
someNotificationId
,
someClientIp
);
String
key
=
Joiner
.
on
(
ConfigConsts
.
CLUSTER_NAMESPACE_SEPARATOR
)
.
join
(
someAppId
,
someCluster
,
defaultNamespace
);
long
someId
=
1
;
ReleaseMessage
someReleaseMessage
=
new
ReleaseMessage
(
key
);
someReleaseMessage
.
setId
(
someId
);
controller
.
handleMessage
(
key
,
Topics
.
APOLLO_RELEASE_TOPIC
);
controller
.
handleMessage
(
someReleaseMessage
,
Topics
.
APOLLO_RELEASE_TOPIC
);
ResponseEntity
<
ApolloConfigNotification
>
response
=
(
ResponseEntity
<
ApolloConfigNotification
>)
deferredResult
.
getResult
();
...
...
@@ -177,6 +244,7 @@ public class NotificationControllerTest {
assertEquals
(
HttpStatus
.
OK
,
response
.
getStatusCode
());
assertEquals
(
defaultNamespace
,
notification
.
getNamespaceName
());
assertEquals
(
someId
,
notification
.
getNotificationId
());
}
@Test
...
...
@@ -190,13 +258,17 @@ public class NotificationControllerTest {
DeferredResult
<
ResponseEntity
<
ApolloConfigNotification
>>
deferredResult
=
controller
.
pollNotification
(
someAppId
,
someCluster
,
somePublicNamespace
,
someDataCenter
,
someClientIp
);
.
pollNotification
(
someAppId
,
someCluster
,
somePublicNamespace
,
someDataCenter
,
someNotificationId
,
someClientIp
);
String
key
=
Joiner
.
on
(
ConfigConsts
.
CLUSTER_NAMESPACE_SEPARATOR
)
.
join
(
somePublicAppId
,
someDataCenter
,
somePublicNamespace
);
long
someId
=
1
;
ReleaseMessage
someReleaseMessage
=
new
ReleaseMessage
(
key
);
someReleaseMessage
.
setId
(
someId
);
controller
.
handleMessage
(
key
,
Topics
.
APOLLO_RELEASE_TOPIC
);
controller
.
handleMessage
(
someReleaseMessage
,
Topics
.
APOLLO_RELEASE_TOPIC
);
ResponseEntity
<
ApolloConfigNotification
>
response
=
(
ResponseEntity
<
ApolloConfigNotification
>)
deferredResult
.
getResult
();
...
...
@@ -204,7 +276,7 @@ public class NotificationControllerTest {
assertEquals
(
HttpStatus
.
OK
,
response
.
getStatusCode
());
assertEquals
(
somePublicNamespace
,
notification
.
getNamespaceName
());
assertEquals
(
someId
,
notification
.
getNotificationId
());
}
private
AppNamespace
assmbleAppNamespace
(
String
appId
,
String
namespace
)
{
...
...
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/integration/NotificationControllerIntegrationTest.java
View file @
52e88f6e
...
...
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
junit
.
Assert
.
assertNotEquals
;
/**
* @author Jason Song(song_s@ctrip.com)
...
...
@@ -45,7 +46,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
executorService
=
Executors
.
newSingleThreadExecutor
();
}
@Test
@Test
(
timeout
=
5000L
)
public
void
testPollNotificationWithDefaultNamespace
()
throws
Exception
{
AtomicBoolean
stop
=
new
AtomicBoolean
();
perodicSendMessage
(
assembleKey
(
someAppId
,
someCluster
,
defaultNamespace
),
stop
);
...
...
@@ -60,6 +61,39 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
ApolloConfigNotification
notification
=
result
.
getBody
();
assertEquals
(
HttpStatus
.
OK
,
result
.
getStatusCode
());
assertEquals
(
defaultNamespace
,
notification
.
getNamespaceName
());
assertNotEquals
(
0
,
notification
.
getNotificationId
());
}
@Test
(
timeout
=
5000L
)
@Sql
(
scripts
=
"/integration-test/test-release-message.sql"
,
executionPhase
=
Sql
.
ExecutionPhase
.
BEFORE_TEST_METHOD
)
@Sql
(
scripts
=
"/integration-test/cleanup.sql"
,
executionPhase
=
Sql
.
ExecutionPhase
.
AFTER_TEST_METHOD
)
public
void
testPollNotificationWithDefaultNamespaceWithNotificationIdNull
()
throws
Exception
{
ResponseEntity
<
ApolloConfigNotification
>
result
=
restTemplate
.
getForEntity
(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}"
,
ApolloConfigNotification
.
class
,
getHostUrl
(),
someAppId
,
someCluster
,
defaultNamespace
);
ApolloConfigNotification
notification
=
result
.
getBody
();
assertEquals
(
HttpStatus
.
OK
,
result
.
getStatusCode
());
assertEquals
(
defaultNamespace
,
notification
.
getNamespaceName
());
assertEquals
(
10
,
notification
.
getNotificationId
());
}
@Test
(
timeout
=
5000L
)
@Sql
(
scripts
=
"/integration-test/test-release.sql"
,
executionPhase
=
Sql
.
ExecutionPhase
.
BEFORE_TEST_METHOD
)
@Sql
(
scripts
=
"/integration-test/test-release-message.sql"
,
executionPhase
=
Sql
.
ExecutionPhase
.
BEFORE_TEST_METHOD
)
@Sql
(
scripts
=
"/integration-test/cleanup.sql"
,
executionPhase
=
Sql
.
ExecutionPhase
.
AFTER_TEST_METHOD
)
public
void
testPollNotificationWithDefaultNamespaceWithNotificationIdOutDated
()
throws
Exception
{
long
someOutDatedNotificationId
=
1
;
ResponseEntity
<
ApolloConfigNotification
>
result
=
restTemplate
.
getForEntity
(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}¬ificationId={notificationId}"
,
ApolloConfigNotification
.
class
,
getHostUrl
(),
someAppId
,
someCluster
,
defaultNamespace
,
someOutDatedNotificationId
);
ApolloConfigNotification
notification
=
result
.
getBody
();
assertEquals
(
HttpStatus
.
OK
,
result
.
getStatusCode
());
assertEquals
(
defaultNamespace
,
notification
.
getNamespaceName
());
assertEquals
(
10
,
notification
.
getNotificationId
());
}
@Test
(
timeout
=
5000L
)
...
...
@@ -82,6 +116,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
ApolloConfigNotification
notification
=
result
.
getBody
();
assertEquals
(
HttpStatus
.
OK
,
result
.
getStatusCode
());
assertEquals
(
somePublicNamespace
,
notification
.
getNamespaceName
());
assertNotEquals
(
0
,
notification
.
getNotificationId
());
}
@Test
(
timeout
=
5000L
)
...
...
@@ -105,6 +140,24 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
ApolloConfigNotification
notification
=
result
.
getBody
();
assertEquals
(
HttpStatus
.
OK
,
result
.
getStatusCode
());
assertEquals
(
somePublicNamespace
,
notification
.
getNamespaceName
());
assertNotEquals
(
0
,
notification
.
getNotificationId
());
}
@Test
(
timeout
=
5000L
)
@Sql
(
scripts
=
"/integration-test/test-release.sql"
,
executionPhase
=
Sql
.
ExecutionPhase
.
BEFORE_TEST_METHOD
)
@Sql
(
scripts
=
"/integration-test/test-release-message.sql"
,
executionPhase
=
Sql
.
ExecutionPhase
.
BEFORE_TEST_METHOD
)
@Sql
(
scripts
=
"/integration-test/cleanup.sql"
,
executionPhase
=
Sql
.
ExecutionPhase
.
AFTER_TEST_METHOD
)
public
void
testPollNotificationWithPublicNamespaceWithNotificationIdOutDated
()
throws
Exception
{
long
someOutDatedNotificationId
=
1
;
ResponseEntity
<
ApolloConfigNotification
>
result
=
restTemplate
.
getForEntity
(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}¬ificationId={notificationId}"
,
ApolloConfigNotification
.
class
,
getHostUrl
(),
someAppId
,
someCluster
,
somePublicNamespace
,
someOutDatedNotificationId
);
ApolloConfigNotification
notification
=
result
.
getBody
();
assertEquals
(
HttpStatus
.
OK
,
result
.
getStatusCode
());
assertEquals
(
somePublicNamespace
,
notification
.
getNamespaceName
());
assertEquals
(
20
,
notification
.
getNotificationId
());
}
private
String
assembleKey
(
String
appId
,
String
cluster
,
String
namespace
)
{
...
...
apollo-configservice/src/test/resources/integration-test/cleanup.sql
View file @
52e88f6e
...
...
@@ -3,4 +3,5 @@ DELETE FROM Namespace;
DELETE
FROM
AppNamespace
;
DELETE
FROM
Cluster
;
DELETE
FROM
App
;
DELETE
FROM
ReleaseMessage
;
apollo-configservice/src/test/resources/integration-test/test-release-message.sql
0 → 100644
View file @
52e88f6e
INSERT
INTO
`releasemessage`
(
`Id`
,
`Message`
)
VALUES
(
10
,
'someAppId+default+application'
);
INSERT
INTO
`releasemessage`
(
`Id`
,
`Message`
)
VALUES
(
20
,
'somePublicAppId+default+somePublicNamespace'
);
apollo-core/src/main/java/com/ctrip/framework/apollo/core/dto/ApolloConfigNotification.java
View file @
52e88f6e
...
...
@@ -5,20 +5,30 @@ package com.ctrip.framework.apollo.core.dto;
*/
public
class
ApolloConfigNotification
{
private
String
namespaceName
;
private
long
notificationId
;
//for json converter
public
ApolloConfigNotification
()
{
}
public
ApolloConfigNotification
(
String
namespaceName
)
{
public
ApolloConfigNotification
(
String
namespaceName
,
long
notificationId
)
{
this
.
namespaceName
=
namespaceName
;
this
.
notificationId
=
notificationId
;
}
public
String
getNamespaceName
()
{
return
namespaceName
;
}
public
void
setNamespaceName
(
String
namespaceName
)
{
this
.
namespaceName
=
namespaceName
;
public
long
getNotificationId
()
{
return
notificationId
;
}
@Override
public
String
toString
()
{
return
"ApolloConfigNotification{"
+
"namespaceName='"
+
namespaceName
+
'\''
+
", notificationId="
+
notificationId
+
'}'
;
}
}
apollo-core/src/main/java/com/ctrip/framework/apollo/core/utils/ClassLoaderUtil.java
View file @
52e88f6e
...
...
@@ -18,7 +18,7 @@ public class ClassLoaderUtil {
static
{
if
(
loader
==
null
)
{
logger
.
info
(
"Using system class loader"
);
logger
.
warn
(
"Using system class loader"
);
loader
=
ClassLoader
.
getSystemClassLoader
();
}
...
...
apollo-core/src/main/java/com/ctrip/framework/apollo/core/utils/ResourceUtils.java
View file @
52e88f6e
...
...
@@ -19,7 +19,7 @@ public class ResourceUtils {
@SuppressWarnings
(
"unchecked"
)
public
static
Properties
readConfigFile
(
String
configPath
,
Properties
defaults
)
{
InputStream
in
=
ClassLoaderUtil
.
getLoader
().
getResourceAsStream
(
configPath
);
logger
.
info
(
"Reading config from resource {}"
,
configPath
);
logger
.
debug
(
"Reading config from resource {}"
,
configPath
);
Properties
props
=
new
Properties
();
try
{
if
(
in
==
null
)
{
...
...
@@ -27,9 +27,9 @@ public class ResourceUtils {
Path
path
=
new
File
(
System
.
getProperty
(
"user.dir"
)
+
configPath
).
toPath
();
if
(
Files
.
isReadable
(
path
))
{
in
=
new
FileInputStream
(
path
.
toFile
());
logger
.
info
(
"Reading config from file {} "
,
path
);
logger
.
debug
(
"Reading config from file {} "
,
path
);
}
else
{
logger
.
info
(
"Could not find available config file"
);
logger
.
warn
(
"Could not find available config file"
);
}
}
if
(
defaults
!=
null
)
{
...
...
@@ -59,9 +59,9 @@ public class ResourceUtils {
sb
.
append
(
key
).
append
(
'='
).
append
(
val
).
append
(
'\n'
);
}
if
(
sb
.
length
()
>
0
)
{
logger
.
info
(
"Reading properties: \n"
+
sb
.
toString
());
logger
.
debug
(
"Reading properties: \n"
+
sb
.
toString
());
}
else
{
logger
.
info
(
"No available properties"
);
logger
.
warn
(
"No available properties"
);
}
return
props
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment