Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {

public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);

public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, long snapshotThresholdMs, String jobCmd, boolean exclusive);

public Long getQueueItemIdByContentIdAndType(long contentId, String contentType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,46 @@ public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
}

@Override
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, long snapshotThresholdMs, String jobCmd, boolean exclusive) {
Date cutTime = DateUtil.currentGMTTime();
List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();

SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessTime", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT);

sbItem.done();
Date date1 = new Date(cutTime.getTime() - thresholdMs);
String dateString1 = DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), date1);

SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs));
Date date2 = new Date(cutTime.getTime() - snapshotThresholdMs);
String dateString2 = DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), date2);

if(exclusive)
return lockRows(sc, null, true);
return listBy(sc, null);
String sql = "SELECT s.id, s.queue_id, s.content_type, s.content_id, s.queue_proc_msid, s.queue_proc_number, s.queue_proc_time, s.created FROM sync_queue_item AS s " +
"JOIN async_job as a ON s.content_id = a.id WHERE s.queue_proc_msid IS NOT NULL AND s.queue_proc_number IS NOT NULL AND s.queue_proc_time IS NOT NULL" +
" AND ((a.job_cmd NOT LIKE ? AND s.queue_proc_time < ? ) OR (a.job_cmd LIKE ? AND s.queue_proc_time < ?))";
TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use try-with-resource

pstmt.setString(1, "%" + jobCmd + "%");
pstmt.setString(2, dateString1);
pstmt.setString(3, "%" + jobCmd + "%");
pstmt.setString(4, dateString2);
ResultSet rs = pstmt.executeQuery();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use try-with-resource

while(rs.next()) {
SyncQueueItemVO item = new SyncQueueItemVO();
item.setId(rs.getLong(1));
item.setQueueId(rs.getLong(2));
item.setContentType(rs.getString(3));
item.setContentId(rs.getLong(4));
item.setLastProcessMsid(rs.getLong(5));
item.setLastProcessNumber(rs.getLong(6));
item.setLastProcessTime(rs.getDate(7));
item.setCreated(rs.getDate(8));
l.add(item);
}
} catch (SQLException e) {
s_logger.error("Unexpected sql excetpion, ", e);
} catch (Throwable e) {
s_logger.error("Unexpected excetpion, ", e);
}
return l;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private static final ConfigKey<Integer> VmJobLockTimeout = new ConfigKey<Integer>("Advanced",
Integer.class, "vm.job.lock.timeout", "1800",
"Time in seconds to wait in acquiring lock to submit a vm worker job", false);
private static final ConfigKey<Long> VolumeSnapshotJobCancelThreshold = new ConfigKey<Long>("Advanced", Long.class, "volume.snapshot.job.cancel.threshold", "60",
"Time (in minutes) for volume snapshot async-job to be forcefully cancelled if it has been in process for long", true, ConfigKey.Scope.Global);

private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);

Expand Down Expand Up @@ -133,7 +135,7 @@ public String getConfigComponentName() {

@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[] {JobExpireMinutes, JobCancelThresholdMinutes, VmJobLockTimeout};
return new ConfigKey<?>[] {JobExpireMinutes, JobCancelThresholdMinutes, VmJobLockTimeout, VolumeSnapshotJobCancelThreshold};
}

@Override
Expand Down Expand Up @@ -792,7 +794,8 @@ public void reallyRun() {
s_logger.info("Begin cleanup expired async-jobs");

// forcefully cancel blocking queue items if they've been staying there for too long
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false);
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, VolumeSnapshotJobCancelThreshold.value() * 60000,
"VmWorkTakeVolumeSnapshot", false);
if (blockItems != null && blockItems.size() > 0) {
for (SyncQueueItemVO item : blockItems) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface SyncQueueManager extends Manager {

public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);

public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, long snapshotThresholdMs, String jobCmd, boolean exclusive);

