Skip to content

Commit dfd3284

Browse files
authored
Optimize timeseries metadata read and fix table size accounting (#840)
1 parent 719f80a commit dfd3284

4 files changed

Lines changed: 181 additions & 41 deletions

File tree

java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java

Lines changed: 108 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -732,22 +732,23 @@ public TimeseriesMetadata readTimeseriesMetadata(
732732
boolean ignoreNotExistDevice,
733733
LongConsumer ioSizeConsumer)
734734
throws IOException {
735-
readFileMetadata(ioSizeConsumer);
736-
MetadataIndexNode deviceMetadataIndexNode =
737-
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
738-
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
739-
getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeConsumer);
740-
if (metadataIndexPair == null) {
741-
if (ignoreNotExistDevice) {
742-
return null;
743-
}
744-
throw new IOException(
745-
Messages.format("error.read.device_not_in_metadata_file", device, file));
746-
}
747-
ByteBuffer buffer =
748-
readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer);
749-
MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
750-
if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
735+
return readTimeseriesMetadata(device, null, measurement, ignoreNotExistDevice, ioSizeConsumer);
736+
}
737+
738+
public TimeseriesMetadata readTimeseriesMetadata(
739+
IDeviceID device,
740+
long[] deviceMetadataIndexNodeOffset,
741+
String measurement,
742+
boolean ignoreNotExistDevice,
743+
LongConsumer ioSizeConsumer)
744+
throws IOException {
745+
Pair<IMetadataIndexEntry, Long> metadataIndexPair;
746+
MetadataIndexNode metadataIndexNode;
747+
ByteBuffer buffer;
748+
if (deviceMetadataIndexNodeOffset != null) {
749+
buffer =
750+
readData(
751+
deviceMetadataIndexNodeOffset[0], deviceMetadataIndexNodeOffset[1], ioSizeConsumer);
751752
try {
752753
metadataIndexNode =
753754
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
@@ -759,6 +760,36 @@ public TimeseriesMetadata readTimeseriesMetadata(
759760
metadataIndexPair =
760761
getMetadataAndEndOffsetOfMeasurementNode(
761762
metadataIndexNode, measurement, false, ioSizeConsumer);
763+
} else {
764+
readFileMetadata(ioSizeConsumer);
765+
MetadataIndexNode deviceMetadataIndexNode =
766+
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
767+
metadataIndexPair =
768+
getMetadataAndEndOffsetOfDeviceNode(
769+
deviceMetadataIndexNode, device, true, ioSizeConsumer);
770+
if (metadataIndexPair == null) {
771+
if (ignoreNotExistDevice) {
772+
return null;
773+
}
774+
throw new IOException(
775+
Messages.format("error.read.device_not_in_metadata_file", device, file));
776+
}
777+
buffer =
778+
readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer);
779+
metadataIndexNode = deviceMetadataIndexNode;
780+
if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
781+
try {
782+
metadataIndexNode =
783+
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
784+
buffer, deserializeConfig);
785+
} catch (Exception e) {
786+
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
787+
throw e;
788+
}
789+
metadataIndexPair =
790+
getMetadataAndEndOffsetOfMeasurementNode(
791+
metadataIndexNode, measurement, false, ioSizeConsumer);
792+
}
762793
}
763794
if (metadataIndexPair == null) {
764795
return null;
@@ -781,13 +812,15 @@ public TimeseriesMetadata readTimeseriesMetadata(
781812
}
782813
// when the buffer length is over than Integer.MAX_VALUE,
783814
// using tsFileInput to get timeseriesMetadataList
784-
tsFileInput.position(metadataIndexPair.left.getOffset());
785-
while (tsFileInput.position() < metadataIndexPair.right) {
786-
try {
787-
timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(tsFileInput, true));
788-
} catch (Exception e1) {
789-
logger.error(Messages.get("log.read.sequence_reader_tsm_deserialize_error"), file);
790-
throw e1;
815+
synchronized (this) {
816+
tsFileInput.position(metadataIndexPair.left.getOffset());
817+
while (tsFileInput.position() < metadataIndexPair.right) {
818+
try {
819+
timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(tsFileInput, true));
820+
} catch (Exception e1) {
821+
logger.error(Messages.get("log.read.sequence_reader_tsm_deserialize_error"), file);
822+
throw e1;
823+
}
791824
}
792825
}
793826
}
@@ -872,8 +905,21 @@ public List<TimeseriesMetadata> readTimeseriesMetadata(
872905
boolean ignoreNotExistDevice,
873906
LongConsumer ioSizeRecorder)
874907
throws IOException {
908+
return readTimeseriesMetadata(
909+
device, null, measurement, allSensors, ignoreNotExistDevice, ioSizeRecorder);
910+
}
911+
912+
public List<TimeseriesMetadata> readTimeseriesMetadata(
913+
IDeviceID device,
914+
long[] deviceMetadataIndexNodeOffset,
915+
String measurement,
916+
Set<String> allSensors,
917+
boolean ignoreNotExistDevice,
918+
LongConsumer ioSizeRecorder)
919+
throws IOException {
875920
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
876-
getLeafMetadataIndexPair(device, measurement, ioSizeRecorder);
921+
getLeafMetadataIndexPair(
922+
device, deviceMetadataIndexNodeOffset, measurement, ioSizeRecorder);
877923
if (metadataIndexPair == null) {
878924
if (ignoreNotExistDevice) {
879925
return Collections.emptyList();
@@ -927,19 +973,18 @@ public List<TimeseriesMetadata> readTimeseriesMetadata(
927973

928974
/* Get leaf MetadataIndexPair which contains path */
929975
private Pair<IMetadataIndexEntry, Long> getLeafMetadataIndexPair(
930-
IDeviceID device, String measurement, LongConsumer ioSizeRecorder) throws IOException {
931-
readFileMetadata(ioSizeRecorder);
932-
MetadataIndexNode deviceMetadataIndexNode =
933-
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
934-
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
935-
getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeRecorder);
936-
if (metadataIndexPair == null) {
937-
return null;
938-
}
939-
ByteBuffer buffer =
940-
readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder);
941-
MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
942-
if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
976+
IDeviceID device,
977+
long[] deviceMetadataIndexNodeOffset,
978+
String measurement,
979+
LongConsumer ioSizeRecorder)
980+
throws IOException {
981+
Pair<IMetadataIndexEntry, Long> metadataIndexPair;
982+
MetadataIndexNode metadataIndexNode;
983+
ByteBuffer buffer;
984+
if (deviceMetadataIndexNodeOffset != null) {
985+
buffer =
986+
readData(
987+
deviceMetadataIndexNodeOffset[0], deviceMetadataIndexNodeOffset[1], ioSizeRecorder);
943988
try {
944989
metadataIndexNode =
945990
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
@@ -948,10 +993,33 @@ private Pair<IMetadataIndexEntry, Long> getLeafMetadataIndexPair(
948993
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
949994
throw e;
950995
}
996+
} else {
997+
readFileMetadata(ioSizeRecorder);
998+
MetadataIndexNode deviceMetadataIndexNode =
999+
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
9511000
metadataIndexPair =
952-
getMetadataAndEndOffsetOfMeasurementNode(
953-
metadataIndexNode, measurement, false, ioSizeRecorder);
1001+
getMetadataAndEndOffsetOfDeviceNode(
1002+
deviceMetadataIndexNode, device, true, ioSizeRecorder);
1003+
if (metadataIndexPair == null) {
1004+
return null;
1005+
}
1006+
buffer =
1007+
readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder);
1008+
metadataIndexNode = deviceMetadataIndexNode;
1009+
if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1010+
try {
1011+
metadataIndexNode =
1012+
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
1013+
buffer, deserializeConfig);
1014+
} catch (Exception e) {
1015+
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
1016+
throw e;
1017+
}
1018+
}
9541019
}
1020+
metadataIndexPair =
1021+
getMetadataAndEndOffsetOfMeasurementNode(
1022+
metadataIndexNode, measurement, false, ioSizeRecorder);
9551023
return metadataIndexPair;
9561024
}
9571025

