Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
using Orleans.Runtime.MembershipService;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;

#nullable disable
Expand Down Expand Up @@ -448,6 +450,11 @@ private static MembershipEntry Parse(SiloInstanceRecord tableEntry)
parse.IAmAliveTime = !string.IsNullOrEmpty(tableEntry.IAmAliveTime) ?
LogFormatter.ParseDate(tableEntry.IAmAliveTime) : default;

if (!string.IsNullOrEmpty(tableEntry.Metadata))
{
parse.Metadata = JsonSerializer.Deserialize<Dictionary<string, string>>(tableEntry.Metadata)?.ToImmutableDictionary();
}

var suspectingSilos = new List<SiloAddress>();
var suspectingTimes = new List<DateTime>();

Expand Down Expand Up @@ -490,6 +497,7 @@ private SiloInstanceRecord Convert(MembershipEntry memEntry, TableVersion tableV
SiloName = memEntry.SiloName,
StartTime = LogFormatter.PrintDate(memEntry.StartTime),
IAmAliveTime = LogFormatter.PrintDate(memEntry.IAmAliveTime),
Metadata = memEntry.Metadata is not null ? JsonSerializer.Serialize(memEntry.Metadata) : null,
SiloIdentity = SiloInstanceRecord.ConstructSiloIdentity(memEntry.SiloAddress),
MembershipVersion = tableVersion.Version
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ internal class SiloInstanceRecord
public const string SUSPECTING_TIMES_PROPERTY_NAME = "SuspectingTimes";
public const string START_TIME_PROPERTY_NAME = "StartTime";
public const string I_AM_ALIVE_TIME_PROPERTY_NAME = "IAmAliveTime";
public const string METADATA_PROPERTY_NAME = "MetadataJson";
internal const char Seperator = '-';
internal const string TABLE_VERSION_ROW = "VersionRow"; // Range key for version row.
public const string MEMBERSHIP_VERSION_PROPERTY_NAME = "MembershipVersion";
Expand All @@ -41,6 +42,7 @@ internal class SiloInstanceRecord
public string SuspectingTimes { get; set; }
public string StartTime { get; set; }
public string IAmAliveTime { get; set; }
public string Metadata { get; set; }
public int ETag { get; set; }

public int MembershipVersion { get; set; }
Expand Down Expand Up @@ -92,6 +94,9 @@ public SiloInstanceRecord(Dictionary<string, AttributeValue> fields)
if (fields.TryGetValue(I_AM_ALIVE_TIME_PROPERTY_NAME, out var aliveTime))
IAmAliveTime = aliveTime.S;

if (fields.TryGetValue(METADATA_PROPERTY_NAME, out var metadata))
Metadata = metadata.S;

if (fields.TryGetValue(ETAG_PROPERTY_NAME, out var sETag) &&
int.TryParse(sETag.N, out var etag))
ETag = etag;
Expand Down Expand Up @@ -195,6 +200,9 @@ public Dictionary<string, AttributeValue> GetFields(bool includeKeys = false)
if (!string.IsNullOrWhiteSpace(IAmAliveTime))
fields.Add(I_AM_ALIVE_TIME_PROPERTY_NAME, new AttributeValue(IAmAliveTime));

if (!string.IsNullOrWhiteSpace(Metadata))
fields.Add(METADATA_PROPERTY_NAME, new AttributeValue(Metadata));

fields.Add(MEMBERSHIP_VERSION_PROPERTY_NAME, new AttributeValue { N = MembershipVersion.ToString() });

fields.Add(ETAG_PROPERTY_NAME, new AttributeValue { N = ETag.ToString() });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi

try
{
ClearMetadataIfUnavailable(entry);
return await orleansQueries.InsertMembershipRowAsync(this.clusterId, entry, tableVersion.VersionEtag);
}
catch (Exception ex)
Expand Down Expand Up @@ -132,6 +133,7 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi

try
{
ClearMetadataIfUnavailable(entry);
return await orleansQueries.UpdateMembershipRowAsync(this.clusterId, entry, tableVersion.VersionEtag);
}
catch (Exception ex)
Expand Down Expand Up @@ -201,6 +203,14 @@ private async Task<bool> InitTableAsync()
}
}

