Skip to content

Commit

Permalink
[AMORO-3346] Extract the implementation of OptimizerManager
Browse files Browse the repository at this point in the history
  • Loading branch information
mansonliwh committed Feb 12, 2025
1 parent 9bfe609 commit 1541d64
Show file tree
Hide file tree
Showing 11 changed files with 559 additions and 471 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.amoro.server.persistence.HttpSessionHandlerFactory;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.DefaultOptimizerManager;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.DefaultTableManager;
Expand Down Expand Up @@ -100,7 +101,7 @@ public class AmoroServiceContainer {
private CatalogManager catalogManager;
private TableManager tableManager;
private TableService tableService;
private DefaultOptimizingService optimizingService;
private DefaultOptimizerManager optimizerManager;
private TerminalManager terminalManager;
private Configurations serviceConfig;
private TServer tableManagementServer;
Expand Down Expand Up @@ -157,12 +158,10 @@ public void startService() throws Exception {
tableManager = new DefaultTableManager(serviceConfig, catalogManager);

tableService = new DefaultTableService(serviceConfig, catalogManager);
optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, tableManager, tableService);

optimizerManager = new DefaultOptimizerManager(serviceConfig, catalogManager, tableManager);
LOG.info("Setting up AMS table executors...");
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
addHandlerChain(optimizingService.getTableRuntimeHandler());
addHandlerChain(optimizerManager.getTableRuntimeHandler());
addHandlerChain(AsyncTableExecutors.getInstance().getDataExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getSnapshotsExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
Expand Down Expand Up @@ -219,10 +218,11 @@ public void dispose() {
terminalManager.dispose();
terminalManager = null;
}
if (optimizingService != null) {
LOG.info("Stopping optimizing service...");
optimizingService.dispose();
optimizingService = null;

if (optimizerManager != null) {
LOG.info("Stopping optimizing manager...");
optimizerManager.dispose();
optimizerManager = null;
}

if (amsServiceMetrics != null) {
Expand Down Expand Up @@ -253,7 +253,7 @@ private void startThriftServer(TServer server, String threadName) {
private void initHttpService() {
DashboardServer dashboardServer =
new DashboardServer(
serviceConfig, catalogManager, tableManager, optimizingService, terminalManager);
serviceConfig, catalogManager, tableManager, optimizerManager, terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager);

httpServer =
Expand Down Expand Up @@ -364,7 +364,7 @@ workerThreads, getThriftThreadFactory(Constants.THRIFT_TABLE_SERVICE_NAME)),
new OptimizingService.Processor<>(
ThriftServiceProxy.createProxy(
OptimizingService.Iface.class,
optimizingService,
new DefaultOptimizingService(serviceConfig, optimizerManager, tableService),
AmoroRuntimeException::normalize));
optimizingServiceServer =
createThriftServer(
Expand Down Expand Up @@ -559,6 +559,6 @@ public CatalogManager getCatalogManager() {

@VisibleForTesting
public OptimizerManager getOptimizingService() {
return this.optimizingService;
return this.optimizerManager;
}
}
Loading

0 comments on commit 1541d64

Please sign in to comment.