void purgeAsyncJobQueueItemId(long asyncJobId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
}

@Override
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, long snapshotThresholdMs, String jobCmd, boolean exclusive) {
return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, snapshotThresholdMs, jobCmd, exclusive);
}

private boolean queueReadyToProcess(SyncQueueVO queueVO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,19 @@ public Pair<Boolean, Long> getCommandHostDelegation(long hostId, Command cmd) {
_cmdExecLogDao.persist(execLog);
cmd.setContextParam("execid", String.valueOf(execLog.getId()));
cmd.setContextParam("noderuninfo", String.format("%d-%d", _clusterMgr.getManagementNodeId(), _clusterMgr.getCurrentRunId()));
cmd.setContextParam("vCenterSessionTimeout", String.valueOf(_vmwareMgr.getVcenterSessionTimeout()));
if (cmd instanceof CopyCommand) { // Snapshot backup
CopyCommand cpyCommand = (CopyCommand)cmd;
DataTO srcData = cpyCommand.getSrcTO();
DataTO destData = cpyCommand.getDestTO();
if (srcData.getObjectType() == DataObjectType.SNAPSHOT && destData.getObjectType() == DataObjectType.SNAPSHOT &&
srcData.getDataStore().getRole() == DataStoreRole.Primary) {
cmd.setContextParam("vCenterSessionTimeout", String.valueOf(_vmwareMgr.getSnapshotBackupSessionTimeout()));
} else {
cmd.setContextParam("vCenterSessionTimeout", String.valueOf(_vmwareMgr.getVcenterSessionTimeout()));
}
} else {
cmd.setContextParam("vCenterSessionTimeout", String.valueOf(_vmwareMgr.getVcenterSessionTimeout()));
}

if (cmd instanceof BackupSnapshotCommand || cmd instanceof CreatePrivateTemplateFromVolumeCommand ||
cmd instanceof CreatePrivateTemplateFromSnapshotCommand || cmd instanceof CopyVolumeCommand || cmd instanceof CopyCommand ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ public VmwareServerDiscoverer() {

VmwareContext context = null;
try {
context = VmwareContextFactory.create(url.getHost(), username, password);
int sessionTimeout = _vmwareMgr.getVcenterSessionTimeout();
context = VmwareContextFactory.create(url.getHost(), username, password, sessionTimeout);
if (privateTrafficLabel != null)
context.registerStockObject("privateTrafficLabel", privateTrafficLabel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ public interface VmwareManager {
boolean isLegacyZone(long dcId);

boolean hasNexusVSM(Long clusterId);

public int getSnapshotBackupSessionTimeout();

}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public class VmwareManagerImpl extends ManagerBase implements VmwareManager, Vmw
private int _additionalPortRangeSize;
private int _routerExtraPublicNics = 2;
private int _vCenterSessionTimeout = 1200000; // Timeout in milliseconds
private int _snapshotBackupSessionTimeout = 1200000; // Timeout in milliseconds

private String _rootDiskController = DiskControllerType.ide.toString();

Expand Down Expand Up @@ -283,6 +284,9 @@ public boolean configure(String name, Map<String, Object> params) throws Configu
_vCenterSessionTimeout = NumbersUtil.parseInt(_configDao.getValue(Config.VmwareVcenterSessionTimeout.key()), 1200) * 1000;
s_logger.info("VmwareManagerImpl config - vmware.vcenter.session.timeout: " + _vCenterSessionTimeout);

_snapshotBackupSessionTimeout = NumbersUtil.parseInt(_configDao.getValue(Config.VmwareSnapshotBackupSessionTimeout.key()), 1200) * 1000;
s_logger.info("VmwareManagerImpl config - vmware.snapshot.backup.session.timeout: " + _snapshotBackupSessionTimeout);

_recycleHungWorker = _configDao.getValue(Config.VmwareRecycleHungWorker.key());
if (_recycleHungWorker == null || _recycleHungWorker.isEmpty()) {
_recycleHungWorker = "false";
Expand Down Expand Up @@ -487,6 +491,8 @@ public void setupResourceStartupParams(Map<String, Object> params) {
params.put("vmware.root.disk.controller", _rootDiskController);
params.put("vmware.recycle.hung.wokervm", _recycleHungWorker);
params.put("ports.per.dvportgroup", _portsPerDvPortGroup);
params.put("vmware.vcenter.session.timeout", _vCenterSessionTimeout);
params.put("vmware.snapshot.backup.session.timeout", _snapshotBackupSessionTimeout);
}

@Override
Expand Down Expand Up @@ -942,6 +948,11 @@ public int getVcenterSessionTimeout() {
return _vCenterSessionTimeout;
}

@Override
public int getSnapshotBackupSessionTimeout(){
return _snapshotBackupSessionTimeout;
}

@Override
public List<Class<?>> getCommands() {
List<Class<?>> cmdList = new ArrayList<Class<?>>();
Expand Down Expand Up @@ -1022,7 +1033,7 @@ public VmwareDatacenterVO addVmwareDatacenter(AddVmwareDcCmd cmd) throws Resourc
String guid;
ManagedObjectReference dcMor;
try {
context = VmwareContextFactory.create(vCenterHost, userName, password);
context = VmwareContextFactory.create(vCenterHost, userName, password, _vCenterSessionTimeout);

// Check if DC exists on vCenter
dcMo = new DatacenterMO(context, vmwareDcName);
Expand Down Expand Up @@ -1119,7 +1130,7 @@ public void doInTransactionWithoutResult(TransactionStatus status) {
// Construct context
VmwareContext context = null;
try {
context = VmwareContextFactory.create(vCenterHost, userName, password);
context = VmwareContextFactory.create(vCenterHost, userName, password, _vCenterSessionTimeout);

// Check if DC exists on vCenter
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void init() {
s_clusterMgr = _clusterMgr;
}

public static VmwareContext create(String vCenterAddress, String vCenterUserName, String vCenterPassword) throws Exception {
public static VmwareContext create(String vCenterAddress, String vCenterUserName, String vCenterPassword, int sessionTimeout) throws Exception {
assert (vCenterAddress != null);
assert (vCenterUserName != null);
assert (vCenterPassword != null);
Expand All @@ -66,7 +66,7 @@ public static VmwareContext create(String vCenterAddress, String vCenterUserName
StringUtils.getMaskedPasswordForDisplay(vCenterPassword));

VmwareClient vimClient = new VmwareClient(vCenterAddress + "-" + s_seq++);
vimClient.setVcenterSessionTimeout(s_vmwareMgr.getVcenterSessionTimeout());
vimClient.setVcenterSessionTimeout(sessionTimeout);
vimClient.connect(serviceUrl, vCenterUserName, vCenterPassword);

VmwareContext context = new VmwareContext(vimClient, vCenterAddress);
Expand All @@ -76,22 +76,22 @@ public static VmwareContext create(String vCenterAddress, String vCenterUserName
context.registerStockObject("manageportgroup", s_vmwareMgr.getManagementPortGroupName());
context.registerStockObject("noderuninfo", String.format("%d-%d", s_clusterMgr.getManagementNodeId(), s_clusterMgr.getCurrentRunId()));

context.setPoolInfo(s_pool, VmwareContextPool.composePoolKey(vCenterAddress, vCenterUserName));
context.setPoolInfo(s_pool, VmwareContextPool.composePoolKey(vCenterAddress, vCenterUserName, sessionTimeout));
s_pool.registerOutstandingContext(context);

return context;
}

public static VmwareContext getContext(String vCenterAddress, String vCenterUserName, String vCenterPassword) throws Exception {
VmwareContext context = s_pool.getContext(vCenterAddress, vCenterUserName);
public static VmwareContext getContext(String vCenterAddress, String vCenterUserName, String vCenterPassword, int sessionTimeout) throws Exception {
VmwareContext context = s_pool.getContext(vCenterAddress, vCenterUserName, sessionTimeout);
if (context == null) {
context = create(vCenterAddress, vCenterUserName, vCenterPassword);
context = create(vCenterAddress, vCenterUserName, vCenterPassword, sessionTimeout);
} else {
// Validate current context and verify if vCenter session timeout value of the context matches the timeout value set by Admin
if (!context.validate() || (context.getVimClient().getVcenterSessionTimeout() != s_vmwareMgr.getVcenterSessionTimeout())) {
if (!context.validate()) {
s_logger.info("Validation of the context failed, dispose and create a new one");
context.close();
context = create(vCenterAddress, vCenterUserName, vCenterPassword);
context = create(vCenterAddress, vCenterUserName, vCenterPassword, sessionTimeout);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
protected int _portsPerDvPortGroup;
protected boolean _fullCloneFlag = false;
protected boolean _instanceNameFlag = false;

protected int _vCenterSessionTimeout = 1200000; // Timeout in milliseconds
protected boolean _recycleHungWorker = false;
protected DiskControllerType _rootDiskController = DiskControllerType.ide;

Expand Down Expand Up @@ -4983,6 +4983,10 @@ public boolean configure(String name, Map<String, Object> params) throws Configu
_instanceNameFlag = false;
}

Integer sessionTimeout = (Integer) params.get("vmware.vcenter.session.timeout");
if (sessionTimeout != null)
_vCenterSessionTimeout = sessionTimeout.intValue();

value = (String)params.get("scripts.timeout");
int timeout = NumbersUtil.parseInt(value, 1440) * 1000;
_storageProcessor = new VmwareStorageProcessor((VmwareHostService)this, _fullCloneFlag, (VmwareStorageMount)mgr, timeout, this, _shutdownWaitMs, null);
Expand Down Expand Up @@ -5035,9 +5039,16 @@ public VmwareHypervisorHost getHyperHost(VmwareContext context) {
@Override
public VmwareContext getServiceContext(Command cmd) {
VmwareContext context = null;
int vCenterSessionTimeout;
if (cmd != null && cmd.getContextParam("vCenterSessionTimeout") != null) {
vCenterSessionTimeout = NumbersUtil.parseInt(cmd.getContextParam("vCenterSessionTimeout"), 1200000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use _vCenterSessionTimeout here?

} else {
vCenterSessionTimeout = _vCenterSessionTimeout;
}

if(s_serviceContext.get() != null) {
context = s_serviceContext.get();
String poolKey = VmwareContextPool.composePoolKey(_vCenterAddress, _username);
String poolKey = VmwareContextPool.composePoolKey(_vCenterAddress, _username, vCenterSessionTimeout);
// Before re-using the thread local context, ensure it corresponds to the right vCenter API session and that it is valid to make calls.
if(context.getPoolKey().equals(poolKey)) {
if (context.validate()) {
Expand All @@ -5051,11 +5062,11 @@ public VmwareContext getServiceContext(Command cmd) {
}
} else {
// Exisitng ThreadLocal context corresponds to a different vCenter API session. Why has it not been recycled?
s_logger.warn("ThreadLocal VMware context: " + poolKey + " doesn't correspond to the right vCenter. Expected VMware context: " + context.getPoolKey());
s_logger.warn("ThreadLocal VMware context: " + poolKey + ". Expected VMware context: " + context.getPoolKey());
}
}
try {
context = VmwareContextFactory.getContext(_vCenterAddress, _username, _password);
context = VmwareContextFactory.getContext(_vCenterAddress, _username, _password, vCenterSessionTimeout);
s_serviceContext.set(context);
} catch (Exception e) {
s_logger.error("Unable to connect to vSphere server: " + _vCenterAddress, e);
Expand Down
Loading