private void ClearMetadataIfUnavailable(MembershipEntry entry)
{
if (!orleansQueries.SupportsMembershipMetadata)
{
entry.Metadata = null;
}
}

[LoggerMessage(
Level = LogLevel.Trace,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(InitializeMembershipTable)} called."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
ALTER TABLE OrleansMembershipTable ADD COLUMN MetadataJson TEXT NULL;

DROP PROCEDURE IF EXISTS InsertMembershipKey;

DELIMITER $$

CREATE PROCEDURE InsertMembershipKey(
in _DeploymentId NVARCHAR(150),
in _Address VARCHAR(45),
in _Port INT,
in _Generation INT,
in _Version INT,
in _SiloName NVARCHAR(150),
in _HostName NVARCHAR(150),
in _Status INT,
in _ProxyPort INT,
in _StartTime DATETIME,
in _IAmAliveTime DATETIME,
in _MetadataJson TEXT
)
BEGIN
DECLARE _ROWCOUNT INT;
START TRANSACTION;
INSERT INTO OrleansMembershipTable
(
DeploymentId,
Address,
Port,
Generation,
SiloName,
HostName,
Status,
ProxyPort,
MetadataJson,
StartTime,
IAmAliveTime
)
SELECT * FROM ( SELECT
_DeploymentId,
_Address,
_Port,
_Generation,
_SiloName,
_HostName,
_Status,
_ProxyPort,
_MetadataJson,
_StartTime,
_IAmAliveTime) AS TMP
WHERE NOT EXISTS
(
SELECT 1
FROM
OrleansMembershipTable
WHERE
DeploymentId = _DeploymentId AND _DeploymentId IS NOT NULL
AND Address = _Address AND _Address IS NOT NULL
AND Port = _Port AND _Port IS NOT NULL
AND Generation = _Generation AND _Generation IS NOT NULL
);

UPDATE OrleansMembershipVersionTable
SET
Version = Version + 1
WHERE
DeploymentId = _DeploymentId AND _DeploymentId IS NOT NULL
AND Version = _Version AND _Version IS NOT NULL
AND ROW_COUNT() > 0;

SET _ROWCOUNT = ROW_COUNT();

IF _ROWCOUNT = 0
THEN
ROLLBACK;
ELSE
COMMIT;
END IF;
SELECT _ROWCOUNT;
END$$

DELIMITER ;

UPDATE OrleansQuery
SET QueryText = 'call InsertMembershipKey(@DeploymentId, @Address, @Port, @Generation,
@Version, @SiloName, @HostName, @Status, @ProxyPort, @StartTime, @IAmAliveTime, @MetadataJson);'
WHERE QueryKey = 'InsertMembershipKey';

UPDATE OrleansQuery
SET QueryText = 'START TRANSACTION;

UPDATE OrleansMembershipVersionTable
SET
Version = Version + 1
WHERE
DeploymentId = @DeploymentId AND @DeploymentId IS NOT NULL
AND Version = @Version AND @Version IS NOT NULL;

UPDATE OrleansMembershipTable
SET
Status = @Status,
SuspectTimes = @SuspectTimes,
MetadataJson = @MetadataJson,
IAmAliveTime = @IAmAliveTime
WHERE
DeploymentId = @DeploymentId AND @DeploymentId IS NOT NULL
AND Address = @Address AND @Address IS NOT NULL
AND Port = @Port AND @Port IS NOT NULL
AND Generation = @Generation AND @Generation IS NOT NULL
AND ROW_COUNT() > 0;

SELECT ROW_COUNT();
COMMIT;
'
WHERE QueryKey = 'UpdateMembershipKey';