@@ -1646,7 +1714,7 @@ private void generateMetadataIndexUsingTsFileInput(
16461714
needChunkMetadata);
16471715
}
16481716

1649-
private void generateMetadataIndexUsingTsFileInput(
1717+
private synchronized void generateMetadataIndexUsingTsFileInput(
16501718
IMetadataIndexEntry metadataIndex,
16511719
long start,
16521720
long end,

java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,10 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
507507
// construct the index tree node for the series
508508
currentDevice = currentPath.getIDeviceID();
509509
boolean isTableModel = schema.getTableSchemaMap().containsKey(currentDevice.getTableName());
510+
String currentTableName = isTableModel ? currentDevice.getTableName() : null;
511+
if (prevDevice == null && currentTableName != null) {
512+
prevTableName = currentTableName;
513+
}
510514
if (!currentDevice.equals(prevDevice)) {
511515
if (prevDevice != null) {
512516
addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
@@ -515,7 +519,6 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
515519
generateRootNode(
516520
measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
517521
currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
518-
String currentTableName = isTableModel ? currentDevice.getTableName() : null;
519522
if (!Objects.equals(currentTableName, prevTableName)) {
520523
if (prevTableName != null) {
521524
long currentTableSize = out.getPosition() - prevTableMetadataStartOffset;

java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.tsfile.enums.TSDataType;
2525
import org.apache.tsfile.file.metadata.IDeviceID;
2626
import org.apache.tsfile.file.metadata.TableSchema;
27+
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
2728
import org.apache.tsfile.file.metadata.enums.CompressionType;
2829
import org.apache.tsfile.file.metadata.enums.TSEncoding;
2930
import org.apache.tsfile.read.common.TimeRange;
@@ -102,6 +103,33 @@ public void test() throws IOException {
102103
}
103104
}
104105

106+
@Test
107+
public void testReadTimeseriesMetadataWithDeviceMetadataIndexNodeOffset() throws IOException {
108+
try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH))) {
109+
registerTableSchema(writer, "table1");
110+
generateDevice(writer, "table1", 10);
111+
writer.endFile();
112+
}
113+
114+
try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
115+
TsFileDeviceIterator deviceIterator =
116+
reader.getTableDevicesIteratorWithIsAligned("table1", null);
117+
Assert.assertTrue(deviceIterator.hasNext());
118+
Pair<IDeviceID, Boolean> currentDevice = deviceIterator.next();
119+
long[] deviceMetadataIndexNodeOffset = deviceIterator.getCurrentDeviceMeasurementNodeOffset();
120+
121+
TimeseriesMetadata metadataWithoutOffset =
122+
reader.readTimeseriesMetadata(currentDevice.getLeft(), "s1", false);
123+
TimeseriesMetadata metadataWithOffset =
124+
reader.readTimeseriesMetadata(
125+
currentDevice.getLeft(), deviceMetadataIndexNodeOffset, "s1", false, null);
126+
127+
Assert.assertEquals("s1", metadataWithoutOffset.getMeasurementId());
128+
Assert.assertEquals(
129+
metadataWithoutOffset.getMeasurementId(), metadataWithOffset.getMeasurementId());
130+
}
131+
}
132+
105133
private void registerTableSchema(TsFileIOWriter writer, String tableName) {
106134
List<IMeasurementSchema> schemas =
107135
Arrays.asList(

java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,6 +1280,47 @@ public void calculateTableSize() throws IOException, WriteProcessException {
12801280
Assert.assertTrue(tableSizeMap.get("table2") >= 1024 * 1024);
12811281
}
12821282

1283+
@Test
1284+
public void calculateTableSize2() throws IOException, WriteProcessException {
1285+
TableSchema tableSchema1 =
1286+
new TableSchema(
1287+
"table1",
1288+
Arrays.asList(
1289+
new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
1290+
new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD)));
1291+
TableSchema tableSchema2 =
1292+
new TableSchema(
1293+
"table2",
1294+
Arrays.asList(
1295+
new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
1296+
new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD)));
1297+
Tablet tablet1 =
1298+
new Tablet(
1299+
"table1",
1300+
IMeasurementSchema.getMeasurementNameList(tableSchema1.getColumnSchemas()),
1301+
IMeasurementSchema.getDataTypeList(tableSchema1.getColumnSchemas()),
1302+
tableSchema1.getColumnTypes());
1303+
tablet1.addTimestamp(0, 0);
1304+
tablet1.addValue(0, 0, new byte[1024]);
1305+
Tablet tablet2 =
1306+
new Tablet(
1307+
"table2",
1308+
IMeasurementSchema.getMeasurementNameList(tableSchema2.getColumnSchemas()),
1309+
IMeasurementSchema.getDataTypeList(tableSchema2.getColumnSchemas()),
1310+
tableSchema2.getColumnTypes());
1311+
tablet2.addTimestamp(0, 0);
1312+
tablet2.addValue(0, 0, new byte[1024 * 1024]);
1313+
Map<String, Long> tableSizeMap = null;
1314+
try (TsFileWriter writer = new TsFileWriter(f)) {
1315+
writer.registerTableSchema(tableSchema1);
1316+
writer.registerTableSchema(tableSchema2);
1317+
writer.writeTable(tablet1);
1318+
writer.writeTable(tablet2);
1319+
tableSizeMap = writer.getIOWriter().getTableSizeMap();
1320+
}
1321+
Assert.assertTrue(tableSizeMap.get("table1") > 1024);
1322+
}
1323+
12831324
@Test
12841325
public void writeRecord() throws IOException, WriteProcessException, ReadProcessException {
12851326
setEnv(100 * 1024 * 1024, 10 * 1024);

0 commit comments

Comments
 (0)