Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,11 @@ private boolean restore(PageStoreDir pageStoreDir) {
LOG.error("Failed to restore PageStore", e);
return false;
}
if (mInitService.isPresent()) {
try (LockResource r = new LockResource(mPageMetaStore.getLock().readLock())) {
mPageMetaStore.reportBlocks(pageStoreDir);
}
}
LOG.info("PageStore ({}) restored with {} pages ({} bytes), "
+ "discarded {} pages ({} bytes)",
pageStoreDir.getRootPath(), mPageMetaStore.numPages(), mPageMetaStore.bytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ static PageMetaStore create(CacheManagerOptions options) throws IOException {
*/
void addPage(PageId pageId, PageInfo pageInfo);

/**
* Report all block locations in a pageStoreDir to master using BlockHeartbeat.
*
* @param pageStoreDir the page store dir targeted to report
*/
default void reportBlocks(PageStoreDir pageStoreDir) {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a Javadoc comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The page cache implementation is shared by the client side local cache, as well as the worker side paged block store. So PageMetaStore is also used by the client side local cache, adding a method specifically for doing worker-master sync is not appropriate here.


/**
* Adds a new temp page to the cache.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,21 @@ public PagedBlockStoreMeta getStoreMetaFull() {
usedBytesOnDirs.build(), blockOnDirs.build());
}

@Override
@GuardedBy("getLock().readLock()")
public void reportBlocks(PageStoreDir pageStoreDir) {
final PagedBlockStoreDir pagedBlockStoreDir = downcast(pageStoreDir);
Set<PagedBlockMeta> blockMetas = mBlocks.getByField(INDEX_STORE_DIR, pagedBlockStoreDir);
for (PagedBlockMeta blockMeta : blockMetas) {
for (BlockStoreEventListener listener : mBlockStoreEventListeners) {
synchronized (listener) {
listener.onMoveBlockByWorker(blockMeta.getBlockId(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky way to report the blocks generated by pageStore to master by Incrementally, but it need a full block report during register stage.

blockMeta.getBlockLocation(), blockMeta.getBlockLocation());
}
}
}
}

private static PagedBlockStoreDir downcast(PageStoreDir pageStoreDir) {
if (pageStoreDir instanceof PagedBlockStoreDir) {
return (PagedBlockStoreDir) pageStoreDir;
Expand Down