UPDATE OrleansQuery
SET QueryText = 'SELECT
v.DeploymentId,
m.Address,
m.Port,
m.Generation,
m.SiloName,
m.HostName,
m.Status,
m.ProxyPort,
m.SuspectTimes,
m.MetadataJson,
m.StartTime,
m.IAmAliveTime,
v.Version
FROM
OrleansMembershipVersionTable v
LEFT OUTER JOIN OrleansMembershipTable m ON v.DeploymentId = m.DeploymentId
AND Address = @Address AND @Address IS NOT NULL
AND Port = @Port AND @Port IS NOT NULL
AND Generation = @Generation AND @Generation IS NOT NULL
WHERE
v.DeploymentId = @DeploymentId AND @DeploymentId IS NOT NULL;
'
WHERE QueryKey = 'MembershipReadRowKey';

UPDATE OrleansQuery
SET QueryText = 'SELECT
v.DeploymentId,
m.Address,
m.Port,
m.Generation,
m.SiloName,
m.HostName,
m.Status,
m.ProxyPort,
m.SuspectTimes,
m.MetadataJson,
m.StartTime,
m.IAmAliveTime,
v.Version
FROM
OrleansMembershipVersionTable v LEFT OUTER JOIN OrleansMembershipTable m
ON v.DeploymentId = m.DeploymentId
WHERE
v.DeploymentId = @DeploymentId AND @DeploymentId IS NOT NULL;
'
WHERE QueryKey = 'MembershipReadAllKey';
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
ALTER TABLE OrleansMembershipTable ADD MetadataJson NCLOB;
/

CREATE OR REPLACE FUNCTION InsertMembership(PARAM_DEPLOYMENTID IN NVARCHAR2, PARAM_IAMALIVETIME IN TIMESTAMP, PARAM_SILONAME IN NVARCHAR2, PARAM_HOSTNAME IN NVARCHAR2, PARAM_ADDRESS IN VARCHAR2,
PARAM_PORT IN NUMBER, PARAM_GENERATION IN NUMBER, PARAM_STARTTIME IN TIMESTAMP, PARAM_STATUS IN NUMBER, PARAM_PROXYPORT IN NUMBER, PARAM_METADATAJSON IN NCLOB, PARAM_VERSION IN NUMBER)
RETURN NUMBER IS
rowcount NUMBER;
PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
INSERT INTO OrleansMembershipTable
(
DeploymentId,
Address,
Port,
Generation,
SiloName,
HostName,
Status,
ProxyPort,
MetadataJson,
StartTime,
IAmAliveTime
)
SELECT
PARAM_DEPLOYMENTID,
PARAM_ADDRESS,
PARAM_PORT,
PARAM_GENERATION,
PARAM_SILONAME,
PARAM_HOSTNAME,
PARAM_STATUS,
PARAM_PROXYPORT,
PARAM_METADATAJSON,
PARAM_STARTTIME,
PARAM_IAMALIVETIME
FROM DUAL WHERE NOT EXISTS
(
SELECT 1 FROM OrleansMembershipTable WHERE
DeploymentId = PARAM_DEPLOYMENTID AND PARAM_DEPLOYMENTID IS NOT NULL
AND Address = PARAM_ADDRESS AND PARAM_ADDRESS IS NOT NULL
AND Port = PARAM_PORT AND PARAM_PORT IS NOT NULL
AND Generation = PARAM_GENERATION AND PARAM_GENERATION IS NOT NULL
);
rowcount := SQL%ROWCOUNT;
UPDATE OrleansMembershipVersionTable
SET Timestamp = sys_extract_utc(systimestamp),
Version = Version + 1
WHERE
DeploymentId = PARAM_DEPLOYMENTID AND PARAM_DEPLOYMENTID IS NOT NULL
AND Version = PARAM_VERSION AND PARAM_VERSION IS NOT NULL
AND rowcount > 0;
rowcount := SQL%ROWCOUNT;
IF rowcount = 0 THEN
ROLLBACK;
ELSE
COMMIT;
END IF;

IF rowcount > 0 THEN
RETURN(1);
ELSE
RETURN(0);
END IF;
END;
/

