Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
/**
* A callable class to apply an operation to some entity (table/database) by running a Spark job.
* Takes care of the job lifecycle using /jobs API.
*
* <p>NOTE: Every implementation must implement a static {@code OPERATION_TYPE} field in order for
* the job scheduler to load the OperationTask.
*/
@Slf4j
@Getter
Expand Down Expand Up @@ -270,6 +273,20 @@ private void reportJobState(
AppConstants.JOB_DURATION,
System.currentTimeMillis() - startTime,
attributes);

// Granular attributes to publish entity level job metrics
Attributes granularAttributes =
Attributes.of(
AttributeKey.stringKey(AppConstants.ENTITY_NAME),
metadata.getEntityName(),
AttributeKey.stringKey(AppConstants.ENTITY_TYPE),
metadata.getClass().getSimpleName().replace("Metadata", ""),
AttributeKey.stringKey(AppConstants.JOB_TYPE),
getType().getValue(),
AttributeKey.stringKey(AppConstants.JOB_STATE),
state.name());

otelEmitter.count(METRICS_SCOPE, "maintenance_job_completed", 1, granularAttributes);
}

protected abstract boolean launchJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.scheduler.JobsScheduler;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.DataLayoutUtil;
import com.linkedin.openhouse.jobs.util.DatabaseMetadata;
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
import com.linkedin.openhouse.jobs.util.Metadata;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -168,6 +171,15 @@ private List<OperationTask<?>> processMetadataList(
if (optionalOperationTask.isPresent()) {
taskList.add(optionalOperationTask.get());
}

// Publish entity metrics for triggered tasks
Attributes taskAttributes =
Attributes.of(
AttributeKey.stringKey(AppConstants.ENTITY_NAME), metadata.getEntityName(),
AttributeKey.stringKey(AppConstants.ENTITY_TYPE),
metadata.getClass().getSimpleName().replace("Metadata", ""),
AttributeKey.stringKey(AppConstants.JOB_TYPE), jobType.getValue());
otelEmitter.count(METRICS_SCOPE, "maintenance_job_triggered", 1, taskAttributes);
}
return taskList;
}
Expand All @@ -183,6 +195,15 @@ public Optional<OperationTask<?>> processMetadata(
task.setOtelEmitter(otelEmitter);
if (!task.shouldRun()) {
log.info("Skipping task {}", task);

// Publish entity metrics for skipped tasks
Attributes taskAttributes =
Attributes.of(
AttributeKey.stringKey(AppConstants.ENTITY_NAME), metadata.getEntityName(),
AttributeKey.stringKey(AppConstants.ENTITY_TYPE),
metadata.getClass().getSimpleName().replace("Metadata", ""),
AttributeKey.stringKey(AppConstants.JOB_TYPE), task.getType().getValue());
otelEmitter.count(METRICS_SCOPE, "maintenance_job_skipped", 1, taskAttributes);
return Optional.empty();
} else {
if (OperationMode.SUBMIT.equals(operationMode)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ public final class AppConstants {

// Spark App observability constants
public static final String TYPE = "type";
public static final String ENTITY_TYPE = "entity_type";
public static final String JOB_TYPE = "job_type";
public static final String JOB_STATE = "job_state";
public static final String ORPHAN_FILE_COUNT = "orphan_file_count";
public static final String STAGED_FILE_COUNT = "staged_file_count";
public static final String ORPHAN_DIRECTORY_COUNT = "orphan_directory_count";
Expand Down Expand Up @@ -49,6 +51,7 @@ public final class AppConstants {
public static final String JOB_ID = "job_id";
public static final String QUEUED_TIME = "queued_time";
public static final String DATABASE_NAME = "database_name";
public static final String ENTITY_NAME = "entity_name";

// Maintenance jobs table properties keys
public static final String BACKUP_ENABLED_KEY = "retention.backup.enabled";
Expand Down