Skip to content
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@
<version>0.1.1</version>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>net.sourceforge.htmlcleaner</groupId>
<artifactId>htmlcleaner</artifactId>
<version>2.16</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,14 +380,13 @@ class GrpcPuppet(puppetOptions: PuppetOptions) : Puppet(puppetOptions) {

return CompletableFuture.supplyAsync {
val response = grpcClient!!.contactPayload(request)
val payload = ContactPayload()
val payload = ContactPayload(response.id)
payload.address = response.address
payload.alias = response.alias
payload.avatar = response.avatar
payload.city = response.city
payload.friend = response.friend
payload.gender = ContractGender.getByCode(response.gender.number)
payload.id = response.id
payload.name = response.name
payload.province = response.province
payload.signature = response.signature
Expand Down Expand Up @@ -835,11 +834,10 @@ class GrpcPuppet(puppetOptions: PuppetOptions) : Puppet(puppetOptions) {

return CompletableFuture.supplyAsync {
val response = grpcClient!!.roomPayload(request)
val payload = RoomPayload()
val payload = RoomPayload(response.id)

payload.adminIdList = response.adminIdsList
payload.avatar = response.avatar
payload.id = response.id
payload.memberIdList = response.memberIdsList
payload.ownerId = response.ownerId
payload.topic = response.topic
Expand Down
212 changes: 187 additions & 25 deletions wechaty-puppet/src/main/kotlin/Puppet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.stream.Collector
import java.util.stream.Collectors


val PUPPET_COUNT = AtomicLong()
Expand Down Expand Up @@ -68,7 +70,7 @@ abstract class Puppet : EventEmitter {

on("heartbeat", object : PuppetHeartbeatListener {
override fun handler(payload: EventHeartbeatPayload) {
log.info("heartbeat -> ${payload.data}")
log.debug("heartbeat -> ${payload.data}")
val watchdogFood = WatchdogFood(1000 * timeOut)
watchdogFood.data = payload.data
watchDog.feed(watchdogFood);
Expand All @@ -89,8 +91,10 @@ abstract class Puppet : EventEmitter {

on("reset", object : PuppetResetListener {
override fun handler(payload: EventResetPayload) {
log.info("get a reset message")
if (rateLimiter.tryAcquire()) {

log.debug("get a reset message")
if(rateLimiter.tryAcquire()){

reset(payload.data)
}
}
Expand Down Expand Up @@ -120,15 +124,15 @@ abstract class Puppet : EventEmitter {
executorService.scheduleAtFixedRate({
if (state == StateEnum.ON) {
val incrementAndGet = HEARTBEAT_COUNTER.incrementAndGet()
log.info("HEARTBEAT_COUNTER #{}", incrementAndGet)
log.debug("HEARTBEAT_COUNTER #{}", incrementAndGet)
ding("`recover CPR #${incrementAndGet}")
}
}, HOSTIE_KEEPALIVE_TIMEOUT, HOSTIE_KEEPALIVE_TIMEOUT, TimeUnit.MILLISECONDS)

// heartbeatTimerId = vertx.setPeriodic(HOSTIE_KEEPALIVE_TIMEOUT) { id ->
// if(state == StateEnum.ON) {
// val incrementAndGet = HEARTBEAT_COUNTER.incrementAndGet()
// log.info("HEARTBEAT_COUNTER #{}", incrementAndGet)
// log.debug("HEARTBEAT_COUNTER #{}", incrementAndGet)
// ding("`recover CPR #${incrementAndGet}")
// }
// }
Expand All @@ -144,7 +148,9 @@ abstract class Puppet : EventEmitter {
super.on(event, object : Listener {
override fun handler(vararg any: Any) {

log.info("class Type is {}", any[0].javaClass.name)

log.debug("class Type is {}",any[0].javaClass.name)


listener.handler(any[0] as EventDongPayload)
}
Expand Down Expand Up @@ -213,7 +219,9 @@ abstract class Puppet : EventEmitter {
super.on(event, object : Listener {
override fun handler(vararg any: Any) {

log.info("class Type is {}", any[0].javaClass.name)

log.debug("class Type is {}",any[0].javaClass.name)


listener.handler(any[0] as EventScanPayload)
}
Expand Down Expand Up @@ -249,7 +257,9 @@ abstract class Puppet : EventEmitter {
super.on(event, object : Listener {
override fun handler(vararg any: Any) {

log.info("class Type is {}", any[0].javaClass.name)

log.debug("class Type is {}",any[0].javaClass.name)


listener.handler(any[0] as EventHeartbeatPayload)
}
Expand Down Expand Up @@ -284,7 +294,7 @@ abstract class Puppet : EventEmitter {
val future = CompletableFuture<Void>()

if (state == StateEnum.OFF) {
log.info("Puppet reset state is off")
log.debug("Puppet reset state is off")
future.complete(null)
return future
}
Expand All @@ -297,7 +307,7 @@ abstract class Puppet : EventEmitter {


protected fun login(userId: String): Future<Void> {
log.info("Puppet login in ({})", userId)
log.debug("Puppet login in ({})", userId)
return CompletableFuture.runAsync {
if (StringUtils.isNotBlank(userId)) {
throw RuntimeException("must logout first before login again!")
Expand Down Expand Up @@ -365,7 +375,7 @@ abstract class Puppet : EventEmitter {
protected abstract fun contactRawPayloadParser(rawPayload: ContactPayload): Future<ContactPayload>

open fun contactRoomList(contactId: String): Future<List<String?>>? {
log.info("contractId is {}", contactId)
log.debug("contractId is {}", contactId)
val roomList = roomList().get()
val roomPayloadFuture: List<CompletableFuture<RoomPayload>> = roomList
.map { roomId: String ->
Expand All @@ -390,9 +400,9 @@ abstract class Puppet : EventEmitter {
return CompletableFuture.completedFuture(null)
}

fun contactSearch(query: ContactQueryFilter?, searchIdList: List<String>?): Future<List<String>> {
fun contactSearch(query: ContactQueryFilter?, searchIdList: List<String>? = null): Future<List<String>> {

log.info("query {},{} ", query, searchIdList)
log.debug("query {},{} ", query, searchIdList)

return CompletableFuture.supplyAsync {

Expand All @@ -406,22 +416,86 @@ abstract class Puppet : EventEmitter {
return@supplyAsync list
}

return@supplyAsync list!!.filter {
val payload = contactPayload(it).get()
return@filter StringUtils.equals(query.name, payload.name)
val stream = list?.stream()?.map{contactPayload(it).get()}
if(StringUtils.isNotBlank(query.name)){
stream?.filter {
StringUtils.equals(query.name, it.name)
}
}

if(StringUtils.isNotBlank(query.alias)){
stream?.filter {
StringUtils.equals(query.alias, it.alias)
}
}

if(StringUtils.isNotBlank(query.id)){
stream?.filter {
StringUtils.equals(query.alias, it.alias)
}
}

if(StringUtils.isNotBlank(query.weixin)){
stream?.filter {
StringUtils.equals(query.alias, it.alias)
}
}

if(query.nameReg != null){
stream?.filter{
query.nameReg!!.matches(it.name ?: "")
}
}

if(query.aliasReg != null){
stream?.filter{
query.aliasReg!!.matches(it.alias ?: "")
}
}

val collect = stream?.map {
it.id
}?.collect(Collectors.toList())

return@supplyAsync collect

}
}

fun ContactPayloadFilterFactory(query:ContactQueryFilter):ContactPayloadFilterFunction{

val clz = query::class.java
val fields = clz.fields
val list = fields.map {
it.name to it.get(query)
}

val filterKv = list.get(0)

val filterFunction = { payload: ContactPayload ->
Boolean
val clazz = payload::class.java
val field = clazz.getField(filterKv.first)
val toString = field.get(payload).toString()
StringUtils.equals(toString, filterKv.second.toString())
}

return filterFunction
}




protected fun contactPayloadCache(contactId: String): ContactPayload? {

val contactPayload = cacheContactPayload.getIfPresent(contactId)

log.info("contactPayload is {} by id {}", contactPayload, contactId)
log.debug("contactPayload is {} by id {}", contactPayload,contactId)

return contactPayload
}

public fun contactPayload(contactId: String): Future<ContactPayload> {
fun contactPayload(contactId: String): Future<ContactPayload> {

val future = CompletableFuture<ContactPayload>()

Expand Down Expand Up @@ -455,7 +529,7 @@ abstract class Puppet : EventEmitter {
abstract fun friendshipRawPayload(friendshipId: String): Future<FriendshipPayload>
abstract fun friendshipRawPayloadParser(rawPayload: FriendshipPayload): Future<FriendshipPayload>
fun friendshipSearch(condition: FriendshipSearchCondition): Future<String?> {
log.info("friendshipSearch{}", condition)
log.debug("friendshipSearch{}", condition)
Preconditions.checkNotNull(condition)

return if (StringUtils.isNotEmpty(condition.phone)) {
Expand All @@ -466,7 +540,7 @@ abstract class Puppet : EventEmitter {
}

protected fun friendshipPayloadCache(friendshipId: String): FriendshipPayload? {
log.info("friendshipId is {}", friendshipId)
log.debug("friendshipId is {}", friendshipId)
return cacheFriendshipPayload.getIfPresent(friendshipId)
}

Expand Down Expand Up @@ -554,8 +628,66 @@ abstract class Puppet : EventEmitter {
return Lists.newArrayList(keys)
}

fun messageSearch(query: MessageQueryFilter): Future<List<String>?> {
TODO("TODO")
fun messageSearch(query: MessageQueryFilter): Future<List<String>> {

return CompletableFuture.supplyAsync {

log.debug("messageSearch {}", query)

val allMessageIdList = messageList()

val messagePayloadList = allMessageIdList.map {
messagePayload(it).get()
}

val stream = messagePayloadList.stream()

if (StringUtils.isNotEmpty(query.fromId)) {
stream.filter {
StringUtils.equals(it.fromId, query.fromId)
}
}

if (StringUtils.isNotEmpty(query.id)) {
stream.filter {
StringUtils.equals(it.id, query.id)
}
}

if (StringUtils.isNotEmpty(query.roomId)) {
stream.filter {
StringUtils.equals(it.roomId, query.roomId)
}
}

if (StringUtils.isNotEmpty(query.toId)) {
stream.filter {
StringUtils.equals(it.toId, query.toId)
}
}

if (StringUtils.isNotEmpty(query.text)) {
stream.filter {
StringUtils.equals(it.text, query.text)
}
}

if (query.textReg != null) {
stream.filter {
query.textReg!!.matches(it.text ?: "")
}
}

if (query.type != null) {
stream.filter {
query.type == it.type
}
}

return@supplyAsync stream.map { it.id }.collect(Collectors.toList())
}


}

protected fun messageQueryFilterFactory(query: MessageQueryFilter) {
Expand Down Expand Up @@ -719,12 +851,12 @@ abstract class Puppet : EventEmitter {

if (StringUtils.isNotBlank(query.topic)) {
roomPayloads = roomPayloads.filter { t ->
log.info("t.topic is {} and topic is {}", t.topic, query.topic)
log.debug("t.topic is {} and topic is {}", t.topic, query.topic)
val equals = StringUtils.equals(t.topic, query.topic)
log.info("equals is {}", equals)
log.debug("equals is {}", equals)
equals
}
log.info("roomPayloads is {}", roomPayloads)
log.debug("roomPayloads is {}", roomPayloads)
}

if (CollectionUtils.isNotEmpty(roomPayloads)) {
Expand Down Expand Up @@ -777,3 +909,33 @@ abstract class Puppet : EventEmitter {
}

}
//
//fun main() {
//
// val contactQueryFilter = ContactQueryFilter()
//
// contactQueryFilter.name = "111"
//
// ContactPayloadFilterFactory(contactQueryFilter)
//
//}
//fun ContactPayloadFilterFactory(query:ContactQueryFilter):ContactPayloadFilterFunction{
//
// val clz = query::class.java
// val fields = clz.fields
// val list = fields.map {
// it.name to it.get(query)
// }
//
// val filterKv = list.get(0)
//
// val filterFunction = { payload: ContactPayload ->
// Boolean
// val clazz = payload::class.java
// val field = clazz.getField(filterKv.first)
// val toString = field.get(payload).toString()
// StringUtils.equals(toString, filterKv.second.toString())
// }
//
// return filterFunction
//}
Loading