CREATE OR REPLACE FUNCTION UpdateMembership(PARAM_DEPLOYMENTID IN NVARCHAR2, PARAM_ADDRESS IN VARCHAR2, PARAM_PORT IN NUMBER, PARAM_GENERATION IN NUMBER,
PARAM_IAMALIVETIME IN TIMESTAMP, PARAM_STATUS IN NUMBER, PARAM_SUSPECTTIMES IN VARCHAR2, PARAM_METADATAJSON IN NCLOB, PARAM_VERSION IN NUMBER
)
RETURN NUMBER IS
rowcount NUMBER;
PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
UPDATE OrleansMembershipVersionTable
SET
Timestamp = sys_extract_utc(systimestamp),
Version = Version + 1
WHERE
DeploymentId = PARAM_DEPLOYMENTID AND PARAM_DEPLOYMENTID IS NOT NULL
AND Version = PARAM_VERSION AND PARAM_VERSION IS NOT NULL;
rowcount := SQL%ROWCOUNT;
UPDATE OrleansMembershipTable
SET
Status = PARAM_STATUS,
SuspectTimes = PARAM_SUSPECTTIMES,
MetadataJson = PARAM_METADATAJSON,
IAmAliveTime = PARAM_IAMALIVETIME
WHERE DeploymentId = PARAM_DEPLOYMENTID AND PARAM_DEPLOYMENTID IS NOT NULL
AND Address = PARAM_ADDRESS AND PARAM_ADDRESS IS NOT NULL
AND Port = PARAM_PORT AND PARAM_PORT IS NOT NULL
AND Generation = PARAM_GENERATION AND PARAM_GENERATION IS NOT NULL
AND rowcount > 0;
rowcount := SQL%ROWCOUNT;
COMMIT;
RETURN(rowcount);
END;
/

UPDATE OrleansQuery
SET QueryText = 'SELECT INSERTMEMBERSHIP(:DeploymentId,:IAmAliveTime,:SiloName,:Hostname,:Address,:Port,:Generation,:StartTime,:Status,:ProxyPort,:MetadataJson,:Version) FROM DUAL'
WHERE QueryKey = 'InsertMembershipKey';
/

UPDATE OrleansQuery
SET QueryText = 'SELECT UpdateMembership(:DeploymentId, :Address, :Port, :Generation, :IAmAliveTime, :Status, :SuspectTimes, :MetadataJson, :Version) AS RESULT FROM DUAL'
WHERE QueryKey = 'UpdateMembershipKey';
/

UPDATE OrleansQuery
SET QueryText = 'SELECT v.DeploymentId, m.Address, m.Port, m.Generation, m.SiloName, m.HostName,
m.Status, m.ProxyPort, m.SuspectTimes, m.MetadataJson, m.StartTime, m.IAmAliveTime, v.Version
FROM
OrleansMembershipVersionTable v
LEFT OUTER JOIN OrleansMembershipTable m ON v.DeploymentId = m.DeploymentId
AND Address = :Address AND :Address IS NOT NULL
AND Port = :Port AND :Port IS NOT NULL
AND Generation = :Generation AND :Generation IS NOT NULL
WHERE
v.DeploymentId = :DeploymentId AND :DeploymentId IS NOT NULL'
WHERE QueryKey = 'MembershipReadRowKey';
/

UPDATE OrleansQuery
SET QueryText = 'SELECT v.DeploymentId, m.Address, m.Port, m.Generation, m.SiloName, m.HostName, m.Status,
m.ProxyPort, m.SuspectTimes, m.MetadataJson, m.StartTime, m.IAmAliveTime, v.Version
FROM
OrleansMembershipVersionTable v
LEFT OUTER JOIN OrleansMembershipTable m ON v.DeploymentId = m.DeploymentId
WHERE
v.DeploymentId = :DeploymentId AND :DeploymentId IS NOT NULL'
WHERE QueryKey = 'MembershipReadAllKey';
/

COMMIT;
Loading
Loading